Script 'mail_helper' called by obssrc Hello community, here is the log from the commit of package python-distributed for openSUSE:Factory checked in at 2023-12-25 19:06:30 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-distributed (Old) and /work/SRC/openSUSE:Factory/.python-distributed.new.28375 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-distributed" Mon Dec 25 19:06:30 2023 rev:74 rq:1135097 version:2023.12.1 Changes: -------- --- /work/SRC/openSUSE:Factory/python-distributed/python-distributed.changes 2023-12-09 22:57:51.301749065 +0100 +++ /work/SRC/openSUSE:Factory/.python-distributed.new.28375/python-distributed.changes 2023-12-25 19:06:40.460490857 +0100 @@ -1,0 +2,7 @@ +Mon Dec 18 12:14:31 UTC 2023 - Dirk Müller <[email protected]> + +- update to 2023.12.1: + * see corresponding dask update: + https://docs.dask.org/en/stable/changelog.html#v2023-12-1 + +------------------------------------------------------------------- Old: ---- distributed-2023.12.0-gh.tar.gz New: ---- distributed-2023.12.1-gh.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-distributed.spec ++++++ --- /var/tmp/diff_new_pack.VMVcN3/_old 2023-12-25 19:06:41.196517717 +0100 +++ /var/tmp/diff_new_pack.VMVcN3/_new 2023-12-25 19:06:41.196517717 +0100 @@ -44,7 +44,7 @@ Name: python-distributed%{psuffix} # ===> Note: python-dask MUST be updated in sync with python-distributed! <=== -Version: 2023.12.0 +Version: 2023.12.1 Release: 0 Summary: Library for distributed computing with Python License: BSD-3-Clause ++++++ distributed-2023.12.0-gh.tar.gz -> distributed-2023.12.1-gh.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2023.12.0/.github/workflows/ci-pre-commit.yml new/distributed-2023.12.1/.github/workflows/ci-pre-commit.yml --- old/distributed-2023.12.0/.github/workflows/ci-pre-commit.yml 2023-12-02 00:22:58.000000000 +0100 +++ new/distributed-2023.12.1/.github/workflows/ci-pre-commit.yml 2023-12-15 21:40:22.000000000 +0100 @@ -12,7 +12,7 @@ runs-on: ubuntu-latest steps: - uses: actions/[email protected] - - uses: actions/setup-python@v4 + - uses: actions/setup-python@v5 with: python-version: '3.9' - uses: pre-commit/[email protected] diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2023.12.0/.github/workflows/conda.yml new/distributed-2023.12.1/.github/workflows/conda.yml --- old/distributed-2023.12.0/.github/workflows/conda.yml 2023-12-02 00:22:58.000000000 +0100 +++ new/distributed-2023.12.1/.github/workflows/conda.yml 2023-12-15 21:40:22.000000000 +0100 @@ -30,7 +30,7 @@ with: fetch-depth: 0 - name: Set up Python - uses: conda-incubator/[email protected] + uses: conda-incubator/[email protected] with: miniforge-variant: Mambaforge use-mamba: true diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2023.12.0/.github/workflows/test-report.yaml new/distributed-2023.12.1/.github/workflows/test-report.yaml --- old/distributed-2023.12.0/.github/workflows/test-report.yaml 2023-12-02 00:22:58.000000000 +0100 +++ new/distributed-2023.12.1/.github/workflows/test-report.yaml 2023-12-15 21:40:22.000000000 +0100 @@ -22,7 +22,7 @@ - uses: actions/[email protected] - name: Setup Conda Environment - uses: conda-incubator/[email protected] + uses: conda-incubator/[email protected] with: miniforge-variant: Mambaforge miniforge-version: latest @@ -56,7 +56,7 @@ mv test_report.html test_short_report.html deploy/ - name: Deploy ð - uses: JamesIves/[email protected] + uses: JamesIves/[email protected] with: branch: gh-pages folder: deploy diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2023.12.0/.github/workflows/tests.yaml new/distributed-2023.12.1/.github/workflows/tests.yaml --- old/distributed-2023.12.0/.github/workflows/tests.yaml 2023-12-02 00:22:58.000000000 +0100 +++ new/distributed-2023.12.1/.github/workflows/tests.yaml 2023-12-15 21:40:22.000000000 +0100 @@ -114,7 +114,7 @@ fetch-depth: 0 - name: Setup Conda Environment - uses: conda-incubator/[email protected] + uses: conda-incubator/[email protected] with: miniforge-variant: Mambaforge miniforge-version: latest diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2023.12.0/continuous_integration/gpuci/axis.yaml new/distributed-2023.12.1/continuous_integration/gpuci/axis.yaml --- old/distributed-2023.12.0/continuous_integration/gpuci/axis.yaml 2023-12-02 00:22:58.000000000 +0100 +++ new/distributed-2023.12.1/continuous_integration/gpuci/axis.yaml 2023-12-15 21:40:22.000000000 +0100 @@ -9,6 +9,6 @@ - ubuntu20.04 RAPIDS_VER: -- "23.12" +- "24.02" excludes: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2023.12.0/distributed/client.py new/distributed-2023.12.1/distributed/client.py --- old/distributed-2023.12.0/distributed/client.py 2023-12-02 00:22:58.000000000 +0100 +++ new/distributed-2023.12.1/distributed/client.py 2023-12-15 21:40:22.000000000 +0100 @@ -4857,7 +4857,7 @@ warnings.warn( "The `idempotent` argument is deprecated and will be removed in a " "future version. Please mark your plugin as idempotent by setting its " - "`.idempotent` atrribute to `True`.", + "`.idempotent` attribute to `True`.", FutureWarning, ) else: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2023.12.0/distributed/deploy/subprocess.py new/distributed-2023.12.1/distributed/deploy/subprocess.py --- old/distributed-2023.12.0/distributed/deploy/subprocess.py 2023-12-02 00:22:58.000000000 +0100 +++ new/distributed-2023.12.1/distributed/deploy/subprocess.py 2023-12-15 21:40:22.000000000 +0100 @@ -6,6 +6,10 @@ import json import logging import math +import os +import tempfile +import uuid +from pathlib import Path from typing import Any import psutil @@ -16,6 +20,7 @@ from distributed.compatibility import WINDOWS from distributed.deploy.spec import ProcessInterface, SpecCluster from distributed.deploy.utils import nprocesses_nthreads +from distributed.utils import Deadline from distributed.worker_memory import parse_memory_limit logger = logging.getLogger(__name__) @@ -59,13 +64,20 @@ """ scheduler_kwargs: dict + timeout: int address: str | None def __init__( self, scheduler_kwargs: dict | None = None, + timeout: int = 30, ): - self.scheduler_kwargs = scheduler_kwargs or {} + self.scheduler_kwargs = { + "scheduler_file": os.path.join(tempfile.gettempdir(), str(uuid.uuid4())) + } + if scheduler_kwargs: + self.scheduler_kwargs.update(scheduler_kwargs) + self.timeout = timeout super().__init__() async def _start(self): @@ -78,20 +90,32 @@ ), ] logger.info(" ".join(cmd)) + deadline = Deadline.after(self.timeout) self.process = await asyncio.create_subprocess_exec( *cmd, stderr=asyncio.subprocess.PIPE, ) - while True: - line = (await self.process.stderr.readline()).decode() - if not line.strip(): - raise RuntimeError("Scheduler failed to start") - logger.info(line.strip()) - if "Scheduler at" in line: - self.address = line.split("Scheduler at:")[1].strip() - break - logger.debug(line) + scheduler_file = Path(self.scheduler_kwargs["scheduler_file"]) + while not ( + deadline.expired + or scheduler_file.exists() + or self.process.returncode is not None + ): + await asyncio.sleep(0.1) + if deadline.expired or self.process.returncode is not None: + assert self.process.stderr + logger.error((await self.process.stderr.read()).decode()) + if deadline.expired: + raise RuntimeError(f"Scheduler failed to start within {self.timeout}s") + raise RuntimeError( + f"Scheduler failed to start and exited with code {self.process.returncode}" + ) + + with scheduler_file.open(mode="r") as f: + identity = json.load(f) + self.address = identity["address"] + logger.info("Scheduler at %r", self.address) class SubprocessWorker(Subprocess): diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2023.12.0/distributed/deploy/tests/test_subprocess.py new/distributed-2023.12.1/distributed/deploy/tests/test_subprocess.py --- old/distributed-2023.12.0/distributed/deploy/tests/test_subprocess.py 2023-12-02 00:22:58.000000000 +0100 +++ new/distributed-2023.12.1/distributed/deploy/tests/test_subprocess.py 2023-12-15 21:40:22.000000000 +0100 @@ -1,5 +1,7 @@ from __future__ import annotations +import logging + import pytest from distributed import Client @@ -9,7 +11,7 @@ SubprocessScheduler, SubprocessWorker, ) -from distributed.utils_test import gen_test +from distributed.utils_test import gen_test, new_config_file @pytest.mark.skipif(WINDOWS, reason="distributed#7434") @@ -72,6 +74,24 @@ pass [email protected](WINDOWS, reason="distributed#7434") [email protected] +@gen_test() +async def test_subprocess_cluster_does_not_depend_on_logging(): + with new_config_file( + {"distributed": {"logging": {"distributed": logging.CRITICAL + 1}}} + ): + async with SubprocessCluster( + asynchronous=True, + dashboard_address=":0", + scheduler_kwargs={"idle_timeout": "5s"}, + worker_kwargs={"death_timeout": "5s"}, + ) as cluster: + async with Client(cluster, asynchronous=True) as client: + result = await client.submit(lambda x: x + 1, 10) + assert result == 11 + + @pytest.mark.skipif( not WINDOWS, reason="Windows-specific error testing (distributed#7434)" ) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2023.12.0/distributed/diagnostics/tests/test_scheduler_plugin.py new/distributed-2023.12.1/distributed/diagnostics/tests/test_scheduler_plugin.py --- old/distributed-2023.12.0/distributed/diagnostics/tests/test_scheduler_plugin.py 2023-12-02 00:22:58.000000000 +0100 +++ new/distributed-2023.12.1/distributed/diagnostics/tests/test_scheduler_plugin.py 2023-12-15 21:40:22.000000000 +0100 @@ -402,23 +402,6 @@ assert s.foo == "bar" -@gen_cluster(client=True, config={"distributed.scheduler.pickle": False}) -async def test_register_plugin_pickle_disabled(c, s, a, b): - class Dummy1(SchedulerPlugin): - def start(self, scheduler): - scheduler.foo = "bar" - - n_plugins = len(s.plugins) - with pytest.raises(ValueError) as excinfo: - await c.register_plugin(Dummy1()) - - msg = str(excinfo.value) - assert "disallowed from deserializing" in msg - assert "distributed.scheduler.pickle" in msg - - assert n_plugins == len(s.plugins) - - @gen_cluster(nthreads=[]) async def test_unregister_scheduler_plugin(s): class Plugin(SchedulerPlugin): diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2023.12.0/distributed/distributed-schema.yaml new/distributed-2023.12.1/distributed/distributed-schema.yaml --- old/distributed-2023.12.0/distributed/distributed-schema.yaml 2023-12-02 00:22:58.000000000 +0100 +++ new/distributed-2023.12.1/distributed/distributed-schema.yaml 2023-12-15 21:40:22.000000000 +0100 @@ -130,16 +130,6 @@ If we don't receive a heartbeat faster than this then we assume that the worker has died. - pickle: - type: boolean - description: | - Is the scheduler allowed to deserialize arbitrary bytestrings? - - The scheduler almost never deserializes user data. - However there are some cases where the user can submit functions to run directly on the scheduler. - This can be convenient for debugging, but also introduces some security risk. - By setting this to false we ensure that the user is unable to run arbitrary code on the scheduler. - preload: type: array description: | diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2023.12.0/distributed/distributed.yaml new/distributed-2023.12.1/distributed/distributed.yaml --- old/distributed-2023.12.0/distributed/distributed.yaml 2023-12-02 00:22:58.000000000 +0100 +++ new/distributed-2023.12.1/distributed/distributed.yaml 2023-12-15 21:40:22.000000000 +0100 @@ -22,7 +22,6 @@ work-stealing-interval: 100ms # Callback time for work stealing worker-saturation: 1.1 # Send this fraction of nthreads root tasks to workers worker-ttl: "5 minutes" # like '60s'. Time to live for workers. They must heartbeat faster than this - pickle: True # Is the scheduler allowed to deserialize arbitrary bytestrings preload: [] # Run custom modules with Scheduler preload-argv: [] # See https://docs.dask.org/en/latest/how-to/customize-initialization.html unknown-task-duration: 500ms # Default duration for all tasks with unknown durations ("15m", "2h") diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2023.12.0/distributed/protocol/core.py new/distributed-2023.12.1/distributed/protocol/core.py --- old/distributed-2023.12.0/distributed/protocol/core.py 2023-12-02 00:22:58.000000000 +0100 +++ new/distributed-2023.12.1/distributed/protocol/core.py 2023-12-15 21:40:22.000000000 +0100 @@ -4,8 +4,6 @@ import msgpack -import dask.config - from distributed.protocol import pickle from distributed.protocol.compression import decompress, maybe_compress from distributed.protocol.serialize import ( @@ -117,8 +115,6 @@ def loads(frames, deserialize=True, deserializers=None): """Transform bytestream back into Python value""" - allow_pickle = dask.config.get("distributed.scheduler.pickle") - try: def _decode_default(obj): @@ -148,13 +144,7 @@ sub_frames = frames[offset : offset + sub_header["num-sub-frames"]] if "compression" in sub_header: sub_frames = decompress(sub_header, sub_frames) - if allow_pickle: - return pickle.loads(sub_header["pickled-obj"], buffers=sub_frames) - else: - raise ValueError( - "Unpickle on the Scheduler isn't allowed, set `distributed.scheduler.pickle=true`" - ) - + return pickle.loads(sub_header["pickled-obj"], buffers=sub_frames) return msgpack_decode_default(obj) return msgpack.loads( diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2023.12.0/distributed/protocol/serialize.py new/distributed-2023.12.1/distributed/protocol/serialize.py --- old/distributed-2023.12.0/distributed/protocol/serialize.py 2023-12-02 00:22:58.000000000 +0100 +++ new/distributed-2023.12.1/distributed/protocol/serialize.py 2023-12-15 21:40:22.000000000 +0100 @@ -573,10 +573,6 @@ Both the scheduler and workers with automatically unpickle this object on arrival. - - Notice, this requires that the scheduler is allowed to use pickle. - If the configuration option "distributed.scheduler.pickle" is set - to False, the scheduler will raise an exception instead. """ data: T diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2023.12.0/distributed/scheduler.py new/distributed-2023.12.1/distributed/scheduler.py --- old/distributed-2023.12.0/distributed/scheduler.py 2023-12-02 00:22:58.000000000 +0100 +++ new/distributed-2023.12.1/distributed/scheduler.py 2023-12-15 21:40:22.000000000 +0100 @@ -1373,6 +1373,15 @@ #: be rejected. run_id: int | None + #: Whether to consider this task rootish in the context of task queueing + #: True + #: Always consider this task rootish + #: False + #: Never consider this task rootish + #: None + #: Use a heuristic to determine whether this task should be considered rootish + _rootish: bool | None + #: Cached hash of :attr:`~TaskState.client_key` _hash: int @@ -1430,6 +1439,7 @@ self.metadata = None self.annotations = None self.erred_on = None + self._rootish = None self.run_id = None TaskState._instances.add(self) @@ -2909,8 +2919,11 @@ Whether ``ts`` is a root or root-like task. Root-ish tasks are part of a group that's much larger than the cluster, - and have few or no dependencies. + and have few or no dependencies. Tasks may also be explicitly marked as rootish + to override this heuristic. """ + if ts._rootish is not None: + return ts._rootish if ts.resource_restrictions or ts.worker_restrictions or ts.host_restrictions: return False tg = ts.group @@ -3533,6 +3546,10 @@ jupyter=False, **kwargs, ): + if dask.config.get("distributed.scheduler.pickle", default=True) is False: + raise RuntimeError( + "Pickling can no longer be disabled with the `distributed.scheduler.pickle` option. Please remove this configuration to start the scheduler." + ) if loop is not None: warnings.warn( "the loop kwarg to Scheduler is deprecated", @@ -5868,13 +5885,6 @@ idempotent: bool | None = None, ) -> None: """Register a plugin on the scheduler.""" - if not dask.config.get("distributed.scheduler.pickle"): - raise ValueError( - "Cannot register a scheduler plugin as the scheduler " - "has been explicitly disallowed from deserializing " - "arbitrary bytestrings using pickle via the " - "'distributed.scheduler.pickle' configuration setting." - ) if idempotent is None: warnings.warn( "The signature of `Scheduler.register_scheduler_plugin` now requires " @@ -6922,7 +6932,7 @@ if key is None: key = operator.attrgetter("address") - if isinstance(key, bytes) and dask.config.get("distributed.scheduler.pickle"): + if isinstance(key, bytes): key = pickle.loads(key) groups = groupby(key, self.workers.values()) @@ -7262,14 +7272,6 @@ Caution: this runs arbitrary Python code on the scheduler. This should eventually be phased out. It is mostly used by diagnostics. """ - if not dask.config.get("distributed.scheduler.pickle"): - logger.warning( - "Tried to call 'feed' route with custom functions, but " - "pickle is disallowed. Set the 'distributed.scheduler.pickle'" - "config value to True to use the 'feed' route (this is mostly " - "commonly used with progress bars)" - ) - return interval = parse_timedelta(interval) if function: @@ -7484,12 +7486,6 @@ """ from distributed.worker import run - if not dask.config.get("distributed.scheduler.pickle"): - raise ValueError( - "Cannot run function as the scheduler has been explicitly disallowed from " - "deserializing arbitrary bytestrings using pickle via the " - "'distributed.scheduler.pickle' configuration setting." - ) kwargs = kwargs or {} self.log_event("all", {"action": "run-function", "function": function}) return run(self, comm, function=function, args=args, kwargs=kwargs, wait=wait) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2023.12.0/distributed/shuffle/_core.py new/distributed-2023.12.1/distributed/shuffle/_core.py --- old/distributed-2023.12.0/distributed/shuffle/_core.py 2023-12-02 00:22:58.000000000 +0100 +++ new/distributed-2023.12.1/distributed/shuffle/_core.py 2023-12-15 21:40:22.000000000 +0100 @@ -7,7 +7,7 @@ import pickle import time from collections import defaultdict -from collections.abc import Callable, Iterable, Iterator, Sequence +from collections.abc import Callable, Generator, Iterable, Iterator, Sequence from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field from enum import Enum @@ -40,7 +40,6 @@ _P = ParamSpec("_P") # circular dependencies - from distributed.shuffle._scheduler_plugin import ShuffleSchedulerPlugin from distributed.shuffle._worker_plugin import ShuffleWorkerPlugin ShuffleId = NewType("ShuffleId", str) @@ -375,23 +374,25 @@ id: ShuffleId disk: bool + @abc.abstractproperty + def output_partitions(self) -> Generator[_T_partition_id, None, None]: + """Output partitions""" + raise NotImplementedError + + @abc.abstractmethod + def pick_worker(self, partition: _T_partition_id, workers: Sequence[str]) -> str: + """Pick a worker for a partition""" + def create_new_run( self, - plugin: ShuffleSchedulerPlugin, + worker_for: dict[_T_partition_id, str], ) -> SchedulerShuffleState: - worker_for = self._pin_output_workers(plugin) return SchedulerShuffleState( run_spec=ShuffleRunSpec(spec=self, worker_for=worker_for), participating_workers=set(worker_for.values()), ) @abc.abstractmethod - def _pin_output_workers( - self, plugin: ShuffleSchedulerPlugin - ) -> dict[_T_partition_id, str]: - """Pin output tasks to workers and return the mapping of partition ID to worker.""" - - @abc.abstractmethod def create_run_on_worker( self, run_id: int, diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2023.12.0/distributed/shuffle/_merge.py new/distributed-2023.12.1/distributed/shuffle/_merge.py --- old/distributed-2023.12.0/distributed/shuffle/_merge.py 2023-12-02 00:22:58.000000000 +0100 +++ new/distributed-2023.12.1/distributed/shuffle/_merge.py 2023-12-15 21:40:22.000000000 +0100 @@ -361,18 +361,22 @@ def _construct_graph(self) -> dict[tuple | str, tuple]: token_left = tokenize( - "hash-join", + # Include self.name to ensure that shuffle IDs are unique for individual + # merge operations. Reusing shuffles between merges is dangerous because of + # required coordination and complexity introduced through dynamic clusters. + self.name, self.name_input_left, self.left_on, - self.npartitions, - self.parts_out, + self.left_index, ) token_right = tokenize( - "hash-join", + # Include self.name to ensure that shuffle IDs are unique for individual + # merge operations. Reusing shuffles between merges is dangerous because of + # required coordination and complexity introduced through dynamic clusters. + self.name, self.name_input_right, self.right_on, - self.npartitions, - self.parts_out, + self.right_index, ) dsk: dict[tuple | str, tuple] = {} name_left = "hash-join-transfer-" + token_left diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2023.12.0/distributed/shuffle/_rechunk.py new/distributed-2023.12.1/distributed/shuffle/_rechunk.py --- old/distributed-2023.12.0/distributed/shuffle/_rechunk.py 2023-12-02 00:22:58.000000000 +0100 +++ new/distributed-2023.12.1/distributed/shuffle/_rechunk.py 2023-12-15 21:40:22.000000000 +0100 @@ -99,7 +99,7 @@ import mmap import os from collections import defaultdict -from collections.abc import Callable, Sequence +from collections.abc import Callable, Generator, Sequence from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from itertools import product @@ -125,7 +125,6 @@ ) from distributed.shuffle._limiter import ResourceLimiter from distributed.shuffle._pickle import unpickle_bytestream -from distributed.shuffle._scheduler_plugin import ShuffleSchedulerPlugin from distributed.shuffle._shuffle import barrier_key, shuffle_barrier from distributed.shuffle._worker_plugin import ShuffleWorkerPlugin from distributed.sizeof import sizeof @@ -456,11 +455,22 @@ new: ChunkedAxes old: ChunkedAxes - def _pin_output_workers(self, plugin: ShuffleSchedulerPlugin) -> dict[NDIndex, str]: - parts_out = product(*(range(len(c)) for c in self.new)) - return plugin._pin_output_workers( - self.id, parts_out, _get_worker_for_hash_sharding - ) + @property + def output_partitions(self) -> Generator[NDIndex, None, None]: + yield from product(*(range(len(c)) for c in self.new)) + + def pick_worker(self, partition: NDIndex, workers: Sequence[str]) -> str: + npartitions = 1 + for c in self.new: + npartitions *= len(c) + ix = 0 + for dim, pos in enumerate(partition): + if dim > 0: + ix += len(self.new[dim - 1]) * pos + else: + ix += pos + i = len(workers) * ix // npartitions + return workers[i] def create_run_on_worker( self, @@ -487,11 +497,3 @@ disk=self.disk, loop=plugin.worker.loop, ) - - -def _get_worker_for_hash_sharding( - output_partition: NDIndex, workers: Sequence[str] -) -> str: - """Get address of target worker for this output partition using hash sharding""" - i = hash(output_partition) % len(workers) - return workers[i] diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2023.12.0/distributed/shuffle/_scheduler_plugin.py new/distributed-2023.12.1/distributed/shuffle/_scheduler_plugin.py --- old/distributed-2023.12.0/distributed/shuffle/_scheduler_plugin.py 2023-12-02 00:22:58.000000000 +0100 +++ new/distributed-2023.12.1/distributed/shuffle/_scheduler_plugin.py 2023-12-15 21:40:22.000000000 +0100 @@ -3,7 +3,6 @@ import contextlib import logging from collections import defaultdict -from collections.abc import Callable, Iterable, Sequence from typing import TYPE_CHECKING, Any from dask.typing import Key @@ -145,7 +144,9 @@ # that the shuffle works as intended and should fail instead. self._raise_if_barrier_unknown(spec.id) self._raise_if_task_not_processing(key) - state = spec.create_new_run(self) + worker_for = self._calculate_worker_for(spec) + self._ensure_output_tasks_are_non_rootish(spec) + state = spec.create_new_run(worker_for) self.active_shuffles[spec.id] = state self._shuffles[spec.id].add(state) state.participating_workers.add(worker) @@ -167,14 +168,14 @@ if task.state != "processing": raise RuntimeError(f"Expected {task} to be processing, is {task.state}.") - def _pin_output_workers( - self, - id: ShuffleId, - output_partitions: Iterable[Any], - pick: Callable[[Any, Sequence[str]], str], - ) -> dict[Any, str]: + def _calculate_worker_for(self, spec: ShuffleSpec) -> dict[Any, str]: """Pin the outputs of a P2P shuffle to specific workers. + The P2P implementation of a hash join combines the loading of shuffled output + partitions for the left and right side with the actual merge operation into a + single output task. As a consequence, we need to make sure that shuffles with + shared output tasks align on the output mapping. + Parameters ---------- id: ID of the shuffle to pin @@ -185,18 +186,80 @@ This function assumes that the barrier task and the output tasks share the same worker restrictions. """ - mapping = {} - barrier = self.scheduler.tasks[barrier_key(id)] + existing: dict[Any, str] = {} + shuffle_id = spec.id + barrier = self.scheduler.tasks[barrier_key(shuffle_id)] if barrier.worker_restrictions: workers = list(barrier.worker_restrictions) else: workers = list(self.scheduler.workers) - for partition in output_partitions: - worker = pick(partition, workers) - mapping[partition] = worker - return mapping + # Check if this shuffle shares output tasks with a different shuffle that has + # already been initialized and needs to be taken into account when + # mapping output partitions to workers. + # Naively, you could delete this whole paragraph and just call + # spec.pick_worker; it would return two identical sets of results on both calls + # of this method... until the set of available workers changes between the two + # calls, which would cause misaligned shuffle outputs and a deadlock. + seen = {barrier} + for dependent in barrier.dependents: + for possible_barrier in dependent.dependencies: + if possible_barrier in seen: + continue + seen.add(possible_barrier) + if not (other_barrier_key := id_from_key(possible_barrier.key)): + continue + if not (shuffle := self.active_shuffles.get(other_barrier_key)): + continue + current_worker_for = shuffle.run_spec.worker_for + # This is a fail-safe for future three-ways merges. At the moment there + # should only ever be at most one other shuffle that shares output + # tasks, so existing will always be empty. + if existing: # pragma: nocover + for shared_key in existing.keys() & current_worker_for.keys(): + if existing[shared_key] != current_worker_for[shared_key]: + raise RuntimeError( + f"Failed to initialize shuffle {spec.id} because " + "it cannot align output partition mappings between " + f"existing shuffles {seen}. " + f"Mismatch encountered for output partition {shared_key!r}: " + f"{existing[shared_key]} != {current_worker_for[shared_key]}." + ) + existing.update(current_worker_for) + + worker_for = {} + for partition in spec.output_partitions: + if (worker := existing.get(partition, None)) is None: + worker = spec.pick_worker(partition, workers) + worker_for[partition] = worker + return worker_for + + def _ensure_output_tasks_are_non_rootish(self, spec: ShuffleSpec) -> None: + """Output tasks are created without worker restrictions and run once with the + only purpose of setting the worker restriction and then raising Reschedule, and + then running again properly on the correct worker. It would be non-trivial to + set the worker restriction before they're first run due to potential task + fusion. + + Most times, this lack of initial restrictions would cause output tasks to be + labelled as rootish on their first (very fast) run, which in turn would break + the design assumption that the worker-side queue of rootish tasks will last long + enough to cover the round-trip to the scheduler to receive more tasks, which in + turn would cause a measurable slowdown on the overall runtime of the shuffle + operation. + + This method ensures that, given M output tasks and N workers, each worker-side + queue is pre-loaded with M/N output tasks which can be flushed very fast as + they all raise Reschedule() in quick succession. + + See Also + -------- + ShuffleRun._ensure_output_worker + """ + barrier = self.scheduler.tasks[barrier_key(spec.id)] + for dependent in barrier.dependents: + dependent._rootish = False @log_errors() def _set_restriction(self, ts: TaskState, worker: str) -> None: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2023.12.0/distributed/shuffle/_shuffle.py new/distributed-2023.12.1/distributed/shuffle/_shuffle.py --- old/distributed-2023.12.0/distributed/shuffle/_shuffle.py 2023-12-02 00:22:58.000000000 +0100 +++ new/distributed-2023.12.1/distributed/shuffle/_shuffle.py 2023-12-15 21:40:22.000000000 +0100 @@ -3,10 +3,16 @@ import logging import os from collections import defaultdict -from collections.abc import Callable, Collection, Iterable, Iterator, Sequence +from collections.abc import ( + Callable, + Collection, + Generator, + Iterable, + Iterator, + Sequence, +) from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass -from functools import partial from pathlib import Path from typing import TYPE_CHECKING, Any @@ -41,7 +47,6 @@ handle_unpack_errors, ) from distributed.shuffle._limiter import ResourceLimiter -from distributed.shuffle._scheduler_plugin import ShuffleSchedulerPlugin from distributed.shuffle._worker_plugin import ShuffleWorkerPlugin from distributed.sizeof import sizeof @@ -524,9 +529,12 @@ meta: pd.DataFrame parts_out: set[int] - def _pin_output_workers(self, plugin: ShuffleSchedulerPlugin) -> dict[int, str]: - pick_worker = partial(_get_worker_for_range_sharding, self.npartitions) - return plugin._pin_output_workers(self.id, self.parts_out, pick_worker) + @property + def output_partitions(self) -> Generator[int, None, None]: + yield from self.parts_out + + def pick_worker(self, partition: int, workers: Sequence[str]) -> str: + return _get_worker_for_range_sharding(self.npartitions, partition, workers) def create_run_on_worker( self, run_id: int, worker_for: dict[int, str], plugin: ShuffleWorkerPlugin diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2023.12.0/distributed/shuffle/tests/test_merge.py new/distributed-2023.12.1/distributed/shuffle/tests/test_merge.py --- old/distributed-2023.12.0/distributed/shuffle/tests/test_merge.py 2023-12-02 00:22:58.000000000 +0100 +++ new/distributed-2023.12.1/distributed/shuffle/tests/test_merge.py 2023-12-15 21:40:22.000000000 +0100 @@ -1,12 +1,18 @@ from __future__ import annotations +import asyncio import contextlib +from typing import Any from unittest import mock import pytest -from distributed.shuffle._core import id_from_key +from dask.typing import Key + +from distributed import Worker +from distributed.shuffle._core import ShuffleId, ShuffleSpec, id_from_key from distributed.shuffle._merge import hash_join +from distributed.shuffle._worker_plugin import ShuffleRun, _ShuffleRunManager from distributed.utils_test import gen_cluster dd = pytest.importorskip("dask.dataframe") @@ -112,7 +118,9 @@ # Vary the number of output partitions for the shuffles of dd2 .repartition(20).merge(ddf2, left_on="b", right_on="x", shuffle="p2p") ) - # Generate unique shuffle IDs if the input frame is the same but parameters differ + # Generate unique shuffle IDs if the input frame is the same but + # parameters differ. Reusing shuffles in merges is dangerous because of the + # required coordination and complexity introduced through dynamic clusters. assert sum(id_from_key(k) is not None for k in out.dask) == 4 result = await c.compute(out) expected = pdf1.merge(pdf2, left_on="a", right_on="x").merge( @@ -147,8 +155,10 @@ right_on="b", shuffle="p2p", ) - # Generate the same shuffle IDs if the input frame is the same and all its parameters match - assert sum(id_from_key(k) is not None for k in out.dask) == 3 + # Generate unique shuffle IDs if the input frame is the same and all its + # parameters match. Reusing shuffles in merges is dangerous because of the + # required coordination and complexity introduced through dynamic clusters. + assert sum(id_from_key(k) is not None for k in out.dask) == 4 result = await c.compute(out) expected = pdf2.merge( pdf1.merge(pdf2, left_on="a", right_on="x"), left_on="x", right_on="b" @@ -474,3 +484,51 @@ ), pdf_right.merge(pdf_left, how=how, right_index=True, left_on="a"), ) + + +class LimitedGetOrCreateShuffleRunManager(_ShuffleRunManager): + seen: set[ShuffleId] + block_get_or_create: asyncio.Event + blocking_get_or_create: asyncio.Event + + def __init__(self, *args: Any, **kwargs: Any): + super().__init__(*args, **kwargs) + self.seen = set() + self.limit = 1 + self.blocking_get_or_create = asyncio.Event() + self.block_get_or_create = asyncio.Event() + + async def get_or_create(self, spec: ShuffleSpec, key: Key) -> ShuffleRun: + if len(self.seen) >= self.limit and spec.id not in self.seen: + self.blocking_get_or_create.set() + await self.block_get_or_create.wait() + self.seen.add(spec.id) + return await super().get_or_create(spec, key) + + [email protected]( + "distributed.shuffle._worker_plugin._ShuffleRunManager", + LimitedGetOrCreateShuffleRunManager, +) +@gen_cluster(client=True, nthreads=[("", 1)]) +async def test_merge_does_not_deadlock_if_worker_joins(c, s, a): + """Regression test for https://github.com/dask/distributed/issues/8411""" + pdf1 = pd.DataFrame({"a": range(100), "b": range(0, 200, 2)}) + pdf2 = pd.DataFrame({"x": range(200), "y": [1, 2, 3, 4] * 50}) + df1 = dd.from_pandas(pdf1, npartitions=10) + df2 = dd.from_pandas(pdf2, npartitions=20) + + run_manager_A = a.plugins["shuffle"].shuffle_runs + + joined = dd.merge(df1, df2, left_on="a", right_on="x", shuffle="p2p") + result = c.compute(joined) + + await run_manager_A.blocking_get_or_create.wait() + + async with Worker(s.address) as b: + run_manager_A.block_get_or_create.set() + run_manager_B = b.plugins["shuffle"].shuffle_runs + run_manager_B.block_get_or_create.set() + result = await result + expected = pd.merge(pdf1, pdf2, left_on="a", right_on="x") + assert_eq(result, expected, check_index=False) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2023.12.0/distributed/shuffle/tests/test_rechunk.py new/distributed-2023.12.1/distributed/shuffle/tests/test_rechunk.py --- old/distributed-2023.12.0/distributed/shuffle/tests/test_rechunk.py 2023-12-02 00:22:58.000000000 +0100 +++ new/distributed-2023.12.1/distributed/shuffle/tests/test_rechunk.py 2023-12-15 21:40:22.000000000 +0100 @@ -26,8 +26,8 @@ from distributed.shuffle._limiter import ResourceLimiter from distributed.shuffle._rechunk import ( ArrayRechunkRun, + ArrayRechunkSpec, Split, - _get_worker_for_hash_sharding, split_axes, ) from distributed.shuffle.tests.utils import AbstractShuffleTestPool @@ -103,11 +103,12 @@ worker_for_mapping = {} + spec = ArrayRechunkSpec(id=ShuffleId("foo"), disk=disk, new=new, old=old) new_indices = list(product(*(range(len(dim)) for dim in new))) - for i, idx in enumerate(new_indices): - worker_for_mapping[idx] = _get_worker_for_hash_sharding(i, workers) - + for idx in new_indices: + worker_for_mapping[idx] = spec.pick_worker(idx, workers) assert len(set(worker_for_mapping.values())) == min(n_workers, len(new_indices)) + # scheduler_state = spec.create_new_run(worker_for_mapping) with ArrayRechunkTestPool() as local_shuffle_pool: shuffles = [] @@ -1200,3 +1201,18 @@ buf_ids = {id(get_host_array(shard)) for shard in shards} assert len(buf_ids) == len(shards) await block_map.set() + + [email protected]("nworkers", [1, 2, 41, 50]) +def test_worker_for_homogeneous_distribution(nworkers): + old = ((1, 2, 3, 4), (5,) * 6) + new = ((5, 5), (12, 18)) + workers = [str(i) for i in range(nworkers)] + spec = ArrayRechunkSpec(ShuffleId("foo"), disk=False, new=new, old=old) + count = {w: 0 for w in workers} + for nidx in spec.output_partitions: + count[spec.pick_worker(nidx, workers)] += 1 + + assert sum(count.values()) > 0 + assert sum(count.values()) == len(list(spec.output_partitions)) + assert abs(max(count.values()) - min(count.values())) <= 1 diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2023.12.0/distributed/shuffle/tests/test_shuffle.py new/distributed-2023.12.1/distributed/shuffle/tests/test_shuffle.py --- old/distributed-2023.12.0/distributed/shuffle/tests/test_shuffle.py 2023-12-02 00:22:58.000000000 +0100 +++ new/distributed-2023.12.1/distributed/shuffle/tests/test_shuffle.py 2023-12-15 21:40:22.000000000 +0100 @@ -2478,6 +2478,45 @@ dd.assert_eq(result, expected) +class BlockedBarrierShuffleSchedulerPlugin(ShuffleSchedulerPlugin): + def __init__(self, scheduler: Scheduler): + super().__init__(scheduler) + self.in_barrier = asyncio.Event() + self.block_barrier = asyncio.Event() + + async def barrier(self, id: ShuffleId, run_id: int, consistent: bool) -> None: + self.in_barrier.set() + await self.block_barrier.wait() + return await super().barrier(id, run_id, consistent) + + +@gen_cluster(client=True) +async def test_unpack_is_non_rootish(c, s, a, b): + with pytest.warns(UserWarning): + scheduler_plugin = BlockedBarrierShuffleSchedulerPlugin(s) + df = dask.datasets.timeseries( + start="2000-01-01", + end="2000-01-21", + dtypes={"x": float, "y": float}, + freq="10 s", + ) + df = df.shuffle("x") + result = c.compute(df) + + await scheduler_plugin.in_barrier.wait() + + unpack_tss = [ts for key, ts in s.tasks.items() if key_split(key) == "shuffle_p2p"] + assert len(unpack_tss) == 20 + assert not any(s.is_rootish(ts) for ts in unpack_tss) + del unpack_tss + scheduler_plugin.block_barrier.set() + result = await result + + await check_worker_cleanup(a) + await check_worker_cleanup(b) + await check_scheduler_cleanup(s) + + class FlakyConnectionPool(ConnectionPool): def __init__(self, *args, failing_connects=0, **kwargs): self.attempts = 0 diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2023.12.0/distributed/spill.py new/distributed-2023.12.1/distributed/spill.py --- old/distributed-2023.12.0/distributed/spill.py 2023-12-02 00:22:58.000000000 +0100 +++ new/distributed-2023.12.1/distributed/spill.py 2023-12-15 21:40:22.000000000 +0100 @@ -136,25 +136,24 @@ logger.error("Spill to disk failed; keeping data in memory", exc_info=True) raise HandledError() except PickleError as e: - key_e, orig_e = e.args - assert key_e in self.fast - assert key_e not in self.slow - if key_e == key: + assert e.key in self.fast + assert e.key not in self.slow + if e.key == key: assert key is not None # The key we just inserted failed to serialize. # This happens only when the key is individually larger than target. # The exception will be caught by Worker and logged; the status of # the task will be set to error. del self[key] - raise orig_e + raise else: # The key we just inserted is smaller than target, but it caused # another, unrelated key to be spilled out of the LRU, and that key # failed to serialize. There's nothing wrong with the new key. The older # key is still in memory. - if key_e not in self.logged_pickle_errors: - logger.error("Failed to pickle %r", key_e, exc_info=True) - self.logged_pickle_errors.add(key_e) + if e.key not in self.logged_pickle_errors: + logger.error("Failed to pickle %r", e.key, exc_info=True) + self.logged_pickle_errors.add(e.key) raise HandledError() def __setitem__(self, key: Key, value: object) -> None: @@ -267,8 +266,13 @@ pass -class PickleError(Exception): - pass +class PickleError(TypeError): + def __str__(self) -> str: + return f"Failed to pickle {self.key!r}" + + @property + def key(self) -> Key: + return self.args[0] class HandledError(Exception): @@ -324,7 +328,7 @@ # zict.LRU ensures that the key remains in fast if we raise. # Wrap the exception so that it's recognizable by SpillBuffer, # which will then unwrap it. - raise PickleError(key, e) + raise PickleError(key) from e # Thanks to Buffer.__setitem__, we never update existing # keys in slow, but always delete them and reinsert them. diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2023.12.0/distributed/tests/test_scheduler.py new/distributed-2023.12.1/distributed/tests/test_scheduler.py --- old/distributed-2023.12.0/distributed/tests/test_scheduler.py 2023-12-02 00:22:58.000000000 +0100 +++ new/distributed-2023.12.1/distributed/tests/test_scheduler.py 2023-12-15 21:40:22.000000000 +0100 @@ -279,6 +279,30 @@ test_decide_worker_coschedule_order_neighbors_() +@gen_cluster( + client=True, + nthreads=[], +) +async def test_override_is_rootish(c, s): + x = c.submit(lambda x: x + 1, 1, key="x") + await async_poll_for(lambda: "x" in s.tasks, timeout=5) + ts_x = s.tasks["x"] + assert ts_x._rootish is None + assert s.is_rootish(ts_x) + + ts_x._rootish = False + assert not s.is_rootish(ts_x) + + y = c.submit(lambda y: y + 1, 1, key="y", workers=["not-existing"]) + await async_poll_for(lambda: "y" in s.tasks, timeout=5) + ts_y = s.tasks["y"] + assert ts_y._rootish is None + assert not s.is_rootish(ts_y) + + ts_y._rootish = True + assert s.is_rootish(ts_y) + + @pytest.mark.skipif( QUEUING_ON_BY_DEFAULT, reason="Not relevant with queuing on; see https://github.com/dask/distributed/issues/7204", @@ -1799,13 +1823,11 @@ assert response == s.address -@gen_cluster(client=True, config={"distributed.scheduler.pickle": False}) -async def test_run_on_scheduler_disabled(c, s, a, b): - def f(dask_scheduler=None): - return dask_scheduler.address - - with pytest.raises(ValueError, match="disallowed from deserializing"): - await c._run_on_scheduler(f) +@gen_test() +async def test_allow_pickle_false(): + with dask.config.set({"distributed.scheduler.pickle": False}): + with pytest.raises(RuntimeError, match="Pickling can no longer be disabled"): + await Scheduler() @gen_cluster() diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2023.12.0/distributed/tests/test_spill.py new/distributed-2023.12.1/distributed/tests/test_spill.py --- old/distributed-2023.12.0/distributed/tests/test_spill.py 2023-12-02 00:22:58.000000000 +0100 +++ new/distributed-2023.12.1/distributed/tests/test_spill.py 2023-12-15 21:40:22.000000000 +0100 @@ -219,9 +219,10 @@ a = Bad(size=201) # Exception caught in the worker - with pytest.raises(TypeError, match="Could not serialize"): + with pytest.raises(TypeError, match="Failed to pickle 'a'") as e: with captured_logger("distributed.spill") as logs_bad_key: buf["a"] = a + assert isinstance(e.value.__cause__.__cause__, MyError) # spill.py must remain silent because we're already logging in worker.py assert not logs_bad_key.getvalue() @@ -240,7 +241,7 @@ # worker.py won't intercept the exception here, so spill.py must dump the traceback logs_value = logs_bad_key_mem.getvalue() - assert "Failed to pickle" in logs_value # from distributed.spill + assert "Failed to pickle 'b'" in logs_value # from distributed.spill assert "Traceback" in logs_value # from distributed.spill assert_buf(buf, tmp_path, {"b": b, "c": c}, {}) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2023.12.0/distributed/tests/test_worker.py new/distributed-2023.12.1/distributed/tests/test_worker.py --- old/distributed-2023.12.0/distributed/tests/test_worker.py 2023-12-02 00:22:58.000000000 +0100 +++ new/distributed-2023.12.1/distributed/tests/test_worker.py 2023-12-15 21:40:22.000000000 +0100 @@ -30,7 +30,6 @@ from dask.system import CPU_COUNT from dask.utils import tmpfile -import distributed from distributed import ( Client, Event, @@ -46,7 +45,6 @@ from distributed.comm.utils import OFFLOAD_THRESHOLD from distributed.compatibility import LINUX, WINDOWS from distributed.core import CommClosedError, Status, rpc -from distributed.diagnostics import nvml from distributed.diagnostics.plugin import ForwardOutput from distributed.metrics import time from distributed.protocol import pickle @@ -2267,17 +2265,6 @@ await future [email protected] -@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)]) -async def test_gpu_executor(c, s, w): - if nvml.device_get_count() > 0: - e = w.executors["gpu"] - assert isinstance(e, distributed.threadpoolexecutor.ThreadPoolExecutor) - assert e._max_workers == 1 - else: - assert "gpu" not in w.executors - - async def assert_task_states_on_worker( expected: dict[str, str], worker: Worker ) -> None: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2023.12.0/distributed/tests/test_worker_memory.py new/distributed-2023.12.1/distributed/tests/test_worker_memory.py --- old/distributed-2023.12.0/distributed/tests/test_worker_memory.py 2023-12-02 00:22:58.000000000 +0100 +++ new/distributed-2023.12.1/distributed/tests/test_worker_memory.py 2023-12-15 21:40:22.000000000 +0100 @@ -174,8 +174,9 @@ assert x.status == "error" - with pytest.raises(TypeError, match="Could not serialize"): + with pytest.raises(TypeError, match="Failed to pickle 'x'") as e: await x + assert isinstance(e.value.__cause__.__cause__, CustomError) await assert_basic_futures(c) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2023.12.0/distributed/worker.py new/distributed-2023.12.1/distributed/worker.py --- old/distributed-2023.12.0/distributed/worker.py 2023-12-02 00:22:58.000000000 +0100 +++ new/distributed-2023.12.1/distributed/worker.py 2023-12-15 21:40:22.000000000 +0100 @@ -636,10 +636,6 @@ "offload": utils._offload_executor, "actor": ThreadPoolExecutor(1, thread_name_prefix="Dask-Actor-Threads"), } - if nvml.device_get_count() > 0: - self.executors["gpu"] = ThreadPoolExecutor( - 1, thread_name_prefix="Dask-GPU-Threads" - ) # Find the default executor if executor == "offload": diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2023.12.0/pyproject.toml new/distributed-2023.12.1/pyproject.toml --- old/distributed-2023.12.0/pyproject.toml 2023-12-02 00:22:58.000000000 +0100 +++ new/distributed-2023.12.1/pyproject.toml 2023-12-15 21:40:22.000000000 +0100 @@ -28,7 +28,7 @@ dependencies = [ "click >= 8.0", "cloudpickle >= 1.5.0", - "dask == 2023.12.0", + "dask == 2023.12.1", "jinja2 >= 2.10.3", "locket >= 1.0.0", "msgpack >= 1.0.0",
