Script 'mail_helper' called by obssrc
Hello community,

here is the log from the commit of package python-distributed for 
openSUSE:Factory checked in at 2023-12-25 19:06:30
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/python-distributed (Old)
 and      /work/SRC/openSUSE:Factory/.python-distributed.new.28375 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Package is "python-distributed"

Mon Dec 25 19:06:30 2023 rev:74 rq:1135097 version:2023.12.1

Changes:
--------
--- /work/SRC/openSUSE:Factory/python-distributed/python-distributed.changes    
2023-12-09 22:57:51.301749065 +0100
+++ 
/work/SRC/openSUSE:Factory/.python-distributed.new.28375/python-distributed.changes
 2023-12-25 19:06:40.460490857 +0100
@@ -1,0 +2,7 @@
+Mon Dec 18 12:14:31 UTC 2023 - Dirk Müller <[email protected]>
+
+- update to 2023.12.1:
+  * see corresponding dask update:
+  https://docs.dask.org/en/stable/changelog.html#v2023-12-1
+
+-------------------------------------------------------------------

Old:
----
  distributed-2023.12.0-gh.tar.gz

New:
----
  distributed-2023.12.1-gh.tar.gz

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Other differences:
------------------
++++++ python-distributed.spec ++++++
--- /var/tmp/diff_new_pack.VMVcN3/_old  2023-12-25 19:06:41.196517717 +0100
+++ /var/tmp/diff_new_pack.VMVcN3/_new  2023-12-25 19:06:41.196517717 +0100
@@ -44,7 +44,7 @@
 
 Name:           python-distributed%{psuffix}
 # ===> Note: python-dask MUST be updated in sync with python-distributed! <===
-Version:        2023.12.0
+Version:        2023.12.1
 Release:        0
 Summary:        Library for distributed computing with Python
 License:        BSD-3-Clause

++++++ distributed-2023.12.0-gh.tar.gz -> distributed-2023.12.1-gh.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2023.12.0/.github/workflows/ci-pre-commit.yml 
new/distributed-2023.12.1/.github/workflows/ci-pre-commit.yml
--- old/distributed-2023.12.0/.github/workflows/ci-pre-commit.yml       
2023-12-02 00:22:58.000000000 +0100
+++ new/distributed-2023.12.1/.github/workflows/ci-pre-commit.yml       
2023-12-15 21:40:22.000000000 +0100
@@ -12,7 +12,7 @@
     runs-on: ubuntu-latest
     steps:
       - uses: actions/[email protected]
-      - uses: actions/setup-python@v4
+      - uses: actions/setup-python@v5
         with:
           python-version: '3.9'
       - uses: pre-commit/[email protected]
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2023.12.0/.github/workflows/conda.yml 
new/distributed-2023.12.1/.github/workflows/conda.yml
--- old/distributed-2023.12.0/.github/workflows/conda.yml       2023-12-02 
00:22:58.000000000 +0100
+++ new/distributed-2023.12.1/.github/workflows/conda.yml       2023-12-15 
21:40:22.000000000 +0100
@@ -30,7 +30,7 @@
         with:
           fetch-depth: 0
       - name: Set up Python
-        uses: conda-incubator/[email protected]
+        uses: conda-incubator/[email protected]
         with:
           miniforge-variant: Mambaforge
           use-mamba: true
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2023.12.0/.github/workflows/test-report.yaml 
new/distributed-2023.12.1/.github/workflows/test-report.yaml
--- old/distributed-2023.12.0/.github/workflows/test-report.yaml        
2023-12-02 00:22:58.000000000 +0100
+++ new/distributed-2023.12.1/.github/workflows/test-report.yaml        
2023-12-15 21:40:22.000000000 +0100
@@ -22,7 +22,7 @@
       - uses: actions/[email protected]
 
       - name: Setup Conda Environment
-        uses: conda-incubator/[email protected]
+        uses: conda-incubator/[email protected]
         with:
           miniforge-variant: Mambaforge
           miniforge-version: latest
@@ -56,7 +56,7 @@
           mv test_report.html test_short_report.html deploy/
 
       - name: Deploy 🚀
-        uses: JamesIves/[email protected]
+        uses: JamesIves/[email protected]
         with:
           branch: gh-pages
           folder: deploy
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2023.12.0/.github/workflows/tests.yaml 
new/distributed-2023.12.1/.github/workflows/tests.yaml
--- old/distributed-2023.12.0/.github/workflows/tests.yaml      2023-12-02 
00:22:58.000000000 +0100
+++ new/distributed-2023.12.1/.github/workflows/tests.yaml      2023-12-15 
21:40:22.000000000 +0100
@@ -114,7 +114,7 @@
           fetch-depth: 0
 
       - name: Setup Conda Environment
-        uses: conda-incubator/[email protected]
+        uses: conda-incubator/[email protected]
         with:
           miniforge-variant: Mambaforge
           miniforge-version: latest
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2023.12.0/continuous_integration/gpuci/axis.yaml 
new/distributed-2023.12.1/continuous_integration/gpuci/axis.yaml
--- old/distributed-2023.12.0/continuous_integration/gpuci/axis.yaml    
2023-12-02 00:22:58.000000000 +0100
+++ new/distributed-2023.12.1/continuous_integration/gpuci/axis.yaml    
2023-12-15 21:40:22.000000000 +0100
@@ -9,6 +9,6 @@
 - ubuntu20.04
 
 RAPIDS_VER:
-- "23.12"
+- "24.02"
 
 excludes:
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2023.12.0/distributed/client.py 
new/distributed-2023.12.1/distributed/client.py
--- old/distributed-2023.12.0/distributed/client.py     2023-12-02 
00:22:58.000000000 +0100
+++ new/distributed-2023.12.1/distributed/client.py     2023-12-15 
21:40:22.000000000 +0100
@@ -4857,7 +4857,7 @@
             warnings.warn(
                 "The `idempotent` argument is deprecated and will be removed 
in a "
                 "future version. Please mark your plugin as idempotent by 
setting its "
-                "`.idempotent` atrribute to `True`.",
+                "`.idempotent` attribute to `True`.",
                 FutureWarning,
             )
         else:
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2023.12.0/distributed/deploy/subprocess.py 
new/distributed-2023.12.1/distributed/deploy/subprocess.py
--- old/distributed-2023.12.0/distributed/deploy/subprocess.py  2023-12-02 
00:22:58.000000000 +0100
+++ new/distributed-2023.12.1/distributed/deploy/subprocess.py  2023-12-15 
21:40:22.000000000 +0100
@@ -6,6 +6,10 @@
 import json
 import logging
 import math
+import os
+import tempfile
+import uuid
+from pathlib import Path
 from typing import Any
 
 import psutil
