This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 71c68ca041d Fix numeric_only logic in frames_test for Pandas 2 (#28422)
71c68ca041d is described below
commit 71c68ca041d15a53e481238ad2d6da11ee6a6820
Author: caneff <[email protected]>
AuthorDate: Mon Sep 18 18:02:10 2023 -0400
Fix numeric_only logic in frames_test for Pandas 2 (#28422)
---
sdks/python/apache_beam/dataframe/frame_base.py | 22 ++++--
.../apache_beam/dataframe/frame_base_test.py | 47 ++++++++++++-
sdks/python/apache_beam/dataframe/frames.py | 4 +-
sdks/python/apache_beam/dataframe/frames_test.py | 80 ++++++++++++++++------
4 files changed, 123 insertions(+), 30 deletions(-)
diff --git a/sdks/python/apache_beam/dataframe/frame_base.py
b/sdks/python/apache_beam/dataframe/frame_base.py
index 1da12ececff..48a4c29d058 100644
--- a/sdks/python/apache_beam/dataframe/frame_base.py
+++ b/sdks/python/apache_beam/dataframe/frame_base.py
@@ -500,6 +500,8 @@ def args_to_kwargs(base_type, removed_method=False,
removed_args=None):
removed_arg_names = removed_args if removed_args is not None else []
+ # We would need to add position only arguments if they ever become a thing
+ # in Pandas (as of 2.1 currently they aren't).
base_arg_spec = getfullargspec(unwrap(getattr(base_type, func.__name__)))
base_arg_names = base_arg_spec.args
# Some arguments are keyword only and we still want to check against those.
@@ -514,6 +516,9 @@ def args_to_kwargs(base_type, removed_method=False,
removed_args=None):
@functools.wraps(func)
def wrapper(*args, **kwargs):
+ if len(args) > len(base_arg_names):
+ raise TypeError(f"{func.__name__} got too many positioned arguments.")
+
for name, value in zip(base_arg_names, args):
if name in kwargs:
raise TypeError(
@@ -523,7 +528,7 @@ def args_to_kwargs(base_type, removed_method=False,
removed_args=None):
# Still have to populate these for the Beam function signature.
if removed_args:
for name in removed_args:
- if not name in kwargs:
+ if name not in kwargs:
kwargs[name] = None
return func(**kwargs)
@@ -646,13 +651,18 @@ def populate_defaults(base_type, removed_method=False,
removed_args=None):
return func
base_argspec = getfullargspec(unwrap(getattr(base_type, func.__name__)))
- if not base_argspec.defaults:
+ if not base_argspec.defaults and not base_argspec.kwonlydefaults:
return func
- arg_to_default = dict(
- zip(
- base_argspec.args[-len(base_argspec.defaults):],
- base_argspec.defaults))
+ arg_to_default = {}
+ if base_argspec.defaults:
+ arg_to_default.update(
+ zip(
+ base_argspec.args[-len(base_argspec.defaults):],
+ base_argspec.defaults))
+
+ if base_argspec.kwonlydefaults:
+ arg_to_default.update(base_argspec.kwonlydefaults)
unwrapped_func = unwrap(func)
# args that do not have defaults in func, but do have defaults in base
diff --git a/sdks/python/apache_beam/dataframe/frame_base_test.py
b/sdks/python/apache_beam/dataframe/frame_base_test.py
index 2d16d02ba1e..b3077320720 100644
--- a/sdks/python/apache_beam/dataframe/frame_base_test.py
+++ b/sdks/python/apache_beam/dataframe/frame_base_test.py
@@ -72,7 +72,7 @@ class FrameBaseTest(unittest.TestCase):
def test_args_to_kwargs(self):
class Base(object):
- def func(self, a=1, b=2, c=3):
+ def func(self, a=1, b=2, c=3, *, kw_only=4):
pass
class Proxy(object):
@@ -87,6 +87,9 @@ class FrameBaseTest(unittest.TestCase):
self.assertEqual(proxy.func(2, 4, 6), {'a': 2, 'b': 4, 'c': 6})
self.assertEqual(proxy.func(2, c=6), {'a': 2, 'c': 6})
self.assertEqual(proxy.func(c=6, a=2), {'a': 2, 'c': 6})
+ self.assertEqual(proxy.func(2, kw_only=20), {'a': 2, 'kw_only': 20})
+ with self.assertRaises(TypeError): # got too many positioned arguments
+ proxy.func(2, 4, 6, 8)
def test_args_to_kwargs_populates_defaults(self):
class Base(object):
@@ -129,6 +132,48 @@ class FrameBaseTest(unittest.TestCase):
proxy.func_removed_args()
self.assertEqual(proxy.func_removed_args(12, d=100), {'a': 12, 'd': 100})
+ def test_args_to_kwargs_populates_default_handles_kw_only(self):
+ class Base(object):
+ def func(self, a, b=2, c=3, *, kw_only=4):
+ pass
+
+ class ProxyUsesKwOnly(object):
+ @frame_base.args_to_kwargs(Base)
+ @frame_base.populate_defaults(Base)
+ def func(self, a, kw_only, **kwargs):
+ return dict(kwargs, a=a, kw_only=kw_only)
+
+ proxy = ProxyUsesKwOnly()
+
+ # pylint: disable=too-many-function-args,no-value-for-parameter
+ with self.assertRaises(TypeError): # missing 1 required positional
argument
+ proxy.func()
+
+ self.assertEqual(proxy.func(100), {'a': 100, 'kw_only': 4})
+ self.assertEqual(
+ proxy.func(2, 4, 6, kw_only=8), {
+ 'a': 2, 'b': 4, 'c': 6, 'kw_only': 8
+ })
+ with self.assertRaises(TypeError):
+ proxy.func(2, 4, 6, 8) # got too many positioned arguments
+
+ class ProxyDoesntUseKwOnly(object):
+ @frame_base.args_to_kwargs(Base)
+ @frame_base.populate_defaults(Base)
+ def func(self, a, **kwargs):
+ return dict(kwargs, a=a)
+
+ proxy = ProxyDoesntUseKwOnly()
+
+ # pylint: disable=too-many-function-args,no-value-for-parameter
+ with self.assertRaises(TypeError): # missing 1 required positional
argument
+ proxy.func()
+ self.assertEqual(proxy.func(100), {'a': 100})
+ self.assertEqual(
+ proxy.func(2, 4, 6, kw_only=8), {
+ 'a': 2, 'b': 4, 'c': 6, 'kw_only': 8
+ })
+
if __name__ == '__main__':
unittest.main()
diff --git a/sdks/python/apache_beam/dataframe/frames.py
b/sdks/python/apache_beam/dataframe/frames.py
index a0a99c12ecd..34fc17553b0 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -907,7 +907,7 @@ class DeferredDataFrameOrSeries(frame_base.DeferredFrame):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'sort_index',
- lambda df: df.sort_index(axis, **kwargs),
+ lambda df: df.sort_index(axis=axis, **kwargs),
[self._expr],
requires_partition_by=partitionings.Arbitrary(),
preserves_partition_by=partitionings.Arbitrary(),
@@ -2689,7 +2689,7 @@ class DeferredDataFrame(DeferredDataFrameOrSeries):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'set_axis',
- lambda df: df.set_axis(labels, axis, **kwargs),
+ lambda df: df.set_axis(labels, axis=axis, **kwargs),
[self._expr],
requires_partition_by=partitionings.Arbitrary(),
preserves_partition_by=partitionings.Arbitrary()))
diff --git a/sdks/python/apache_beam/dataframe/frames_test.py
b/sdks/python/apache_beam/dataframe/frames_test.py
index 4e59d1da5de..b8b8cb733ef 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -17,6 +17,7 @@
import re
import unittest
import warnings
+from typing import Dict
import numpy as np
import pandas as pd
@@ -1634,6 +1635,30 @@ class DeferredFrameTest(_AbstractFrameTest):
# https://github.com/pandas-dev/pandas/issues/40139
ALL_GROUPING_AGGREGATIONS = sorted(
set(frames.ALL_AGGREGATIONS) - set(('kurt', 'kurtosis')))
+AGGREGATIONS_WHERE_NUMERIC_ONLY_DEFAULTS_TO_TRUE_IN_PANDAS_1 = set(
+ frames.ALL_AGGREGATIONS) - set((
+ 'nunique',
+ 'size',
+ 'count',
+ 'idxmin',
+ 'idxmax',
+ 'mode',
+ 'rank',
+ 'all',
+ 'any',
+ 'describe'))
+
+
+def numeric_only_kwargs_for_pandas_2(agg_type: str) -> Dict[str, bool]:
+ """Get proper arguments for numeric_only.
+
+ Behavior for numeric_only in these methods changed in Pandas 2 to default
+ to False instead of True, so explicitly make it True in Pandas 2."""
+ if PD_VERSION >= (2, 0) and (
+ agg_type in
AGGREGATIONS_WHERE_NUMERIC_ONLY_DEFAULTS_TO_TRUE_IN_PANDAS_1):
+ return {'numeric_only': True}
+ else:
+ return {}
class GroupByTest(_AbstractFrameTest):
@@ -1650,8 +1675,9 @@ class GroupByTest(_AbstractFrameTest):
self.skipTest(
"https://github.com/apache/beam/issues/20967: proxy generation of "
"DataFrameGroupBy.describe fails in pandas < 1.2")
+ kwargs = numeric_only_kwargs_for_pandas_2(agg_type)
self._run_test(
- lambda df: df.groupby('group').agg(agg_type),
+ lambda df: df.groupby('group').agg(agg_type, **kwargs),
GROUPBY_DF,
check_proxy=False)
@@ -1661,8 +1687,10 @@ class GroupByTest(_AbstractFrameTest):
self.skipTest(
"https://github.com/apache/beam/issues/20967: proxy generation of "
"DataFrameGroupBy.describe fails in pandas < 1.2")
+ kwargs = numeric_only_kwargs_for_pandas_2(agg_type)
self._run_test(
- lambda df: getattr(df[df.foo > 30].groupby('group'), agg_type)(),
+ lambda df: getattr(df[df.foo > 30].groupby('group'), agg_type)
+ (**kwargs),
GROUPBY_DF,
check_proxy=False)
@@ -1673,8 +1701,9 @@ class GroupByTest(_AbstractFrameTest):
"https://github.com/apache/beam/issues/20967: proxy generation of "
"DataFrameGroupBy.describe fails in pandas < 1.2")
+ kwargs = numeric_only_kwargs_for_pandas_2(agg_type)
self._run_test(
- lambda df: getattr(df.groupby('group'), agg_type)(),
+ lambda df: getattr(df.groupby('group'), agg_type)(**kwargs),
GROUPBY_DF,
check_proxy=False)
@@ -1685,8 +1714,10 @@ class GroupByTest(_AbstractFrameTest):
"https://github.com/apache/beam/issues/20967: proxy generation of "
"DataFrameGroupBy.describe fails in pandas < 1.2")
+ kwargs = numeric_only_kwargs_for_pandas_2(agg_type)
self._run_test(
- lambda df: getattr(df[df.foo > 40].groupby(df.group), agg_type)(),
+ lambda df: getattr(df[df.foo > 40].groupby(df.group), agg_type)
+ (**kwargs),
GROUPBY_DF,
check_proxy=False)
@@ -1717,12 +1748,15 @@ class GroupByTest(_AbstractFrameTest):
"https://github.com/apache/beam/issues/20895: "
"SeriesGroupBy.{corr, cov} do not raise the expected error.")
- self._run_test(lambda df: getattr(df.groupby('group').foo, agg_type)(), df)
- self._run_test(lambda df: getattr(df.groupby('group').bar, agg_type)(), df)
+ kwargs = numeric_only_kwargs_for_pandas_2(agg_type)
self._run_test(
- lambda df: getattr(df.groupby('group')['foo'], agg_type)(), df)
+ lambda df: getattr(df.groupby('group').foo, agg_type)(**kwargs), df)
self._run_test(
- lambda df: getattr(df.groupby('group')['bar'], agg_type)(), df)
+ lambda df: getattr(df.groupby('group').bar, agg_type)(**kwargs), df)
+ self._run_test(
+ lambda df: getattr(df.groupby('group')['foo'], agg_type)(**kwargs), df)
+ self._run_test(
+ lambda df: getattr(df.groupby('group')['bar'], agg_type)(**kwargs), df)
@parameterized.expand(ALL_GROUPING_AGGREGATIONS)
def test_groupby_project_dataframe(self, agg_type):
@@ -1730,8 +1764,10 @@ class GroupByTest(_AbstractFrameTest):
self.skipTest(
"https://github.com/apache/beam/issues/20967: proxy generation of "
"DataFrameGroupBy.describe fails in pandas < 1.2")
+ kwargs = numeric_only_kwargs_for_pandas_2(agg_type)
self._run_test(
- lambda df: getattr(df.groupby('group')[['bar', 'baz']], agg_type)(),
+ lambda df: getattr(df.groupby('group')[['bar', 'baz']], agg_type)
+ (**kwargs),
GROUPBY_DF,
check_proxy=False)
@@ -1760,9 +1796,10 @@ class GroupByTest(_AbstractFrameTest):
def test_groupby_callable(self):
df = GROUPBY_DF
-
- self._run_test(lambda df: df.groupby(lambda x: x % 2).foo.sum(), df)
- self._run_test(lambda df: df.groupby(lambda x: x % 5).median(), df)
+ kwargs = numeric_only_kwargs_for_pandas_2('sum')
+ self._run_test(lambda df: df.groupby(lambda x: x % 2).foo.sum(**kwargs),
df)
+ kwargs = numeric_only_kwargs_for_pandas_2('median')
+ self._run_test(lambda df: df.groupby(lambda x: x % 5).median(**kwargs), df)
def test_groupby_apply(self):
df = GROUPBY_DF
@@ -1817,8 +1854,9 @@ class GroupByTest(_AbstractFrameTest):
def test_groupby_pipe(self):
df = GROUPBY_DF
-
- self._run_test(lambda df: df.groupby('group').pipe(lambda x: x.sum()), df)
+ kwargs = numeric_only_kwargs_for_pandas_2('sum')
+ self._run_test(
+ lambda df: df.groupby('group').pipe(lambda x: x.sum(**kwargs)), df)
self._run_test(
lambda df: df.groupby('group')['bool'].pipe(lambda x: x.any()), df)
self._run_test(
@@ -1900,14 +1938,14 @@ class GroupByTest(_AbstractFrameTest):
self.skipTest(
"https://github.com/apache/beam/issues/20967: proxy generation of "
"DataFrameGroupBy.describe fails in pandas < 1.2")
+
+ def agg(df, group_by):
+ kwargs = numeric_only_kwargs_for_pandas_2(agg_type)
+ return df[df.foo > 40].groupby(group_by).agg(agg_type, **kwargs)
+
+ self._run_test(lambda df: agg(df, df.group), GROUPBY_DF, check_proxy=False)
self._run_test(
- lambda df: df[df.foo > 40].groupby(df.group).agg(agg_type),
- GROUPBY_DF,
- check_proxy=False)
- self._run_test(
- lambda df: df[df.foo > 40].groupby(df.foo % 3).agg(agg_type),
- GROUPBY_DF,
- check_proxy=False)
+ lambda df: agg(df, df.foo % 3), GROUPBY_DF, check_proxy=False)
@parameterized.expand(ALL_GROUPING_AGGREGATIONS)
def test_series_groupby_series(self, agg_type):