Script 'mail_helper' called by obssrc
Hello community,

here is the log from the commit of package python-distributed for 
openSUSE:Factory checked in at 2024-03-06 23:05:52
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/python-distributed (Old)
 and      /work/SRC/openSUSE:Factory/.python-distributed.new.1770 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Package is "python-distributed"

Wed Mar  6 23:05:52 2024 rev:79 rq:1155502 version:2024.2.1

Changes:
--------
--- /work/SRC/openSUSE:Factory/python-distributed/python-distributed.changes    
2024-02-16 21:42:51.006770687 +0100
+++ 
/work/SRC/openSUSE:Factory/.python-distributed.new.1770/python-distributed.changes
  2024-03-06 23:06:23.713369868 +0100
@@ -1,0 +2,9 @@
+Tue Mar  5 21:29:43 UTC 2024 - Ben Greiner <c...@bnavigator.de>
+
+- Update to 2024.2.1
+  * Allow silencing dask.DataFrame deprecation warning
+  * More robust distributed scheduler for rare key collisions
+  * More robust adaptive scaling on large clusters
+- Drop distributed-ignore-daskdepr.patch
+
+-------------------------------------------------------------------

Old:
----
  distributed-2024.2.0-gh.tar.gz
  distributed-ignore-daskdepr.patch

New:
----
  distributed-2024.2.1-gh.tar.gz

BETA DEBUG BEGIN:
  Old:  * More robust adaptive scaling on large clusters
- Drop distributed-ignore-daskdepr.patch
BETA DEBUG END:

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Other differences:
------------------
++++++ python-distributed.spec ++++++
--- /var/tmp/diff_new_pack.xdnFuB/_old  2024-03-06 23:06:24.357393237 +0100
+++ /var/tmp/diff_new_pack.xdnFuB/_new  2024-03-06 23:06:24.357393237 +0100
@@ -47,7 +47,7 @@
 
 Name:           python-distributed%{psuffix}
 # ===> Note: python-dask MUST be updated in sync with python-distributed! <===
-Version:        2024.2.0
+Version:        2024.2.1
 Release:        0
 Summary:        Library for distributed computing with Python
 License:        BSD-3-Clause
@@ -55,8 +55,6 @@
 # SourceRepository: https://github.com/dask/distributed
 Source:         
https://github.com/dask/distributed/archive/refs/tags/%{version}.tar.gz#/distributed-%{version}-gh.tar.gz
 Source99:       python-distributed-rpmlintrc
-# PATCH-FIX-UPSTREAM distributed-ignore-daskdepr.patch gh#dask/distributed#8504
-Patch0:         distributed-ignore-daskdepr.patch
 # PATCH-FIX-OPENSUSE distributed-ignore-off.patch -- ignore that we can't 
probe addresses on obs, c...@bnavigator.de
 Patch3:         distributed-ignore-offline.patch
 # PATCH-FIX-OPENSUSE distributed-ignore-thread-leaks.patch -- ignore leaking 
threads on obs, c...@bnavigator.de

++++++ distributed-2024.2.0-gh.tar.gz -> distributed-2024.2.1-gh.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2024.2.0/.github/workflows/ci-pre-commit.yml 
new/distributed-2024.2.1/.github/workflows/ci-pre-commit.yml
--- old/distributed-2024.2.0/.github/workflows/ci-pre-commit.yml        
2024-02-09 23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/.github/workflows/ci-pre-commit.yml        
2024-02-23 19:11:48.000000000 +0100
@@ -15,4 +15,4 @@
       - uses: actions/setup-python@v5
         with:
           python-version: '3.9'
-      - uses: pre-commit/action@v3.0.0
+      - uses: pre-commit/action@v3.0.1
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2024.2.0/.github/workflows/tests.yaml 
new/distributed-2024.2.1/.github/workflows/tests.yaml
--- old/distributed-2024.2.0/.github/workflows/tests.yaml       2024-02-09 
23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/.github/workflows/tests.yaml       2024-02-23 
19:11:48.000000000 +0100
@@ -154,7 +154,7 @@
           # Increase this value to reset cache if
           # continuous_integration/environment-${{ matrix.environment }}.yaml 
has not
           # changed. See also same variable in .pre-commit-config.yaml
-          CACHE_NUMBER: 0
+          CACHE_NUMBER: 1
         id: cache
 
       - name: Update environment
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2024.2.0/.pre-commit-config.yaml 
new/distributed-2024.2.1/.pre-commit-config.yaml
--- old/distributed-2024.2.0/.pre-commit-config.yaml    2024-02-09 
23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/.pre-commit-config.yaml    2024-02-23 
19:11:48.000000000 +0100
@@ -69,4 +69,4 @@
 
 # Increase this value to clear the cache on GitHub actions if nothing else in 
this file
 # has changed. See also same variable in .github/workflows/test.yaml
-# CACHE_NUMBER: 0
+# CACHE_NUMBER: 1
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2024.2.0/continuous_integration/environment-3.12.yaml 
new/distributed-2024.2.1/continuous_integration/environment-3.12.yaml
--- old/distributed-2024.2.0/continuous_integration/environment-3.12.yaml       
2024-02-09 23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/continuous_integration/environment-3.12.yaml       
2024-02-23 19:11:48.000000000 +0100
@@ -15,8 +15,8 @@
   - filesystem-spec  # overridden by git tip below
   - gilknocker
   - h5py
-  - ipykernel <6.22.0  # https://github.com/dask/distributed/issues/7688
-  - ipywidgets <8.0.5  # https://github.com/dask/distributed/issues/7688
+  - ipykernel
+  - ipywidgets
   - jinja2
   - locket
   - msgpack-python
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2024.2.0/continuous_integration/gpuci/axis.yaml 
new/distributed-2024.2.1/continuous_integration/gpuci/axis.yaml
--- old/distributed-2024.2.0/continuous_integration/gpuci/axis.yaml     
2024-02-09 23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/continuous_integration/gpuci/axis.yaml     
2024-02-23 19:11:48.000000000 +0100
@@ -9,6 +9,6 @@
 - ubuntu20.04
 
 RAPIDS_VER:
-- "24.02"
+- "24.04"
 
 excludes:
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2024.2.0/continuous_integration/scripts/test_report.py 
new/distributed-2024.2.1/continuous_integration/scripts/test_report.py
--- old/distributed-2024.2.0/continuous_integration/scripts/test_report.py      
2024-02-09 23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/continuous_integration/scripts/test_report.py      
2024-02-23 19:11:48.000000000 +0100
@@ -498,9 +498,9 @@
     )
     overall = {name: grouped.get_group(name) for name in grouped.groups}
 
-    # Get all of the workflow run timestamps that we wound up with, which we 
can use
+    # Get all the workflow run timestamps that we wound up with, which we can 
use
     # below to align the different groups.
-    times = set()
+    times: set = set()
     for df in overall.values():
         times.update(df.date.unique())
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2024.2.0/distributed/client.py 
new/distributed-2024.2.1/distributed/client.py
--- old/distributed-2024.2.0/distributed/client.py      2024-02-09 
23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/distributed/client.py      2024-02-23 
19:11:48.000000000 +0100
@@ -41,6 +41,7 @@
     ensure_dict,
     format_bytes,
     funcname,
+    parse_bytes,
     parse_timedelta,
     shorten_traceback,
     typename,
@@ -1576,7 +1577,7 @@
 
                 breakout = False
                 for msg in msgs:
-                    logger.debug("Client receives message %s", msg)
+                    logger.debug("Client %s receives message %s", self.id, msg)
 
                     if "status" in msg and "error" in msg["status"]:
                         typ, exc, tb = clean_exception(**msg)
@@ -3162,7 +3163,9 @@
             header, frames = serialize(ToPickle(dsk), on_error="raise")
 
             pickled_size = sum(map(nbytes, [header] + frames))
-            if pickled_size > 10_000_000:
+            if pickled_size > parse_bytes(
+                
dask.config.get("distributed.admin.large-graph-warning-threshold")
+            ):
                 warnings.warn(
                     f"Sending large graph of size 
{format_bytes(pickled_size)}.\n"
                     "This may cause some slowdown.\n"
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2024.2.0/distributed/comm/tcp.py 
new/distributed-2024.2.1/distributed/comm/tcp.py
--- old/distributed-2024.2.0/distributed/comm/tcp.py    2024-02-09 
23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/distributed/comm/tcp.py    2024-02-23 
19:11:48.000000000 +0100
@@ -44,14 +44,14 @@
 logger = logging.getLogger(__name__)
 
 
-# Workaround for OpenSSL 1.0.2.
-# Can drop with OpenSSL 1.1.1 used by Python 3.10+.
-# ref: https://bugs.python.org/issue42853
-if sys.version_info < (3, 10):
-    OPENSSL_MAX_CHUNKSIZE = 256 ** ctypes.sizeof(ctypes.c_int) // 2 - 1
-else:
-    OPENSSL_MAX_CHUNKSIZE = 256 ** ctypes.sizeof(ctypes.c_size_t) - 1
+# We must not load more than this into a buffer at a time
+# It's currently unclear why that is
+# see
+# - https://github.com/dask/distributed/pull/5854
+# - https://bugs.python.org/issue42853
+# - https://github.com/dask/distributed/pull/8507
 
+C_INT_MAX = 256 ** ctypes.sizeof(ctypes.c_int) // 2 - 1
 MAX_BUFFER_SIZE = MEMORY_LIMIT / 2
 
 
@@ -286,8 +286,8 @@
                         2,
                         range(
                             0,
-                            each_frame_nbytes + OPENSSL_MAX_CHUNKSIZE,
-                            OPENSSL_MAX_CHUNKSIZE,
+                            each_frame_nbytes + C_INT_MAX,
+                            C_INT_MAX,
                         ),
                     ):
                         chunk = each_frame[i:j]
@@ -360,7 +360,7 @@
 
     for i, j in sliding_window(
         2,
-        range(0, n + OPENSSL_MAX_CHUNKSIZE, OPENSSL_MAX_CHUNKSIZE),
+        range(0, n + C_INT_MAX, C_INT_MAX),
     ):
         chunk = buf[i:j]
         actual = await stream.read_into(chunk)  # type: ignore[arg-type]