@@ -16,6 +20,7 @@
 from distributed.compatibility import WINDOWS
 from distributed.deploy.spec import ProcessInterface, SpecCluster
 from distributed.deploy.utils import nprocesses_nthreads
+from distributed.utils import Deadline
 from distributed.worker_memory import parse_memory_limit
 
 logger = logging.getLogger(__name__)
@@ -59,13 +64,20 @@
     """
 
     scheduler_kwargs: dict
+    timeout: int
     address: str | None
 
     def __init__(
         self,
         scheduler_kwargs: dict | None = None,
+        timeout: int = 30,
     ):
-        self.scheduler_kwargs = scheduler_kwargs or {}
+        self.scheduler_kwargs = {
+            "scheduler_file": os.path.join(tempfile.gettempdir(), 
str(uuid.uuid4()))
+        }
+        if scheduler_kwargs:
+            self.scheduler_kwargs.update(scheduler_kwargs)
+        self.timeout = timeout
         super().__init__()
 
     async def _start(self):
@@ -78,20 +90,32 @@
             ),
         ]
         logger.info(" ".join(cmd))
+        deadline = Deadline.after(self.timeout)
         self.process = await asyncio.create_subprocess_exec(
             *cmd,
             stderr=asyncio.subprocess.PIPE,
         )
 
-        while True:
-            line = (await self.process.stderr.readline()).decode()
-            if not line.strip():
-                raise RuntimeError("Scheduler failed to start")
-            logger.info(line.strip())
-            if "Scheduler at" in line:
-                self.address = line.split("Scheduler at:")[1].strip()
-                break
-        logger.debug(line)
+        scheduler_file = Path(self.scheduler_kwargs["scheduler_file"])
+        while not (
+            deadline.expired
+            or scheduler_file.exists()
+            or self.process.returncode is not None
+        ):
+            await asyncio.sleep(0.1)
+        if deadline.expired or self.process.returncode is not None:
+            assert self.process.stderr
+            logger.error((await self.process.stderr.read()).decode())
+            if deadline.expired:
+                raise RuntimeError(f"Scheduler failed to start within 
{self.timeout}s")
+            raise RuntimeError(
+                f"Scheduler failed to start and exited with code 
{self.process.returncode}"
+            )
+
+        with scheduler_file.open(mode="r") as f:
+            identity = json.load(f)
+            self.address = identity["address"]
+        logger.info("Scheduler at %r", self.address)
 
 
 class SubprocessWorker(Subprocess):
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2023.12.0/distributed/deploy/tests/test_subprocess.py 
new/distributed-2023.12.1/distributed/deploy/tests/test_subprocess.py
--- old/distributed-2023.12.0/distributed/deploy/tests/test_subprocess.py       
2023-12-02 00:22:58.000000000 +0100
+++ new/distributed-2023.12.1/distributed/deploy/tests/test_subprocess.py       
2023-12-15 21:40:22.000000000 +0100
@@ -1,5 +1,7 @@
 from __future__ import annotations
 
+import logging
+
 import pytest
 
 from distributed import Client
@@ -9,7 +11,7 @@
     SubprocessScheduler,
     SubprocessWorker,
 )
-from distributed.utils_test import gen_test
+from distributed.utils_test import gen_test, new_config_file
 
 
 @pytest.mark.skipif(WINDOWS, reason="distributed#7434")
@@ -72,6 +74,24 @@
             pass
 
 
[email protected](WINDOWS, reason="distributed#7434")
[email protected]
+@gen_test()
+async def test_subprocess_cluster_does_not_depend_on_logging():
+    with new_config_file(
+        {"distributed": {"logging": {"distributed": logging.CRITICAL + 1}}}
+    ):
+        async with SubprocessCluster(
+            asynchronous=True,
+            dashboard_address=":0",
+            scheduler_kwargs={"idle_timeout": "5s"},
+            worker_kwargs={"death_timeout": "5s"},
+        ) as cluster:
+            async with Client(cluster, asynchronous=True) as client:
+                result = await client.submit(lambda x: x + 1, 10)
+                assert result == 11
+
+
 @pytest.mark.skipif(
     not WINDOWS, reason="Windows-specific error testing (distributed#7434)"
 )
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2023.12.0/distributed/diagnostics/tests/test_scheduler_plugin.py
 
new/distributed-2023.12.1/distributed/diagnostics/tests/test_scheduler_plugin.py
--- 
old/distributed-2023.12.0/distributed/diagnostics/tests/test_scheduler_plugin.py
    2023-12-02 00:22:58.000000000 +0100
+++ 
new/distributed-2023.12.1/distributed/diagnostics/tests/test_scheduler_plugin.py
    2023-12-15 21:40:22.000000000 +0100
@@ -402,23 +402,6 @@
         assert s.foo == "bar"
 
 
-@gen_cluster(client=True, config={"distributed.scheduler.pickle": False})
-async def test_register_plugin_pickle_disabled(c, s, a, b):
-    class Dummy1(SchedulerPlugin):
-        def start(self, scheduler):
-            scheduler.foo = "bar"
-
-    n_plugins = len(s.plugins)
-    with pytest.raises(ValueError) as excinfo:
-        await c.register_plugin(Dummy1())
-
-    msg = str(excinfo.value)
-    assert "disallowed from deserializing" in msg
-    assert "distributed.scheduler.pickle" in msg
-
-    assert n_plugins == len(s.plugins)
-
-
 @gen_cluster(nthreads=[])
 async def test_unregister_scheduler_plugin(s):
     class Plugin(SchedulerPlugin):
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2023.12.0/distributed/distributed-schema.yaml 
new/distributed-2023.12.1/distributed/distributed-schema.yaml
--- old/distributed-2023.12.0/distributed/distributed-schema.yaml       
2023-12-02 00:22:58.000000000 +0100
+++ new/distributed-2023.12.1/distributed/distributed-schema.yaml       
2023-12-15 21:40:22.000000000 +0100
@@ -130,16 +130,6 @@
 
               If we don't receive a heartbeat faster than this then we assume 
that the worker has died.
 
-          pickle:
-            type: boolean
-            description: |
-              Is the scheduler allowed to deserialize arbitrary bytestrings?
-
-              The scheduler almost never deserializes user data.
-              However there are some cases where the user can submit functions 
to run directly on the scheduler.
-              This can be convenient for debugging, but also introduces some 
security risk.
-              By setting this to false we ensure that the user is unable to 
run arbitrary code on the scheduler.
-
           preload:
             type: array
             description: |
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2023.12.0/distributed/distributed.yaml 
new/distributed-2023.12.1/distributed/distributed.yaml
--- old/distributed-2023.12.0/distributed/distributed.yaml      2023-12-02 
00:22:58.000000000 +0100
+++ new/distributed-2023.12.1/distributed/distributed.yaml      2023-12-15 
21:40:22.000000000 +0100
@@ -22,7 +22,6 @@
     work-stealing-interval: 100ms  # Callback time for work stealing
     worker-saturation: 1.1 # Send this fraction of nthreads root tasks to 
workers
     worker-ttl: "5 minutes" # like '60s'. Time to live for workers.  They must 
heartbeat faster than this
-    pickle: True            # Is the scheduler allowed to deserialize 
arbitrary bytestrings
     preload: []             # Run custom modules with Scheduler
     preload-argv: []        # See 
https://docs.dask.org/en/latest/how-to/customize-initialization.html
     unknown-task-duration: 500ms  # Default duration for all tasks with 
unknown durations ("15m", "2h")
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2023.12.0/distributed/protocol/core.py 
new/distributed-2023.12.1/distributed/protocol/core.py
--- old/distributed-2023.12.0/distributed/protocol/core.py      2023-12-02 
00:22:58.000000000 +0100
+++ new/distributed-2023.12.1/distributed/protocol/core.py      2023-12-15 
21:40:22.000000000 +0100
@@ -4,8 +4,6 @@
 
 import msgpack
 
-import dask.config
-
 from distributed.protocol import pickle
 from distributed.protocol.compression import decompress, maybe_compress
 from distributed.protocol.serialize import (
@@ -117,8 +115,6 @@
 def loads(frames, deserialize=True, deserializers=None):
     """Transform bytestream back into Python value"""
 
-    allow_pickle = dask.config.get("distributed.scheduler.pickle")
-
     try:
 
         def _decode_default(obj):
@@ -148,13 +144,7 @@
                 sub_frames = frames[offset : offset + 
sub_header["num-sub-frames"]]
                 if "compression" in sub_header:
                     sub_frames = decompress(sub_header, sub_frames)
-                if allow_pickle:
-                    return pickle.loads(sub_header["pickled-obj"], 
buffers=sub_frames)
-                else:
-                    raise ValueError(
-                        "Unpickle on the Scheduler isn't allowed, set 
`distributed.scheduler.pickle=true`"
-                    )
-
+                return pickle.loads(sub_header["pickled-obj"], 
buffers=sub_frames)
             return msgpack_decode_default(obj)
 
         return msgpack.loads(
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2023.12.0/distributed/protocol/serialize.py 
new/distributed-2023.12.1/distributed/protocol/serialize.py
--- old/distributed-2023.12.0/distributed/protocol/serialize.py 2023-12-02 
00:22:58.000000000 +0100
+++ new/distributed-2023.12.1/distributed/protocol/serialize.py 2023-12-15 
21:40:22.000000000 +0100
@@ -573,10 +573,6 @@
 
     Both the scheduler and workers with automatically unpickle this
     object on arrival.
-
-    Notice, this requires that the scheduler is allowed to use pickle.
-    If the configuration option "distributed.scheduler.pickle" is set
-    to False, the scheduler will raise an exception instead.
     """
 
     data: T
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2023.12.0/distributed/scheduler.py 
new/distributed-2023.12.1/distributed/scheduler.py
--- old/distributed-2023.12.0/distributed/scheduler.py  2023-12-02 
00:22:58.000000000 +0100
+++ new/distributed-2023.12.1/distributed/scheduler.py  2023-12-15 
21:40:22.000000000 +0100
@@ -1373,6 +1373,15 @@
     #: be rejected.
     run_id: int | None
 
