Script 'mail_helper' called by obssrc
Hello community,

here is the log from the commit of package python-dask-expr for 
openSUSE:Factory checked in at 2024-12-06 14:27:28
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/python-dask-expr (Old)
 and      /work/SRC/openSUSE:Factory/.python-dask-expr.new.28523 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Package is "python-dask-expr"

Fri Dec  6 14:27:28 2024 rev:8 rq:1228730 version:1.1.20

Changes:
--------
--- /work/SRC/openSUSE:Factory/python-dask-expr/python-dask-expr.changes        
2024-11-27 22:12:41.235848776 +0100
+++ 
/work/SRC/openSUSE:Factory/.python-dask-expr.new.28523/python-dask-expr.changes 
    2024-12-06 14:27:54.032317295 +0100
@@ -1,0 +2,20 @@
+Thu Dec  5 21:53:04 UTC 2024 - Ben Greiner <c...@bnavigator.de>
+
+- Update to 1.1.20
+  * Fix value_counts with split_out != 1 (#1170) Patrick Hoefler
+  * Remove recursion in task spec (#1158) Florian Jetter
+  * Deprecated and remove from_legacy_dataframe usage (#1168)
+    Patrick Hoefler
+  * Remove from_dask_dataframe (#1167) Patrick Hoefler
+  * Avoid exponentially growing graph for Assign-Projection
+    combinations (#1164) Patrick Hoefler
+  * Introduce more caching when walking the expression (#1165)
+    Patrick Hoefler
+  * Use Taskspec fuse implementation (#1162) Florian Jetter
+  * Fix orphaned dependencies in Fused expression (#1163) Patrick
+    Hoefler
+- Add dask-expr-pr1173-blockwise.patch
+  * Use new blockwise unpack collection in array
+  * gh#dask/dask-expr#1173
+
+-------------------------------------------------------------------

Old:
----
  dask_expr-1.1.19-gh.tar.gz

New:
----
  dask-expr-pr1173-blockwise.patch
  dask_expr-1.1.20-gh.tar.gz

BETA DEBUG BEGIN:
  New:    Hoefler
- Add dask-expr-pr1173-blockwise.patch
  * Use new blockwise unpack collection in array
BETA DEBUG END:

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Other differences:
------------------
++++++ python-dask-expr.spec ++++++
--- /var/tmp/diff_new_pack.NWHnRb/_old  2024-12-06 14:27:54.576340207 +0100
+++ /var/tmp/diff_new_pack.NWHnRb/_new  2024-12-06 14:27:54.580340376 +0100
@@ -26,12 +26,14 @@
 %bcond_with test
 %endif
 Name:           python-dask-expr%{psuffix}
-Version:        1.1.19
+Version:        1.1.20
 Release:        0
 Summary:        High Level Expressions for Dask
 License:        BSD-3-Clause
 URL:            https://github.com/dask/dask-expr
 Source0:        
https://github.com/dask/dask-expr/archive/refs/tags/v%{version}.tar.gz#/dask_expr-%{version}-gh.tar.gz
+# PATCH-FIX-UPSTREAM dask-expr-pr1173-blockwise.patch gh#dask/dask-expr#1173
+Patch0:         
https://github.com/dask/dask-expr/pull/1173.patch#/dask-expr-pr1173-blockwise.patch
 BuildRequires:  %{python_module base >= 3.10}
 BuildRequires:  %{python_module pip}
 BuildRequires:  %{python_module setuptools >= 62.6}
@@ -39,7 +41,7 @@
 BuildRequires:  %{python_module wheel}
 BuildRequires:  fdupes
 BuildRequires:  python-rpm-macros
-Requires:       python-dask = 2024.11.2
+Requires:       python-dask = 2024.12.0
 Requires:       python-pandas >= 2
 Requires:       python-pyarrow >= 14.0.1
 Provides:       python-dask_expr = %{version}-%{release}

++++++ dask-expr-pr1173-blockwise.patch ++++++
>From 7b6d178a31cdc52816908ba93aae3f6e3bbae680 Mon Sep 17 00:00:00 2001
From: James Bourbeau <jrbourb...@gmail.com>
Date: Wed, 4 Dec 2024 10:41:15 -0600
Subject: [PATCH 1/2] Use new blockwise unpack collection in array

---
 dask_expr/array/blockwise.py | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/dask_expr/array/blockwise.py b/dask_expr/array/blockwise.py
index 838539e0..23792434 100644
--- a/dask_expr/array/blockwise.py
+++ b/dask_expr/array/blockwise.py
@@ -18,7 +18,7 @@
 from dask.array.utils import compute_meta
 from dask.base import is_dask_collection
 from dask.blockwise import blockwise as core_blockwise
-from dask.delayed import unpack_collections
+from dask.blockwise import _blockwise_unpack_collections_task_spec
 from dask.tokenize import tokenize
 from dask.utils import cached_property, funcname
 
@@ -142,7 +142,7 @@ def _layer(self):
         for arg, ind in arginds:
             if ind is None:
                 arg = normalize_arg(arg)
-                arg, collections = unpack_collections(arg)
+                arg, collections = _blockwise_unpack_collections_task_spec(arg)
                 dependencies.extend(collections)
             else:
                 if (
@@ -163,7 +163,7 @@ def _layer(self):
         kwargs2 = {}
         for k, v in self.kwargs.items():
             v = normalize_arg(v)
-            v, collections = unpack_collections(v)
+            v, collections = _blockwise_unpack_collections_task_spec(v)
             dependencies.extend(collections)
             kwargs2[k] = v
 

>From fd6f081bcce4f36190b87ce26ae278cc3de71d04 Mon Sep 17 00:00:00 2001
From: James Bourbeau <jrbourb...@gmail.com>
Date: Wed, 4 Dec 2024 10:46:47 -0600
Subject: [PATCH 2/2] Lint

---
 dask_expr/array/blockwise.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/dask_expr/array/blockwise.py b/dask_expr/array/blockwise.py
index 23792434..4553f05c 100644
--- a/dask_expr/array/blockwise.py
+++ b/dask_expr/array/blockwise.py
@@ -17,8 +17,8 @@
 )
 from dask.array.utils import compute_meta
 from dask.base import is_dask_collection
-from dask.blockwise import blockwise as core_blockwise
 from dask.blockwise import _blockwise_unpack_collections_task_spec
+from dask.blockwise import blockwise as core_blockwise
 from dask.tokenize import tokenize
 from dask.utils import cached_property, funcname
 

++++++ dask_expr-1.1.19-gh.tar.gz -> dask_expr-1.1.20-gh.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-expr-1.1.19/changes.md 
new/dask-expr-1.1.20/changes.md
--- old/dask-expr-1.1.19/changes.md     2024-11-13 16:16:32.000000000 +0100
+++ new/dask-expr-1.1.20/changes.md     2024-12-03 23:40:36.000000000 +0100
@@ -1,5 +1,20 @@
 ## Dask-expr
 
+# v1.1.20
+
+- Fix value_counts with split_out != 1 (:pr:`1170`) `Patrick Hoefler`_
+- Remove recursion in task spec (:pr:`1158`) `Florian Jetter`_
+- Deprecated and remove from_legacy_dataframe usage (:pr:`1168`) `Patrick 
Hoefler`_
+- Remove ``from_dask_dataframe`` (:pr:`1167`) `Patrick Hoefler`_
+- Avoid exponentially growing graph for Assign-Projection combinations 
(:pr:`1164`) `Patrick Hoefler`_
+- Introduce more caching when walking the expression (:pr:`1165`) `Patrick 
Hoefler`_
+- Use Taskspec fuse implementation (:pr:`1162`) `Florian Jetter`_
+- Fix orphaned dependencies in Fused expression (:pr:`1163`) `Patrick Hoefler`_
+
+# v1.1.19
+
+# v1.1.18
+
 # v1.1.17
 
 - Add support for Python 3.13 (:pr:`1160`) `James Bourbeau`_
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/_collection.py 
new/dask-expr-1.1.20/dask_expr/_collection.py
--- old/dask-expr-1.1.19/dask_expr/_collection.py       2024-11-13 
16:16:32.000000000 +0100
+++ new/dask-expr-1.1.20/dask_expr/_collection.py       2024-12-03 
23:40:36.000000000 +0100
@@ -45,7 +45,7 @@
     meta_series_constructor,
     pyarrow_strings_enabled,
 )
-from dask.delayed import delayed
+from dask.delayed import Delayed, delayed
 from dask.utils import (
     IndexCallable,
     M,
@@ -66,6 +66,7 @@
 from pandas.api.types import is_bool_dtype, is_datetime64_any_dtype, 
is_numeric_dtype
 from pandas.api.types import is_scalar as pd_is_scalar
 from pandas.api.types import is_timedelta64_dtype
+from pandas.core.dtypes.common import is_extension_array_dtype
 from pyarrow import fs as pa_fs
 from tlz import first
 
@@ -159,9 +160,7 @@
     if isinstance(other, FrameBase):
         other = other.expr
     elif isinstance(other, da.Array):
-        other = from_dask_array(
-            other, index=self.index.to_legacy_dataframe(), columns=self.columns
-        )
+        other = from_dask_array(other, index=self.index, columns=self.columns)
         if self.ndim == 1 and len(self.columns):
             other = other[self.columns[0]]
 
@@ -1371,11 +1370,9 @@
                 Repartition(self, npartitions, divisions, force, 
partition_size, freq)
             )
 
-    def to_dask_dataframe(self, *args, **kwargs) -> _Frame:
+    def to_legacy_dataframe(self, optimize: bool = True, **optimize_kwargs) -> 
_Frame:
         """Convert to a legacy dask-dataframe collection
 
-        WARNING: This API is deprecated. Please use `to_legacy_dataframe`.
-
         Parameters
         ----------
         optimize
@@ -1384,21 +1381,11 @@
             Key-word arguments to pass through to `optimize`.
         """
         warnings.warn(
-            "`to_dask_dataframe` is deprecated, please use 
`to_legacy_dataframe`.",
+            "to_legacy_dataframe is deprecated and will be removed in a future 
release. "
+            "The legacy implementation as a whole is deprecated and will be 
removed, making "
+            "this method unnecessary.",
             FutureWarning,
         )
-        return self.to_legacy_dataframe(*args, **kwargs)
-
-    def to_legacy_dataframe(self, optimize: bool = True, **optimize_kwargs) -> 
_Frame:
-        """Convert to a legacy dask-dataframe collection
-
-        Parameters
-        ----------
-        optimize
-            Whether to optimize the underlying `Expr` object before conversion.
-        **optimize_kwargs
-            Key-word arguments to pass through to `optimize`.
-        """
         df = self.optimize(**optimize_kwargs) if optimize else self
         return new_dd_object(df.dask, df._name, df._meta, df.divisions)
 
@@ -1430,9 +1417,18 @@
         -------
         A Dask Array
         """
-        return self.to_legacy_dataframe(optimize, 
**optimize_kwargs).to_dask_array(
-            lengths=lengths, meta=meta
-        )
+        if lengths is True:
+            lengths = tuple(self.map_partitions(len, 
enforce_metadata=False).compute())
+
+        arr = self.values
+
+        chunks = self._validate_chunks(arr, lengths)
+        arr._chunks = chunks
+
+        if meta is not None:
+            arr._meta = meta
+
+        return arr
 
     @property
     def values(self):
@@ -1442,7 +1438,13 @@
         Operations that depend on shape information, like slicing or reshaping,
         will not work.
         """
-        return self.to_dask_array()
+        if is_extension_array_dtype(self._meta.values):
+            warnings.warn(
+                "Dask currently has limited support for converting pandas 
extension dtypes "
+                f"to arrays. Converting {self._meta.values.dtype} to object 
dtype.",
+                UserWarning,
+            )
+        return self.map_partitions(methods.values)
 
     def __divmod__(self, other):
         result = self.expr.__divmod__(other)
@@ -2460,15 +2462,38 @@
 
         if lengths is True:
             lengths = tuple(self.map_partitions(len).compute())
+        records = to_records(self)
 
-        frame = self.to_legacy_dataframe()
-        records = to_records(frame)
-
-        chunks = frame._validate_chunks(records, lengths)
+        chunks = self._validate_chunks(records, lengths)
         records._chunks = (chunks[0],)
 
         return records
 
+    def _validate_chunks(self, arr, lengths):
+        from collections.abc import Sequence
+
+        from dask.array.core import normalize_chunks
+
+        if isinstance(lengths, Sequence):
+            lengths = tuple(lengths)
+
+            if len(lengths) != self.npartitions:
+                raise ValueError(
+                    "The number of items in 'lengths' does not match the 
number of "
+                    f"partitions. {len(lengths)} != {self.npartitions}"
+                )
+
+            if self.ndim == 1:
+                chunks = normalize_chunks((lengths,))
+            else:
+                chunks = normalize_chunks((lengths, (len(self.columns),)))
+
+            return chunks
+        elif lengths is not None:
+            raise ValueError(f"Unexpected value for 'lengths': '{lengths}'")
+
+        return arr._chunks
+
     def to_bag(self, index=False, format="tuple"):
         """Create a Dask Bag from a Series"""
         from dask_expr.io.bag import to_bag
@@ -2498,7 +2523,13 @@
         --------
         dask_expr.from_delayed
         """
-        return 
self.to_legacy_dataframe().to_delayed(optimize_graph=optimize_graph)
+        if optimize_graph:
+            frame = self.optimize()
+        else:
+            frame = self
+        keys = frame.__dask_keys__()
+        graph = frame.__dask_graph__()
+        return [Delayed(k, graph) for k in keys]
 
     def to_backend(self, backend: str | None = None, **kwargs):
         """Move to a new DataFrame backend
@@ -2812,9 +2843,7 @@
                         "Number of partitions do not match "
                         f"({v.npartitions} != {result.npartitions})"
                     )
-                v = from_dask_array(
-                    v, index=result.index.to_legacy_dataframe(), 
meta=result._meta
-                )
+                v = from_dask_array(v, index=result.index, meta=result._meta)
             else:
                 raise TypeError(f"Column assignment doesn't support type 
{type(v)}")
             args.extend([k, v])
@@ -4797,6 +4826,9 @@
     def dtype(self):
         return pd.Series(self._meta).dtype
 
+    def to_delayed(self, optimize_graph=True):
+        return super().to_delayed(optimize_graph=optimize_graph)[0]
+
 
 def new_collection(expr):
     """Create new collection from an expr"""
@@ -5020,31 +5052,20 @@
     )
 
 
-def from_dask_dataframe(*args, **kwargs) -> FrameBase:
+def from_legacy_dataframe(ddf: _Frame, optimize: bool = True) -> FrameBase:
     """Create a dask-expr collection from a legacy dask-dataframe collection
 
-    WARNING: This API is deprecated. Please use `from_legacy_dataframe`.
-
     Parameters
     ----------
     optimize
         Whether to optimize the graph before conversion.
     """
     warnings.warn(
-        "`from_dask_dataframe` is deprecated, please use 
`from_legacy_dataframe`.",
+        "from_legacy_dataframe is deprecated and will be removed in a future 
release. "
+        "The legacy implementation as a whole is deprecated and will be 
removed, making "
+        "this method unnecessary.",
         FutureWarning,
     )
-    return from_legacy_dataframe(*args, **kwargs)
-
-
-def from_legacy_dataframe(ddf: _Frame, optimize: bool = True) -> FrameBase:
-    """Create a dask-expr collection from a legacy dask-dataframe collection
-
-    Parameters
-    ----------
-    optimize
-        Whether to optimize the graph before conversion.
-    """
     graph = ddf.dask
     if optimize:
         graph = ddf.__dask_optimize__(graph, ddf.__dask_keys__())
@@ -5100,12 +5121,9 @@
     """
     from dask.dataframe.io import from_dask_array
 
-    if isinstance(index, FrameBase):
-        index = index.to_legacy_dataframe()
     if columns is not None and isinstance(columns, list) and not len(columns):
         columns = None
-    df = from_dask_array(x, columns=columns, index=index, meta=meta)
-    return from_legacy_dataframe(df, optimize=True)
+    return from_dask_array(x, columns=columns, index=index, meta=meta)
 
 
 @dataframe_creation_dispatch.register_inplace("pandas")
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/_core.py 
new/dask-expr-1.1.20/dask_expr/_core.py
--- old/dask-expr-1.1.19/dask_expr/_core.py     2024-11-13 16:16:32.000000000 
+0100
+++ new/dask-expr-1.1.20/dask_expr/_core.py     2024-12-03 23:40:36.000000000 
+0100
@@ -160,17 +160,26 @@
             raise RuntimeError(f"Serializing a {type(self)} object")
         return type(self), tuple(self.operands)
 
-    def _depth(self):
+    def _depth(self, cache=None):
         """Depth of the expression tree
 
         Returns
         -------
         depth: int
         """
+        if cache is None:
+            cache = {}
         if not self.dependencies():
             return 1
         else:
-            return max(expr._depth() for expr in self.dependencies()) + 1
+            result = []
+            for expr in self.dependencies():
+                if expr._name in cache:
+                    result.append(cache[expr._name])
+                else:
+                    result.append(expr._depth(cache) + 1)
+                    cache[expr._name] = result[-1]
+            return max(result)
 
     def operand(self, key):
         # Access an operand unambiguously
@@ -242,7 +251,7 @@
             for i in range(self.npartitions)
         }
 
-    def rewrite(self, kind: str):
+    def rewrite(self, kind: str, rewritten):
         """Rewrite an expression
 
         This leverages the ``._{kind}_down`` and ``._{kind}_up``
@@ -255,6 +264,9 @@
         changed:
             whether or not any change occured
         """
+        if self._name in rewritten:
+            return rewritten[self._name]
+
         expr = self
         down_name = f"_{kind}_down"
         up_name = f"_{kind}_up"
@@ -291,7 +303,8 @@
             changed = False
             for operand in expr.operands:
                 if isinstance(operand, Expr):
-                    new = operand.rewrite(kind=kind)
+                    new = operand.rewrite(kind=kind, rewritten=rewritten)
+                    rewritten[operand._name] = new
                     if new._name != operand._name:
                         changed = True
                 else:
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/_expr.py 
new/dask-expr-1.1.20/dask_expr/_expr.py
--- old/dask-expr-1.1.19/dask_expr/_expr.py     2024-11-13 16:16:32.000000000 
+0100
+++ new/dask-expr-1.1.20/dask_expr/_expr.py     2024-12-03 23:40:36.000000000 
+0100
@@ -1899,6 +1899,16 @@
                 # don't squash if we are using a column that was previously 
created
                 return
             return Assign(*self.frame.operands, *self.operands[1:])
+        elif isinstance(self.frame, Projection) and isinstance(
+            self.frame.frame, Assign
+        ):
+            if self._check_for_previously_created_column(self.frame.frame):
+                return
+            new_columns = self.frame.operands[1].copy()
+            new_columns.extend(self.keys)
+            return Projection(
+                Assign(*self.frame.frame.operands, *self.operands[1:]), 
new_columns
+            )
 
     def _check_for_previously_created_column(self, child):
         input_columns = []
@@ -3053,7 +3063,7 @@
         return expr
 
     # Manipulate Expression to make it more efficient
-    expr = expr.rewrite(kind="tune")
+    expr = expr.rewrite(kind="tune", rewritten={})
     if stage == "tuned-logical":
         return expr
 
@@ -3193,15 +3203,14 @@
                     dependents[next._name] = set()
                     expr_mapping[next._name] = next
 
-            for operand in next.operands:
-                if isinstance(operand, Expr):
-                    stack.append(operand)
-                    if is_valid_blockwise_op(operand):
-                        if next._name in dependencies:
-                            dependencies[next._name].add(operand._name)
-                        dependents[operand._name].add(next._name)
-                        expr_mapping[operand._name] = operand
-                        expr_mapping[next._name] = next
+            for operand in next.dependencies():
+                stack.append(operand)
+                if is_valid_blockwise_op(operand):
+                    if next._name in dependencies:
+                        dependencies[next._name].add(operand._name)
+                    dependents[operand._name].add(next._name)
+                    expr_mapping[operand._name] = operand
+                    expr_mapping[next._name] = next
 
         # Traverse each "root" until we find a fusable sub-group.
         # Here we use root to refer to a Blockwise Expr node that
@@ -3767,31 +3776,16 @@
 
     def _task(self, name: Key, index: int) -> Task:
         internal_tasks = []
-        seen_keys = set()
-        external_deps = set()
         for _expr in self.exprs:
             if self._broadcast_dep(_expr):
                 subname = (_expr._name, 0)
             else:
                 subname = (_expr._name, index)
             t = _expr._task(subname, subname[1])
+
             assert t.key == subname
             internal_tasks.append(t)
-            seen_keys.add(subname)
-            external_deps.update(t.dependencies)
-        external_deps -= seen_keys
-        dependencies = {dep: TaskRef(dep) for dep in external_deps}
-        t = Task(
-            name,
-            Fused._execute_internal_graph,
-            # Wrap the actual subgraph as a data node such that the tasks are
-            # not erroneously parsed. The external task would otherwise carry
-            # the internal keys as dependencies which is not satisfiable
-            DataNode(None, internal_tasks),
-            dependencies,
-            (self.exprs[0]._name, index),
-        )
-        return t
+        return Task.fuse(*internal_tasks, key=name)
 
     @staticmethod
     def _execute_internal_graph(internal_tasks, dependencies, outkey):
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/_indexing.py 
new/dask-expr-1.1.20/dask_expr/_indexing.py
--- old/dask-expr-1.1.19/dask_expr/_indexing.py 2024-11-13 16:16:32.000000000 
+0100
+++ new/dask-expr-1.1.20/dask_expr/_indexing.py 2024-12-03 23:40:36.000000000 
+0100
@@ -18,7 +18,8 @@
 from pandas.api.types import is_bool_dtype
 from pandas.errors import IndexingError
 
-from dask_expr._collection import Series, from_legacy_dataframe, new_collection
+from dask_expr import from_dask_array
+from dask_expr._collection import Series, new_collection
 from dask_expr._expr import (
     Blockwise,
     MaybeAlignPartitions,
@@ -98,6 +99,8 @@
             elif is_series_like(iindexer) and not 
is_bool_dtype(iindexer.dtype):
                 return new_collection(LocList(self.obj, iindexer.values, 
cindexer))
             elif isinstance(iindexer, list) or is_arraylike(iindexer):
+                if len(iindexer) == 0:
+                    return new_collection(LocEmpty(self.obj._meta, cindexer))
                 return new_collection(LocList(self.obj, iindexer, cindexer))
             else:
                 # element should raise KeyError
@@ -132,9 +135,7 @@
         return new_collection(Loc(frame, iindexer))
 
     def _loc_array(self, iindexer, cindexer):
-        iindexer_series = from_legacy_dataframe(
-            iindexer.to_dask_dataframe("_", 
self.obj.index.to_legacy_dataframe())
-        )
+        iindexer_series = from_dask_array(iindexer, columns="_", 
index=self.obj.index)
         return self._loc_series(iindexer_series, cindexer, 
check_alignment=False)
 
     def _maybe_partial_time_string(self, iindexer, unit):
@@ -250,6 +251,26 @@
         return self._layer_information[0]
 
 
+class LocEmpty(LocList):
+    _parameters = ["meta", "cindexer"]
+
+    def _lower(self):
+        return None
+
+    @functools.cached_property
+    def _meta(self):
+        if self.cindexer is None:
+            return self.operand("meta")
+        else:
+            return self.operand("meta").loc[:, self.cindexer]
+
+    @functools.cached_property
+    def _layer_information(self):
+        divisions = [None, None]
+        dsk = {(self._name, 0): DataNode((self._name, 0), self._meta)}
+        return dsk, divisions
+
+
 class LocSlice(LocBase):
     @functools.cached_property
     def start(self):
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/_merge.py 
new/dask-expr-1.1.20/dask_expr/_merge.py
--- old/dask-expr-1.1.19/dask_expr/_merge.py    2024-11-13 16:16:32.000000000 
+0100
+++ new/dask-expr-1.1.20/dask_expr/_merge.py    2024-12-03 23:40:36.000000000 
+0100
@@ -680,7 +680,7 @@
             _barrier_key_left,
             p2p_barrier,
             token_left,
-            transfer_keys_left,
+            *transfer_keys_left,
             spec=DataFrameShuffleSpec(
                 id=token_left,
                 npartitions=self.npartitions,
@@ -698,7 +698,7 @@
             _barrier_key_right,
             p2p_barrier,
             token_right,
-            transfer_keys_right,
+            *transfer_keys_right,
             spec=DataFrameShuffleSpec(
                 id=token_right,
                 npartitions=self.npartitions,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/_reductions.py 
new/dask-expr-1.1.20/dask_expr/_reductions.py
--- old/dask-expr-1.1.19/dask_expr/_reductions.py       2024-11-13 
16:16:32.000000000 +0100
+++ new/dask-expr-1.1.20/dask_expr/_reductions.py       2024-12-03 
23:40:36.000000000 +0100
@@ -244,7 +244,8 @@
 
         # Reset the index if we we used it for shuffling
         if split_by_index:
-            shuffled = SetIndexBlockwise(shuffled, split_by, True, None)
+            idx = list(self._meta.index.names) if split_by != ["index"] else 
split_by
+            shuffled = SetIndexBlockwise(shuffled, idx, True, None)
 
         # Convert back to Series if necessary
         if self.shuffle_by_index is not False:
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/_shuffle.py 
new/dask-expr-1.1.20/dask_expr/_shuffle.py
--- old/dask-expr-1.1.19/dask_expr/_shuffle.py  2024-11-13 16:16:32.000000000 
+0100
+++ new/dask-expr-1.1.20/dask_expr/_shuffle.py  2024-12-03 23:40:36.000000000 
+0100
@@ -592,7 +592,7 @@
             _barrier_key,
             p2p_barrier,
             token,
-            transfer_keys,
+            *transfer_keys,
             spec=DataFrameShuffleSpec(
                 id=shuffle_id,
                 npartitions=self.npartitions_out,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/_version.py 
new/dask-expr-1.1.20/dask_expr/_version.py
--- old/dask-expr-1.1.19/dask_expr/_version.py  2024-11-13 16:16:32.000000000 
+0100
+++ new/dask-expr-1.1.20/dask_expr/_version.py  2024-12-03 23:40:36.000000000 
+0100
@@ -26,9 +26,9 @@
     # setup.py/versioneer.py will grep for the variable names, so they must
     # each be defined on a line of their own. _version.py will just call
     # get_keywords().
-    git_refnames = " (tag: v1.1.19)"
-    git_full = "735fc8904832c723680c5c9ef5f89e01e622a076"
-    git_date = "2024-11-13 16:16:32 +0100"
+    git_refnames = " (tag: v1.1.20)"
+    git_full = "daef4a273acfb117ebcbbac610103372ebade7c6"
+    git_date = "2024-12-03 16:40:36 -0600"
     keywords = {"refnames": git_refnames, "full": git_full, "date": git_date}
     return keywords
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/_delayed.py 
new/dask-expr-1.1.20/dask_expr/io/_delayed.py
--- old/dask-expr-1.1.19/dask_expr/io/_delayed.py       2024-11-13 
16:16:32.000000000 +0100
+++ new/dask-expr-1.1.20/dask_expr/io/_delayed.py       2024-12-03 
23:40:36.000000000 +0100
@@ -4,13 +4,14 @@
 from collections.abc import Iterable
 from typing import TYPE_CHECKING
 
+import pandas as pd
 from dask._task_spec import Alias, Task, TaskRef
 from dask.dataframe.dispatch import make_meta
-from dask.dataframe.utils import check_meta
+from dask.dataframe.utils import check_meta, pyarrow_strings_enabled
 from dask.delayed import Delayed, delayed
 from dask.typing import Key
 
-from dask_expr._expr import DelayedsExpr, PartitionsFiltered
+from dask_expr._expr import ArrowStringConversion, DelayedsExpr, 
PartitionsFiltered
 from dask_expr._util import _tokenize_deterministic
 from dask_expr.io import BlockwiseIO
 
@@ -141,8 +142,12 @@
 
     from dask_expr._collection import new_collection
 
-    return new_collection(
-        FromDelayed(
-            DelayedsExpr(*dfs), make_meta(meta), divisions, verify_meta, None, 
prefix
-        )
+    result = FromDelayed(
+        DelayedsExpr(*dfs), make_meta(meta), divisions, verify_meta, None, 
prefix
     )
+    if pyarrow_strings_enabled() and any(
+        pd.api.types.is_object_dtype(dtype)
+        for dtype in (result.dtypes.values if result.ndim == 2 else 
[result.dtypes])
+    ):
+        return new_collection(ArrowStringConversion(result))
+    return new_collection(result)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/bag.py 
new/dask-expr-1.1.20/dask_expr/io/bag.py
--- old/dask-expr-1.1.19/dask_expr/io/bag.py    2024-11-13 16:16:32.000000000 
+0100
+++ new/dask-expr-1.1.20/dask_expr/io/bag.py    2024-12-03 23:40:36.000000000 
+0100
@@ -1,4 +1,42 @@
+from dask.dataframe.io.io import _df_to_bag
+from dask.tokenize import tokenize
+
+from dask_expr import FrameBase
+
+
 def to_bag(df, index=False, format="tuple"):
-    from dask.dataframe.io import to_bag as _to_bag
+    """Create Dask Bag from a Dask DataFrame
+
+    Parameters
+    ----------
+    index : bool, optional
+        If True, the elements are tuples of ``(index, value)``, otherwise
+        they're just the ``value``.  Default is False.
+    format : {"tuple", "dict", "frame"}, optional
+        Whether to return a bag of tuples, dictionaries, or
+        dataframe-like objects. Default is "tuple". If "frame",
+        the original partitions of ``df`` will not be transformed
+        in any way.
+
+
+    Examples
+    --------
+    >>> bag = df.to_bag()  # doctest: +SKIP
+    """
+    from dask.bag.core import Bag
+
+    df = df.optimize()
 
-    return _to_bag(df.to_legacy_dataframe(), index=index, format=format)
+    if not isinstance(df, FrameBase):
+        raise TypeError("df must be either DataFrame or Series")
+    name = "to_bag-" + tokenize(df._name, index, format)
+    if format == "frame":
+        dsk = df.dask
+        name = df._name
+    else:
+        dsk = {
+            (name, i): (_df_to_bag, block, index, format)
+            for (i, block) in enumerate(df.__dask_keys__())
+        }
+        dsk.update(df.__dask_graph__())
+    return Bag(dsk, name, df.npartitions)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/csv.py 
new/dask-expr-1.1.20/dask_expr/io/csv.py
--- old/dask-expr-1.1.19/dask_expr/io/csv.py    2024-11-13 16:16:32.000000000 
+0100
+++ new/dask-expr-1.1.20/dask_expr/io/csv.py    2024-12-03 23:40:36.000000000 
+0100
@@ -280,7 +280,7 @@
     from dask.dataframe.io.csv import to_csv as _to_csv
 
     return _to_csv(
-        df.to_legacy_dataframe(),
+        df.optimize(),
         filename,
         single_file=single_file,
         encoding=encoding,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/hdf.py 
new/dask-expr-1.1.20/dask_expr/io/hdf.py
--- old/dask-expr-1.1.19/dask_expr/io/hdf.py    2024-11-13 16:16:32.000000000 
+0100
+++ new/dask-expr-1.1.20/dask_expr/io/hdf.py    2024-12-03 23:40:36.000000000 
+0100
@@ -1,6 +1,3 @@
-from dask_expr import from_legacy_dataframe
-
-
 def read_hdf(
     pattern,
     key,
@@ -14,7 +11,7 @@
 ):
     from dask.dataframe.io import read_hdf as _read_hdf
 
-    df = _read_hdf(
+    return _read_hdf(
         pattern,
         key,
         start=start,
@@ -25,7 +22,6 @@
         lock=lock,
         mode=mode,
     )
-    return from_legacy_dataframe(df)
 
 
 def to_hdf(
@@ -130,7 +126,7 @@
     from dask.dataframe.io import to_hdf as _to_hdf
 
     return _to_hdf(
-        df.to_legacy_dataframe(),
+        df.optimize(),
         path,
         key,
         mode=mode,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/io.py 
new/dask-expr-1.1.20/dask_expr/io/io.py
--- old/dask-expr-1.1.19/dask_expr/io/io.py     2024-11-13 16:16:32.000000000 
+0100
+++ new/dask-expr-1.1.20/dask_expr/io/io.py     2024-12-03 23:40:36.000000000 
+0100
@@ -6,7 +6,7 @@
 
 import numpy as np
 import pyarrow as pa
-from dask._task_spec import Task
+from dask._task_spec import List, Task
 from dask.dataframe import methods
 from dask.dataframe._pyarrow import to_pyarrow_string
 from dask.dataframe.core import apply_and_enforce, is_dataframe_like, make_meta
@@ -135,7 +135,7 @@
         bucket = self._fusion_buckets[index]
         # FIXME: This will likely require a wrapper
         return Task(
-            name, methods.concat, [expr._filtered_task(name, i) for i in 
bucket]
+            name, methods.concat, List(*(expr._filtered_task(name, i) for i in 
bucket))
         )
 
     @functools.cached_property
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/json.py 
new/dask-expr-1.1.20/dask_expr/io/json.py
--- old/dask-expr-1.1.19/dask_expr/io/json.py   2024-11-13 16:16:32.000000000 
+0100
+++ new/dask-expr-1.1.20/dask_expr/io/json.py   2024-12-03 23:40:36.000000000 
+0100
@@ -1,7 +1,6 @@
 import pandas as pd
 from dask.dataframe.utils import insert_meta_param_description
 
-from dask_expr import from_legacy_dataframe
 from dask_expr._backends import dataframe_creation_dispatch
 
 
@@ -98,7 +97,7 @@
     """
     from dask.dataframe.io.json import read_json
 
-    df = read_json(
+    return read_json(
         url_path,
         orient=orient,
         lines=lines,
@@ -114,7 +113,6 @@
         path_converter=path_converter,
         **kwargs,
     )
-    return from_legacy_dataframe(df)
 
 
 def to_json(
@@ -172,7 +170,7 @@
     from dask.dataframe.io.json import to_json
 
     return to_json(
-        df.to_legacy_dataframe(),
+        df,
         url_path,
         orient=orient,
         lines=lines,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/orc.py 
new/dask-expr-1.1.20/dask_expr/io/orc.py
--- old/dask-expr-1.1.19/dask_expr/io/orc.py    2024-11-13 16:16:32.000000000 
+0100
+++ new/dask-expr-1.1.20/dask_expr/io/orc.py    2024-12-03 23:40:36.000000000 
+0100
@@ -1,4 +1,3 @@
-from dask_expr import from_legacy_dataframe
 from dask_expr._backends import dataframe_creation_dispatch
 
 
@@ -48,7 +47,7 @@
     """
     from dask.dataframe.io import read_orc as _read_orc
 
-    df = _read_orc(
+    return _read_orc(
         path,
         engine=engine,
         columns=columns,
@@ -57,7 +56,6 @@
         aggregate_files=aggregate_files,
         storage_options=storage_options,
     )
-    return from_legacy_dataframe(df)
 
 
 def to_orc(
@@ -72,7 +70,7 @@
     from dask.dataframe.io import to_orc as _to_orc
 
     return _to_orc(
-        df.to_legacy_dataframe(),
+        df.optimize(),
         path,
         engine=engine,
         write_index=write_index,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/parquet.py 
new/dask-expr-1.1.20/dask_expr/io/parquet.py
--- old/dask-expr-1.1.19/dask_expr/io/parquet.py        2024-11-13 
16:16:32.000000000 +0100
+++ new/dask-expr-1.1.20/dask_expr/io/parquet.py        2024-12-03 
23:40:36.000000000 +0100
@@ -593,7 +593,7 @@
     # Engine-specific initialization steps to write the dataset.
     # Possibly create parquet metadata, and load existing stuff if appending
     i_offset, fmd, metadata_file_exists, extra_write_kwargs = 
engine.initialize_write(
-        df.to_legacy_dataframe(),
+        df,
         fs,
         path,
         append=append,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/sql.py 
new/dask-expr-1.1.20/dask_expr/io/sql.py
--- old/dask-expr-1.1.19/dask_expr/io/sql.py    2024-11-13 16:16:32.000000000 
+0100
+++ new/dask-expr-1.1.20/dask_expr/io/sql.py    2024-12-03 23:40:36.000000000 
+0100
@@ -1,6 +1,3 @@
-from dask_expr import from_legacy_dataframe
-
-
 def read_sql(sql, con, index_col, **kwargs):
     """
     Read SQL query or database table into a DataFrame.
@@ -36,8 +33,7 @@
     """
     from dask.dataframe.io.sql import read_sql
 
-    df = read_sql(sql, con, index_col, **kwargs)
-    return from_legacy_dataframe(df)
+    return read_sql(sql, con, index_col, **kwargs)
 
 
 def read_sql_table(
@@ -122,7 +118,7 @@
     """
     from dask.dataframe.io.sql import read_sql_table as _read_sql_table
 
-    df = _read_sql_table(
+    return _read_sql_table(
         table_name,
         con,
         index_col,
@@ -137,7 +133,6 @@
         engine_kwargs=engine_kwargs,
         **kwargs,
     )
-    return from_legacy_dataframe(df)
 
 
 def read_sql_query(
@@ -210,7 +205,7 @@
     """
     from dask.dataframe.io.sql import read_sql_query as _read_sql_query
 
-    df = _read_sql_query(
+    return _read_sql_query(
         sql,
         con,
         index_col,
@@ -223,7 +218,6 @@
         engine_kwargs=engine_kwargs,
         **kwargs,
     )
-    return from_legacy_dataframe(df)
 
 
 def to_sql(
@@ -354,7 +348,7 @@
     from dask.dataframe.io.sql import to_sql as _to_sql
 
     return _to_sql(
-        df.to_legacy_dataframe(),
+        df,
         name=name,
         uri=uri,
         schema=schema,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/dask-expr-1.1.19/dask_expr/io/tests/test_distributed.py 
new/dask-expr-1.1.20/dask_expr/io/tests/test_distributed.py
--- old/dask-expr-1.1.19/dask_expr/io/tests/test_distributed.py 2024-11-13 
16:16:32.000000000 +0100
+++ new/dask-expr-1.1.20/dask_expr/io/tests/test_distributed.py 2024-12-03 
23:40:36.000000000 +0100
@@ -63,4 +63,4 @@
     df = read_parquet(tmpdir, filesystem=filesystem)
     from distributed.protocol import dumps
 
-    assert len(b"".join(dumps(df.optimize().dask))) <= 9000
+    assert len(b"".join(dumps(df.optimize().dask))) <= 9100
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/io/tests/test_io.py 
new/dask-expr-1.1.20/dask_expr/io/tests/test_io.py
--- old/dask-expr-1.1.19/dask_expr/io/tests/test_io.py  2024-11-13 
16:16:32.000000000 +0100
+++ new/dask-expr-1.1.20/dask_expr/io/tests/test_io.py  2024-12-03 
23:40:36.000000000 +0100
@@ -14,7 +14,6 @@
     DataFrame,
     from_array,
     from_dask_array,
-    from_dask_dataframe,
     from_dict,
     from_legacy_dataframe,
     from_map,
@@ -231,29 +230,21 @@
 @pytest.mark.parametrize("optimize", [True, False])
 def test_from_legacy_dataframe(optimize):
     ddf = dd.from_dict({"a": range(100)}, npartitions=10)
-    df = from_legacy_dataframe(ddf, optimize=optimize)
+    with pytest.warns(FutureWarning, match="is deprecated"):
+        df = from_legacy_dataframe(ddf, optimize=optimize)
     assert isinstance(df.expr, Expr)
     assert_eq(df, ddf)
 
-    # Check deprecated API
-    with pytest.warns(FutureWarning, match="deprecated"):
-        df2 = from_dask_dataframe(ddf, optimize=optimize)
-        assert_eq(df, df2)
-
 
 @pytest.mark.parametrize("optimize", [True, False])
 def test_to_legacy_dataframe(optimize):
     pdf = pd.DataFrame({"x": [1, 4, 3, 2, 0, 5]})
     df = from_pandas(pdf, npartitions=2)
-    ddf = df.to_legacy_dataframe(optimize=optimize)
+    with pytest.warns(FutureWarning, match="is deprecated"):
+        ddf = df.to_legacy_dataframe(optimize=optimize)
     assert isinstance(ddf, dd.core.DataFrame)
     assert_eq(df, ddf)
 
-    # Check deprecated API
-    with pytest.warns(FutureWarning, match="deprecated"):
-        ddf2 = df.to_dask_dataframe(optimize=optimize)
-        assert_eq(ddf, ddf2)
-
 
 @pytest.mark.parametrize("optimize", [True, False])
 def test_to_dask_array(optimize):
@@ -470,3 +461,41 @@
         obj = meta.schema
 
     assert normalizer(obj) == normalizer(obj)
+
+
+@pytest.mark.parametrize("lengths", [[2, 2], True])
+def test_to_records_with_lengths(lengths):
+    pytest.importorskip("dask.array")
+    from dask.array.utils import assert_eq
+
+    df = pd.DataFrame(
+        {"x": ["a", "b", "c", "d"], "y": [2, 3, 4, 5]},
+        index=pd.Index([1.0, 2.0, 3.0, 4.0], name="ind"),
+    )
+    ddf = dd.from_pandas(df, 2)
+
+    result = ddf.to_records(lengths=lengths)
+    assert_eq(df.to_records(), result, check_type=False)  # TODO: make 
check_type pass
+
+    assert isinstance(result, da.Array)
+
+    expected_chunks = ((2, 2),)
+
+    assert result.chunks == expected_chunks
+
+
+def test_to_bag():
+    a = pd.DataFrame(
+        {"x": ["a", "b", "c", "d"], "y": [2, 3, 4, 5]},
+        index=pd.Index([1.0, 2.0, 3.0, 4.0], name="ind"),
+    )
+    ddf = dd.from_pandas(a, 2)
+
+    assert ddf.to_bag().compute() == list(a.itertuples(False))
+    assert ddf.to_bag(True).compute() == list(a.itertuples(True))
+    assert ddf.to_bag(format="dict").compute() == [
+        {"x": "a", "y": 2},
+        {"x": "b", "y": 3},
+        {"x": "c", "y": 4},
+        {"x": "d", "y": 5},
+    ]
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/tests/test_collection.py 
new/dask-expr-1.1.20/dask_expr/tests/test_collection.py
--- old/dask-expr-1.1.19/dask_expr/tests/test_collection.py     2024-11-13 
16:16:32.000000000 +0100
+++ new/dask-expr-1.1.20/dask_expr/tests/test_collection.py     2024-12-03 
23:40:36.000000000 +0100
@@ -2651,6 +2651,44 @@
     assert_eq(ddf.melt(**kwargs), pdf.melt(**kwargs), check_index=False)
 
 
+def test_assign_projection_mix():
+    data = {
+        "date": [
+            "2024-03-22 18:13:36.801000",
+            "2024-03-22 18:14:11.457000",
+            "2024-04-02 06:05:01.658000",
+            "2024-04-02 06:05:04.870000",
+            "2024-04-03 06:11:30.202000",
+        ],
+        "code": [1.0, 3.0, 6.0, 6.0, 8.0],
+        "first": pd.NaT,
+        "first_2": pd.NaT,
+        "second": pd.NaT,
+        "second_2": pd.NaT,
+        "third": pd.NaT,
+    }
+    df = pd.DataFrame(data)
+    df["date"] = pd.to_datetime(df["date"])
+
+    df = from_pandas(df)
+
+    def apply_func(x):
+        return x
+
+    event_columns = {1: ["first", "first_2"], 2: ["second", "second_2"], 3: 
["third"]}
+
+    for event_code, columns in event_columns.items():
+        mask = df["code"] == event_code
+        df[columns[0]] = df[columns[0]].mask(cond=(mask), other=df["date"])
+        if len(columns) == 2:
+            df[columns[1]] = df[columns[1]].mask(cond=(mask), other=df["date"])
+            df[columns[1]] = df[columns[1]].apply(apply_func)
+
+    df = df.drop(columns=["code", "date"])
+    result = df.optimize(fuse=False)
+    assert result.expr._depth() == 13.0  # this grew exponentially previously
+
+
 def test_dropna_merge(df, pdf):
     dropped_na = df.dropna(subset=["x"])
     result = dropped_na.merge(dropped_na, on="x")
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/tests/test_groupby.py 
new/dask-expr-1.1.20/dask_expr/tests/test_groupby.py
--- old/dask-expr-1.1.19/dask_expr/tests/test_groupby.py        2024-11-13 
16:16:32.000000000 +0100
+++ new/dask-expr-1.1.20/dask_expr/tests/test_groupby.py        2024-12-03 
23:40:36.000000000 +0100
@@ -157,6 +157,13 @@
     assert_eq(agg, expect)
 
 
+def test_value_counts_split_out(pdf):
+    df = from_pandas(pdf, npartitions=10)
+    result = df.groupby("x").y.value_counts(split_out=True)
+    expected = pdf.groupby("x").y.value_counts()
+    assert_eq(result, expected)
+
+
 def test_unique(df, pdf):
     result = df.groupby("x")["y"].unique()
     expected = pdf.groupby("x")["y"].unique()
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/tests/test_indexing.py 
new/dask-expr-1.1.20/dask_expr/tests/test_indexing.py
--- old/dask-expr-1.1.19/dask_expr/tests/test_indexing.py       2024-11-13 
16:16:32.000000000 +0100
+++ new/dask-expr-1.1.20/dask_expr/tests/test_indexing.py       2024-12-03 
23:40:36.000000000 +0100
@@ -35,15 +35,6 @@
         df.iloc[(1, 2, 3)]
 
 
-def test_loc(df, pdf):
-    assert_eq(df.loc[:, "x"], pdf.loc[:, "x"])
-    assert_eq(df.loc[:, ["x"]], pdf.loc[:, ["x"]])
-    assert_eq(df.loc[:, []], pdf.loc[:, []])
-
-    assert_eq(df.loc[df.y == 20, "x"], pdf.loc[pdf.y == 20, "x"])
-    assert_eq(df.loc[df.y == 20, ["x"]], pdf.loc[pdf.y == 20, ["x"]])
-
-
 def test_loc_slice(pdf, df):
     pdf.columns = [10, 20]
     df.columns = [10, 20]
@@ -86,6 +77,12 @@
 
 
 def test_loc(df, pdf):
+    assert_eq(df.loc[:, "x"], pdf.loc[:, "x"])
+    assert_eq(df.loc[:, ["x"]], pdf.loc[:, ["x"]])
+    assert_eq(df.loc[:, []], pdf.loc[:, []])
+
+    assert_eq(df.loc[df.y == 20, "x"], pdf.loc[pdf.y == 20, "x"])
+    assert_eq(df.loc[df.y == 20, ["x"]], pdf.loc[pdf.y == 20, ["x"]])
     assert df.loc[3:8].divisions[0] == 3
     assert df.loc[3:8].divisions[-1] == 8
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-expr-1.1.19/dask_expr/tests/test_reductions.py 
new/dask-expr-1.1.20/dask_expr/tests/test_reductions.py
--- old/dask-expr-1.1.19/dask_expr/tests/test_reductions.py     2024-11-13 
16:16:32.000000000 +0100
+++ new/dask-expr-1.1.20/dask_expr/tests/test_reductions.py     2024-12-03 
23:40:36.000000000 +0100
@@ -146,10 +146,10 @@
 
 
 @pytest.mark.parametrize(
-    "split_every,expect_tasks", [(False, 53), (None, 57), (5, 57), (2, 73)]
+    "split_every, expect_tasks", [(False, 53), (None, 57), (5, 57), (2, 73)]
 )
 def test_dataframe_mode_split_every(pdf, df, split_every, expect_tasks):
-    assert_eq(df.to_legacy_dataframe().mode(split_every=split_every), 
pdf.mode())
+    assert_eq(df.mode(split_every=split_every), pdf.mode())
     q = df.mode(split_every=split_every).optimize(fuse=False)
     assert len(q.__dask_graph__()) == expect_tasks
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-expr-1.1.19/pyproject.toml 
new/dask-expr-1.1.20/pyproject.toml
--- old/dask-expr-1.1.19/pyproject.toml 2024-11-13 16:16:32.000000000 +0100
+++ new/dask-expr-1.1.20/pyproject.toml 2024-12-03 23:40:36.000000000 +0100
@@ -26,7 +26,7 @@
 readme = "README.md"
 requires-python = ">=3.10"
 dependencies = [
-    "dask == 2024.11.2",
+    "dask == 2024.12.0",
     "pyarrow>=14.0.1",
     "pandas >= 2",
 ]

Reply via email to