Script 'mail_helper' called by obssrc Hello community, here is the log from the commit of package python-distributed for openSUSE:Factory checked in at 2024-03-06 23:05:52 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-distributed (Old) and /work/SRC/openSUSE:Factory/.python-distributed.new.1770 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-distributed" Wed Mar 6 23:05:52 2024 rev:79 rq:1155502 version:2024.2.1 Changes: -------- --- /work/SRC/openSUSE:Factory/python-distributed/python-distributed.changes 2024-02-16 21:42:51.006770687 +0100 +++ /work/SRC/openSUSE:Factory/.python-distributed.new.1770/python-distributed.changes 2024-03-06 23:06:23.713369868 +0100 @@ -1,0 +2,9 @@ +Tue Mar 5 21:29:43 UTC 2024 - Ben Greiner <c...@bnavigator.de> + +- Update to 2024.2.1 + * Allow silencing dask.DataFrame deprecation warning + * More robust distributed scheduler for rare key collisions + * More robust adaptive scaling on large clusters +- Drop distributed-ignore-daskdepr.patch + +------------------------------------------------------------------- Old: ---- distributed-2024.2.0-gh.tar.gz distributed-ignore-daskdepr.patch New: ---- distributed-2024.2.1-gh.tar.gz BETA DEBUG BEGIN: Old: * More robust adaptive scaling on large clusters - Drop distributed-ignore-daskdepr.patch BETA DEBUG END: ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-distributed.spec ++++++ --- /var/tmp/diff_new_pack.xdnFuB/_old 2024-03-06 23:06:24.357393237 +0100 +++ /var/tmp/diff_new_pack.xdnFuB/_new 2024-03-06 23:06:24.357393237 +0100 @@ -47,7 +47,7 @@ Name: python-distributed%{psuffix} # ===> Note: python-dask MUST be updated in sync with python-distributed! <=== -Version: 2024.2.0 +Version: 2024.2.1 Release: 0 Summary: Library for distributed computing with Python License: BSD-3-Clause @@ -55,8 +55,6 @@ # SourceRepository: https://github.com/dask/distributed Source: https://github.com/dask/distributed/archive/refs/tags/%{version}.tar.gz#/distributed-%{version}-gh.tar.gz Source99: python-distributed-rpmlintrc -# PATCH-FIX-UPSTREAM distributed-ignore-daskdepr.patch gh#dask/distributed#8504 -Patch0: distributed-ignore-daskdepr.patch # PATCH-FIX-OPENSUSE distributed-ignore-off.patch -- ignore that we can't probe addresses on obs, c...@bnavigator.de Patch3: distributed-ignore-offline.patch # PATCH-FIX-OPENSUSE distributed-ignore-thread-leaks.patch -- ignore leaking threads on obs, c...@bnavigator.de ++++++ distributed-2024.2.0-gh.tar.gz -> distributed-2024.2.1-gh.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/.github/workflows/ci-pre-commit.yml new/distributed-2024.2.1/.github/workflows/ci-pre-commit.yml --- old/distributed-2024.2.0/.github/workflows/ci-pre-commit.yml 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/.github/workflows/ci-pre-commit.yml 2024-02-23 19:11:48.000000000 +0100 @@ -15,4 +15,4 @@ - uses: actions/setup-python@v5 with: python-version: '3.9' - - uses: pre-commit/action@v3.0.0 + - uses: pre-commit/action@v3.0.1 diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/.github/workflows/tests.yaml new/distributed-2024.2.1/.github/workflows/tests.yaml --- old/distributed-2024.2.0/.github/workflows/tests.yaml 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/.github/workflows/tests.yaml 2024-02-23 19:11:48.000000000 +0100 @@ -154,7 +154,7 @@ # Increase this value to reset cache if # continuous_integration/environment-${{ matrix.environment }}.yaml has not # changed. See also same variable in .pre-commit-config.yaml - CACHE_NUMBER: 0 + CACHE_NUMBER: 1 id: cache - name: Update environment diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/.pre-commit-config.yaml new/distributed-2024.2.1/.pre-commit-config.yaml --- old/distributed-2024.2.0/.pre-commit-config.yaml 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/.pre-commit-config.yaml 2024-02-23 19:11:48.000000000 +0100 @@ -69,4 +69,4 @@ # Increase this value to clear the cache on GitHub actions if nothing else in this file # has changed. See also same variable in .github/workflows/test.yaml -# CACHE_NUMBER: 0 +# CACHE_NUMBER: 1 diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/continuous_integration/environment-3.12.yaml new/distributed-2024.2.1/continuous_integration/environment-3.12.yaml --- old/distributed-2024.2.0/continuous_integration/environment-3.12.yaml 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/continuous_integration/environment-3.12.yaml 2024-02-23 19:11:48.000000000 +0100 @@ -15,8 +15,8 @@ - filesystem-spec # overridden by git tip below - gilknocker - h5py - - ipykernel <6.22.0 # https://github.com/dask/distributed/issues/7688 - - ipywidgets <8.0.5 # https://github.com/dask/distributed/issues/7688 + - ipykernel + - ipywidgets - jinja2 - locket - msgpack-python diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/continuous_integration/gpuci/axis.yaml new/distributed-2024.2.1/continuous_integration/gpuci/axis.yaml --- old/distributed-2024.2.0/continuous_integration/gpuci/axis.yaml 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/continuous_integration/gpuci/axis.yaml 2024-02-23 19:11:48.000000000 +0100 @@ -9,6 +9,6 @@ - ubuntu20.04 RAPIDS_VER: -- "24.02" +- "24.04" excludes: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/continuous_integration/scripts/test_report.py new/distributed-2024.2.1/continuous_integration/scripts/test_report.py --- old/distributed-2024.2.0/continuous_integration/scripts/test_report.py 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/continuous_integration/scripts/test_report.py 2024-02-23 19:11:48.000000000 +0100 @@ -498,9 +498,9 @@ ) overall = {name: grouped.get_group(name) for name in grouped.groups} - # Get all of the workflow run timestamps that we wound up with, which we can use + # Get all the workflow run timestamps that we wound up with, which we can use # below to align the different groups. - times = set() + times: set = set() for df in overall.values(): times.update(df.date.unique()) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/distributed/client.py new/distributed-2024.2.1/distributed/client.py --- old/distributed-2024.2.0/distributed/client.py 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/distributed/client.py 2024-02-23 19:11:48.000000000 +0100 @@ -41,6 +41,7 @@ ensure_dict, format_bytes, funcname, + parse_bytes, parse_timedelta, shorten_traceback, typename, @@ -1576,7 +1577,7 @@ breakout = False for msg in msgs: - logger.debug("Client receives message %s", msg) + logger.debug("Client %s receives message %s", self.id, msg) if "status" in msg and "error" in msg["status"]: typ, exc, tb = clean_exception(**msg) @@ -3162,7 +3163,9 @@ header, frames = serialize(ToPickle(dsk), on_error="raise") pickled_size = sum(map(nbytes, [header] + frames)) - if pickled_size > 10_000_000: + if pickled_size > parse_bytes( + dask.config.get("distributed.admin.large-graph-warning-threshold") + ): warnings.warn( f"Sending large graph of size {format_bytes(pickled_size)}.\n" "This may cause some slowdown.\n" diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/distributed/comm/tcp.py new/distributed-2024.2.1/distributed/comm/tcp.py --- old/distributed-2024.2.0/distributed/comm/tcp.py 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/distributed/comm/tcp.py 2024-02-23 19:11:48.000000000 +0100 @@ -44,14 +44,14 @@ logger = logging.getLogger(__name__) -# Workaround for OpenSSL 1.0.2. -# Can drop with OpenSSL 1.1.1 used by Python 3.10+. -# ref: https://bugs.python.org/issue42853 -if sys.version_info < (3, 10): - OPENSSL_MAX_CHUNKSIZE = 256 ** ctypes.sizeof(ctypes.c_int) // 2 - 1 -else: - OPENSSL_MAX_CHUNKSIZE = 256 ** ctypes.sizeof(ctypes.c_size_t) - 1 +# We must not load more than this into a buffer at a time +# It's currently unclear why that is +# see +# - https://github.com/dask/distributed/pull/5854 +# - https://bugs.python.org/issue42853 +# - https://github.com/dask/distributed/pull/8507 +C_INT_MAX = 256 ** ctypes.sizeof(ctypes.c_int) // 2 - 1 MAX_BUFFER_SIZE = MEMORY_LIMIT / 2 @@ -286,8 +286,8 @@ 2, range( 0, - each_frame_nbytes + OPENSSL_MAX_CHUNKSIZE, - OPENSSL_MAX_CHUNKSIZE, + each_frame_nbytes + C_INT_MAX, + C_INT_MAX, ), ): chunk = each_frame[i:j] @@ -360,7 +360,7 @@ for i, j in sliding_window( 2, - range(0, n + OPENSSL_MAX_CHUNKSIZE, OPENSSL_MAX_CHUNKSIZE), + range(0, n + C_INT_MAX, C_INT_MAX), ): chunk = buf[i:j] actual = await stream.read_into(chunk) # type: ignore[arg-type] @@ -432,7 +432,8 @@ A TLS-specific version of TCP. """ - max_shard_size = min(OPENSSL_MAX_CHUNKSIZE, TCP.max_shard_size) + # Workaround for OpenSSL 1.0.2 (can drop with OpenSSL 1.1.1) + max_shard_size = min(C_INT_MAX, TCP.max_shard_size) def _read_extra(self): TCP._read_extra(self) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/distributed/deploy/tests/test_adaptive.py new/distributed-2024.2.1/distributed/deploy/tests/test_adaptive.py --- old/distributed-2024.2.0/distributed/deploy/tests/test_adaptive.py 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/distributed/deploy/tests/test_adaptive.py 2024-02-23 19:11:48.000000000 +0100 @@ -203,7 +203,7 @@ assert len(adapt.log) == 1 # Scale up when there is plenty of available work - futures = client.map(slowinc, range(1000), delay=0.100) + futures = client.map(slowinc, range(2, 1002), delay=0.100) while len(adapt.log) == 1: await asyncio.sleep(0.01) assert len(adapt.log) == 2 diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/distributed/distributed-schema.yaml new/distributed-2024.2.1/distributed/distributed-schema.yaml --- old/distributed-2024.2.0/distributed/distributed-schema.yaml 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/distributed/distributed-schema.yaml 2024-02-23 19:11:48.000000000 +0100 @@ -1103,6 +1103,12 @@ description: | Options for logs, event loops, and so on properties: + large-graph-warning-threshold: + type: string + description: | + Threshold in bytes for when a warning is raised about a large + submitted task graph. + Default is 10MB. tick: type: object description: | diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/distributed/distributed.yaml new/distributed-2024.2.1/distributed/distributed.yaml --- old/distributed-2024.2.0/distributed/distributed.yaml 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/distributed/distributed.yaml 2024-02-23 19:11:48.000000000 +0100 @@ -323,6 +323,7 @@ ################## admin: + large-graph-warning-threshold: 10MB # Threshold for warning on large graph tick: interval: 20ms # time between event loop health checks limit: 3s # time allowed before triggering a warning diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/distributed/protocol/tests/test_cupy.py new/distributed-2024.2.1/distributed/protocol/tests/test_cupy.py --- old/distributed-2024.2.0/distributed/protocol/tests/test_cupy.py 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/distributed/protocol/tests/test_cupy.py 2024-02-23 19:11:48.000000000 +0100 @@ -17,7 +17,7 @@ @pytest.mark.parametrize("order", ["C", "F"]) @pytest.mark.parametrize("serializers", [("cuda",), ("dask",), ("pickle",)]) def test_serialize_cupy(shape, dtype, order, serializers): - x = cupy.arange(numpy.product(shape), dtype=dtype) + x = cupy.arange(numpy.prod(shape), dtype=dtype) x = cupy.ndarray(shape, dtype=x.dtype, memptr=x.data, order=order) header, frames = serialize(x, serializers=serializers) y = deserialize(header, frames, deserializers=serializers) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/distributed/protocol/tests/test_numba.py new/distributed-2024.2.1/distributed/protocol/tests/test_numba.py --- old/distributed-2024.2.0/distributed/protocol/tests/test_numba.py 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/distributed/protocol/tests/test_numba.py 2024-02-23 19:11:48.000000000 +0100 @@ -20,7 +20,7 @@ if not cuda.is_available(): pytest.skip("CUDA is not available") - ary = np.arange(np.product(shape), dtype=dtype) + ary = np.arange(np.prod(shape), dtype=dtype) ary = np.ndarray(shape, dtype=ary.dtype, buffer=ary.data, order=order) x = cuda.to_device(ary) header, frames = serialize(x, serializers=serializers) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/distributed/protocol/tests/test_protocol.py new/distributed-2024.2.1/distributed/protocol/tests/test_protocol.py --- old/distributed-2024.2.0/distributed/protocol/tests/test_protocol.py 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/distributed/protocol/tests/test_protocol.py 2024-02-23 19:11:48.000000000 +0100 @@ -208,3 +208,30 @@ assert L[0].count(b"__Pickled__") == 1 assert L[0].count(b"__Serialized__") == 1 assert loads(L) == {np.int64(1): {2: "a"}, 3: ("b", "c"), 4: "d"} + + +@pytest.mark.slow +@pytest.mark.parametrize("typ", [bytes, str, "ext"]) +def test_large_payload(typ): + """See also: test_core.py::test_large_payload""" + critical_size = 2**31 + 1 # >2 GiB + if typ == bytes: + large_payload = critical_size * b"0" + expected = large_payload + elif typ == str: + large_payload = critical_size * "0" + expected = large_payload + # Testing array and map dtypes is practically not possible since we'd have + # to create an actual list or dict object of critical size (i.e. not the + # content but the container itself). These are so large that msgpack is + # running forever + # elif typ == "array": + # large_payload = [b"0"] * critical_size + # expected = tuple(large_payload) + # elif typ == "map": + # large_payload = {x: b"0" for x in range(critical_size)} + # expected = large_payload + elif typ == "ext": + large_payload = msgpack.ExtType(1, b"0" * critical_size) + expected = large_payload + assert loads(dumps(large_payload)) == expected diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/distributed/protocol/utils.py new/distributed-2024.2.1/distributed/protocol/utils.py --- old/distributed-2024.2.0/distributed/protocol/utils.py 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/distributed/protocol/utils.py 2024-02-23 19:11:48.000000000 +0100 @@ -12,9 +12,7 @@ BIG_BYTES_SHARD_SIZE = dask.utils.parse_bytes(dask.config.get("distributed.comm.shard")) -msgpack_opts = { - ("max_%s_len" % x): 2**31 - 1 for x in ["str", "bin", "array", "map", "ext"] -} +msgpack_opts = {} msgpack_opts["strict_map_key"] = False msgpack_opts["raw"] = False diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/distributed/scheduler.py new/distributed-2024.2.1/distributed/scheduler.py --- old/distributed-2024.2.0/distributed/scheduler.py 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/distributed/scheduler.py 2024-02-23 19:11:48.000000000 +0100 @@ -52,7 +52,9 @@ from tornado.ioloop import IOLoop import dask -from dask.core import get_deps, validate_key +import dask.utils +from dask.base import TokenizationError, normalize_token, tokenize +from dask.core import get_deps, iskey, validate_key from dask.typing import Key, no_default from dask.utils import ( ensure_dict, @@ -4721,6 +4723,7 @@ stimulus_id=stimulus_id or f"update-graph-{start}", ) except RuntimeError as e: + logger.error(str(e)) err = error_message(e) for key in keys: self.report( @@ -4729,7 +4732,10 @@ "key": key, "exception": err["exception"], "traceback": err["traceback"], - } + }, + # This informs all clients in who_wants plus the current client + # (which may not have been added to who_wants yet) + client=client, ) end = time() self.digest_metric("update-graph-duration", end - start) @@ -4747,6 +4753,7 @@ stack = list(keys) touched_keys = set() touched_tasks = [] + tgs_with_bad_run_spec = set() while stack: k = stack.pop() if k in touched_keys: @@ -4755,8 +4762,73 @@ if ts is None: ts = self.new_task(k, dsk.get(k), "released", computation=computation) new_tasks.append(ts) - elif not ts.run_spec: + # It is possible to create the TaskState object before its runspec is known + # to the scheduler. For instance, this is possible when using a Variable: + # `f = c.submit(foo); await Variable().set(f)` since the Variable uses a + # different comm channel, so the `client_desires_key` message could arrive + # before `update_graph`. + # There are also anti-pattern processes possible; + # see for example test_scatter_creates_ts + elif ts.run_spec is None: ts.run_spec = dsk.get(k) + # run_spec in the submitted graph may be None. This happens + # when an already persisted future is part of the graph + elif k in dsk: + # If both tokens are non-deterministic, skip comparison + try: + tok_lhs = tokenize(ts.run_spec, ensure_deterministic=True) + except TokenizationError: + tok_lhs = "" + try: + tok_rhs = tokenize(dsk[k], ensure_deterministic=True) + except TokenizationError: + tok_rhs = "" + + # Additionally check dependency names. This should only be necessary + # if run_specs can't be tokenized deterministically. + deps_lhs = {dts.key for dts in ts.dependencies} + deps_rhs = dependencies[k] + + # FIXME It would be a really healthy idea to change this to a hard + # failure. However, this is not possible at the moment because of + # https://github.com/dask/dask/issues/9888 + if tok_lhs != tok_rhs or deps_lhs != deps_rhs: + # Retain old run_spec and dependencies; rerun them if necessary. + # This sweeps the issue of collision under the carpet as long as the + # old and new task produce the same output - such as in + # dask/dask#9888. + dependencies[k] = deps_lhs + + if ts.group not in tgs_with_bad_run_spec: + tgs_with_bad_run_spec.add(ts.group) + logger.warning( + f"Detected different `run_spec` for key {ts.key!r} between " + "two consecutive calls to `update_graph`. " + "This can cause failures and deadlocks down the line. " + "Please ensure unique key names. " + "If you are using a standard dask collections, consider " + "releasing all the data before resubmitting another " + "computation. More details and help can be found at " + "https://github.com/dask/dask/issues/9888. " + + textwrap.dedent( + f""" + Debugging information + --------------------- + old task state: {ts.state} + old run_spec: {ts.run_spec!r} + new run_spec: {dsk[k]!r} + old token: {normalize_token(ts.run_spec)!r} + new token: {normalize_token(dsk[k])!r} + old dependencies: {deps_lhs} + new dependencies: {deps_rhs} + """ + ) + ) + else: + logger.debug( + f"Detected different `run_spec` for key {ts.key!r} between " + "two consecutive calls to `update_graph`." + ) if ts.run_spec: runnable.append(ts) @@ -5538,28 +5610,28 @@ tasks: dict = self.tasks ts = tasks.get(msg_key) - client_comms: dict = self.client_comms - if ts is None: + if ts is None and client is None: # Notify all clients - client_keys = list(client_comms) - elif client: - # Notify clients interested in key - client_keys = [cs.client_key for cs in ts.who_wants or ()] + client_keys = list(self.client_comms) + elif ts is None: + client_keys = [client] else: # Notify clients interested in key (including `client`) + # Note that, if report() was called by update_graph(), `client` won't be in + # ts.who_wants yet. client_keys = [ cs.client_key for cs in ts.who_wants or () if cs.client_key != client ] - client_keys.append(client) + if client is not None: + client_keys.append(client) - k: str for k in client_keys: - c = client_comms.get(k) + c = self.client_comms.get(k) if c is None: continue try: c.send(msg) - # logger.debug("Scheduler sends message to client %s", msg) + # logger.debug("Scheduler sends message to client %s: %s", k, msg) except CommClosedError: if self.status == Status.running: logger.critical( @@ -7019,12 +7091,11 @@ If neither ``workers`` nor ``names`` are provided, we call ``workers_to_close`` which finds a good set. close_workers: bool (defaults to False) - Whether or not to actually close the worker explicitly from here. - Otherwise we expect some external job scheduler to finish off the - worker. + Whether to actually close the worker explicitly from here. + Otherwise, we expect some external job scheduler to finish off the worker. remove: bool (defaults to True) - Whether or not to remove the worker metadata immediately or else - wait for the worker to contact us. + Whether to remove the worker metadata immediately or else wait for the + worker to contact us. If close_workers=False and remove=False, this method just flushes the tasks in memory out of the workers and then returns. @@ -8724,9 +8795,17 @@ dsk2 = {} fut_deps = {} for k, v in dsk.items(): - dsk2[k], futs = unpack_remotedata(v, byte_keys=True) + v, futs = unpack_remotedata(v, byte_keys=True) if futs: fut_deps[k] = futs + + # Remove aliases {x: x}. + # FIXME: This is an artifact generated by unpack_remotedata when using persisted + # collections. There should be a better way to achieve that tasks are not self + # referencing themselves. + if not iskey(v) or v != k: + dsk2[k] = v + dsk = dsk2 # - Add in deps for any tasks that depend on futures @@ -8736,14 +8815,8 @@ # Remove any self-dependencies (happens on test_publish_bag() and others) for k, v in dependencies.items(): deps = set(v) - if k in deps: - deps.remove(k) + deps.discard(k) dependencies[k] = deps - # Remove aliases - for k in list(dsk): - if dsk[k] is k: - del dsk[k] dsk = valmap(_normalize_task, dsk) - return dsk, dependencies, annotations_by_type diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/distributed/shuffle/_core.py new/distributed-2024.2.1/distributed/shuffle/_core.py --- old/distributed-2024.2.0/distributed/shuffle/_core.py 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/distributed/shuffle/_core.py 2024-02-23 19:11:48.000000000 +0100 @@ -442,6 +442,9 @@ participating_workers=set(worker_for.values()), ) + def validate_data(self, data: Any) -> None: + """Validate payload data before shuffling""" + @abc.abstractmethod def create_run_on_worker( self, diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/distributed/shuffle/_disk.py new/distributed-2024.2.1/distributed/shuffle/_disk.py --- old/distributed-2024.2.0/distributed/shuffle/_disk.py 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/distributed/shuffle/_disk.py 2024-02-23 19:11:48.000000000 +0100 @@ -12,6 +12,7 @@ from distributed.metrics import context_meter, thread_time from distributed.shuffle._buffer import ShardsBuffer +from distributed.shuffle._exceptions import DataUnavailable from distributed.shuffle._limiter import ResourceLimiter from distributed.shuffle._pickle import pickle_bytelist from distributed.utils import Deadline, empty_context, log_errors, nbytes @@ -201,13 +202,13 @@ context_meter.digest_metric("p2p-disk-read", 1, "count") context_meter.digest_metric("p2p-disk-read", size, "bytes") except FileNotFoundError: - raise KeyError(id) + raise DataUnavailable(id) if data: self.bytes_read += size return data else: - raise KeyError(id) + raise DataUnavailable(id) async def close(self) -> None: await super().close() diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/distributed/shuffle/_exceptions.py new/distributed-2024.2.1/distributed/shuffle/_exceptions.py --- old/distributed-2024.2.0/distributed/shuffle/_exceptions.py 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/distributed/shuffle/_exceptions.py 2024-02-23 19:11:48.000000000 +0100 @@ -3,3 +3,7 @@ class ShuffleClosedError(RuntimeError): pass + + +class DataUnavailable(Exception): + """Raised when data is not available in the buffer""" diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/distributed/shuffle/_memory.py new/distributed-2024.2.1/distributed/shuffle/_memory.py --- old/distributed-2024.2.0/distributed/shuffle/_memory.py 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/distributed/shuffle/_memory.py 2024-02-23 19:11:48.000000000 +0100 @@ -6,6 +6,7 @@ from dask.sizeof import sizeof from distributed.shuffle._buffer import ShardsBuffer +from distributed.shuffle._exceptions import DataUnavailable from distributed.shuffle._limiter import ResourceLimiter from distributed.utils import log_errors @@ -30,7 +31,10 @@ if not self._inputs_done: raise RuntimeError("Tried to read from file before done.") - shards = self._shards.pop(id) # Raises KeyError + try: + shards = self._shards.pop(id) # Raises KeyError + except KeyError: + raise DataUnavailable(id) self.bytes_read += sum(map(sizeof, shards)) # Don't keep the serialized and the deserialized shards # in memory at the same time diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/distributed/shuffle/_shuffle.py new/distributed-2024.2.1/distributed/shuffle/_shuffle.py --- old/distributed-2024.2.0/distributed/shuffle/_shuffle.py 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/distributed/shuffle/_shuffle.py 2024-02-23 19:11:48.000000000 +0100 @@ -48,6 +48,7 @@ handle_transfer_errors, handle_unpack_errors, ) +from distributed.shuffle._exceptions import DataUnavailable from distributed.shuffle._limiter import ResourceLimiter from distributed.shuffle._worker_plugin import ShuffleWorkerPlugin from distributed.sizeof import sizeof @@ -527,7 +528,7 @@ try: data = self._read_from_disk((partition_id,)) return convert_shards(data, self.meta) - except KeyError: + except DataUnavailable: return self.meta.copy() def _get_assigned_worker(self, id: int) -> str: @@ -554,6 +555,10 @@ def pick_worker(self, partition: int, workers: Sequence[str]) -> str: return _get_worker_for_range_sharding(self.npartitions, partition, workers) + def validate_data(self, data: pd.DataFrame) -> None: + if set(data.columns) != set(self.meta.columns): + raise ValueError(f"Expected {self.meta.columns=} to match {data.columns=}.") + def create_run_on_worker( self, run_id: int, diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/distributed/shuffle/_worker_plugin.py new/distributed-2024.2.1/distributed/shuffle/_worker_plugin.py --- old/distributed-2024.2.0/distributed/shuffle/_worker_plugin.py 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/distributed/shuffle/_worker_plugin.py 2024-02-23 19:11:48.000000000 +0100 @@ -341,6 +341,7 @@ spec: ShuffleSpec, **kwargs: Any, ) -> int: + spec.validate_data(data) shuffle_run = self.get_or_create_shuffle(spec) return shuffle_run.add_partition( data=data, diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/distributed/shuffle/tests/test_disk_buffer.py new/distributed-2024.2.1/distributed/shuffle/tests/test_disk_buffer.py --- old/distributed-2024.2.0/distributed/shuffle/tests/test_disk_buffer.py 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/distributed/shuffle/tests/test_disk_buffer.py 2024-02-23 19:11:48.000000000 +0100 @@ -8,6 +8,7 @@ import pytest from distributed.shuffle._disk import DiskShardsBuffer +from distributed.shuffle._exceptions import DataUnavailable from distributed.shuffle._limiter import ResourceLimiter from distributed.utils_test import gen_test @@ -32,7 +33,7 @@ x = mf.read("x") y = mf.read("y") - with pytest.raises(KeyError): + with pytest.raises(DataUnavailable): mf.read("z") assert x == b"0" * 2000 @@ -57,7 +58,7 @@ await mf.flush() assert mf.read("1") == b"foo" - with pytest.raises(KeyError): + with pytest.raises(DataUnavailable): mf.read(2) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/distributed/shuffle/tests/test_memory_buffer.py new/distributed-2024.2.1/distributed/shuffle/tests/test_memory_buffer.py --- old/distributed-2024.2.0/distributed/shuffle/tests/test_memory_buffer.py 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/distributed/shuffle/tests/test_memory_buffer.py 2024-02-23 19:11:48.000000000 +0100 @@ -2,6 +2,7 @@ import pytest +from distributed.shuffle._exceptions import DataUnavailable from distributed.shuffle._memory import MemoryShardsBuffer from distributed.utils_test import gen_test @@ -21,7 +22,7 @@ x = mf.read("x") y = mf.read("y") - with pytest.raises(KeyError): + with pytest.raises(DataUnavailable): mf.read("z") assert x == [b"0" * 1000] * 2 @@ -42,7 +43,7 @@ await mf.flush() assert mf.read("1") == [b"foo"] - with pytest.raises(KeyError): + with pytest.raises(DataUnavailable): mf.read("2") diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/distributed/shuffle/tests/test_shuffle.py new/distributed-2024.2.1/distributed/shuffle/tests/test_shuffle.py --- old/distributed-2024.2.0/distributed/shuffle/tests/test_shuffle.py 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/distributed/shuffle/tests/test_shuffle.py 2024-02-23 19:11:48.000000000 +0100 @@ -210,32 +210,6 @@ await check_scheduler_cleanup(s) -@pytest.mark.parametrize("disk", [True, False]) -@gen_cluster(client=True) -async def test_stable_ordering(c, s, a, b, disk): - df = dask.datasets.timeseries( - start="2000-01-01", - end="2000-02-01", - dtypes={"x": int, "y": int}, - freq="10 s", - ) - df["x"] = df["x"] % 19 - df["y"] = df["y"] % 23 - with dask.config.set( - {"dataframe.shuffle.method": "p2p", "distributed.p2p.disk": disk} - ): - shuffled = dd.shuffle.shuffle(df, "x") - result, expected = await c.compute([shuffled, df], sync=True) - dd.assert_eq( - result.drop_duplicates("x", keep="first"), - expected.drop_duplicates("x", keep="first"), - ) - - await check_worker_cleanup(a) - await check_worker_cleanup(b) - await check_scheduler_cleanup(s) - - @pytest.mark.parametrize("processes", [True, False]) @gen_test() async def test_basic_integration_local_cluster(processes): @@ -2505,17 +2479,14 @@ def test_sort_values_with_existing_divisions(client): - "Regression test for #8165" + """Regression test for #8165""" df = pd.DataFrame( {"a": np.random.randint(0, 3, 20), "b": np.random.randint(0, 3, 20)} ) - ddf = dd.from_pandas( - df, - npartitions=4, - ) + ddf = dd.from_pandas(df, npartitions=4) with dask.config.set({"dataframe.shuffle.method": "p2p"}): ddf = ddf.set_index("a").sort_values("b") - result = client.compute(ddf, sync=True) + result = ddf.compute() dd.assert_eq( result, df.set_index("a").sort_values("b"), @@ -2725,3 +2696,76 @@ await wait_for_tasks_in_state("shuffle-transfer", "resumed", 1, barrier_worker) barrier_worker.block_gather_dep.set() await out + + +@pytest.mark.parametrize("disk", [True, False]) +@pytest.mark.parametrize("keep", ["first", "last"]) +@gen_cluster(client=True) +async def test_shuffle_stable_ordering(c, s, a, b, keep, disk): + """Ensures that shuffling guarantees ordering for individual entries + belonging to the same shuffle key""" + + def make_partition(partition_id, size): + """Return null column for every other partition""" + offset = partition_id * size + df = pd.DataFrame({"a": np.arange(start=offset, stop=offset + size)}) + df["b"] = df["a"] % 23 + return df + + df = dd.from_map(make_partition, np.arange(19), args=(250,)) + + with dask.config.set( + {"dataframe.shuffle.method": "p2p", "distributed.p2p.disk": disk} + ): + shuffled = df.shuffle("b") + result, expected = await c.compute([shuffled, df], sync=True) + dd.assert_eq(result, expected) + + for _, group in result.groupby("b"): + assert group["a"].is_monotonic_increasing + + await check_worker_cleanup(a) + await check_worker_cleanup(b) + await check_scheduler_cleanup(s) + + +@pytest.mark.parametrize("disk", [True, False]) +@pytest.mark.parametrize("keep", ["first", "last"]) +@gen_cluster(client=True) +async def test_drop_duplicates_stable_ordering(c, s, a, b, keep, disk): + df = dask.datasets.timeseries() + + with dask.config.set( + {"dataframe.shuffle.method": "p2p", "distributed.p2p.disk": disk} + ): + result, expected = await c.compute( + [ + df.drop_duplicates( + subset=["name"], keep=keep, split_out=df.npartitions + ), + df, + ], + sync=True, + ) + expected = expected.drop_duplicates(subset=["name"], keep=keep) + dd.assert_eq(result, expected) + + +@gen_cluster(client=True) +async def test_wrong_meta_provided(c, s, a, b): + # https://github.com/dask/distributed/issues/8519 + @dask.delayed + def data_gen(): + return pd.DataFrame({"a": range(10)}) + + ddf = dd.from_delayed( + [data_gen()] * 2, meta=[("a", int), ("b", int)], verify_meta=False + ) + + with raises_with_cause( + RuntimeError, + r"shuffling \w* failed", + ValueError, + "meta", + ): + await c.gather(c.compute(ddf.shuffle(on="a"))) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/distributed/tests/test_active_memory_manager.py new/distributed-2024.2.1/distributed/tests/test_active_memory_manager.py --- old/distributed-2024.2.0/distributed/tests/test_active_memory_manager.py 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/distributed/tests/test_active_memory_manager.py 2024-02-23 19:11:48.000000000 +0100 @@ -22,6 +22,7 @@ from distributed.utils_test import ( NO_AMM, BlockedGatherDep, + BlockedGetData, assert_story, async_poll_for, captured_logger, @@ -1129,6 +1130,60 @@ assert dict(w2.data) == {"x": 123, clutter.key: 456} +@gen_cluster( + client=True, + nthreads=[("", 1)] * 10, + config={ + "distributed.scheduler.active-memory-manager.start": True, + "distributed.scheduler.active-memory-manager.interval": 0.05, + "distributed.scheduler.active-memory-manager.measure": "managed", + "distributed.scheduler.active-memory-manager.policies": [], + }, +) +async def test_RetireWorker_mass(c, s, *workers): + """Retire 90% of a cluster at once.""" + # Note: by using scatter instead of submit/map, we're also testing that tasks + # aren't being recomputed + data = await c.scatter(range(100)) + for w in workers: + assert len(w.data) == 10 + + await c.retire_workers([w.address for w in workers[:-1]]) + assert set(s.workers) == {workers[-1].address} + assert len(workers[-1].data) == 100 + + +@gen_cluster( + client=True, + config={ + "distributed.scheduler.active-memory-manager.start": True, + "distributed.scheduler.active-memory-manager.interval": 0.05, + "distributed.scheduler.active-memory-manager.measure": "managed", + "distributed.scheduler.active-memory-manager.policies": [], + }, +) +async def test_RetireWorker_incremental(c, s, w2, w3): + """Retire worker w1; this causes its keys to be replicated onto w2. + Before that can happen, retire w2 too. + """ + async with BlockedGetData(s.address) as w1: + # Note: by using scatter instead of submit/map, we're also testing that tasks + # aren't being recomputed + x = await c.scatter({"x": 1}, workers=[w1.address]) + y = await c.scatter({"y": 2}, workers=[w3.address]) + + # Because w2's memory is lower than w3, AMM will choose w2 + retire1 = asyncio.create_task(c.retire_workers([w1.address])) + await w1.in_get_data.wait() + assert w2.state.tasks["x"].state == "flight" + await c.retire_workers([w2.address]) + + w1.block_get_data.set() + await retire1 + assert set(s.workers) == {w3.address} + assert set(w3.data) == {"x", "y"} + + class Counter: def __init__(self): self.n = 0 @@ -1223,7 +1278,7 @@ self.manager.policies.remove(self) -async def tensordot_stress(c): +async def tensordot_stress(c, s): da = pytest.importorskip("dask.array") rng = da.random.RandomState(0) @@ -1234,6 +1289,10 @@ b = (a @ a.T).sum().round(3) assert await c.compute(b) == 245.394 + # Test that we didn't recompute any tasks during the stress test + await async_poll_for(lambda: not s.tasks, timeout=5) + assert sum(t.start == "memory" for t in s.transition_log) == 1639 + @pytest.mark.slow @gen_cluster( @@ -1245,7 +1304,7 @@ """Test the tensordot_stress helper without AMM. This is to figure out if a stability issue is AMM-specific or not. """ - await tensordot_stress(c) + await tensordot_stress(c, s) @pytest.mark.slow @@ -1267,7 +1326,7 @@ See also: test_ReduceReplicas_stress """ - await tensordot_stress(c) + await tensordot_stress(c, s) @pytest.mark.slow @@ -1288,7 +1347,7 @@ test_drop_stress above, this test does not stop running after a few seconds - the policy must not disrupt the computation too much. """ - await tensordot_stress(c) + await tensordot_stress(c, s) @pytest.mark.slow @@ -1316,7 +1375,7 @@ random.shuffle(addrs) print(f"Removing all workers except {addrs[9]}") - tasks = [asyncio.create_task(tensordot_stress(c))] + tasks = [asyncio.create_task(tensordot_stress(c, s))] await asyncio.sleep(1) tasks.append(asyncio.create_task(c.retire_workers(addrs[0:2]))) await asyncio.sleep(1) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/distributed/tests/test_cancelled_state.py new/distributed-2024.2.1/distributed/tests/test_cancelled_state.py --- old/distributed-2024.2.0/distributed/tests/test_cancelled_state.py 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/distributed/tests/test_cancelled_state.py 2024-02-23 19:11:48.000000000 +0100 @@ -169,7 +169,11 @@ with lock: return x + 1 - async with BrokenWorker(s.address) as a: + async with BrokenWorker( + s.address, + # heartbeat will close a worker after remove_worker(close=False) + heartbeat_interval="100s", + ) as a: await c.wait_for_workers(2) fut1 = c.submit( blockable_compute, @@ -267,7 +271,11 @@ @gen_cluster(client=True, nthreads=[("", 1)]) async def test_in_flight_lost_after_resumed(c, s, b): - async with BlockedGetData(s.address) as a: + async with BlockedGetData( + s.address, + # heartbeat will close a worker after remove_worker(close=False) + heartbeat_interval="100s", + ) as a: fut1 = c.submit(inc, 1, workers=[a.address], key="fut1") # Ensure fut1 is in memory but block any further execution afterwards to # ensure we control when the recomputation happens diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/distributed/tests/test_client.py new/distributed-2024.2.1/distributed/tests/test_client.py --- old/distributed-2024.2.0/distributed/tests/test_client.py 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/distributed/tests/test_client.py 2024-02-23 19:11:48.000000000 +0100 @@ -5975,6 +5975,8 @@ async def test_warn_when_submitting_large_values(c, s): with pytest.warns(UserWarning, match="Sending large graph of size"): future = c.submit(lambda x: x + 1, b"0" * 10_000_000) + with dask.config.set({"distributed.admin.large-graph-warning-threshold": "1GB"}): + future = c.submit(lambda x: x + 1, b"0" * 10_000_000) @gen_cluster(client=True, nthreads=[]) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/distributed/tests/test_core.py new/distributed-2024.2.1/distributed/tests/test_core.py --- old/distributed-2024.2.0/distributed/tests/test_core.py 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/distributed/tests/test_core.py 2024-02-23 19:11:48.000000000 +0100 @@ -2,6 +2,7 @@ import asyncio import contextlib +import logging import os import random import socket @@ -1481,3 +1482,27 @@ assert ledger == list(range(n)) finally: await comm.close() + + +@pytest.mark.slow +@gen_test(timeout=180) +async def test_large_payload(caplog): + """See also: protocol/tests/test_protocol.py::test_large_payload""" + critical_size = 2**31 + 1 # >2 GiB + data = b"0" * critical_size + + async with Server({"echo": echo_serialize}) as server: + await server.listen(0) + comm = await connect(server.address) + + # FIXME https://github.com/dask/distributed/issues/8465 + # At debug level, messages are dumped into the log. By default, pytest captures + # all logs, which would make this test extremely expensive to run. + with caplog.at_level(logging.INFO, logger="distributed.core"): + # Note: if we wrap data in to_serialize, it will be sent as a buffer, which + # is not encoded by msgpack. + await comm.write({"op": "echo", "x": data}) + response = await comm.read() + + assert response["result"] == data + await comm.close() diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/distributed/tests/test_scheduler.py new/distributed-2024.2.1/distributed/tests/test_scheduler.py --- old/distributed-2024.2.0/distributed/tests/test_scheduler.py 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/distributed/tests/test_scheduler.py 2024-02-23 19:11:48.000000000 +0100 @@ -624,7 +624,7 @@ secede() second.wait() - fs = c.map(func, [first] * 5, [second] * 5) + fs = c.map(func, [first] * 5, [second] * 5, key=[f"x{i}" for i in range(5)]) await async_poll_for(lambda: a.state.executing, timeout=5) await first.set() @@ -2641,7 +2641,7 @@ assert s.adaptive_target() == 1 # Long task - x = c.submit(slowinc, 1, delay=0.5) + x = c.submit(slowinc, 1, delay=0.4) while x.key not in s.tasks: await asyncio.sleep(0.01) assert s.adaptive_target(target_duration=".1s") == 1 # still one @@ -4702,3 +4702,180 @@ await asyncio.sleep(0.01) await f + + +@pytest.mark.parametrize("deps", ["same", "less", "more"]) +@gen_cluster(client=True, nthreads=[]) +async def test_resubmit_different_task_same_key_before_previous_is_done(c, s, deps): + """If an intermediate key has a different run_spec (either the callable function or + the dependencies / arguments) that will conflict with what was previously defined, + it should raise an error since this can otherwise break in many different places and + cause either spurious exceptions or even deadlocks. + + In this specific test, the previous run_spec has not been computed yet. + See also test_resubmit_different_task_same_key_after_previous_is_done. + + For a real world example where this can trigger, see + https://github.com/dask/dask/issues/9888 + """ + x1 = c.submit(inc, 1, key="x1") + y_old = c.submit(inc, x1, key="y") + + x1b = x1 if deps != "less" else 2 + x2 = delayed(inc)(10, dask_key_name="x2") if deps == "more" else 11 + y_new = delayed(sum)([x1b, x2], dask_key_name="y") + z = delayed(inc)(y_new, dask_key_name="z") + + with captured_logger("distributed.scheduler", level=logging.WARNING) as log: + fut = c.compute(z) + await wait_for_state("z", "waiting", s) + + assert "Detected different `run_spec` for key 'y'" in log.getvalue() + + async with Worker(s.address): + # Used old run_spec + assert await y_old == 3 + assert await fut == 4 + + +@pytest.mark.parametrize("deps", ["same", "less", "more"]) +@pytest.mark.parametrize("release_previous", [False, True]) +@gen_cluster(client=True) +async def test_resubmit_different_task_same_key_after_previous_is_done( + c, s, a, b, deps, release_previous +): + """Same as test_resubmit_different_task_same_key, but now the replaced task has + already been computed and is either in memory or released, and so are its old + dependencies, so they may need to be recomputed. + """ + x1 = delayed(inc)(1, dask_key_name="x1") + x1fut = c.compute(x1) + y_old = c.submit(inc, x1fut, key="y") + z1 = c.submit(inc, y_old, key="z1") + await wait(z1) + if release_previous: + del x1fut, y_old + await wait_for_state("x1", "released", s) + await wait_for_state("y", "released", s) + + x1b = x1 if deps != "less" else 2 + x2 = delayed(inc)(10, dask_key_name="x2") if deps == "more" else 11 + y_new = delayed(sum)([x1b, x2], dask_key_name="y") + z2 = delayed(inc)(y_new, dask_key_name="z2") + + with captured_logger("distributed.scheduler", level=logging.WARNING) as log: + fut = c.compute(z2) + # Used old run_spec + assert await fut == 4 + assert "x2" not in s.tasks + + # _generate_taskstates won't run for a dependency that's already in memory + has_warning = "Detected different `run_spec` for key 'y'" in log.getvalue() + assert has_warning is (release_previous or deps == "less") + + +@gen_cluster(client=True, nthreads=[]) +async def test_resubmit_different_task_same_key_many_clients(c, s): + """Two different clients submit a task with the same key but different run_spec's.""" + async with Client(s.address, asynchronous=True) as c2: + with captured_logger("distributed.scheduler", level=logging.WARNING) as log: + x1 = c.submit(inc, 1, key="x") + x2 = c2.submit(inc, 2, key="x") + + await wait_for_state("x", ("no-worker", "queued"), s) + who_wants = s.tasks["x"].who_wants + await async_poll_for( + lambda: {cs.client_key for cs in who_wants} == {c.id, c2.id}, timeout=5 + ) + + assert "Detected different `run_spec` for key 'x'" in log.getvalue() + + async with Worker(s.address): + assert await x1 == 2 + assert await x2 == 2 # kept old run_spec + + +@pytest.mark.parametrize( + "before,after,expect_msg", + [ + (object(), 123, True), + (123, object(), True), + (o := object(), o, False), + ], +) +@gen_cluster(client=True, nthreads=[]) +async def test_resubmit_nondeterministic_task_same_deps( + c, s, before, after, expect_msg +): + """Some run_specs can't be tokenized deterministically. Silently skip comparison on + the run_spec when both lhs and rhs are nondeterministic. + Dependencies must be the same. + """ + x1 = c.submit(lambda x: x, before, key="x") + x2 = delayed(lambda x: x)(after, dask_key_name="x") + y = delayed(lambda x: x)(x2, dask_key_name="y") + + with captured_logger("distributed.scheduler", level=logging.WARNING) as log: + fut = c.compute(y) + await async_poll_for(lambda: "y" in s.tasks, timeout=5) + + has_msg = "Detected different `run_spec` for key 'x'" in log.getvalue() + assert has_msg == expect_msg + + async with Worker(s.address): + assert type(await fut) is type(before) + + +@pytest.mark.parametrize("add_deps", [False, True]) +@gen_cluster(client=True, nthreads=[]) +async def test_resubmit_nondeterministic_task_different_deps(c, s, add_deps): + """Some run_specs can't be tokenized deterministically. Silently skip comparison on + the run_spec in those cases. However, fail anyway if dependencies have changed. + """ + o = object() + x1 = c.submit(inc, 1, key="x1") if not add_deps else 2 + x2 = c.submit(inc, 2, key="x2") + y1 = delayed(lambda i, j: i)(x1, o, dask_key_name="y").persist() + y2 = delayed(lambda i, j: i)(x2, o, dask_key_name="y") + z = delayed(inc)(y2, dask_key_name="z") + + with captured_logger("distributed.scheduler", level=logging.WARNING) as log: + fut = c.compute(z) + await wait_for_state("z", "waiting", s) + assert "Detected different `run_spec` for key 'y'" in log.getvalue() + + async with Worker(s.address): + assert await fut == 3 + + +@pytest.mark.parametrize( + "loglevel,expect_loglines", [(logging.DEBUG, 2), (logging.WARNING, 1)] +) +@gen_cluster(client=True, nthreads=[]) +async def test_resubmit_different_task_same_key_warns_only_once( + c, s, loglevel, expect_loglines +): + """If all tasks of a layer are affected by the same run_spec collision, warn + only once. + """ + y1s = c.map(inc, [0, 1, 2], key=[("y", 0), ("y", 1), ("y", 2)]) + dsk = { + "x": 3, + ("y", 0): (inc, "x"), # run_spec and dependencies change + ("y", 1): (inc, 4), # run_spec changes, dependencies don't + ("y", 2): (inc, 2), # Doesn't change + ("z", 0): (inc, ("y", 0)), + ("z", 1): (inc, ("y", 1)), + ("z", 2): (inc, ("y", 2)), + } + with captured_logger("distributed.scheduler", level=loglevel) as log: + zs = c.get(dsk, [("z", 0), ("z", 1), ("z", 2)], sync=False) + await wait_for_state(("z", 2), "waiting", s) + + actual_loglines = len( + re.findall("Detected different `run_spec` for key ", log.getvalue()) + ) + assert actual_loglines == expect_loglines + + async with Worker(s.address): + assert await c.gather(zs) == [2, 3, 4] # Kept old ys diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/distributed/tests/test_worker.py new/distributed-2024.2.1/distributed/tests/test_worker.py --- old/distributed-2024.2.0/distributed/tests/test_worker.py 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/distributed/tests/test_worker.py 2024-02-23 19:11:48.000000000 +0100 @@ -1834,7 +1834,8 @@ Worker=Nanny, worker_kwargs={"heartbeat_interval": "1ms"}, ) -async def test_heartbeat_missing_restarts(c, s, n): +async def test_heartbeat_missing_doesnt_restart(c, s, n): + """Read: https://github.com/dask/distributed/pull/8522""" old_heartbeat_handler = s.handlers["heartbeat_worker"] s.handlers["heartbeat_worker"] = lambda *args, **kwargs: {"status": "missing"} @@ -1843,11 +1844,8 @@ assert not s.workers s.handlers["heartbeat_worker"] = old_heartbeat_handler - - await n.process.running.wait() - assert n.status == Status.running - - await c.wait_for_workers(1) + assert n.status == Status.closing_gracefully + assert not n.process.is_alive() @gen_cluster(nthreads=[]) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/distributed/worker.py new/distributed-2024.2.1/distributed/worker.py --- old/distributed-2024.2.0/distributed/worker.py 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/distributed/worker.py 2024-02-23 19:11:48.000000000 +0100 @@ -1272,12 +1272,15 @@ self._update_latency(end - start) if response["status"] == "missing": - # Scheduler thought we left. Reconnection is not supported, so just shut down. - logger.error( - f"Scheduler was unaware of this worker {self.address!r}. Shutting down." - ) - # Something is out of sync; have the nanny restart us if possible. - await self.close(nanny=False) + # Scheduler thought we left. + # This is a common race condition when the scheduler calls + # remove_worker(); there can be a heartbeat between when the scheduler + # removes the worker on its side and when the {"op": "close"} command + # arrives through batched comms to the worker. + logger.warning("Scheduler was unaware of this worker; shutting down.") + # We close here just for safety's sake - the {op: close} should + # arrive soon anyway. + await self.close(reason="worker-heartbeat-missing") return self.scheduler_delay = response["time"] - middle @@ -1290,7 +1293,7 @@ logger.exception("Failed to communicate with scheduler during heartbeat.") except Exception: logger.exception("Unexpected exception during heartbeat. Closing worker.") - await self.close() + await self.close(reason="worker-heartbeat-error") raise @fail_hard diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2024.2.0/pyproject.toml new/distributed-2024.2.1/pyproject.toml --- old/distributed-2024.2.0/pyproject.toml 2024-02-09 23:58:27.000000000 +0100 +++ new/distributed-2024.2.1/pyproject.toml 2024-02-23 19:11:48.000000000 +0100 @@ -28,7 +28,7 @@ dependencies = [ "click >= 8.0", "cloudpickle >= 1.5.0", - "dask == 2024.2.0", + "dask == 2024.2.1", "jinja2 >= 2.10.3", "locket >= 1.0.0", "msgpack >= 1.0.0", @@ -143,15 +143,14 @@ '''ignore:setDaemon\(\) is deprecated, set the daemon attribute instead:DeprecationWarning:paramiko''', '''ignore:`np.bool8` is a deprecated alias for `np.bool_`''', '''ignore:is_sparse is deprecated and will:FutureWarning''', - # Need bokeh >= 3.3 for https://github.com/bokeh/bokeh/pull/13147 - '''ignore:datetime\.datetime\.utc(fromtimestamp|now)\(\) is deprecated and scheduled for removal in a future version.*:DeprecationWarning:bokeh''', - # https://github.com/tornadoweb/tornado/issues/3334 + # Need Tornado >=6.4 '''ignore:datetime\.datetime\.utc(fromtimestamp|now)\(\) is deprecated and scheduled for removal in a future version.*:DeprecationWarning:tornado''', # https://github.com/dateutil/dateutil/issues/1284 '''ignore:datetime\.datetime\.utc(fromtimestamp|now)\(\) is deprecated and scheduled for removal in a future version.*:DeprecationWarning:dateutil''', # https://github.com/dask/dask/pull/10622 '''ignore:Minimal version of pyarrow will soon be increased to 14.0.1''', '''ignore:the matrix subclass is not the recommended way''', + '''ignore:The current Dask DataFrame implementation is deprecated.*:DeprecationWarning''', ] minversion = "6" markers = [