+    #: Whether to consider this task rootish in the context of task queueing
+    #: True
+    #:     Always consider this task rootish
+    #: False
+    #:     Never consider this task rootish
+    #: None
+    #:     Use a heuristic to determine whether this task should be considered 
rootish
+    _rootish: bool | None
+
     #: Cached hash of :attr:`~TaskState.client_key`
     _hash: int
 
@@ -1430,6 +1439,7 @@
         self.metadata = None
         self.annotations = None
         self.erred_on = None
+        self._rootish = None
         self.run_id = None
         TaskState._instances.add(self)
 
@@ -2909,8 +2919,11 @@
         Whether ``ts`` is a root or root-like task.
 
         Root-ish tasks are part of a group that's much larger than the cluster,
-        and have few or no dependencies.
+        and have few or no dependencies. Tasks may also be explicitly marked 
as rootish
+        to override this heuristic.
         """
+        if ts._rootish is not None:
+            return ts._rootish
         if ts.resource_restrictions or ts.worker_restrictions or 
ts.host_restrictions:
             return False
         tg = ts.group
@@ -3533,6 +3546,10 @@
         jupyter=False,
         **kwargs,
     ):
+        if dask.config.get("distributed.scheduler.pickle", default=True) is 
False:
+            raise RuntimeError(
+                "Pickling can no longer be disabled with the 
`distributed.scheduler.pickle` option. Please remove this configuration to 
start the scheduler."
+            )
         if loop is not None:
             warnings.warn(
                 "the loop kwarg to Scheduler is deprecated",
@@ -5868,13 +5885,6 @@
         idempotent: bool | None = None,
     ) -> None:
         """Register a plugin on the scheduler."""
-        if not dask.config.get("distributed.scheduler.pickle"):
-            raise ValueError(
-                "Cannot register a scheduler plugin as the scheduler "
-                "has been explicitly disallowed from deserializing "
-                "arbitrary bytestrings using pickle via the "
-                "'distributed.scheduler.pickle' configuration setting."
-            )
         if idempotent is None:
             warnings.warn(
                 "The signature of `Scheduler.register_scheduler_plugin` now 
requires "
@@ -6922,7 +6932,7 @@
 
         if key is None:
             key = operator.attrgetter("address")
-        if isinstance(key, bytes) and 
dask.config.get("distributed.scheduler.pickle"):
+        if isinstance(key, bytes):
             key = pickle.loads(key)
 
         groups = groupby(key, self.workers.values())
@@ -7262,14 +7272,6 @@
         Caution: this runs arbitrary Python code on the scheduler.  This should
         eventually be phased out.  It is mostly used by diagnostics.
         """
-        if not dask.config.get("distributed.scheduler.pickle"):
-            logger.warning(
-                "Tried to call 'feed' route with custom functions, but "
-                "pickle is disallowed.  Set the 'distributed.scheduler.pickle'"
-                "config value to True to use the 'feed' route (this is mostly "
-                "commonly used with progress bars)"
-            )
-            return
 
         interval = parse_timedelta(interval)
         if function:
@@ -7484,12 +7486,6 @@
         """
         from distributed.worker import run
 
-        if not dask.config.get("distributed.scheduler.pickle"):
-            raise ValueError(
-                "Cannot run function as the scheduler has been explicitly 
disallowed from "
-                "deserializing arbitrary bytestrings using pickle via the "
-                "'distributed.scheduler.pickle' configuration setting."
-            )
         kwargs = kwargs or {}
         self.log_event("all", {"action": "run-function", "function": function})
         return run(self, comm, function=function, args=args, kwargs=kwargs, 
wait=wait)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2023.12.0/distributed/shuffle/_core.py 
new/distributed-2023.12.1/distributed/shuffle/_core.py
--- old/distributed-2023.12.0/distributed/shuffle/_core.py      2023-12-02 
00:22:58.000000000 +0100
+++ new/distributed-2023.12.1/distributed/shuffle/_core.py      2023-12-15 
21:40:22.000000000 +0100
@@ -7,7 +7,7 @@
 import pickle
 import time
 from collections import defaultdict
-from collections.abc import Callable, Iterable, Iterator, Sequence
+from collections.abc import Callable, Generator, Iterable, Iterator, Sequence
 from concurrent.futures import ThreadPoolExecutor
 from dataclasses import dataclass, field
 from enum import Enum
@@ -40,7 +40,6 @@
     _P = ParamSpec("_P")
 
     # circular dependencies
-    from distributed.shuffle._scheduler_plugin import ShuffleSchedulerPlugin
     from distributed.shuffle._worker_plugin import ShuffleWorkerPlugin
 
 ShuffleId = NewType("ShuffleId", str)
@@ -375,23 +374,25 @@
     id: ShuffleId
     disk: bool
 
+    @abc.abstractproperty
+    def output_partitions(self) -> Generator[_T_partition_id, None, None]:
+        """Output partitions"""
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def pick_worker(self, partition: _T_partition_id, workers: Sequence[str]) 
-> str:
+        """Pick a worker for a partition"""
+
     def create_new_run(
         self,
-        plugin: ShuffleSchedulerPlugin,
+        worker_for: dict[_T_partition_id, str],
     ) -> SchedulerShuffleState:
-        worker_for = self._pin_output_workers(plugin)
         return SchedulerShuffleState(
             run_spec=ShuffleRunSpec(spec=self, worker_for=worker_for),
             participating_workers=set(worker_for.values()),
         )
 
     @abc.abstractmethod
-    def _pin_output_workers(
-        self, plugin: ShuffleSchedulerPlugin
-    ) -> dict[_T_partition_id, str]:
-        """Pin output tasks to workers and return the mapping of partition ID 
to worker."""
-
-    @abc.abstractmethod
     def create_run_on_worker(
         self,
         run_id: int,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2023.12.0/distributed/shuffle/_merge.py 
new/distributed-2023.12.1/distributed/shuffle/_merge.py
--- old/distributed-2023.12.0/distributed/shuffle/_merge.py     2023-12-02 
00:22:58.000000000 +0100
+++ new/distributed-2023.12.1/distributed/shuffle/_merge.py     2023-12-15 
21:40:22.000000000 +0100
@@ -361,18 +361,22 @@
 
     def _construct_graph(self) -> dict[tuple | str, tuple]:
         token_left = tokenize(
-            "hash-join",
+            # Include self.name to ensure that shuffle IDs are unique for 
individual
+            # merge operations. Reusing shuffles between merges is dangerous 
because of
+            # required coordination and complexity introduced through dynamic 
clusters.
+            self.name,
             self.name_input_left,
             self.left_on,
-            self.npartitions,
-            self.parts_out,
+            self.left_index,
         )
         token_right = tokenize(
-            "hash-join",
+            # Include self.name to ensure that shuffle IDs are unique for 
individual
+            # merge operations. Reusing shuffles between merges is dangerous 
because of
+            # required coordination and complexity introduced through dynamic 
clusters.
+            self.name,
             self.name_input_right,
             self.right_on,
-            self.npartitions,
-            self.parts_out,
+            self.right_index,
         )
         dsk: dict[tuple | str, tuple] = {}
         name_left = "hash-join-transfer-" + token_left
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2023.12.0/distributed/shuffle/_rechunk.py 
new/distributed-2023.12.1/distributed/shuffle/_rechunk.py
--- old/distributed-2023.12.0/distributed/shuffle/_rechunk.py   2023-12-02 
00:22:58.000000000 +0100
+++ new/distributed-2023.12.1/distributed/shuffle/_rechunk.py   2023-12-15 
21:40:22.000000000 +0100
@@ -99,7 +99,7 @@
 import mmap
 import os
 from collections import defaultdict
-from collections.abc import Callable, Sequence
+from collections.abc import Callable, Generator, Sequence
 from concurrent.futures import ThreadPoolExecutor
 from dataclasses import dataclass
 from itertools import product
@@ -125,7 +125,6 @@
 )
 from distributed.shuffle._limiter import ResourceLimiter
 from distributed.shuffle._pickle import unpickle_bytestream
-from distributed.shuffle._scheduler_plugin import ShuffleSchedulerPlugin
 from distributed.shuffle._shuffle import barrier_key, shuffle_barrier
 from distributed.shuffle._worker_plugin import ShuffleWorkerPlugin
 from distributed.sizeof import sizeof
@@ -456,11 +455,22 @@
     new: ChunkedAxes
     old: ChunkedAxes
 
-    def _pin_output_workers(self, plugin: ShuffleSchedulerPlugin) -> 
dict[NDIndex, str]:
-        parts_out = product(*(range(len(c)) for c in self.new))
-        return plugin._pin_output_workers(
-            self.id, parts_out, _get_worker_for_hash_sharding
-        )
+    @property
+    def output_partitions(self) -> Generator[NDIndex, None, None]:
+        yield from product(*(range(len(c)) for c in self.new))
+
+    def pick_worker(self, partition: NDIndex, workers: Sequence[str]) -> str:
+        npartitions = 1
+        for c in self.new:
+            npartitions *= len(c)
+        ix = 0
+        for dim, pos in enumerate(partition):
+            if dim > 0:
+                ix += len(self.new[dim - 1]) * pos
+            else:
+                ix += pos
+        i = len(workers) * ix // npartitions
+        return workers[i]
 
     def create_run_on_worker(
         self,
@@ -487,11 +497,3 @@
             disk=self.disk,
             loop=plugin.worker.loop,
         )
-
-
-def _get_worker_for_hash_sharding(
-    output_partition: NDIndex, workers: Sequence[str]
-) -> str:
-    """Get address of target worker for this output partition using hash 
sharding"""
-    i = hash(output_partition) % len(workers)
-    return workers[i]
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2023.12.0/distributed/shuffle/_scheduler_plugin.py 
new/distributed-2023.12.1/distributed/shuffle/_scheduler_plugin.py
--- old/distributed-2023.12.0/distributed/shuffle/_scheduler_plugin.py  
2023-12-02 00:22:58.000000000 +0100
+++ new/distributed-2023.12.1/distributed/shuffle/_scheduler_plugin.py  
2023-12-15 21:40:22.000000000 +0100
@@ -3,7 +3,6 @@
 import contextlib
 import logging
 from collections import defaultdict
-from collections.abc import Callable, Iterable, Sequence
 from typing import TYPE_CHECKING, Any
 
 from dask.typing import Key
@@ -145,7 +144,9 @@
             # that the shuffle works as intended and should fail instead.
             self._raise_if_barrier_unknown(spec.id)
             self._raise_if_task_not_processing(key)
-            state = spec.create_new_run(self)
+            worker_for = self._calculate_worker_for(spec)
+            self._ensure_output_tasks_are_non_rootish(spec)
+            state = spec.create_new_run(worker_for)
             self.active_shuffles[spec.id] = state
             self._shuffles[spec.id].add(state)
             state.participating_workers.add(worker)
@@ -167,14 +168,14 @@
         if task.state != "processing":
             raise RuntimeError(f"Expected {task} to be processing, is 
{task.state}.")
 
-    def _pin_output_workers(
-        self,
-        id: ShuffleId,
-        output_partitions: Iterable[Any],
-        pick: Callable[[Any, Sequence[str]], str],
-    ) -> dict[Any, str]:
+    def _calculate_worker_for(self, spec: ShuffleSpec) -> dict[Any, str]:
         """Pin the outputs of a P2P shuffle to specific workers.
 