@@ -432,7 +432,8 @@
     A TLS-specific version of TCP.
     """
 
-    max_shard_size = min(OPENSSL_MAX_CHUNKSIZE, TCP.max_shard_size)
+    # Workaround for OpenSSL 1.0.2 (can drop with OpenSSL 1.1.1)
+    max_shard_size = min(C_INT_MAX, TCP.max_shard_size)
 
     def _read_extra(self):
         TCP._read_extra(self)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2024.2.0/distributed/deploy/tests/test_adaptive.py 
new/distributed-2024.2.1/distributed/deploy/tests/test_adaptive.py
--- old/distributed-2024.2.0/distributed/deploy/tests/test_adaptive.py  
2024-02-09 23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/distributed/deploy/tests/test_adaptive.py  
2024-02-23 19:11:48.000000000 +0100
@@ -203,7 +203,7 @@
         assert len(adapt.log) == 1
 
         # Scale up when there is plenty of available work
-        futures = client.map(slowinc, range(1000), delay=0.100)
+        futures = client.map(slowinc, range(2, 1002), delay=0.100)
         while len(adapt.log) == 1:
             await asyncio.sleep(0.01)
         assert len(adapt.log) == 2
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2024.2.0/distributed/distributed-schema.yaml 
new/distributed-2024.2.1/distributed/distributed-schema.yaml
--- old/distributed-2024.2.0/distributed/distributed-schema.yaml        
2024-02-09 23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/distributed/distributed-schema.yaml        
2024-02-23 19:11:48.000000000 +0100
@@ -1103,6 +1103,12 @@
         description: |
           Options for logs, event loops, and so on
         properties:
+          large-graph-warning-threshold:
+            type: string
+            description: |
+              Threshold in bytes for when a warning is raised about a large
+              submitted task graph.
+              Default is 10MB.
           tick:
             type: object
             description: |
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2024.2.0/distributed/distributed.yaml 
new/distributed-2024.2.1/distributed/distributed.yaml
--- old/distributed-2024.2.0/distributed/distributed.yaml       2024-02-09 
23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/distributed/distributed.yaml       2024-02-23 
19:11:48.000000000 +0100
@@ -323,6 +323,7 @@
   ##################
 
   admin:
+    large-graph-warning-threshold: 10MB  # Threshold for warning on large graph
     tick:
       interval: 20ms  # time between event loop health checks
       limit: 3s       # time allowed before triggering a warning
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2024.2.0/distributed/protocol/tests/test_cupy.py 
new/distributed-2024.2.1/distributed/protocol/tests/test_cupy.py
--- old/distributed-2024.2.0/distributed/protocol/tests/test_cupy.py    
2024-02-09 23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/distributed/protocol/tests/test_cupy.py    
2024-02-23 19:11:48.000000000 +0100
@@ -17,7 +17,7 @@
 @pytest.mark.parametrize("order", ["C", "F"])
 @pytest.mark.parametrize("serializers", [("cuda",), ("dask",), ("pickle",)])
 def test_serialize_cupy(shape, dtype, order, serializers):
-    x = cupy.arange(numpy.product(shape), dtype=dtype)
+    x = cupy.arange(numpy.prod(shape), dtype=dtype)
     x = cupy.ndarray(shape, dtype=x.dtype, memptr=x.data, order=order)
     header, frames = serialize(x, serializers=serializers)
     y = deserialize(header, frames, deserializers=serializers)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2024.2.0/distributed/protocol/tests/test_numba.py 
new/distributed-2024.2.1/distributed/protocol/tests/test_numba.py
--- old/distributed-2024.2.0/distributed/protocol/tests/test_numba.py   
2024-02-09 23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/distributed/protocol/tests/test_numba.py   
2024-02-23 19:11:48.000000000 +0100
@@ -20,7 +20,7 @@
     if not cuda.is_available():
         pytest.skip("CUDA is not available")
 
-    ary = np.arange(np.product(shape), dtype=dtype)
+    ary = np.arange(np.prod(shape), dtype=dtype)
     ary = np.ndarray(shape, dtype=ary.dtype, buffer=ary.data, order=order)
     x = cuda.to_device(ary)
     header, frames = serialize(x, serializers=serializers)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2024.2.0/distributed/protocol/tests/test_protocol.py 
new/distributed-2024.2.1/distributed/protocol/tests/test_protocol.py
--- old/distributed-2024.2.0/distributed/protocol/tests/test_protocol.py        
2024-02-09 23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/distributed/protocol/tests/test_protocol.py        
2024-02-23 19:11:48.000000000 +0100
@@ -208,3 +208,30 @@
     assert L[0].count(b"__Pickled__") == 1
     assert L[0].count(b"__Serialized__") == 1
     assert loads(L) == {np.int64(1): {2: "a"}, 3: ("b", "c"), 4: "d"}
+
+
+@pytest.mark.slow
+@pytest.mark.parametrize("typ", [bytes, str, "ext"])
+def test_large_payload(typ):
+    """See also: test_core.py::test_large_payload"""
+    critical_size = 2**31 + 1  # >2 GiB
+    if typ == bytes:
+        large_payload = critical_size * b"0"
+        expected = large_payload
+    elif typ == str:
+        large_payload = critical_size * "0"
+        expected = large_payload
+    # Testing array and map dtypes is practically not possible since we'd have
+    # to create an actual list or dict object of critical size (i.e. not the
+    # content but the container itself). These are so large that msgpack is
+    # running forever
+    # elif typ == "array":
+    #     large_payload = [b"0"] * critical_size
+    #     expected = tuple(large_payload)
+    # elif typ == "map":
+    #     large_payload = {x: b"0" for x in range(critical_size)}
+    #     expected = large_payload
+    elif typ == "ext":
+        large_payload = msgpack.ExtType(1, b"0" * critical_size)
+        expected = large_payload
+    assert loads(dumps(large_payload)) == expected
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2024.2.0/distributed/protocol/utils.py 
new/distributed-2024.2.1/distributed/protocol/utils.py
--- old/distributed-2024.2.0/distributed/protocol/utils.py      2024-02-09 
23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/distributed/protocol/utils.py      2024-02-23 
19:11:48.000000000 +0100
@@ -12,9 +12,7 @@
 BIG_BYTES_SHARD_SIZE = 
dask.utils.parse_bytes(dask.config.get("distributed.comm.shard"))
 
 
-msgpack_opts = {
-    ("max_%s_len" % x): 2**31 - 1 for x in ["str", "bin", "array", "map", 
"ext"]
-}
+msgpack_opts = {}
 msgpack_opts["strict_map_key"] = False
 msgpack_opts["raw"] = False
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2024.2.0/distributed/scheduler.py 
new/distributed-2024.2.1/distributed/scheduler.py
--- old/distributed-2024.2.0/distributed/scheduler.py   2024-02-09 
23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/distributed/scheduler.py   2024-02-23 
19:11:48.000000000 +0100
@@ -52,7 +52,9 @@
 from tornado.ioloop import IOLoop
 
 import dask
-from dask.core import get_deps, validate_key
+import dask.utils
+from dask.base import TokenizationError, normalize_token, tokenize
+from dask.core import get_deps, iskey, validate_key
 from dask.typing import Key, no_default
 from dask.utils import (
     ensure_dict,
@@ -4721,6 +4723,7 @@
                 stimulus_id=stimulus_id or f"update-graph-{start}",
             )
         except RuntimeError as e:
+            logger.error(str(e))
             err = error_message(e)
             for key in keys:
                 self.report(
@@ -4729,7 +4732,10 @@
                         "key": key,
                         "exception": err["exception"],
                         "traceback": err["traceback"],
-                    }
+                    },
+                    # This informs all clients in who_wants plus the current 
client
+                    # (which may not have been added to who_wants yet)
+                    client=client,
                 )
         end = time()
         self.digest_metric("update-graph-duration", end - start)
@@ -4747,6 +4753,7 @@
         stack = list(keys)
         touched_keys = set()
         touched_tasks = []
+        tgs_with_bad_run_spec = set()
         while stack:
             k = stack.pop()
             if k in touched_keys:
@@ -4755,8 +4762,73 @@
             if ts is None:
                 ts = self.new_task(k, dsk.get(k), "released", 
computation=computation)
                 new_tasks.append(ts)
-            elif not ts.run_spec:
+            # It is possible to create the TaskState object before its runspec 
is known
+            # to the scheduler. For instance, this is possible when using a 
Variable:
+            # `f = c.submit(foo); await Variable().set(f)` since the Variable 
uses a
+            # different comm channel, so the `client_desires_key` message 
could arrive
+            # before `update_graph`.
+            # There are also anti-pattern processes possible;
+            # see for example test_scatter_creates_ts
+            elif ts.run_spec is None:
                 ts.run_spec = dsk.get(k)
+            # run_spec in the submitted graph may be None. This happens
+            # when an already persisted future is part of the graph
+            elif k in dsk:
+                # If both tokens are non-deterministic, skip comparison
+                try:
+                    tok_lhs = tokenize(ts.run_spec, ensure_deterministic=True)
+                except TokenizationError:
+                    tok_lhs = ""
+                try:
+                    tok_rhs = tokenize(dsk[k], ensure_deterministic=True)
+                except TokenizationError:
+                    tok_rhs = ""
+
+                # Additionally check dependency names. This should only be 
necessary
+                # if run_specs can't be tokenized deterministically.
+                deps_lhs = {dts.key for dts in ts.dependencies}
+                deps_rhs = dependencies[k]
+
+                # FIXME It would be a really healthy idea to change this to a 
hard
+                # failure. However, this is not possible at the moment because 
of
+                # https://github.com/dask/dask/issues/9888
+                if tok_lhs != tok_rhs or deps_lhs != deps_rhs:
+                    # Retain old run_spec and dependencies; rerun them if 
necessary.
+                    # This sweeps the issue of collision under the carpet as 
long as the
+                    # old and new task produce the same output - such as in
+                    # dask/dask#9888.
+                    dependencies[k] = deps_lhs
+
+                    if ts.group not in tgs_with_bad_run_spec:
+                        tgs_with_bad_run_spec.add(ts.group)
+                        logger.warning(
+                            f"Detected different `run_spec` for key {ts.key!r} 
between "
+                            "two consecutive calls to `update_graph`. "
+                            "This can cause failures and deadlocks down the 
line. "
+                            "Please ensure unique key names. "
+                            "If you are using a standard dask collections, 
consider "
+                            "releasing all the data before resubmitting 
another "
+                            "computation. More details and help can be found 
at "
+                            "https://github.com/dask/dask/issues/9888. "
+                            + textwrap.dedent(
+                                f"""
+                                Debugging information
+                                ---------------------
+                                old task state: {ts.state}
+                                old run_spec: {ts.run_spec!r}
+                                new run_spec: {dsk[k]!r}
+                                old token: {normalize_token(ts.run_spec)!r}
+                                new token: {normalize_token(dsk[k])!r}
+                                old dependencies: {deps_lhs}
+                                new dependencies: {deps_rhs}
+                                """
+                            )
+                        )
+                    else:
+                        logger.debug(
+                            f"Detected different `run_spec` for key {ts.key!r} 
between "
+                            "two consecutive calls to `update_graph`."
+                        )
 
             if ts.run_spec:
                 runnable.append(ts)
@@ -5538,28 +5610,28 @@
                 tasks: dict = self.tasks
                 ts = tasks.get(msg_key)
 
-        client_comms: dict = self.client_comms
-        if ts is None:
+        if ts is None and client is None:
             # Notify all clients
-            client_keys = list(client_comms)
-        elif client:
-            # Notify clients interested in key
-            client_keys = [cs.client_key for cs in ts.who_wants or ()]
+            client_keys = list(self.client_comms)
+        elif ts is None:
+            client_keys = [client]
         else:
             # Notify clients interested in key (including `client`)
+            # Note that, if report() was called by update_graph(), `client` 
won't be in
+            # ts.who_wants yet.
             client_keys = [
                 cs.client_key for cs in ts.who_wants or () if cs.client_key != 
client
             ]
-            client_keys.append(client)
+            if client is not None:
+                client_keys.append(client)
 
-        k: str
         for k in client_keys:
-            c = client_comms.get(k)
+            c = self.client_comms.get(k)
             if c is None:
                 continue
             try:
                 c.send(msg)
-                # logger.debug("Scheduler sends message to client %s", msg)
+                # logger.debug("Scheduler sends message to client %s: %s", k, 
msg)
             except CommClosedError:
                 if self.status == Status.running:
                     logger.critical(
@@ -7019,12 +7091,11 @@
             If neither ``workers`` nor ``names`` are provided, we call
             ``workers_to_close`` which finds a good set.
         close_workers: bool (defaults to False)
-            Whether or not to actually close the worker explicitly from here.
-            Otherwise we expect some external job scheduler to finish off the
-            worker.
+            Whether to actually close the worker explicitly from here.
+            Otherwise, we expect some external job scheduler to finish off the 
worker.
         remove: bool (defaults to True)
-            Whether or not to remove the worker metadata immediately or else
-            wait for the worker to contact us.
+            Whether to remove the worker metadata immediately or else wait for 
the
+            worker to contact us.
 
             If close_workers=False and remove=False, this method just flushes 
the tasks
             in memory out of the workers and then returns.
@@ -8724,9 +8795,17 @@
     dsk2 = {}
     fut_deps = {}
     for k, v in dsk.items():
-        dsk2[k], futs = unpack_remotedata(v, byte_keys=True)
+        v, futs = unpack_remotedata(v, byte_keys=True)
         if futs:
             fut_deps[k] = futs
+
+        # Remove aliases {x: x}.
+        # FIXME: This is an artifact generated by unpack_remotedata when using 
persisted
+        # collections. There should be a better way to achieve that tasks are 
not self
+        # referencing themselves.
+        if not iskey(v) or v != k:
+            dsk2[k] = v
+
     dsk = dsk2
 
     # - Add in deps for any tasks that depend on futures
@@ -8736,14 +8815,8 @@
     # Remove any self-dependencies (happens on test_publish_bag() and others)
     for k, v in dependencies.items():
         deps = set(v)
-        if k in deps:
-            deps.remove(k)
+        deps.discard(k)
         dependencies[k] = deps
 
-    # Remove aliases
-    for k in list(dsk):
-        if dsk[k] is k:
-            del dsk[k]
     dsk = valmap(_normalize_task, dsk)
-
     return dsk, dependencies, annotations_by_type
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2024.2.0/distributed/shuffle/_core.py 
new/distributed-2024.2.1/distributed/shuffle/_core.py
--- old/distributed-2024.2.0/distributed/shuffle/_core.py       2024-02-09 
23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/distributed/shuffle/_core.py       2024-02-23 
19:11:48.000000000 +0100
@@ -442,6 +442,9 @@
             participating_workers=set(worker_for.values()),
         )
 
+    def validate_data(self, data: Any) -> None:
+        """Validate payload data before shuffling"""
+
     @abc.abstractmethod
     def create_run_on_worker(
         self,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2024.2.0/distributed/shuffle/_disk.py 
new/distributed-2024.2.1/distributed/shuffle/_disk.py
--- old/distributed-2024.2.0/distributed/shuffle/_disk.py       2024-02-09 
23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/distributed/shuffle/_disk.py       2024-02-23 
19:11:48.000000000 +0100
@@ -12,6 +12,7 @@
 
 from distributed.metrics import context_meter, thread_time
 from distributed.shuffle._buffer import ShardsBuffer
+from distributed.shuffle._exceptions import DataUnavailable
 from distributed.shuffle._limiter import ResourceLimiter
 from distributed.shuffle._pickle import pickle_bytelist
 from distributed.utils import Deadline, empty_context, log_errors, nbytes
@@ -201,13 +202,13 @@
                 context_meter.digest_metric("p2p-disk-read", 1, "count")
                 context_meter.digest_metric("p2p-disk-read", size, "bytes")
         except FileNotFoundError:
-            raise KeyError(id)
+            raise DataUnavailable(id)
 
         if data:
             self.bytes_read += size
             return data
         else:
-            raise KeyError(id)
+            raise DataUnavailable(id)
 
     async def close(self) -> None:
         await super().close()
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2024.2.0/distributed/shuffle/_exceptions.py 
new/distributed-2024.2.1/distributed/shuffle/_exceptions.py
--- old/distributed-2024.2.0/distributed/shuffle/_exceptions.py 2024-02-09 
23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/distributed/shuffle/_exceptions.py 2024-02-23 
19:11:48.000000000 +0100
@@ -3,3 +3,7 @@
 
 class ShuffleClosedError(RuntimeError):
     pass
+
+
+class DataUnavailable(Exception):
+    """Raised when data is not available in the buffer"""
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2024.2.0/distributed/shuffle/_memory.py 
new/distributed-2024.2.1/distributed/shuffle/_memory.py
--- old/distributed-2024.2.0/distributed/shuffle/_memory.py     2024-02-09 
23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/distributed/shuffle/_memory.py     2024-02-23 
19:11:48.000000000 +0100
@@ -6,6 +6,7 @@
 from dask.sizeof import sizeof
 
 from distributed.shuffle._buffer import ShardsBuffer
+from distributed.shuffle._exceptions import DataUnavailable
 from distributed.shuffle._limiter import ResourceLimiter
 from distributed.utils import log_errors
 
@@ -30,7 +31,10 @@
         if not self._inputs_done:
             raise RuntimeError("Tried to read from file before done.")
 
-        shards = self._shards.pop(id)  # Raises KeyError
+        try:
+            shards = self._shards.pop(id)  # Raises KeyError
+        except KeyError:
+            raise DataUnavailable(id)
         self.bytes_read += sum(map(sizeof, shards))
         # Don't keep the serialized and the deserialized shards
         # in memory at the same time
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2024.2.0/distributed/shuffle/_shuffle.py 
new/distributed-2024.2.1/distributed/shuffle/_shuffle.py
--- old/distributed-2024.2.0/distributed/shuffle/_shuffle.py    2024-02-09 
23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/distributed/shuffle/_shuffle.py    2024-02-23 
19:11:48.000000000 +0100
@@ -48,6 +48,7 @@
     handle_transfer_errors,
     handle_unpack_errors,
 )
+from distributed.shuffle._exceptions import DataUnavailable
 from distributed.shuffle._limiter import ResourceLimiter
 from distributed.shuffle._worker_plugin import ShuffleWorkerPlugin
 from distributed.sizeof import sizeof
@@ -527,7 +528,7 @@
         try:
             data = self._read_from_disk((partition_id,))
             return convert_shards(data, self.meta)
-        except KeyError:
+        except DataUnavailable:
             return self.meta.copy()
 
     def _get_assigned_worker(self, id: int) -> str:
@@ -554,6 +555,10 @@
     def pick_worker(self, partition: int, workers: Sequence[str]) -> str:
         return _get_worker_for_range_sharding(self.npartitions, partition, 
workers)
 
+    def validate_data(self, data: pd.DataFrame) -> None:
+        if set(data.columns) != set(self.meta.columns):
+            raise ValueError(f"Expected {self.meta.columns=} to match 
{data.columns=}.")
+
     def create_run_on_worker(
         self,
         run_id: int,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2024.2.0/distributed/shuffle/_worker_plugin.py 
new/distributed-2024.2.1/distributed/shuffle/_worker_plugin.py
--- old/distributed-2024.2.0/distributed/shuffle/_worker_plugin.py      
2024-02-09 23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/distributed/shuffle/_worker_plugin.py      
2024-02-23 19:11:48.000000000 +0100
@@ -341,6 +341,7 @@
         spec: ShuffleSpec,
         **kwargs: Any,
     ) -> int:
+        spec.validate_data(data)
         shuffle_run = self.get_or_create_shuffle(spec)
         return shuffle_run.add_partition(
             data=data,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2024.2.0/distributed/shuffle/tests/test_disk_buffer.py 
new/distributed-2024.2.1/distributed/shuffle/tests/test_disk_buffer.py
--- old/distributed-2024.2.0/distributed/shuffle/tests/test_disk_buffer.py      
2024-02-09 23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/distributed/shuffle/tests/test_disk_buffer.py      
2024-02-23 19:11:48.000000000 +0100
@@ -8,6 +8,7 @@
 import pytest
 
 from distributed.shuffle._disk import DiskShardsBuffer
+from distributed.shuffle._exceptions import DataUnavailable
 from distributed.shuffle._limiter import ResourceLimiter
 from distributed.utils_test import gen_test
 
@@ -32,7 +33,7 @@
         x = mf.read("x")
         y = mf.read("y")
 
-        with pytest.raises(KeyError):
+        with pytest.raises(DataUnavailable):
             mf.read("z")
 
         assert x == b"0" * 2000
@@ -57,7 +58,7 @@
 
         await mf.flush()
         assert mf.read("1") == b"foo"
-        with pytest.raises(KeyError):
+        with pytest.raises(DataUnavailable):
             mf.read(2)
 
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2024.2.0/distributed/shuffle/tests/test_memory_buffer.py 
new/distributed-2024.2.1/distributed/shuffle/tests/test_memory_buffer.py
--- old/distributed-2024.2.0/distributed/shuffle/tests/test_memory_buffer.py    
2024-02-09 23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/distributed/shuffle/tests/test_memory_buffer.py    
2024-02-23 19:11:48.000000000 +0100
@@ -2,6 +2,7 @@
 
 import pytest
 
+from distributed.shuffle._exceptions import DataUnavailable
 from distributed.shuffle._memory import MemoryShardsBuffer
 from distributed.utils_test import gen_test
 
@@ -21,7 +22,7 @@
         x = mf.read("x")
         y = mf.read("y")
 
-        with pytest.raises(KeyError):
+        with pytest.raises(DataUnavailable):
             mf.read("z")
 
         assert x == [b"0" * 1000] * 2
@@ -42,7 +43,7 @@
 
         await mf.flush()
         assert mf.read("1") == [b"foo"]
-        with pytest.raises(KeyError):
+        with pytest.raises(DataUnavailable):
             mf.read("2")
 
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2024.2.0/distributed/shuffle/tests/test_shuffle.py 
new/distributed-2024.2.1/distributed/shuffle/tests/test_shuffle.py
--- old/distributed-2024.2.0/distributed/shuffle/tests/test_shuffle.py  
2024-02-09 23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/distributed/shuffle/tests/test_shuffle.py  
2024-02-23 19:11:48.000000000 +0100
@@ -210,32 +210,6 @@
     await check_scheduler_cleanup(s)
 
 
-@pytest.mark.parametrize("disk", [True, False])
-@gen_cluster(client=True)
-async def test_stable_ordering(c, s, a, b, disk):
-    df = dask.datasets.timeseries(
-        start="2000-01-01",
-        end="2000-02-01",
-        dtypes={"x": int, "y": int},
-        freq="10 s",
-    )
-    df["x"] = df["x"] % 19
-    df["y"] = df["y"] % 23
-    with dask.config.set(
-        {"dataframe.shuffle.method": "p2p", "distributed.p2p.disk": disk}
-    ):
-        shuffled = dd.shuffle.shuffle(df, "x")
-    result, expected = await c.compute([shuffled, df], sync=True)
-    dd.assert_eq(
-        result.drop_duplicates("x", keep="first"),
-        expected.drop_duplicates("x", keep="first"),
-    )
-
-    await check_worker_cleanup(a)
-    await check_worker_cleanup(b)
-    await check_scheduler_cleanup(s)
-
-
 @pytest.mark.parametrize("processes", [True, False])
 @gen_test()
 async def test_basic_integration_local_cluster(processes):
@@ -2505,17 +2479,14 @@
 
 
 def test_sort_values_with_existing_divisions(client):
-    "Regression test for #8165"
+    """Regression test for #8165"""
     df = pd.DataFrame(
         {"a": np.random.randint(0, 3, 20), "b": np.random.randint(0, 3, 20)}
     )
