Script 'mail_helper' called by obssrc Hello community, here is the log from the commit of package python-dask-expr for openSUSE:Factory checked in at 2024-12-06 14:27:28 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-dask-expr (Old) and /work/SRC/openSUSE:Factory/.python-dask-expr.new.28523 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-dask-expr" Fri Dec 6 14:27:28 2024 rev:8 rq:1228730 version:1.1.20 Changes: -------- --- /work/SRC/openSUSE:Factory/python-dask-expr/python-dask-expr.changes 2024-11-27 22:12:41.235848776 +0100 +++ /work/SRC/openSUSE:Factory/.python-dask-expr.new.28523/python-dask-expr.changes 2024-12-06 14:27:54.032317295 +0100 @@ -1,0 +2,20 @@ +Thu Dec 5 21:53:04 UTC 2024 - Ben Greiner <c...@bnavigator.de> + +- Update to 1.1.20 + * Fix value_counts with split_out != 1 (#1170) Patrick Hoefler + * Remove recursion in task spec (#1158) Florian Jetter + * Deprecated and remove from_legacy_dataframe usage (#1168) + Patrick Hoefler + * Remove from_dask_dataframe (#1167) Patrick Hoefler + * Avoid exponentially growing graph for Assign-Projection + combinations (#1164) Patrick Hoefler + * Introduce more caching when walking the expression (#1165) + Patrick Hoefler + * Use Taskspec fuse implementation (#1162) Florian Jetter + * Fix orphaned dependencies in Fused expression (#1163) Patrick + Hoefler +- Add dask-expr-pr1173-blockwise.patch + * Use new blockwise unpack collection in array + * gh#dask/dask-expr#1173 + +------------------------------------------------------------------- Old: ---- dask_expr-1.1.19-gh.tar.gz New: ---- dask-expr-pr1173-blockwise.patch dask_expr-1.1.20-gh.tar.gz BETA DEBUG BEGIN: New: Hoefler - Add dask-expr-pr1173-blockwise.patch * Use new blockwise unpack collection in array BETA DEBUG END: ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-dask-expr.spec ++++++ --- /var/tmp/diff_new_pack.NWHnRb/_old 2024-12-06 14:27:54.576340207 +0100 +++ /var/tmp/diff_new_pack.NWHnRb/_new 2024-12-06 14:27:54.580340376 +0100 @@ -26,12 +26,14 @@ %bcond_with test %endif Name: python-dask-expr%{psuffix} -Version: 1.1.19 +Version: 1.1.20 Release: 0 Summary: High Level Expressions for Dask License: BSD-3-Clause URL: https://github.com/dask/dask-expr Source0: https://github.com/dask/dask-expr/archive/refs/tags/v%{version}.tar.gz#/dask_expr-%{version}-gh.tar.gz +# PATCH-FIX-UPSTREAM dask-expr-pr1173-blockwise.patch gh#dask/dask-expr#1173 +Patch0: https://github.com/dask/dask-expr/pull/1173.patch#/dask-expr-pr1173-blockwise.patch BuildRequires: %{python_module base >= 3.10} BuildRequires: %{python_module pip} BuildRequires: %{python_module setuptools >= 62.6} @@ -39,7 +41,7 @@ BuildRequires: %{python_module wheel} BuildRequires: fdupes BuildRequires: python-rpm-macros -Requires: python-dask = 2024.11.2 +Requires: python-dask = 2024.12.0 Requires: python-pandas >= 2 Requires: python-pyarrow >= 14.0.1 Provides: python-dask_expr = %{version}-%{release} ++++++ dask-expr-pr1173-blockwise.patch ++++++ >From 7b6d178a31cdc52816908ba93aae3f6e3bbae680 Mon Sep 17 00:00:00 2001 From: James Bourbeau <jrbourb...@gmail.com> Date: Wed, 4 Dec 2024 10:41:15 -0600 Subject: [PATCH 1/2] Use new blockwise unpack collection in array --- dask_expr/array/blockwise.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dask_expr/array/blockwise.py b/dask_expr/array/blockwise.py index 838539e0..23792434 100644 --- a/dask_expr/array/blockwise.py +++ b/dask_expr/array/blockwise.py @@ -18,7 +18,7 @@ from dask.array.utils import compute_meta from dask.base import is_dask_collection from dask.blockwise import blockwise as core_blockwise -from dask.delayed import unpack_collections +from dask.blockwise import _blockwise_unpack_collections_task_spec from dask.tokenize import tokenize from dask.utils import cached_property, funcname @@ -142,7 +142,7 @@ def _layer(self): for arg, ind in arginds: if ind is None: arg = normalize_arg(arg) - arg, collections = unpack_collections(arg) + arg, collections = _blockwise_unpack_collections_task_spec(arg) dependencies.extend(collections) else: if ( @@ -163,7 +163,7 @@ def _layer(self): kwargs2 = {} for k, v in self.kwargs.items(): v = normalize_arg(v) - v, collections = unpack_collections(v) + v, collections = _blockwise_unpack_collections_task_spec(v) dependencies.extend(collections) kwargs2[k] = v >From fd6f081bcce4f36190b87ce26ae278cc3de71d04 Mon Sep 17 00:00:00 2001 From: James Bourbeau <jrbourb...@gmail.com> Date: Wed, 4 Dec 2024 10:46:47 -0600 Subject: [PATCH 2/2] Lint --- dask_expr/array/blockwise.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_expr/array/blockwise.py b/dask_expr/array/blockwise.py index 23792434..4553f05c 100644 --- a/dask_expr/array/blockwise.py +++ b/dask_expr/array/blockwise.py @@ -17,8 +17,8 @@ ) from dask.array.utils import compute_meta from dask.base import is_dask_collection -from dask.blockwise import blockwise as core_blockwise from dask.blockwise import _blockwise_unpack_collections_task_spec +from dask.blockwise import blockwise as core_blockwise from dask.tokenize import tokenize from dask.utils import cached_property, funcname ++++++ dask_expr-1.1.19-gh.tar.gz -> dask_expr-1.1.20-gh.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-expr-1.1.19/changes.md new/dask-expr-1.1.20/changes.md --- old/dask-expr-1.1.19/changes.md 2024-11-13 16:16:32.000000000 +0100 +++ new/dask-expr-1.1.20/changes.md 2024-12-03 23:40:36.000000000 +0100 @@ -1,5 +1,20 @@ ## Dask-expr +# v1.1.20 + +- Fix value_counts with split_out != 1 (:pr:`1170`) `Patrick Hoefler`_ +- Remove recursion in task spec (:pr:`1158`) `Florian Jetter`_ +- Deprecated and remove from_legacy_dataframe usage (:pr:`1168`) `Patrick Hoefler`_ +- Remove ``from_dask_dataframe`` (:pr:`1167`) `Patrick Hoefler`_ +- Avoid exponentially growing graph for Assign-Projection combinations (:pr:`1164`) `Patrick Hoefler`_ +- Introduce more caching when walking the expression (:pr:`1165`) `Patrick Hoefler`_ +- Use Taskspec fuse implementation (:pr:`1162`) `Florian Jetter`_ +- Fix orphaned dependencies in Fused expression (:pr:`1163`) `Patrick Hoefler`_ + +# v1.1.19 + +# v1.1.18 + # v1.1.17 - Add support for Python 3.13 (:pr:`1160`) `James Bourbeau`_ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/_collection.py new/dask-expr-1.1.20/dask_expr/_collection.py --- old/dask-expr-1.1.19/dask_expr/_collection.py 2024-11-13 16:16:32.000000000 +0100 +++ new/dask-expr-1.1.20/dask_expr/_collection.py 2024-12-03 23:40:36.000000000 +0100 @@ -45,7 +45,7 @@ meta_series_constructor, pyarrow_strings_enabled, ) -from dask.delayed import delayed +from dask.delayed import Delayed, delayed from dask.utils import ( IndexCallable, M, @@ -66,6 +66,7 @@ from pandas.api.types import is_bool_dtype, is_datetime64_any_dtype, is_numeric_dtype from pandas.api.types import is_scalar as pd_is_scalar from pandas.api.types import is_timedelta64_dtype +from pandas.core.dtypes.common import is_extension_array_dtype from pyarrow import fs as pa_fs from tlz import first @@ -159,9 +160,7 @@ if isinstance(other, FrameBase): other = other.expr elif isinstance(other, da.Array): - other = from_dask_array( - other, index=self.index.to_legacy_dataframe(), columns=self.columns - ) + other = from_dask_array(other, index=self.index, columns=self.columns) if self.ndim == 1 and len(self.columns): other = other[self.columns[0]] @@ -1371,11 +1370,9 @@ Repartition(self, npartitions, divisions, force, partition_size, freq) ) - def to_dask_dataframe(self, *args, **kwargs) -> _Frame: + def to_legacy_dataframe(self, optimize: bool = True, **optimize_kwargs) -> _Frame: """Convert to a legacy dask-dataframe collection - WARNING: This API is deprecated. Please use `to_legacy_dataframe`. - Parameters ---------- optimize @@ -1384,21 +1381,11 @@ Key-word arguments to pass through to `optimize`. """ warnings.warn( - "`to_dask_dataframe` is deprecated, please use `to_legacy_dataframe`.", + "to_legacy_dataframe is deprecated and will be removed in a future release. " + "The legacy implementation as a whole is deprecated and will be removed, making " + "this method unnecessary.", FutureWarning, ) - return self.to_legacy_dataframe(*args, **kwargs) - - def to_legacy_dataframe(self, optimize: bool = True, **optimize_kwargs) -> _Frame: - """Convert to a legacy dask-dataframe collection - - Parameters - ---------- - optimize - Whether to optimize the underlying `Expr` object before conversion. - **optimize_kwargs - Key-word arguments to pass through to `optimize`. - """ df = self.optimize(**optimize_kwargs) if optimize else self return new_dd_object(df.dask, df._name, df._meta, df.divisions) @@ -1430,9 +1417,18 @@ ------- A Dask Array """ - return self.to_legacy_dataframe(optimize, **optimize_kwargs).to_dask_array( - lengths=lengths, meta=meta - ) + if lengths is True: + lengths = tuple(self.map_partitions(len, enforce_metadata=False).compute()) + + arr = self.values + + chunks = self._validate_chunks(arr, lengths) + arr._chunks = chunks + + if meta is not None: + arr._meta = meta + + return arr @property def values(self): @@ -1442,7 +1438,13 @@ Operations that depend on shape information, like slicing or reshaping, will not work. """ - return self.to_dask_array() + if is_extension_array_dtype(self._meta.values): + warnings.warn( + "Dask currently has limited support for converting pandas extension dtypes " + f"to arrays. Converting {self._meta.values.dtype} to object dtype.", + UserWarning, + ) + return self.map_partitions(methods.values) def __divmod__(self, other): result = self.expr.__divmod__(other) @@ -2460,15 +2462,38 @@ if lengths is True: lengths = tuple(self.map_partitions(len).compute()) + records = to_records(self) - frame = self.to_legacy_dataframe() - records = to_records(frame) - - chunks = frame._validate_chunks(records, lengths) + chunks = self._validate_chunks(records, lengths) records._chunks = (chunks[0],) return records + def _validate_chunks(self, arr, lengths): + from collections.abc import Sequence + + from dask.array.core import normalize_chunks + + if isinstance(lengths, Sequence): + lengths = tuple(lengths) + + if len(lengths) != self.npartitions: + raise ValueError( + "The number of items in 'lengths' does not match the number of " + f"partitions. {len(lengths)} != {self.npartitions}" + ) + + if self.ndim == 1: + chunks = normalize_chunks((lengths,)) + else: + chunks = normalize_chunks((lengths, (len(self.columns),))) + + return chunks + elif lengths is not None: + raise ValueError(f"Unexpected value for 'lengths': '{lengths}'") + + return arr._chunks + def to_bag(self, index=False, format="tuple"): """Create a Dask Bag from a Series""" from dask_expr.io.bag import to_bag @@ -2498,7 +2523,13 @@ -------- dask_expr.from_delayed """ - return self.to_legacy_dataframe().to_delayed(optimize_graph=optimize_graph) + if optimize_graph: + frame = self.optimize() + else: + frame = self + keys = frame.__dask_keys__() + graph = frame.__dask_graph__() + return [Delayed(k, graph) for k in keys] def to_backend(self, backend: str | None = None, **kwargs): """Move to a new DataFrame backend @@ -2812,9 +2843,7 @@ "Number of partitions do not match " f"({v.npartitions} != {result.npartitions})" ) - v = from_dask_array( - v, index=result.index.to_legacy_dataframe(), meta=result._meta - ) + v = from_dask_array(v, index=result.index, meta=result._meta) else: raise TypeError(f"Column assignment doesn't support type {type(v)}") args.extend([k, v]) @@ -4797,6 +4826,9 @@ def dtype(self): return pd.Series(self._meta).dtype + def to_delayed(self, optimize_graph=True): + return super().to_delayed(optimize_graph=optimize_graph)[0] + def new_collection(expr): """Create new collection from an expr""" @@ -5020,31 +5052,20 @@ ) -def from_dask_dataframe(*args, **kwargs) -> FrameBase: +def from_legacy_dataframe(ddf: _Frame, optimize: bool = True) -> FrameBase: """Create a dask-expr collection from a legacy dask-dataframe collection - WARNING: This API is deprecated. Please use `from_legacy_dataframe`. - Parameters ---------- optimize Whether to optimize the graph before conversion. """ warnings.warn( - "`from_dask_dataframe` is deprecated, please use `from_legacy_dataframe`.", + "from_legacy_dataframe is deprecated and will be removed in a future release. " + "The legacy implementation as a whole is deprecated and will be removed, making " + "this method unnecessary.", FutureWarning, ) - return from_legacy_dataframe(*args, **kwargs) - - -def from_legacy_dataframe(ddf: _Frame, optimize: bool = True) -> FrameBase: - """Create a dask-expr collection from a legacy dask-dataframe collection - - Parameters - ---------- - optimize - Whether to optimize the graph before conversion. - """ graph = ddf.dask if optimize: graph = ddf.__dask_optimize__(graph, ddf.__dask_keys__()) @@ -5100,12 +5121,9 @@ """ from dask.dataframe.io import from_dask_array - if isinstance(index, FrameBase): - index = index.to_legacy_dataframe() if columns is not None and isinstance(columns, list) and not len(columns): columns = None - df = from_dask_array(x, columns=columns, index=index, meta=meta) - return from_legacy_dataframe(df, optimize=True) + return from_dask_array(x, columns=columns, index=index, meta=meta) @dataframe_creation_dispatch.register_inplace("pandas") diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/_core.py new/dask-expr-1.1.20/dask_expr/_core.py --- old/dask-expr-1.1.19/dask_expr/_core.py 2024-11-13 16:16:32.000000000 +0100 +++ new/dask-expr-1.1.20/dask_expr/_core.py 2024-12-03 23:40:36.000000000 +0100 @@ -160,17 +160,26 @@ raise RuntimeError(f"Serializing a {type(self)} object") return type(self), tuple(self.operands) - def _depth(self): + def _depth(self, cache=None): """Depth of the expression tree Returns ------- depth: int """ + if cache is None: + cache = {} if not self.dependencies(): return 1 else: - return max(expr._depth() for expr in self.dependencies()) + 1 + result = [] + for expr in self.dependencies(): + if expr._name in cache: + result.append(cache[expr._name]) + else: + result.append(expr._depth(cache) + 1) + cache[expr._name] = result[-1] + return max(result) def operand(self, key): # Access an operand unambiguously @@ -242,7 +251,7 @@ for i in range(self.npartitions) } - def rewrite(self, kind: str): + def rewrite(self, kind: str, rewritten): """Rewrite an expression This leverages the ``._{kind}_down`` and ``._{kind}_up`` @@ -255,6 +264,9 @@ changed: whether or not any change occured """ + if self._name in rewritten: + return rewritten[self._name] + expr = self down_name = f"_{kind}_down" up_name = f"_{kind}_up" @@ -291,7 +303,8 @@ changed = False for operand in expr.operands: if isinstance(operand, Expr): - new = operand.rewrite(kind=kind) + new = operand.rewrite(kind=kind, rewritten=rewritten) + rewritten[operand._name] = new if new._name != operand._name: changed = True else: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/_expr.py new/dask-expr-1.1.20/dask_expr/_expr.py --- old/dask-expr-1.1.19/dask_expr/_expr.py 2024-11-13 16:16:32.000000000 +0100 +++ new/dask-expr-1.1.20/dask_expr/_expr.py 2024-12-03 23:40:36.000000000 +0100 @@ -1899,6 +1899,16 @@ # don't squash if we are using a column that was previously created return return Assign(*self.frame.operands, *self.operands[1:]) + elif isinstance(self.frame, Projection) and isinstance( + self.frame.frame, Assign + ): + if self._check_for_previously_created_column(self.frame.frame): + return + new_columns = self.frame.operands[1].copy() + new_columns.extend(self.keys) + return Projection( + Assign(*self.frame.frame.operands, *self.operands[1:]), new_columns + ) def _check_for_previously_created_column(self, child): input_columns = [] @@ -3053,7 +3063,7 @@ return expr # Manipulate Expression to make it more efficient - expr = expr.rewrite(kind="tune") + expr = expr.rewrite(kind="tune", rewritten={}) if stage == "tuned-logical": return expr @@ -3193,15 +3203,14 @@ dependents[next._name] = set() expr_mapping[next._name] = next - for operand in next.operands: - if isinstance(operand, Expr): - stack.append(operand) - if is_valid_blockwise_op(operand): - if next._name in dependencies: - dependencies[next._name].add(operand._name) - dependents[operand._name].add(next._name) - expr_mapping[operand._name] = operand - expr_mapping[next._name] = next + for operand in next.dependencies(): + stack.append(operand) + if is_valid_blockwise_op(operand): + if next._name in dependencies: + dependencies[next._name].add(operand._name) + dependents[operand._name].add(next._name) + expr_mapping[operand._name] = operand + expr_mapping[next._name] = next # Traverse each "root" until we find a fusable sub-group. # Here we use root to refer to a Blockwise Expr node that @@ -3767,31 +3776,16 @@ def _task(self, name: Key, index: int) -> Task: internal_tasks = [] - seen_keys = set() - external_deps = set() for _expr in self.exprs: if self._broadcast_dep(_expr): subname = (_expr._name, 0) else: subname = (_expr._name, index) t = _expr._task(subname, subname[1]) + assert t.key == subname internal_tasks.append(t) - seen_keys.add(subname) - external_deps.update(t.dependencies) - external_deps -= seen_keys - dependencies = {dep: TaskRef(dep) for dep in external_deps} - t = Task( - name, - Fused._execute_internal_graph, - # Wrap the actual subgraph as a data node such that the tasks are - # not erroneously parsed. The external task would otherwise carry - # the internal keys as dependencies which is not satisfiable - DataNode(None, internal_tasks), - dependencies, - (self.exprs[0]._name, index), - ) - return t + return Task.fuse(*internal_tasks, key=name) @staticmethod def _execute_internal_graph(internal_tasks, dependencies, outkey): diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/_indexing.py new/dask-expr-1.1.20/dask_expr/_indexing.py --- old/dask-expr-1.1.19/dask_expr/_indexing.py 2024-11-13 16:16:32.000000000 +0100 +++ new/dask-expr-1.1.20/dask_expr/_indexing.py 2024-12-03 23:40:36.000000000 +0100 @@ -18,7 +18,8 @@ from pandas.api.types import is_bool_dtype from pandas.errors import IndexingError -from dask_expr._collection import Series, from_legacy_dataframe, new_collection +from dask_expr import from_dask_array +from dask_expr._collection import Series, new_collection from dask_expr._expr import ( Blockwise, MaybeAlignPartitions, @@ -98,6 +99,8 @@ elif is_series_like(iindexer) and not is_bool_dtype(iindexer.dtype): return new_collection(LocList(self.obj, iindexer.values, cindexer)) elif isinstance(iindexer, list) or is_arraylike(iindexer): + if len(iindexer) == 0: + return new_collection(LocEmpty(self.obj._meta, cindexer)) return new_collection(LocList(self.obj, iindexer, cindexer)) else: # element should raise KeyError @@ -132,9 +135,7 @@ return new_collection(Loc(frame, iindexer)) def _loc_array(self, iindexer, cindexer): - iindexer_series = from_legacy_dataframe( - iindexer.to_dask_dataframe("_", self.obj.index.to_legacy_dataframe()) - ) + iindexer_series = from_dask_array(iindexer, columns="_", index=self.obj.index) return self._loc_series(iindexer_series, cindexer, check_alignment=False) def _maybe_partial_time_string(self, iindexer, unit): @@ -250,6 +251,26 @@ return self._layer_information[0] +class LocEmpty(LocList): + _parameters = ["meta", "cindexer"] + + def _lower(self): + return None + + @functools.cached_property + def _meta(self): + if self.cindexer is None: + return self.operand("meta") + else: + return self.operand("meta").loc[:, self.cindexer] + + @functools.cached_property + def _layer_information(self): + divisions = [None, None] + dsk = {(self._name, 0): DataNode((self._name, 0), self._meta)} + return dsk, divisions + + class LocSlice(LocBase): @functools.cached_property def start(self): diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/_merge.py new/dask-expr-1.1.20/dask_expr/_merge.py --- old/dask-expr-1.1.19/dask_expr/_merge.py 2024-11-13 16:16:32.000000000 +0100 +++ new/dask-expr-1.1.20/dask_expr/_merge.py 2024-12-03 23:40:36.000000000 +0100 @@ -680,7 +680,7 @@ _barrier_key_left, p2p_barrier, token_left, - transfer_keys_left, + *transfer_keys_left, spec=DataFrameShuffleSpec( id=token_left, npartitions=self.npartitions, @@ -698,7 +698,7 @@ _barrier_key_right, p2p_barrier, token_right, - transfer_keys_right, + *transfer_keys_right, spec=DataFrameShuffleSpec( id=token_right, npartitions=self.npartitions, diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/_reductions.py new/dask-expr-1.1.20/dask_expr/_reductions.py --- old/dask-expr-1.1.19/dask_expr/_reductions.py 2024-11-13 16:16:32.000000000 +0100 +++ new/dask-expr-1.1.20/dask_expr/_reductions.py 2024-12-03 23:40:36.000000000 +0100 @@ -244,7 +244,8 @@ # Reset the index if we we used it for shuffling if split_by_index: - shuffled = SetIndexBlockwise(shuffled, split_by, True, None) + idx = list(self._meta.index.names) if split_by != ["index"] else split_by + shuffled = SetIndexBlockwise(shuffled, idx, True, None) # Convert back to Series if necessary if self.shuffle_by_index is not False: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/_shuffle.py new/dask-expr-1.1.20/dask_expr/_shuffle.py --- old/dask-expr-1.1.19/dask_expr/_shuffle.py 2024-11-13 16:16:32.000000000 +0100 +++ new/dask-expr-1.1.20/dask_expr/_shuffle.py 2024-12-03 23:40:36.000000000 +0100 @@ -592,7 +592,7 @@ _barrier_key, p2p_barrier, token, - transfer_keys, + *transfer_keys, spec=DataFrameShuffleSpec( id=shuffle_id, npartitions=self.npartitions_out, diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/_version.py new/dask-expr-1.1.20/dask_expr/_version.py --- old/dask-expr-1.1.19/dask_expr/_version.py 2024-11-13 16:16:32.000000000 +0100 +++ new/dask-expr-1.1.20/dask_expr/_version.py 2024-12-03 23:40:36.000000000 +0100 @@ -26,9 +26,9 @@ # setup.py/versioneer.py will grep for the variable names, so they must # each be defined on a line of their own. _version.py will just call # get_keywords(). - git_refnames = " (tag: v1.1.19)" - git_full = "735fc8904832c723680c5c9ef5f89e01e622a076" - git_date = "2024-11-13 16:16:32 +0100" + git_refnames = " (tag: v1.1.20)" + git_full = "daef4a273acfb117ebcbbac610103372ebade7c6" + git_date = "2024-12-03 16:40:36 -0600" keywords = {"refnames": git_refnames, "full": git_full, "date": git_date} return keywords diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/_delayed.py new/dask-expr-1.1.20/dask_expr/io/_delayed.py --- old/dask-expr-1.1.19/dask_expr/io/_delayed.py 2024-11-13 16:16:32.000000000 +0100 +++ new/dask-expr-1.1.20/dask_expr/io/_delayed.py 2024-12-03 23:40:36.000000000 +0100 @@ -4,13 +4,14 @@ from collections.abc import Iterable from typing import TYPE_CHECKING +import pandas as pd from dask._task_spec import Alias, Task, TaskRef from dask.dataframe.dispatch import make_meta -from dask.dataframe.utils import check_meta +from dask.dataframe.utils import check_meta, pyarrow_strings_enabled from dask.delayed import Delayed, delayed from dask.typing import Key -from dask_expr._expr import DelayedsExpr, PartitionsFiltered +from dask_expr._expr import ArrowStringConversion, DelayedsExpr, PartitionsFiltered from dask_expr._util import _tokenize_deterministic from dask_expr.io import BlockwiseIO @@ -141,8 +142,12 @@ from dask_expr._collection import new_collection - return new_collection( - FromDelayed( - DelayedsExpr(*dfs), make_meta(meta), divisions, verify_meta, None, prefix - ) + result = FromDelayed( + DelayedsExpr(*dfs), make_meta(meta), divisions, verify_meta, None, prefix ) + if pyarrow_strings_enabled() and any( + pd.api.types.is_object_dtype(dtype) + for dtype in (result.dtypes.values if result.ndim == 2 else [result.dtypes]) + ): + return new_collection(ArrowStringConversion(result)) + return new_collection(result) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/bag.py new/dask-expr-1.1.20/dask_expr/io/bag.py --- old/dask-expr-1.1.19/dask_expr/io/bag.py 2024-11-13 16:16:32.000000000 +0100 +++ new/dask-expr-1.1.20/dask_expr/io/bag.py 2024-12-03 23:40:36.000000000 +0100 @@ -1,4 +1,42 @@ +from dask.dataframe.io.io import _df_to_bag +from dask.tokenize import tokenize + +from dask_expr import FrameBase + + def to_bag(df, index=False, format="tuple"): - from dask.dataframe.io import to_bag as _to_bag + """Create Dask Bag from a Dask DataFrame + + Parameters + ---------- + index : bool, optional + If True, the elements are tuples of ``(index, value)``, otherwise + they're just the ``value``. Default is False. + format : {"tuple", "dict", "frame"}, optional + Whether to return a bag of tuples, dictionaries, or + dataframe-like objects. Default is "tuple". If "frame", + the original partitions of ``df`` will not be transformed + in any way. + + + Examples + -------- + >>> bag = df.to_bag() # doctest: +SKIP + """ + from dask.bag.core import Bag + + df = df.optimize() - return _to_bag(df.to_legacy_dataframe(), index=index, format=format) + if not isinstance(df, FrameBase): + raise TypeError("df must be either DataFrame or Series") + name = "to_bag-" + tokenize(df._name, index, format) + if format == "frame": + dsk = df.dask + name = df._name + else: + dsk = { + (name, i): (_df_to_bag, block, index, format) + for (i, block) in enumerate(df.__dask_keys__()) + } + dsk.update(df.__dask_graph__()) + return Bag(dsk, name, df.npartitions) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/csv.py new/dask-expr-1.1.20/dask_expr/io/csv.py --- old/dask-expr-1.1.19/dask_expr/io/csv.py 2024-11-13 16:16:32.000000000 +0100 +++ new/dask-expr-1.1.20/dask_expr/io/csv.py 2024-12-03 23:40:36.000000000 +0100 @@ -280,7 +280,7 @@ from dask.dataframe.io.csv import to_csv as _to_csv return _to_csv( - df.to_legacy_dataframe(), + df.optimize(), filename, single_file=single_file, encoding=encoding, diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/hdf.py new/dask-expr-1.1.20/dask_expr/io/hdf.py --- old/dask-expr-1.1.19/dask_expr/io/hdf.py 2024-11-13 16:16:32.000000000 +0100 +++ new/dask-expr-1.1.20/dask_expr/io/hdf.py 2024-12-03 23:40:36.000000000 +0100 @@ -1,6 +1,3 @@ -from dask_expr import from_legacy_dataframe - - def read_hdf( pattern, key, @@ -14,7 +11,7 @@ ): from dask.dataframe.io import read_hdf as _read_hdf - df = _read_hdf( + return _read_hdf( pattern, key, start=start, @@ -25,7 +22,6 @@ lock=lock, mode=mode, ) - return from_legacy_dataframe(df) def to_hdf( @@ -130,7 +126,7 @@ from dask.dataframe.io import to_hdf as _to_hdf return _to_hdf( - df.to_legacy_dataframe(), + df.optimize(), path, key, mode=mode, diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/io.py new/dask-expr-1.1.20/dask_expr/io/io.py --- old/dask-expr-1.1.19/dask_expr/io/io.py 2024-11-13 16:16:32.000000000 +0100 +++ new/dask-expr-1.1.20/dask_expr/io/io.py 2024-12-03 23:40:36.000000000 +0100 @@ -6,7 +6,7 @@ import numpy as np import pyarrow as pa -from dask._task_spec import Task +from dask._task_spec import List, Task from dask.dataframe import methods from dask.dataframe._pyarrow import to_pyarrow_string from dask.dataframe.core import apply_and_enforce, is_dataframe_like, make_meta @@ -135,7 +135,7 @@ bucket = self._fusion_buckets[index] # FIXME: This will likely require a wrapper return Task( - name, methods.concat, [expr._filtered_task(name, i) for i in bucket] + name, methods.concat, List(*(expr._filtered_task(name, i) for i in bucket)) ) @functools.cached_property diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/json.py new/dask-expr-1.1.20/dask_expr/io/json.py --- old/dask-expr-1.1.19/dask_expr/io/json.py 2024-11-13 16:16:32.000000000 +0100 +++ new/dask-expr-1.1.20/dask_expr/io/json.py 2024-12-03 23:40:36.000000000 +0100 @@ -1,7 +1,6 @@ import pandas as pd from dask.dataframe.utils import insert_meta_param_description -from dask_expr import from_legacy_dataframe from dask_expr._backends import dataframe_creation_dispatch @@ -98,7 +97,7 @@ """ from dask.dataframe.io.json import read_json - df = read_json( + return read_json( url_path, orient=orient, lines=lines, @@ -114,7 +113,6 @@ path_converter=path_converter, **kwargs, ) - return from_legacy_dataframe(df) def to_json( @@ -172,7 +170,7 @@ from dask.dataframe.io.json import to_json return to_json( - df.to_legacy_dataframe(), + df, url_path, orient=orient, lines=lines, diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/orc.py new/dask-expr-1.1.20/dask_expr/io/orc.py --- old/dask-expr-1.1.19/dask_expr/io/orc.py 2024-11-13 16:16:32.000000000 +0100 +++ new/dask-expr-1.1.20/dask_expr/io/orc.py 2024-12-03 23:40:36.000000000 +0100 @@ -1,4 +1,3 @@ -from dask_expr import from_legacy_dataframe from dask_expr._backends import dataframe_creation_dispatch @@ -48,7 +47,7 @@ """ from dask.dataframe.io import read_orc as _read_orc - df = _read_orc( + return _read_orc( path, engine=engine, columns=columns, @@ -57,7 +56,6 @@ aggregate_files=aggregate_files, storage_options=storage_options, ) - return from_legacy_dataframe(df) def to_orc( @@ -72,7 +70,7 @@ from dask.dataframe.io import to_orc as _to_orc return _to_orc( - df.to_legacy_dataframe(), + df.optimize(), path, engine=engine, write_index=write_index, diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/parquet.py new/dask-expr-1.1.20/dask_expr/io/parquet.py --- old/dask-expr-1.1.19/dask_expr/io/parquet.py 2024-11-13 16:16:32.000000000 +0100 +++ new/dask-expr-1.1.20/dask_expr/io/parquet.py 2024-12-03 23:40:36.000000000 +0100 @@ -593,7 +593,7 @@ # Engine-specific initialization steps to write the dataset. # Possibly create parquet metadata, and load existing stuff if appending i_offset, fmd, metadata_file_exists, extra_write_kwargs = engine.initialize_write( - df.to_legacy_dataframe(), + df, fs, path, append=append, diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/sql.py new/dask-expr-1.1.20/dask_expr/io/sql.py --- old/dask-expr-1.1.19/dask_expr/io/sql.py 2024-11-13 16:16:32.000000000 +0100 +++ new/dask-expr-1.1.20/dask_expr/io/sql.py 2024-12-03 23:40:36.000000000 +0100 @@ -1,6 +1,3 @@ -from dask_expr import from_legacy_dataframe - - def read_sql(sql, con, index_col, **kwargs): """ Read SQL query or database table into a DataFrame. @@ -36,8 +33,7 @@ """ from dask.dataframe.io.sql import read_sql - df = read_sql(sql, con, index_col, **kwargs) - return from_legacy_dataframe(df) + return read_sql(sql, con, index_col, **kwargs) def read_sql_table( @@ -122,7 +118,7 @@ """ from dask.dataframe.io.sql import read_sql_table as _read_sql_table - df = _read_sql_table( + return _read_sql_table( table_name, con, index_col, @@ -137,7 +133,6 @@ engine_kwargs=engine_kwargs, **kwargs, ) - return from_legacy_dataframe(df) def read_sql_query( @@ -210,7 +205,7 @@ """ from dask.dataframe.io.sql import read_sql_query as _read_sql_query - df = _read_sql_query( + return _read_sql_query( sql, con, index_col, @@ -223,7 +218,6 @@ engine_kwargs=engine_kwargs, **kwargs, ) - return from_legacy_dataframe(df) def to_sql( @@ -354,7 +348,7 @@ from dask.dataframe.io.sql import to_sql as _to_sql return _to_sql( - df.to_legacy_dataframe(), + df, name=name, uri=uri, schema=schema, diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/tests/test_distributed.py new/dask-expr-1.1.20/dask_expr/io/tests/test_distributed.py --- old/dask-expr-1.1.19/dask_expr/io/tests/test_distributed.py 2024-11-13 16:16:32.000000000 +0100 +++ new/dask-expr-1.1.20/dask_expr/io/tests/test_distributed.py 2024-12-03 23:40:36.000000000 +0100 @@ -63,4 +63,4 @@ df = read_parquet(tmpdir, filesystem=filesystem) from distributed.protocol import dumps - assert len(b"".join(dumps(df.optimize().dask))) <= 9000 + assert len(b"".join(dumps(df.optimize().dask))) <= 9100 diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/tests/test_io.py new/dask-expr-1.1.20/dask_expr/io/tests/test_io.py --- old/dask-expr-1.1.19/dask_expr/io/tests/test_io.py 2024-11-13 16:16:32.000000000 +0100 +++ new/dask-expr-1.1.20/dask_expr/io/tests/test_io.py 2024-12-03 23:40:36.000000000 +0100 @@ -14,7 +14,6 @@ DataFrame, from_array, from_dask_array, - from_dask_dataframe, from_dict, from_legacy_dataframe, from_map, @@ -231,29 +230,21 @@ @pytest.mark.parametrize("optimize", [True, False]) def test_from_legacy_dataframe(optimize): ddf = dd.from_dict({"a": range(100)}, npartitions=10) - df = from_legacy_dataframe(ddf, optimize=optimize) + with pytest.warns(FutureWarning, match="is deprecated"): + df = from_legacy_dataframe(ddf, optimize=optimize) assert isinstance(df.expr, Expr) assert_eq(df, ddf) - # Check deprecated API - with pytest.warns(FutureWarning, match="deprecated"): - df2 = from_dask_dataframe(ddf, optimize=optimize) - assert_eq(df, df2) - @pytest.mark.parametrize("optimize", [True, False]) def test_to_legacy_dataframe(optimize): pdf = pd.DataFrame({"x": [1, 4, 3, 2, 0, 5]}) df = from_pandas(pdf, npartitions=2) - ddf = df.to_legacy_dataframe(optimize=optimize) + with pytest.warns(FutureWarning, match="is deprecated"): + ddf = df.to_legacy_dataframe(optimize=optimize) assert isinstance(ddf, dd.core.DataFrame) assert_eq(df, ddf) - # Check deprecated API - with pytest.warns(FutureWarning, match="deprecated"): - ddf2 = df.to_dask_dataframe(optimize=optimize) - assert_eq(ddf, ddf2) - @pytest.mark.parametrize("optimize", [True, False]) def test_to_dask_array(optimize): @@ -470,3 +461,41 @@ obj = meta.schema assert normalizer(obj) == normalizer(obj) + + +@pytest.mark.parametrize("lengths", [[2, 2], True]) +def test_to_records_with_lengths(lengths): + pytest.importorskip("dask.array") + from dask.array.utils import assert_eq + + df = pd.DataFrame( + {"x": ["a", "b", "c", "d"], "y": [2, 3, 4, 5]}, + index=pd.Index([1.0, 2.0, 3.0, 4.0], name="ind"), + ) + ddf = dd.from_pandas(df, 2) + + result = ddf.to_records(lengths=lengths) + assert_eq(df.to_records(), result, check_type=False) # TODO: make check_type pass + + assert isinstance(result, da.Array) + + expected_chunks = ((2, 2),) + + assert result.chunks == expected_chunks + + +def test_to_bag(): + a = pd.DataFrame( + {"x": ["a", "b", "c", "d"], "y": [2, 3, 4, 5]}, + index=pd.Index([1.0, 2.0, 3.0, 4.0], name="ind"), + ) + ddf = dd.from_pandas(a, 2) + + assert ddf.to_bag().compute() == list(a.itertuples(False)) + assert ddf.to_bag(True).compute() == list(a.itertuples(True)) + assert ddf.to_bag(format="dict").compute() == [ + {"x": "a", "y": 2}, + {"x": "b", "y": 3}, + {"x": "c", "y": 4}, + {"x": "d", "y": 5}, + ] diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/tests/test_collection.py new/dask-expr-1.1.20/dask_expr/tests/test_collection.py --- old/dask-expr-1.1.19/dask_expr/tests/test_collection.py 2024-11-13 16:16:32.000000000 +0100 +++ new/dask-expr-1.1.20/dask_expr/tests/test_collection.py 2024-12-03 23:40:36.000000000 +0100 @@ -2651,6 +2651,44 @@ assert_eq(ddf.melt(**kwargs), pdf.melt(**kwargs), check_index=False) +def test_assign_projection_mix(): + data = { + "date": [ + "2024-03-22 18:13:36.801000", + "2024-03-22 18:14:11.457000", + "2024-04-02 06:05:01.658000", + "2024-04-02 06:05:04.870000", + "2024-04-03 06:11:30.202000", + ], + "code": [1.0, 3.0, 6.0, 6.0, 8.0], + "first": pd.NaT, + "first_2": pd.NaT, + "second": pd.NaT, + "second_2": pd.NaT, + "third": pd.NaT, + } + df = pd.DataFrame(data) + df["date"] = pd.to_datetime(df["date"]) + + df = from_pandas(df) + + def apply_func(x): + return x + + event_columns = {1: ["first", "first_2"], 2: ["second", "second_2"], 3: ["third"]} + + for event_code, columns in event_columns.items(): + mask = df["code"] == event_code + df[columns[0]] = df[columns[0]].mask(cond=(mask), other=df["date"]) + if len(columns) == 2: + df[columns[1]] = df[columns[1]].mask(cond=(mask), other=df["date"]) + df[columns[1]] = df[columns[1]].apply(apply_func) + + df = df.drop(columns=["code", "date"]) + result = df.optimize(fuse=False) + assert result.expr._depth() == 13.0 # this grew exponentially previously + + def test_dropna_merge(df, pdf): dropped_na = df.dropna(subset=["x"]) result = dropped_na.merge(dropped_na, on="x") diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/tests/test_groupby.py new/dask-expr-1.1.20/dask_expr/tests/test_groupby.py --- old/dask-expr-1.1.19/dask_expr/tests/test_groupby.py 2024-11-13 16:16:32.000000000 +0100 +++ new/dask-expr-1.1.20/dask_expr/tests/test_groupby.py 2024-12-03 23:40:36.000000000 +0100 @@ -157,6 +157,13 @@ assert_eq(agg, expect) +def test_value_counts_split_out(pdf): + df = from_pandas(pdf, npartitions=10) + result = df.groupby("x").y.value_counts(split_out=True) + expected = pdf.groupby("x").y.value_counts() + assert_eq(result, expected) + + def test_unique(df, pdf): result = df.groupby("x")["y"].unique() expected = pdf.groupby("x")["y"].unique() diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/tests/test_indexing.py new/dask-expr-1.1.20/dask_expr/tests/test_indexing.py --- old/dask-expr-1.1.19/dask_expr/tests/test_indexing.py 2024-11-13 16:16:32.000000000 +0100 +++ new/dask-expr-1.1.20/dask_expr/tests/test_indexing.py 2024-12-03 23:40:36.000000000 +0100 @@ -35,15 +35,6 @@ df.iloc[(1, 2, 3)] -def test_loc(df, pdf): - assert_eq(df.loc[:, "x"], pdf.loc[:, "x"]) - assert_eq(df.loc[:, ["x"]], pdf.loc[:, ["x"]]) - assert_eq(df.loc[:, []], pdf.loc[:, []]) - - assert_eq(df.loc[df.y == 20, "x"], pdf.loc[pdf.y == 20, "x"]) - assert_eq(df.loc[df.y == 20, ["x"]], pdf.loc[pdf.y == 20, ["x"]]) - - def test_loc_slice(pdf, df): pdf.columns = [10, 20] df.columns = [10, 20] @@ -86,6 +77,12 @@ def test_loc(df, pdf): + assert_eq(df.loc[:, "x"], pdf.loc[:, "x"]) + assert_eq(df.loc[:, ["x"]], pdf.loc[:, ["x"]]) + assert_eq(df.loc[:, []], pdf.loc[:, []]) + + assert_eq(df.loc[df.y == 20, "x"], pdf.loc[pdf.y == 20, "x"]) + assert_eq(df.loc[df.y == 20, ["x"]], pdf.loc[pdf.y == 20, ["x"]]) assert df.loc[3:8].divisions[0] == 3 assert df.loc[3:8].divisions[-1] == 8 diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/tests/test_reductions.py new/dask-expr-1.1.20/dask_expr/tests/test_reductions.py --- old/dask-expr-1.1.19/dask_expr/tests/test_reductions.py 2024-11-13 16:16:32.000000000 +0100 +++ new/dask-expr-1.1.20/dask_expr/tests/test_reductions.py 2024-12-03 23:40:36.000000000 +0100 @@ -146,10 +146,10 @@ @pytest.mark.parametrize( - "split_every,expect_tasks", [(False, 53), (None, 57), (5, 57), (2, 73)] + "split_every, expect_tasks", [(False, 53), (None, 57), (5, 57), (2, 73)] ) def test_dataframe_mode_split_every(pdf, df, split_every, expect_tasks): - assert_eq(df.to_legacy_dataframe().mode(split_every=split_every), pdf.mode()) + assert_eq(df.mode(split_every=split_every), pdf.mode()) q = df.mode(split_every=split_every).optimize(fuse=False) assert len(q.__dask_graph__()) == expect_tasks diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-expr-1.1.19/pyproject.toml new/dask-expr-1.1.20/pyproject.toml --- old/dask-expr-1.1.19/pyproject.toml 2024-11-13 16:16:32.000000000 +0100 +++ new/dask-expr-1.1.20/pyproject.toml 2024-12-03 23:40:36.000000000 +0100 @@ -26,7 +26,7 @@ readme = "README.md" requires-python = ">=3.10" dependencies = [ - "dask == 2024.11.2", + "dask == 2024.12.0", "pyarrow>=14.0.1", "pandas >= 2", ]