+        The P2P implementation of a hash join combines the loading of shuffled 
output
+        partitions for the left and right side with the actual merge operation 
into a
+        single output task. As a consequence, we need to make sure that 
shuffles with
+        shared output tasks align on the output mapping.
+
         Parameters
         ----------
         id: ID of the shuffle to pin
@@ -185,18 +186,80 @@
             This function assumes that the barrier task and the output tasks 
share
             the same worker restrictions.
         """
-        mapping = {}
-        barrier = self.scheduler.tasks[barrier_key(id)]
+        existing: dict[Any, str] = {}
+        shuffle_id = spec.id
+        barrier = self.scheduler.tasks[barrier_key(shuffle_id)]
 
         if barrier.worker_restrictions:
             workers = list(barrier.worker_restrictions)
         else:
             workers = list(self.scheduler.workers)
 
-        for partition in output_partitions:
-            worker = pick(partition, workers)
-            mapping[partition] = worker
-        return mapping
+        # Check if this shuffle shares output tasks with a different shuffle 
that has
+        # already been initialized and needs to be taken into account when
+        # mapping output partitions to workers.
+        # Naively, you could delete this whole paragraph and just call
+        # spec.pick_worker; it would return two identical sets of results on 
both calls
+        # of this method... until the set of available workers changes between 
the two
+        # calls, which would cause misaligned shuffle outputs and a deadlock.
+        seen = {barrier}
+        for dependent in barrier.dependents:
+            for possible_barrier in dependent.dependencies:
+                if possible_barrier in seen:
+                    continue
+                seen.add(possible_barrier)
+                if not (other_barrier_key := 
id_from_key(possible_barrier.key)):
+                    continue
+                if not (shuffle := 
self.active_shuffles.get(other_barrier_key)):
+                    continue
+                current_worker_for = shuffle.run_spec.worker_for
+                # This is a fail-safe for future three-ways merges. At the 
moment there
+                # should only ever be at most one other shuffle that shares 
output
+                # tasks, so existing will always be empty.
+                if existing:  # pragma: nocover
+                    for shared_key in existing.keys() & 
current_worker_for.keys():
+                        if existing[shared_key] != 
current_worker_for[shared_key]:
+                            raise RuntimeError(
+                                f"Failed to initialize shuffle {spec.id} 
because "
+                                "it cannot align output partition mappings 
between "
+                                f"existing shuffles {seen}. "
+                                f"Mismatch encountered for output partition 
{shared_key!r}: "
+                                f"{existing[shared_key]} != 
{current_worker_for[shared_key]}."
+                            )
+                existing.update(current_worker_for)
+
+        worker_for = {}
+        for partition in spec.output_partitions:
+            if (worker := existing.get(partition, None)) is None:
+                worker = spec.pick_worker(partition, workers)
+            worker_for[partition] = worker
+        return worker_for
+
+    def _ensure_output_tasks_are_non_rootish(self, spec: ShuffleSpec) -> None:
+        """Output tasks are created without worker restrictions and run once 
with the
+        only purpose of setting the worker restriction and then raising 
Reschedule, and
+        then running again properly on the correct worker. It would be 
non-trivial to
+        set the worker restriction before they're first run due to potential 
task
+        fusion.
+
+        Most times, this lack of initial restrictions would cause output tasks 
to be
+        labelled as rootish on their first (very fast) run, which in turn 
would break
+        the design assumption that the worker-side queue of rootish tasks will 
last long
+        enough to cover the round-trip to the scheduler to receive more tasks, 
which in
+        turn would cause a measurable slowdown on the overall runtime of the 
shuffle
+        operation.
+
+        This method ensures that, given M output tasks and N workers, each 
worker-side
+        queue is pre-loaded with M/N output tasks which can be flushed very 
fast as
+        they all raise Reschedule() in quick succession.
+
+        See Also
+        --------
+        ShuffleRun._ensure_output_worker
+        """
+        barrier = self.scheduler.tasks[barrier_key(spec.id)]
+        for dependent in barrier.dependents:
+            dependent._rootish = False
 
     @log_errors()
     def _set_restriction(self, ts: TaskState, worker: str) -> None:
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2023.12.0/distributed/shuffle/_shuffle.py 
new/distributed-2023.12.1/distributed/shuffle/_shuffle.py
--- old/distributed-2023.12.0/distributed/shuffle/_shuffle.py   2023-12-02 
00:22:58.000000000 +0100
+++ new/distributed-2023.12.1/distributed/shuffle/_shuffle.py   2023-12-15 
21:40:22.000000000 +0100
@@ -3,10 +3,16 @@
 import logging
 import os
 from collections import defaultdict
-from collections.abc import Callable, Collection, Iterable, Iterator, Sequence
+from collections.abc import (
+    Callable,
+    Collection,
+    Generator,
+    Iterable,
+    Iterator,
+    Sequence,
+)
 from concurrent.futures import ThreadPoolExecutor
 from dataclasses import dataclass
-from functools import partial
 from pathlib import Path
 from typing import TYPE_CHECKING, Any
 
@@ -41,7 +47,6 @@
     handle_unpack_errors,
 )
 from distributed.shuffle._limiter import ResourceLimiter
-from distributed.shuffle._scheduler_plugin import ShuffleSchedulerPlugin
 from distributed.shuffle._worker_plugin import ShuffleWorkerPlugin
 from distributed.sizeof import sizeof
 
@@ -524,9 +529,12 @@
     meta: pd.DataFrame
     parts_out: set[int]
 
-    def _pin_output_workers(self, plugin: ShuffleSchedulerPlugin) -> dict[int, 
str]:
-        pick_worker = partial(_get_worker_for_range_sharding, self.npartitions)
-        return plugin._pin_output_workers(self.id, self.parts_out, pick_worker)
+    @property
+    def output_partitions(self) -> Generator[int, None, None]:
+        yield from self.parts_out
+
+    def pick_worker(self, partition: int, workers: Sequence[str]) -> str:
+        return _get_worker_for_range_sharding(self.npartitions, partition, 
workers)
 
     def create_run_on_worker(
         self, run_id: int, worker_for: dict[int, str], plugin: 
ShuffleWorkerPlugin
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2023.12.0/distributed/shuffle/tests/test_merge.py 
new/distributed-2023.12.1/distributed/shuffle/tests/test_merge.py
--- old/distributed-2023.12.0/distributed/shuffle/tests/test_merge.py   
2023-12-02 00:22:58.000000000 +0100
+++ new/distributed-2023.12.1/distributed/shuffle/tests/test_merge.py   
2023-12-15 21:40:22.000000000 +0100
@@ -1,12 +1,18 @@
 from __future__ import annotations
 
+import asyncio
 import contextlib
+from typing import Any
 from unittest import mock
 
 import pytest
 
-from distributed.shuffle._core import id_from_key
+from dask.typing import Key
+
+from distributed import Worker
+from distributed.shuffle._core import ShuffleId, ShuffleSpec, id_from_key
 from distributed.shuffle._merge import hash_join
+from distributed.shuffle._worker_plugin import ShuffleRun, _ShuffleRunManager
 from distributed.utils_test import gen_cluster
 
 dd = pytest.importorskip("dask.dataframe")
@@ -112,7 +118,9 @@
         # Vary the number of output partitions for the shuffles of dd2
         .repartition(20).merge(ddf2, left_on="b", right_on="x", shuffle="p2p")
     )
-    # Generate unique shuffle IDs if the input frame is the same but 
parameters differ
+    # Generate unique shuffle IDs if the input frame is the same but
+    # parameters differ. Reusing shuffles in merges is dangerous because of the
+    # required coordination and complexity introduced through dynamic clusters.
     assert sum(id_from_key(k) is not None for k in out.dask) == 4
     result = await c.compute(out)
     expected = pdf1.merge(pdf2, left_on="a", right_on="x").merge(
@@ -147,8 +155,10 @@
         right_on="b",
         shuffle="p2p",
     )
-    # Generate the same shuffle IDs if the input frame is the same and all its 
parameters match
-    assert sum(id_from_key(k) is not None for k in out.dask) == 3
+    # Generate unique shuffle IDs if the input frame is the same and all its
+    # parameters match. Reusing shuffles in merges is dangerous because of the
+    # required coordination and complexity introduced through dynamic clusters.
+    assert sum(id_from_key(k) is not None for k in out.dask) == 4
     result = await c.compute(out)
     expected = pdf2.merge(
         pdf1.merge(pdf2, left_on="a", right_on="x"), left_on="x", right_on="b"
@@ -474,3 +484,51 @@
         ),
         pdf_right.merge(pdf_left, how=how, right_index=True, left_on="a"),
     )
+
+
+class LimitedGetOrCreateShuffleRunManager(_ShuffleRunManager):
+    seen: set[ShuffleId]
+    block_get_or_create: asyncio.Event
+    blocking_get_or_create: asyncio.Event
+
+    def __init__(self, *args: Any, **kwargs: Any):
+        super().__init__(*args, **kwargs)
+        self.seen = set()
+        self.limit = 1
+        self.blocking_get_or_create = asyncio.Event()
+        self.block_get_or_create = asyncio.Event()
+
+    async def get_or_create(self, spec: ShuffleSpec, key: Key) -> ShuffleRun:
+        if len(self.seen) >= self.limit and spec.id not in self.seen:
+            self.blocking_get_or_create.set()
+            await self.block_get_or_create.wait()
+        self.seen.add(spec.id)
+        return await super().get_or_create(spec, key)
+
+
[email protected](
+    "distributed.shuffle._worker_plugin._ShuffleRunManager",
+    LimitedGetOrCreateShuffleRunManager,
+)
+@gen_cluster(client=True, nthreads=[("", 1)])
+async def test_merge_does_not_deadlock_if_worker_joins(c, s, a):
+    """Regression test for https://github.com/dask/distributed/issues/8411""";
+    pdf1 = pd.DataFrame({"a": range(100), "b": range(0, 200, 2)})
+    pdf2 = pd.DataFrame({"x": range(200), "y": [1, 2, 3, 4] * 50})
+    df1 = dd.from_pandas(pdf1, npartitions=10)
+    df2 = dd.from_pandas(pdf2, npartitions=20)
+
+    run_manager_A = a.plugins["shuffle"].shuffle_runs
+
+    joined = dd.merge(df1, df2, left_on="a", right_on="x", shuffle="p2p")
+    result = c.compute(joined)
+
+    await run_manager_A.blocking_get_or_create.wait()
+
+    async with Worker(s.address) as b:
+        run_manager_A.block_get_or_create.set()
+        run_manager_B = b.plugins["shuffle"].shuffle_runs
+        run_manager_B.block_get_or_create.set()
+        result = await result
+    expected = pd.merge(pdf1, pdf2, left_on="a", right_on="x")
+    assert_eq(result, expected, check_index=False)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2023.12.0/distributed/shuffle/tests/test_rechunk.py 
new/distributed-2023.12.1/distributed/shuffle/tests/test_rechunk.py
--- old/distributed-2023.12.0/distributed/shuffle/tests/test_rechunk.py 
2023-12-02 00:22:58.000000000 +0100
+++ new/distributed-2023.12.1/distributed/shuffle/tests/test_rechunk.py 
2023-12-15 21:40:22.000000000 +0100
@@ -26,8 +26,8 @@
 from distributed.shuffle._limiter import ResourceLimiter
 from distributed.shuffle._rechunk import (
     ArrayRechunkRun,
+    ArrayRechunkSpec,
     Split,
-    _get_worker_for_hash_sharding,
     split_axes,
 )
 from distributed.shuffle.tests.utils import AbstractShuffleTestPool