-    ddf = dd.from_pandas(
-        df,
-        npartitions=4,
-    )
+    ddf = dd.from_pandas(df, npartitions=4)
     with dask.config.set({"dataframe.shuffle.method": "p2p"}):
         ddf = ddf.set_index("a").sort_values("b")
-        result = client.compute(ddf, sync=True)
+        result = ddf.compute()
         dd.assert_eq(
             result,
             df.set_index("a").sort_values("b"),
@@ -2725,3 +2696,76 @@
     await wait_for_tasks_in_state("shuffle-transfer", "resumed", 1, 
barrier_worker)
     barrier_worker.block_gather_dep.set()
     await out
+
+
+@pytest.mark.parametrize("disk", [True, False])
+@pytest.mark.parametrize("keep", ["first", "last"])
+@gen_cluster(client=True)
+async def test_shuffle_stable_ordering(c, s, a, b, keep, disk):
+    """Ensures that shuffling guarantees ordering for individual entries
+    belonging to the same shuffle key"""
+
+    def make_partition(partition_id, size):
+        """Return null column for every other partition"""
+        offset = partition_id * size
+        df = pd.DataFrame({"a": np.arange(start=offset, stop=offset + size)})
+        df["b"] = df["a"] % 23
+        return df
+
+    df = dd.from_map(make_partition, np.arange(19), args=(250,))
+
+    with dask.config.set(
+        {"dataframe.shuffle.method": "p2p", "distributed.p2p.disk": disk}
+    ):
+        shuffled = df.shuffle("b")
+    result, expected = await c.compute([shuffled, df], sync=True)
+    dd.assert_eq(result, expected)
+
+    for _, group in result.groupby("b"):
+        assert group["a"].is_monotonic_increasing
+
+    await check_worker_cleanup(a)
+    await check_worker_cleanup(b)
+    await check_scheduler_cleanup(s)
+
+
+@pytest.mark.parametrize("disk", [True, False])
+@pytest.mark.parametrize("keep", ["first", "last"])
+@gen_cluster(client=True)
+async def test_drop_duplicates_stable_ordering(c, s, a, b, keep, disk):
+    df = dask.datasets.timeseries()
+
+    with dask.config.set(
+        {"dataframe.shuffle.method": "p2p", "distributed.p2p.disk": disk}
+    ):
+        result, expected = await c.compute(
+            [
+                df.drop_duplicates(
+                    subset=["name"], keep=keep, split_out=df.npartitions
+                ),
+                df,
+            ],
+            sync=True,
+        )
+    expected = expected.drop_duplicates(subset=["name"], keep=keep)
+    dd.assert_eq(result, expected)
+
+
+@gen_cluster(client=True)
+async def test_wrong_meta_provided(c, s, a, b):
+    # https://github.com/dask/distributed/issues/8519
+    @dask.delayed
+    def data_gen():
+        return pd.DataFrame({"a": range(10)})
+
+    ddf = dd.from_delayed(
+        [data_gen()] * 2, meta=[("a", int), ("b", int)], verify_meta=False
+    )
+
+    with raises_with_cause(
+        RuntimeError,
+        r"shuffling \w* failed",
+        ValueError,
+        "meta",
+    ):
+        await c.gather(c.compute(ddf.shuffle(on="a")))
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2024.2.0/distributed/tests/test_active_memory_manager.py 
new/distributed-2024.2.1/distributed/tests/test_active_memory_manager.py
--- old/distributed-2024.2.0/distributed/tests/test_active_memory_manager.py    
2024-02-09 23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/distributed/tests/test_active_memory_manager.py    
2024-02-23 19:11:48.000000000 +0100
@@ -22,6 +22,7 @@
 from distributed.utils_test import (
     NO_AMM,
     BlockedGatherDep,
+    BlockedGetData,
     assert_story,
     async_poll_for,
     captured_logger,
@@ -1129,6 +1130,60 @@
     assert dict(w2.data) == {"x": 123, clutter.key: 456}
 
 
+@gen_cluster(
+    client=True,
+    nthreads=[("", 1)] * 10,
+    config={
+        "distributed.scheduler.active-memory-manager.start": True,
+        "distributed.scheduler.active-memory-manager.interval": 0.05,
+        "distributed.scheduler.active-memory-manager.measure": "managed",
+        "distributed.scheduler.active-memory-manager.policies": [],
+    },
+)
+async def test_RetireWorker_mass(c, s, *workers):
+    """Retire 90% of a cluster at once."""
+    # Note: by using scatter instead of submit/map, we're also testing that 
tasks
+    # aren't being recomputed
+    data = await c.scatter(range(100))
+    for w in workers:
+        assert len(w.data) == 10
+
+    await c.retire_workers([w.address for w in workers[:-1]])
+    assert set(s.workers) == {workers[-1].address}
+    assert len(workers[-1].data) == 100
+
+
+@gen_cluster(
+    client=True,
+    config={
+        "distributed.scheduler.active-memory-manager.start": True,
+        "distributed.scheduler.active-memory-manager.interval": 0.05,
+        "distributed.scheduler.active-memory-manager.measure": "managed",
+        "distributed.scheduler.active-memory-manager.policies": [],
+    },
+)
+async def test_RetireWorker_incremental(c, s, w2, w3):
+    """Retire worker w1; this causes its keys to be replicated onto w2.
+    Before that can happen, retire w2 too.
+    """
+    async with BlockedGetData(s.address) as w1:
+        # Note: by using scatter instead of submit/map, we're also testing 
that tasks
+        # aren't being recomputed
+        x = await c.scatter({"x": 1}, workers=[w1.address])
+        y = await c.scatter({"y": 2}, workers=[w3.address])
+
+        # Because w2's memory is lower than w3, AMM will choose w2
+        retire1 = asyncio.create_task(c.retire_workers([w1.address]))
+        await w1.in_get_data.wait()
+        assert w2.state.tasks["x"].state == "flight"
+        await c.retire_workers([w2.address])
+
+        w1.block_get_data.set()
+        await retire1
+        assert set(s.workers) == {w3.address}
+        assert set(w3.data) == {"x", "y"}
+
+
 class Counter:
     def __init__(self):
         self.n = 0
@@ -1223,7 +1278,7 @@
             self.manager.policies.remove(self)
 
 
-async def tensordot_stress(c):
+async def tensordot_stress(c, s):
     da = pytest.importorskip("dask.array")
 
     rng = da.random.RandomState(0)
@@ -1234,6 +1289,10 @@
         b = (a @ a.T).sum().round(3)
     assert await c.compute(b) == 245.394
 
+    # Test that we didn't recompute any tasks during the stress test
+    await async_poll_for(lambda: not s.tasks, timeout=5)
+    assert sum(t.start == "memory" for t in s.transition_log) == 1639
+
 
 @pytest.mark.slow
 @gen_cluster(
@@ -1245,7 +1304,7 @@
     """Test the tensordot_stress helper without AMM. This is to figure out if a
     stability issue is AMM-specific or not.
     """
-    await tensordot_stress(c)
+    await tensordot_stress(c, s)
 
 
 @pytest.mark.slow
@@ -1267,7 +1326,7 @@
 
     See also: test_ReduceReplicas_stress
     """
-    await tensordot_stress(c)
+    await tensordot_stress(c, s)
 
 
 @pytest.mark.slow
@@ -1288,7 +1347,7 @@
     test_drop_stress above, this test does not stop running after a few 
seconds - the
     policy must not disrupt the computation too much.
     """
-    await tensordot_stress(c)
+    await tensordot_stress(c, s)
 
 
 @pytest.mark.slow
@@ -1316,7 +1375,7 @@
     random.shuffle(addrs)
     print(f"Removing all workers except {addrs[9]}")
 
-    tasks = [asyncio.create_task(tensordot_stress(c))]
+    tasks = [asyncio.create_task(tensordot_stress(c, s))]
     await asyncio.sleep(1)
     tasks.append(asyncio.create_task(c.retire_workers(addrs[0:2])))
     await asyncio.sleep(1)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2024.2.0/distributed/tests/test_cancelled_state.py 
new/distributed-2024.2.1/distributed/tests/test_cancelled_state.py
--- old/distributed-2024.2.0/distributed/tests/test_cancelled_state.py  
2024-02-09 23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/distributed/tests/test_cancelled_state.py  
2024-02-23 19:11:48.000000000 +0100
@@ -169,7 +169,11 @@
         with lock:
             return x + 1
 
-    async with BrokenWorker(s.address) as a:
+    async with BrokenWorker(
+        s.address,
+        # heartbeat will close a worker after remove_worker(close=False)
+        heartbeat_interval="100s",
+    ) as a:
         await c.wait_for_workers(2)
         fut1 = c.submit(
             blockable_compute,
@@ -267,7 +271,11 @@
 
 @gen_cluster(client=True, nthreads=[("", 1)])
 async def test_in_flight_lost_after_resumed(c, s, b):
-    async with BlockedGetData(s.address) as a:
+    async with BlockedGetData(
+        s.address,
+        # heartbeat will close a worker after remove_worker(close=False)
+        heartbeat_interval="100s",
+    ) as a:
         fut1 = c.submit(inc, 1, workers=[a.address], key="fut1")
         # Ensure fut1 is in memory but block any further execution afterwards 
to
         # ensure we control when the recomputation happens
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2024.2.0/distributed/tests/test_client.py 
new/distributed-2024.2.1/distributed/tests/test_client.py
--- old/distributed-2024.2.0/distributed/tests/test_client.py   2024-02-09 
23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/distributed/tests/test_client.py   2024-02-23 
19:11:48.000000000 +0100
@@ -5975,6 +5975,8 @@
 async def test_warn_when_submitting_large_values(c, s):
     with pytest.warns(UserWarning, match="Sending large graph of size"):
         future = c.submit(lambda x: x + 1, b"0" * 10_000_000)
+    with dask.config.set({"distributed.admin.large-graph-warning-threshold": 
"1GB"}):
+        future = c.submit(lambda x: x + 1, b"0" * 10_000_000)
 
 
 @gen_cluster(client=True, nthreads=[])
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2024.2.0/distributed/tests/test_core.py 
new/distributed-2024.2.1/distributed/tests/test_core.py
--- old/distributed-2024.2.0/distributed/tests/test_core.py     2024-02-09 
23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/distributed/tests/test_core.py     2024-02-23 
19:11:48.000000000 +0100
@@ -2,6 +2,7 @@
 
 import asyncio
 import contextlib
+import logging
 import os
 import random
 import socket
@@ -1481,3 +1482,27 @@
             assert ledger == list(range(n))
         finally:
             await comm.close()
+
+
+@pytest.mark.slow
+@gen_test(timeout=180)
+async def test_large_payload(caplog):
+    """See also: protocol/tests/test_protocol.py::test_large_payload"""
+    critical_size = 2**31 + 1  # >2 GiB
+    data = b"0" * critical_size
+
+    async with Server({"echo": echo_serialize}) as server:
+        await server.listen(0)
+        comm = await connect(server.address)
+
+        # FIXME https://github.com/dask/distributed/issues/8465
+        # At debug level, messages are dumped into the log. By default, pytest 
captures
+        # all logs, which would make this test extremely expensive to run.
+        with caplog.at_level(logging.INFO, logger="distributed.core"):
+            # Note: if we wrap data in to_serialize, it will be sent as a 
buffer, which
+            # is not encoded by msgpack.
+            await comm.write({"op": "echo", "x": data})
+            response = await comm.read()
+
+        assert response["result"] == data
+        await comm.close()
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2024.2.0/distributed/tests/test_scheduler.py 
new/distributed-2024.2.1/distributed/tests/test_scheduler.py
--- old/distributed-2024.2.0/distributed/tests/test_scheduler.py        
2024-02-09 23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/distributed/tests/test_scheduler.py        
2024-02-23 19:11:48.000000000 +0100
@@ -624,7 +624,7 @@
         secede()
         second.wait()
 
-    fs = c.map(func, [first] * 5, [second] * 5)
+    fs = c.map(func, [first] * 5, [second] * 5, key=[f"x{i}" for i in 
range(5)])
     await async_poll_for(lambda: a.state.executing, timeout=5)
 
     await first.set()
@@ -2641,7 +2641,7 @@
         assert s.adaptive_target() == 1
 
         # Long task
-        x = c.submit(slowinc, 1, delay=0.5)
+        x = c.submit(slowinc, 1, delay=0.4)
         while x.key not in s.tasks:
             await asyncio.sleep(0.01)
         assert s.adaptive_target(target_duration=".1s") == 1  # still one
@@ -4702,3 +4702,180 @@
         await asyncio.sleep(0.01)
 
     await f
+
+
+@pytest.mark.parametrize("deps", ["same", "less", "more"])
+@gen_cluster(client=True, nthreads=[])
+async def test_resubmit_different_task_same_key_before_previous_is_done(c, s, 
deps):
+    """If an intermediate key has a different run_spec (either the callable 
function or
+    the dependencies / arguments) that will conflict with what was previously 
defined,
+    it should raise an error since this can otherwise break in many different 
places and
+    cause either spurious exceptions or even deadlocks.
+
+    In this specific test, the previous run_spec has not been computed yet.
+    See also test_resubmit_different_task_same_key_after_previous_is_done.
+
+    For a real world example where this can trigger, see
+    https://github.com/dask/dask/issues/9888
+    """
+    x1 = c.submit(inc, 1, key="x1")
+    y_old = c.submit(inc, x1, key="y")
+
+    x1b = x1 if deps != "less" else 2
+    x2 = delayed(inc)(10, dask_key_name="x2") if deps == "more" else 11
+    y_new = delayed(sum)([x1b, x2], dask_key_name="y")
+    z = delayed(inc)(y_new, dask_key_name="z")
+
+    with captured_logger("distributed.scheduler", level=logging.WARNING) as 
log:
+        fut = c.compute(z)
+        await wait_for_state("z", "waiting", s)
+
+    assert "Detected different `run_spec` for key 'y'" in log.getvalue()
+
+    async with Worker(s.address):
+        # Used old run_spec
+        assert await y_old == 3
+        assert await fut == 4
+
+
+@pytest.mark.parametrize("deps", ["same", "less", "more"])
+@pytest.mark.parametrize("release_previous", [False, True])
+@gen_cluster(client=True)
+async def test_resubmit_different_task_same_key_after_previous_is_done(
+    c, s, a, b, deps, release_previous
+):
+    """Same as test_resubmit_different_task_same_key, but now the replaced 
task has
+    already been computed and is either in memory or released, and so are its 
old
+    dependencies, so they may need to be recomputed.
+    """
+    x1 = delayed(inc)(1, dask_key_name="x1")
+    x1fut = c.compute(x1)
+    y_old = c.submit(inc, x1fut, key="y")
+    z1 = c.submit(inc, y_old, key="z1")
+    await wait(z1)
+    if release_previous:
+        del x1fut, y_old
+        await wait_for_state("x1", "released", s)
+        await wait_for_state("y", "released", s)
+
+    x1b = x1 if deps != "less" else 2
+    x2 = delayed(inc)(10, dask_key_name="x2") if deps == "more" else 11
+    y_new = delayed(sum)([x1b, x2], dask_key_name="y")
+    z2 = delayed(inc)(y_new, dask_key_name="z2")
+
+    with captured_logger("distributed.scheduler", level=logging.WARNING) as 
log:
+        fut = c.compute(z2)
+        # Used old run_spec
+        assert await fut == 4
+        assert "x2" not in s.tasks
+
+    # _generate_taskstates won't run for a dependency that's already in memory
+    has_warning = "Detected different `run_spec` for key 'y'" in log.getvalue()
+    assert has_warning is (release_previous or deps == "less")
+
+
+@gen_cluster(client=True, nthreads=[])
+async def test_resubmit_different_task_same_key_many_clients(c, s):
+    """Two different clients submit a task with the same key but different 
run_spec's."""
+    async with Client(s.address, asynchronous=True) as c2:
+        with captured_logger("distributed.scheduler", level=logging.WARNING) 
as log:
+            x1 = c.submit(inc, 1, key="x")
+            x2 = c2.submit(inc, 2, key="x")
+
+            await wait_for_state("x", ("no-worker", "queued"), s)
+            who_wants = s.tasks["x"].who_wants
+            await async_poll_for(
+                lambda: {cs.client_key for cs in who_wants} == {c.id, c2.id}, 
timeout=5
+            )
+
+        assert "Detected different `run_spec` for key 'x'" in log.getvalue()
+
+        async with Worker(s.address):
+            assert await x1 == 2
+            assert await x2 == 2  # kept old run_spec
+
+
+@pytest.mark.parametrize(
+    "before,after,expect_msg",
+    [
+        (object(), 123, True),
+        (123, object(), True),
+        (o := object(), o, False),
+    ],
+)
+@gen_cluster(client=True, nthreads=[])
+async def test_resubmit_nondeterministic_task_same_deps(
+    c, s, before, after, expect_msg
+):
+    """Some run_specs can't be tokenized deterministically. Silently skip 
comparison on
+    the run_spec when both lhs and rhs are nondeterministic.
+    Dependencies must be the same.
+    """
+    x1 = c.submit(lambda x: x, before, key="x")
+    x2 = delayed(lambda x: x)(after, dask_key_name="x")
+    y = delayed(lambda x: x)(x2, dask_key_name="y")
+
+    with captured_logger("distributed.scheduler", level=logging.WARNING) as 
log:
+        fut = c.compute(y)
+        await async_poll_for(lambda: "y" in s.tasks, timeout=5)
+
+    has_msg = "Detected different `run_spec` for key 'x'" in log.getvalue()
+    assert has_msg == expect_msg
+
+    async with Worker(s.address):
+        assert type(await fut) is type(before)
+
+
+@pytest.mark.parametrize("add_deps", [False, True])
+@gen_cluster(client=True, nthreads=[])
+async def test_resubmit_nondeterministic_task_different_deps(c, s, add_deps):
+    """Some run_specs can't be tokenized deterministically. Silently skip 
comparison on
+    the run_spec in those cases. However, fail anyway if dependencies have 
changed.
+    """
+    o = object()
+    x1 = c.submit(inc, 1, key="x1") if not add_deps else 2
+    x2 = c.submit(inc, 2, key="x2")
+    y1 = delayed(lambda i, j: i)(x1, o, dask_key_name="y").persist()
+    y2 = delayed(lambda i, j: i)(x2, o, dask_key_name="y")
+    z = delayed(inc)(y2, dask_key_name="z")
+
+    with captured_logger("distributed.scheduler", level=logging.WARNING) as 
log:
+        fut = c.compute(z)
+        await wait_for_state("z", "waiting", s)
+    assert "Detected different `run_spec` for key 'y'" in log.getvalue()
+
+    async with Worker(s.address):
+        assert await fut == 3
+
+
+@pytest.mark.parametrize(
+    "loglevel,expect_loglines", [(logging.DEBUG, 2), (logging.WARNING, 1)]
+)
+@gen_cluster(client=True, nthreads=[])
+async def test_resubmit_different_task_same_key_warns_only_once(
+    c, s, loglevel, expect_loglines
+):
+    """If all tasks of a layer are affected by the same run_spec collision, 
warn
+    only once.
+    """
+    y1s = c.map(inc, [0, 1, 2], key=[("y", 0), ("y", 1), ("y", 2)])
+    dsk = {
+        "x": 3,
+        ("y", 0): (inc, "x"),  # run_spec and dependencies change
+        ("y", 1): (inc, 4),  # run_spec changes, dependencies don't
+        ("y", 2): (inc, 2),  # Doesn't change
+        ("z", 0): (inc, ("y", 0)),
+        ("z", 1): (inc, ("y", 1)),
+        ("z", 2): (inc, ("y", 2)),
+    }
+    with captured_logger("distributed.scheduler", level=loglevel) as log:
+        zs = c.get(dsk, [("z", 0), ("z", 1), ("z", 2)], sync=False)
+        await wait_for_state(("z", 2), "waiting", s)
+
+    actual_loglines = len(
+        re.findall("Detected different `run_spec` for key ", log.getvalue())
+    )
+    assert actual_loglines == expect_loglines
+
+    async with Worker(s.address):
+        assert await c.gather(zs) == [2, 3, 4]  # Kept old ys
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2024.2.0/distributed/tests/test_worker.py 
new/distributed-2024.2.1/distributed/tests/test_worker.py
--- old/distributed-2024.2.0/distributed/tests/test_worker.py   2024-02-09 
23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/distributed/tests/test_worker.py   2024-02-23 
19:11:48.000000000 +0100
@@ -1834,7 +1834,8 @@
     Worker=Nanny,
     worker_kwargs={"heartbeat_interval": "1ms"},
 )
-async def test_heartbeat_missing_restarts(c, s, n):
+async def test_heartbeat_missing_doesnt_restart(c, s, n):
+    """Read: https://github.com/dask/distributed/pull/8522""";
     old_heartbeat_handler = s.handlers["heartbeat_worker"]
     s.handlers["heartbeat_worker"] = lambda *args, **kwargs: {"status": 
"missing"}
 
@@ -1843,11 +1844,8 @@
 
     assert not s.workers
     s.handlers["heartbeat_worker"] = old_heartbeat_handler
-
-    await n.process.running.wait()
-    assert n.status == Status.running
-
-    await c.wait_for_workers(1)
+    assert n.status == Status.closing_gracefully
+    assert not n.process.is_alive()
 
 
 @gen_cluster(nthreads=[])
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2024.2.0/distributed/worker.py 
new/distributed-2024.2.1/distributed/worker.py
--- old/distributed-2024.2.0/distributed/worker.py      2024-02-09 
23:58:27.000000000 +0100
+++ new/distributed-2024.2.1/distributed/worker.py      2024-02-23 
19:11:48.000000000 +0100
@@ -1272,12 +1272,15 @@
             self._update_latency(end - start)
 
             if response["status"] == "missing":
-                # Scheduler thought we left. Reconnection is not supported, so 
just shut down.
-                logger.error(
-                    f"Scheduler was unaware of this worker {self.address!r}. 
Shutting down."
-                )
-                # Something is out of sync; have the nanny restart us if 
possible.
-                await self.close(nanny=False)
+                # Scheduler thought we left.
+                # This is a common race condition when the scheduler calls
+                # remove_worker(); there can be a heartbeat between when the 
scheduler
+                # removes the worker on its side and when the {"op": "close"} 
command
+                # arrives through batched comms to the worker.
+                logger.warning("Scheduler was unaware of this worker; shutting 
down.")
+                # We close here just for safety's sake - the {op: close} should
+                # arrive soon anyway.
+                await self.close(reason="worker-heartbeat-missing")
                 return
 
             self.scheduler_delay = response["time"] - middle
@@ -1290,7 +1293,7 @@
             logger.exception("Failed to communicate with scheduler during 
heartbeat.")
         except Exception:
             logger.exception("Unexpected exception during heartbeat. Closing 
worker.")
-            await self.close()
+            await self.close(reason="worker-heartbeat-error")
             raise
 
     @fail_hard
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2024.2.0/pyproject.toml 
new/distributed-2024.2.1/pyproject.toml
--- old/distributed-2024.2.0/pyproject.toml     2024-02-09 23:58:27.000000000 
+0100
+++ new/distributed-2024.2.1/pyproject.toml     2024-02-23 19:11:48.000000000 
+0100
@@ -28,7 +28,7 @@
 dependencies = [
     "click >= 8.0",
     "cloudpickle >= 1.5.0",
-    "dask == 2024.2.0",
+    "dask == 2024.2.1",
     "jinja2 >= 2.10.3",
     "locket >= 1.0.0",
     "msgpack >= 1.0.0",
@@ -143,15 +143,14 @@
     '''ignore:setDaemon\(\) is deprecated, set the daemon attribute 
instead:DeprecationWarning:paramiko''',
     '''ignore:`np.bool8` is a deprecated alias for `np.bool_`''',
     '''ignore:is_sparse is deprecated and will:FutureWarning''',
-    # Need bokeh >= 3.3 for https://github.com/bokeh/bokeh/pull/13147
-    '''ignore:datetime\.datetime\.utc(fromtimestamp|now)\(\) is deprecated and 
scheduled for removal in a future version.*:DeprecationWarning:bokeh''',
-    # https://github.com/tornadoweb/tornado/issues/3334
+    # Need Tornado >=6.4
     '''ignore:datetime\.datetime\.utc(fromtimestamp|now)\(\) is deprecated and 
scheduled for removal in a future version.*:DeprecationWarning:tornado''',
     # https://github.com/dateutil/dateutil/issues/1284
     '''ignore:datetime\.datetime\.utc(fromtimestamp|now)\(\) is deprecated and 
scheduled for removal in a future version.*:DeprecationWarning:dateutil''',
     # https://github.com/dask/dask/pull/10622
     '''ignore:Minimal version of pyarrow will soon be increased to 14.0.1''',
     '''ignore:the matrix subclass is not the recommended way''',
+    '''ignore:The current Dask DataFrame implementation is 
deprecated.*:DeprecationWarning''',
 ]
 minversion = "6"
 markers = [

Reply via email to