This is an automated email from the ASF dual-hosted git repository. chamikara pushed a commit to branch revert-15385-rollback-dataframes in repository https://gitbox.apache.org/repos/asf/beam.git
commit 0766b5c05cab37f3b77af625507a7ffb4835ce53 Author: Chamikara Jayalath <[email protected]> AuthorDate: Wed Aug 25 20:42:24 2021 -0700 Revert "[BEAM-12764] Revert "Merge pull request #15165 from [BEAM-12593] Veri…" --- sdks/python/apache_beam/dataframe/frames.py | 21 +++++--- sdks/python/apache_beam/dataframe/frames_test.py | 46 +++++++++++++++-- .../apache_beam/dataframe/pandas_doctests_test.py | 58 ++++++++++++++++++---- sdks/python/setup.py | 2 +- 4 files changed, 105 insertions(+), 22 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py index b834d9c..45ae8c6 100644 --- a/sdks/python/apache_beam/dataframe/frames.py +++ b/sdks/python/apache_beam/dataframe/frames.py @@ -55,6 +55,9 @@ __all__ = [ 'DeferredDataFrame', ] +# Get major, minor version +PD_VERSION = tuple(map(int, pd.__version__.split('.')[0:2])) + def populate_not_implemented(pd_type): def wrapper(deferred_type): @@ -1932,7 +1935,7 @@ class DeferredSeries(DeferredDataFrameOrSeries): else: column = self - result = column.groupby(column).size() + result = column.groupby(column, dropna=dropna).size() # groupby.size() names the index, which we don't need result.index.name = None @@ -2392,8 +2395,8 @@ class DeferredDataFrame(DeferredDataFrameOrSeries): if func in ('quantile',): return getattr(self, func)(*args, axis=axis, **kwargs) - # Maps to a property, args are ignored - if func in ('size',): + # In pandas<1.3.0, maps to a property, args are ignored + if func in ('size',) and PD_VERSION < (1, 3): return getattr(self, func) # We also have specialized distributed implementations for these. They only @@ -3392,7 +3395,7 @@ class DeferredDataFrame(DeferredDataFrameOrSeries): @frame_base.with_docs_from(pd.DataFrame) def value_counts(self, subset=None, sort=False, normalize=False, - ascending=False): + ascending=False, dropna=True): """``sort`` is ``False`` by default, and ``sort=True`` is not supported because it imposes an ordering on the dataset which likely will not be preserved.""" @@ -3403,10 +3406,16 @@ class DeferredDataFrame(DeferredDataFrameOrSeries): "ordering on the dataset which likely will not be preserved.", reason="order-sensitive") columns = subset or list(self.columns) - result = self.groupby(columns).size() + + if dropna: + dropped = self.dropna() + else: + dropped = self + + result = dropped.groupby(columns, dropna=dropna).size() if normalize: - return result/self.dropna().length() + return result/dropped.length() else: return result diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py index c3972ad..a2703d8 100644 --- a/sdks/python/apache_beam/dataframe/frames_test.py +++ b/sdks/python/apache_beam/dataframe/frames_test.py @@ -25,7 +25,8 @@ from apache_beam.dataframe import expressions from apache_beam.dataframe import frame_base from apache_beam.dataframe import frames -PD_VERSION = tuple(map(int, pd.__version__.split('.'))) +# Get major, minor version +PD_VERSION = tuple(map(int, pd.__version__.split('.')[0:2])) GROUPBY_DF = pd.DataFrame({ 'group': ['a' if i % 5 == 0 or i % 3 == 0 else 'b' for i in range(100)], @@ -235,6 +236,17 @@ class DeferredFrameTest(_AbstractFrameTest): self._run_test( lambda df, df2: df.subtract(2).multiply(df2).divide(df), df, df2) + @unittest.skipIf(PD_VERSION < (1, 3), "dropna=False is new in pandas 1.3") + def test_value_counts_dropna_false(self): + df = pd.DataFrame({ + 'first_name': ['John', 'Anne', 'John', 'Beth'], + 'middle_name': ['Smith', pd.NA, pd.NA, 'Louise'] + }) + # TODO(BEAM-12495): Remove the assertRaises this when the underlying bug in + # https://github.com/pandas-dev/pandas/issues/36470 is fixed. + with self.assertRaises(NotImplementedError): + self._run_test(lambda df: df.value_counts(dropna=False), df) + def test_get_column(self): df = pd.DataFrame({ 'Animal': ['Falcon', 'Falcon', 'Parrot', 'Parrot'], @@ -369,10 +381,15 @@ class DeferredFrameTest(_AbstractFrameTest): nonparallel=True) def test_combine_Series(self): - with expressions.allow_non_parallel_operations(): - s1 = pd.Series({'falcon': 330.0, 'eagle': 160.0}) - s2 = pd.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0}) - self._run_test(lambda s1, s2: s1.combine(s2, max), s1, s2) + s1 = pd.Series({'falcon': 330.0, 'eagle': 160.0}) + s2 = pd.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0}) + self._run_test( + lambda s1, + s2: s1.combine(s2, max), + s1, + s2, + nonparallel=True, + check_proxy=False) def test_combine_first_dataframe(self): df1 = pd.DataFrame({'A': [None, 0], 'B': [None, 4]}) @@ -587,8 +604,27 @@ class DeferredFrameTest(_AbstractFrameTest): self._run_test(lambda df: df.value_counts(), df) self._run_test(lambda df: df.value_counts(normalize=True), df) + if PD_VERSION >= (1, 3): + # dropna=False is new in pandas 1.3 + # TODO(BEAM-12495): Remove the assertRaises this when the underlying bug + # in https://github.com/pandas-dev/pandas/issues/36470 is fixed. + with self.assertRaises(NotImplementedError): + self._run_test(lambda df: df.value_counts(dropna=False), df) + + # Test the defaults. self._run_test(lambda df: df.num_wings.value_counts(), df) self._run_test(lambda df: df.num_wings.value_counts(normalize=True), df) + self._run_test(lambda df: df.num_wings.value_counts(dropna=False), df) + + # Test the combination interactions. + for normalize in (True, False): + for dropna in (True, False): + self._run_test( + lambda df, + dropna=dropna, + normalize=normalize: df.num_wings.value_counts( + dropna=dropna, normalize=normalize), + df) def test_value_counts_does_not_support_sort(self): df = pd.DataFrame({ diff --git a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py index edc42f1..755e4e5 100644 --- a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py +++ b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py @@ -20,6 +20,7 @@ import unittest import pandas as pd from apache_beam.dataframe import doctests +from apache_beam.dataframe.frames import PD_VERSION from apache_beam.dataframe.pandas_top_level_functions import _is_top_level_function @@ -68,7 +69,8 @@ class DoctestTest(unittest.TestCase): "df.replace(regex={r'^ba.$': 'new', 'foo': 'xyz'})" ], 'pandas.core.generic.NDFrame.fillna': [ - "df.fillna(method='ffill')", + 'df.fillna(method=\'ffill\')', + 'df.fillna(method="ffill")', 'df.fillna(value=values, limit=1)', ], 'pandas.core.generic.NDFrame.sort_values': ['*'], @@ -164,7 +166,8 @@ class DoctestTest(unittest.TestCase): 'pandas.core.frame.DataFrame.cumprod': ['*'], 'pandas.core.frame.DataFrame.diff': ['*'], 'pandas.core.frame.DataFrame.fillna': [ - "df.fillna(method='ffill')", + 'df.fillna(method=\'ffill\')', + 'df.fillna(method="ffill")', 'df.fillna(value=values, limit=1)', ], 'pandas.core.frame.DataFrame.items': ['*'], @@ -237,6 +240,8 @@ class DoctestTest(unittest.TestCase): # reindex not supported 's2 = s.reindex([1, 0, 2, 3])', ], + 'pandas.core.frame.DataFrame.resample': ['*'], + 'pandas.core.frame.DataFrame.values': ['*'], }, not_implemented_ok={ 'pandas.core.frame.DataFrame.transform': [ @@ -244,6 +249,8 @@ class DoctestTest(unittest.TestCase): # frames_test.py::DeferredFrameTest::test_groupby_transform_sum "df.groupby('Date')['Data'].transform('sum')", ], + 'pandas.core.frame.DataFrame.swaplevel': ['*'], + 'pandas.core.frame.DataFrame.melt': ['*'], 'pandas.core.frame.DataFrame.reindex_axis': ['*'], 'pandas.core.frame.DataFrame.round': [ 'df.round(decimals)', @@ -267,6 +274,11 @@ class DoctestTest(unittest.TestCase): 'pandas.core.frame.DataFrame.set_index': [ "df.set_index([s, s**2])", ], + + # TODO(BEAM-12495) + 'pandas.core.frame.DataFrame.value_counts': [ + 'df.value_counts(dropna=False)' + ], }, skip={ # s2 created with reindex @@ -274,6 +286,8 @@ class DoctestTest(unittest.TestCase): 'df.dot(s2)', ], + 'pandas.core.frame.DataFrame.resample': ['df'], + 'pandas.core.frame.DataFrame.asfreq': ['*'], # Throws NotImplementedError when modifying df 'pandas.core.frame.DataFrame.axes': [ # Returns deferred index. @@ -302,6 +316,14 @@ class DoctestTest(unittest.TestCase): 'pandas.core.frame.DataFrame.to_markdown': ['*'], 'pandas.core.frame.DataFrame.to_parquet': ['*'], + # Raises right exception, but testing framework has matching issues. + # Tested in `frames_test.py`. + 'pandas.core.frame.DataFrame.insert': [ + 'df', + 'df.insert(1, "newcol", [99, 99])', + 'df.insert(0, "col1", [100, 100], allow_duplicates=True)' + ], + 'pandas.core.frame.DataFrame.to_records': [ 'df.index = df.index.rename("I")', 'index_dtypes = f"<S{df.index.str.len().max()}"', # 1.x @@ -385,7 +407,8 @@ class DoctestTest(unittest.TestCase): 's.dot(arr)', # non-deferred result ], 'pandas.core.series.Series.fillna': [ - "df.fillna(method='ffill')", + 'df.fillna(method=\'ffill\')', + 'df.fillna(method="ffill")', 'df.fillna(value=values, limit=1)', ], 'pandas.core.series.Series.items': ['*'], @@ -434,11 +457,11 @@ class DoctestTest(unittest.TestCase): 's.drop_duplicates()', "s.drop_duplicates(keep='last')", ], - 'pandas.core.series.Series.repeat': [ - 's.repeat([1, 2, 3])' - ], 'pandas.core.series.Series.reindex': ['*'], 'pandas.core.series.Series.autocorr': ['*'], + 'pandas.core.series.Series.repeat': ['s.repeat([1, 2, 3])'], + 'pandas.core.series.Series.resample': ['*'], + 'pandas.core.series.Series': ['ser.iloc[0] = 999'], }, not_implemented_ok={ 'pandas.core.series.Series.transform': [ @@ -451,8 +474,11 @@ class DoctestTest(unittest.TestCase): 'ser.groupby(["a", "b", "a", np.nan]).mean()', 'ser.groupby(["a", "b", "a", np.nan], dropna=False).mean()', ], + 'pandas.core.series.Series.swaplevel' :['*'] }, skip={ + # Relies on setting values with iloc + 'pandas.core.series.Series': ['ser', 'r'], 'pandas.core.series.Series.groupby': [ # TODO(BEAM-11393): This example requires aligning two series # with non-unique indexes. It only works in pandas because @@ -460,6 +486,7 @@ class DoctestTest(unittest.TestCase): # alignment. 'ser.groupby(ser > 100).mean()', ], + 'pandas.core.series.Series.asfreq': ['*'], # error formatting 'pandas.core.series.Series.append': [ 's1.append(s2, verify_integrity=True)', @@ -491,12 +518,12 @@ class DoctestTest(unittest.TestCase): # Inspection after modification. 's' ], + 'pandas.core.series.Series.resample': ['df'], }) self.assertEqual(result.failed, 0) def test_string_tests(self): - PD_VERSION = tuple(int(v) for v in pd.__version__.split('.')) - if PD_VERSION < (1, 2, 0): + if PD_VERSION < (1, 2): module = pd.core.strings else: # Definitions were moved to accessor in pandas 1.2.0 @@ -668,11 +695,13 @@ class DoctestTest(unittest.TestCase): 'pandas.core.groupby.generic.SeriesGroupBy.diff': ['*'], 'pandas.core.groupby.generic.DataFrameGroupBy.hist': ['*'], 'pandas.core.groupby.generic.DataFrameGroupBy.fillna': [ - "df.fillna(method='ffill')", + 'df.fillna(method=\'ffill\')', + 'df.fillna(method="ffill")', 'df.fillna(value=values, limit=1)', ], 'pandas.core.groupby.generic.SeriesGroupBy.fillna': [ - "df.fillna(method='ffill')", + 'df.fillna(method=\'ffill\')', + 'df.fillna(method="ffill")', 'df.fillna(value=values, limit=1)', ], }, @@ -682,6 +711,7 @@ class DoctestTest(unittest.TestCase): 'pandas.core.groupby.generic.SeriesGroupBy.transform': ['*'], 'pandas.core.groupby.generic.SeriesGroupBy.idxmax': ['*'], 'pandas.core.groupby.generic.SeriesGroupBy.idxmin': ['*'], + 'pandas.core.groupby.generic.SeriesGroupBy.apply': ['*'], }, skip={ 'pandas.core.groupby.generic.SeriesGroupBy.cov': [ @@ -698,6 +728,14 @@ class DoctestTest(unittest.TestCase): # These examples rely on grouping by a list 'pandas.core.groupby.generic.SeriesGroupBy.aggregate': ['*'], 'pandas.core.groupby.generic.DataFrameGroupBy.aggregate': ['*'], + 'pandas.core.groupby.generic.SeriesGroupBy.transform': [ + # Dropping invalid columns during a transform is unsupported. + 'grouped.transform(lambda x: (x - x.mean()) / x.std())' + ], + 'pandas.core.groupby.generic.DataFrameGroupBy.transform': [ + # Dropping invalid columns during a transform is unsupported. + 'grouped.transform(lambda x: (x - x.mean()) / x.std())' + ], }) self.assertEqual(result.failed, 0) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index f4e02b8..338251d 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -166,7 +166,7 @@ if sys.platform == 'win32' and sys.maxsize <= 2**32: REQUIRED_TEST_PACKAGES = [ 'freezegun>=0.3.12', 'mock>=1.0.1,<3.0.0', - 'pandas>=1.0,<1.3.0', + 'pandas>=1.0,<1.4.0', 'parameterized>=0.7.1,<0.8.0', 'pyhamcrest>=1.9,!=1.10.0,<2.0.0', 'pyyaml>=3.12,<6.0.0',