@@ -103,11 +103,12 @@
 
     worker_for_mapping = {}
 
+    spec = ArrayRechunkSpec(id=ShuffleId("foo"), disk=disk, new=new, old=old)
     new_indices = list(product(*(range(len(dim)) for dim in new)))
-    for i, idx in enumerate(new_indices):
-        worker_for_mapping[idx] = _get_worker_for_hash_sharding(i, workers)
-
+    for idx in new_indices:
+        worker_for_mapping[idx] = spec.pick_worker(idx, workers)
     assert len(set(worker_for_mapping.values())) == min(n_workers, 
len(new_indices))
+    # scheduler_state = spec.create_new_run(worker_for_mapping)
 
     with ArrayRechunkTestPool() as local_shuffle_pool:
         shuffles = []
@@ -1200,3 +1201,18 @@
     buf_ids = {id(get_host_array(shard)) for shard in shards}
     assert len(buf_ids) == len(shards)
     await block_map.set()
+
+
[email protected]("nworkers", [1, 2, 41, 50])
+def test_worker_for_homogeneous_distribution(nworkers):
+    old = ((1, 2, 3, 4), (5,) * 6)
+    new = ((5, 5), (12, 18))
+    workers = [str(i) for i in range(nworkers)]
+    spec = ArrayRechunkSpec(ShuffleId("foo"), disk=False, new=new, old=old)
+    count = {w: 0 for w in workers}
+    for nidx in spec.output_partitions:
+        count[spec.pick_worker(nidx, workers)] += 1
+
+    assert sum(count.values()) > 0
+    assert sum(count.values()) == len(list(spec.output_partitions))
+    assert abs(max(count.values()) - min(count.values())) <= 1
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2023.12.0/distributed/shuffle/tests/test_shuffle.py 
new/distributed-2023.12.1/distributed/shuffle/tests/test_shuffle.py
--- old/distributed-2023.12.0/distributed/shuffle/tests/test_shuffle.py 
2023-12-02 00:22:58.000000000 +0100
+++ new/distributed-2023.12.1/distributed/shuffle/tests/test_shuffle.py 
2023-12-15 21:40:22.000000000 +0100
@@ -2478,6 +2478,45 @@
         dd.assert_eq(result, expected)
 
 
+class BlockedBarrierShuffleSchedulerPlugin(ShuffleSchedulerPlugin):
+    def __init__(self, scheduler: Scheduler):
+        super().__init__(scheduler)
+        self.in_barrier = asyncio.Event()
+        self.block_barrier = asyncio.Event()
+
+    async def barrier(self, id: ShuffleId, run_id: int, consistent: bool) -> 
None:
+        self.in_barrier.set()
+        await self.block_barrier.wait()
+        return await super().barrier(id, run_id, consistent)
+
+
+@gen_cluster(client=True)
+async def test_unpack_is_non_rootish(c, s, a, b):
+    with pytest.warns(UserWarning):
+        scheduler_plugin = BlockedBarrierShuffleSchedulerPlugin(s)
+    df = dask.datasets.timeseries(
+        start="2000-01-01",
+        end="2000-01-21",
+        dtypes={"x": float, "y": float},
+        freq="10 s",
+    )
+    df = df.shuffle("x")
+    result = c.compute(df)
+
+    await scheduler_plugin.in_barrier.wait()
+
+    unpack_tss = [ts for key, ts in s.tasks.items() if key_split(key) == 
"shuffle_p2p"]
+    assert len(unpack_tss) == 20
+    assert not any(s.is_rootish(ts) for ts in unpack_tss)
+    del unpack_tss
+    scheduler_plugin.block_barrier.set()
+    result = await result
+
+    await check_worker_cleanup(a)
+    await check_worker_cleanup(b)
+    await check_scheduler_cleanup(s)
+
+
 class FlakyConnectionPool(ConnectionPool):
     def __init__(self, *args, failing_connects=0, **kwargs):
         self.attempts = 0
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2023.12.0/distributed/spill.py 
new/distributed-2023.12.1/distributed/spill.py
--- old/distributed-2023.12.0/distributed/spill.py      2023-12-02 
00:22:58.000000000 +0100
+++ new/distributed-2023.12.1/distributed/spill.py      2023-12-15 
21:40:22.000000000 +0100
@@ -136,25 +136,24 @@
             logger.error("Spill to disk failed; keeping data in memory", 
exc_info=True)
             raise HandledError()
         except PickleError as e:
-            key_e, orig_e = e.args
-            assert key_e in self.fast
-            assert key_e not in self.slow
-            if key_e == key:
+            assert e.key in self.fast
+            assert e.key not in self.slow
+            if e.key == key:
                 assert key is not None
                 # The key we just inserted failed to serialize.
                 # This happens only when the key is individually larger than 
target.
                 # The exception will be caught by Worker and logged; the 
status of
                 # the task will be set to error.
                 del self[key]
-                raise orig_e
+                raise
             else:
                 # The key we just inserted is smaller than target, but it 
caused
                 # another, unrelated key to be spilled out of the LRU, and 
that key
                 # failed to serialize. There's nothing wrong with the new key. 
The older
                 # key is still in memory.
-                if key_e not in self.logged_pickle_errors:
-                    logger.error("Failed to pickle %r", key_e, exc_info=True)
-                    self.logged_pickle_errors.add(key_e)
+                if e.key not in self.logged_pickle_errors:
+                    logger.error("Failed to pickle %r", e.key, exc_info=True)
+                    self.logged_pickle_errors.add(e.key)
                 raise HandledError()
 
     def __setitem__(self, key: Key, value: object) -> None:
@@ -267,8 +266,13 @@
     pass
 
 
-class PickleError(Exception):
-    pass
+class PickleError(TypeError):
+    def __str__(self) -> str:
+        return f"Failed to pickle {self.key!r}"
+
+    @property
+    def key(self) -> Key:
+        return self.args[0]
 
 
 class HandledError(Exception):
@@ -324,7 +328,7 @@
             # zict.LRU ensures that the key remains in fast if we raise.
             # Wrap the exception so that it's recognizable by SpillBuffer,
             # which will then unwrap it.
-            raise PickleError(key, e)
+            raise PickleError(key) from e
 
         # Thanks to Buffer.__setitem__, we never update existing
         # keys in slow, but always delete them and reinsert them.
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2023.12.0/distributed/tests/test_scheduler.py 
new/distributed-2023.12.1/distributed/tests/test_scheduler.py
--- old/distributed-2023.12.0/distributed/tests/test_scheduler.py       
2023-12-02 00:22:58.000000000 +0100
+++ new/distributed-2023.12.1/distributed/tests/test_scheduler.py       
2023-12-15 21:40:22.000000000 +0100
@@ -279,6 +279,30 @@
     test_decide_worker_coschedule_order_neighbors_()
 
 
+@gen_cluster(
+    client=True,
+    nthreads=[],
+)
+async def test_override_is_rootish(c, s):
+    x = c.submit(lambda x: x + 1, 1, key="x")
+    await async_poll_for(lambda: "x" in s.tasks, timeout=5)
+    ts_x = s.tasks["x"]
+    assert ts_x._rootish is None
+    assert s.is_rootish(ts_x)
+
+    ts_x._rootish = False
+    assert not s.is_rootish(ts_x)
+
+    y = c.submit(lambda y: y + 1, 1, key="y", workers=["not-existing"])
+    await async_poll_for(lambda: "y" in s.tasks, timeout=5)
+    ts_y = s.tasks["y"]
+    assert ts_y._rootish is None
+    assert not s.is_rootish(ts_y)
+
+    ts_y._rootish = True
+    assert s.is_rootish(ts_y)
+
+
 @pytest.mark.skipif(
     QUEUING_ON_BY_DEFAULT,
     reason="Not relevant with queuing on; see 
https://github.com/dask/distributed/issues/7204";,
@@ -1799,13 +1823,11 @@
     assert response == s.address
 
 
-@gen_cluster(client=True, config={"distributed.scheduler.pickle": False})
-async def test_run_on_scheduler_disabled(c, s, a, b):
-    def f(dask_scheduler=None):
-        return dask_scheduler.address
-
-    with pytest.raises(ValueError, match="disallowed from deserializing"):
-        await c._run_on_scheduler(f)
+@gen_test()
+async def test_allow_pickle_false():
+    with dask.config.set({"distributed.scheduler.pickle": False}):
+        with pytest.raises(RuntimeError, match="Pickling can no longer be 
disabled"):
+            await Scheduler()
 
 
 @gen_cluster()
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2023.12.0/distributed/tests/test_spill.py 
new/distributed-2023.12.1/distributed/tests/test_spill.py
--- old/distributed-2023.12.0/distributed/tests/test_spill.py   2023-12-02 
00:22:58.000000000 +0100
+++ new/distributed-2023.12.1/distributed/tests/test_spill.py   2023-12-15 
21:40:22.000000000 +0100
@@ -219,9 +219,10 @@
     a = Bad(size=201)
 
     # Exception caught in the worker
-    with pytest.raises(TypeError, match="Could not serialize"):
+    with pytest.raises(TypeError, match="Failed to pickle 'a'") as e:
         with captured_logger("distributed.spill") as logs_bad_key:
             buf["a"] = a
+    assert isinstance(e.value.__cause__.__cause__, MyError)
 
     # spill.py must remain silent because we're already logging in worker.py
     assert not logs_bad_key.getvalue()
@@ -240,7 +241,7 @@
 
     # worker.py won't intercept the exception here, so spill.py must dump the 
traceback
     logs_value = logs_bad_key_mem.getvalue()
-    assert "Failed to pickle" in logs_value  # from distributed.spill
+    assert "Failed to pickle 'b'" in logs_value  # from distributed.spill
     assert "Traceback" in logs_value  # from distributed.spill
     assert_buf(buf, tmp_path, {"b": b, "c": c}, {})
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2023.12.0/distributed/tests/test_worker.py 
new/distributed-2023.12.1/distributed/tests/test_worker.py
--- old/distributed-2023.12.0/distributed/tests/test_worker.py  2023-12-02 
00:22:58.000000000 +0100
+++ new/distributed-2023.12.1/distributed/tests/test_worker.py  2023-12-15 
21:40:22.000000000 +0100
@@ -30,7 +30,6 @@
 from dask.system import CPU_COUNT
 from dask.utils import tmpfile
 
-import distributed
 from distributed import (
     Client,
     Event,
@@ -46,7 +45,6 @@
 from distributed.comm.utils import OFFLOAD_THRESHOLD
 from distributed.compatibility import LINUX, WINDOWS
 from distributed.core import CommClosedError, Status, rpc
-from distributed.diagnostics import nvml
 from distributed.diagnostics.plugin import ForwardOutput
 from distributed.metrics import time
 from distributed.protocol import pickle
@@ -2267,17 +2265,6 @@
             await future
 
 
[email protected]
-@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)])
-async def test_gpu_executor(c, s, w):
-    if nvml.device_get_count() > 0:
-        e = w.executors["gpu"]
-        assert isinstance(e, distributed.threadpoolexecutor.ThreadPoolExecutor)
-        assert e._max_workers == 1
-    else:
-        assert "gpu" not in w.executors
-
-
 async def assert_task_states_on_worker(
     expected: dict[str, str], worker: Worker
 ) -> None:
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2023.12.0/distributed/tests/test_worker_memory.py 
new/distributed-2023.12.1/distributed/tests/test_worker_memory.py
--- old/distributed-2023.12.0/distributed/tests/test_worker_memory.py   
2023-12-02 00:22:58.000000000 +0100
+++ new/distributed-2023.12.1/distributed/tests/test_worker_memory.py   
2023-12-15 21:40:22.000000000 +0100
@@ -174,8 +174,9 @@
 
     assert x.status == "error"
 
-    with pytest.raises(TypeError, match="Could not serialize"):
+    with pytest.raises(TypeError, match="Failed to pickle 'x'") as e:
         await x
+    assert isinstance(e.value.__cause__.__cause__, CustomError)
 
     await assert_basic_futures(c)
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2023.12.0/distributed/worker.py 
new/distributed-2023.12.1/distributed/worker.py
--- old/distributed-2023.12.0/distributed/worker.py     2023-12-02 
00:22:58.000000000 +0100
+++ new/distributed-2023.12.1/distributed/worker.py     2023-12-15 
21:40:22.000000000 +0100
@@ -636,10 +636,6 @@
             "offload": utils._offload_executor,
             "actor": ThreadPoolExecutor(1, 
thread_name_prefix="Dask-Actor-Threads"),
         }
-        if nvml.device_get_count() > 0:
-            self.executors["gpu"] = ThreadPoolExecutor(
-                1, thread_name_prefix="Dask-GPU-Threads"
-            )
 
         # Find the default executor
         if executor == "offload":
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2023.12.0/pyproject.toml 
new/distributed-2023.12.1/pyproject.toml
--- old/distributed-2023.12.0/pyproject.toml    2023-12-02 00:22:58.000000000 
+0100
+++ new/distributed-2023.12.1/pyproject.toml    2023-12-15 21:40:22.000000000 
+0100
@@ -28,7 +28,7 @@
 dependencies = [
     "click >= 8.0",
     "cloudpickle >= 1.5.0",
-    "dask == 2023.12.0",
+    "dask == 2023.12.1",
     "jinja2 >= 2.10.3",
     "locket >= 1.0.0",
     "msgpack >= 1.0.0",

Reply via email to