This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 42677d5  [BEAM-9547] Add support for `Series.cat` methods (#14934)
42677d5 is described below

commit 42677d565aacf4c6a7036387b33a8f144458c7d0
Author: Brian Hulette <[email protected]>
AuthorDate: Mon Jun 14 11:52:00 2021 -0700

    [BEAM-9547] Add support for `Series.cat` methods (#14934)
    
    * Add support for Series.cat methods
    
    * Disallow astype('category')
---
 sdks/python/apache_beam/dataframe/frames.py      | 107 ++++++++++++++++++++++-
 sdks/python/apache_beam/dataframe/frames_test.py |  46 ++++++++--
 2 files changed, 146 insertions(+), 7 deletions(-)

diff --git a/sdks/python/apache_beam/dataframe/frames.py 
b/sdks/python/apache_beam/dataframe/frames.py
index 3e13dd7..64d529a 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -423,8 +423,52 @@ class DeferredDataFrameOrSeries(frame_base.DeferredFrame):
         grouping_indexes=grouping_indexes)
 
   abs = frame_base._elementwise_method('abs', base=pd.core.generic.NDFrame)
-  astype = frame_base._elementwise_method(
-      'astype', base=pd.core.generic.NDFrame)
+
+  @frame_base.with_docs_from(pd.core.generic.NDFrame)
+  @frame_base.args_to_kwargs(pd.core.generic.NDFrame)
+  @frame_base.populate_defaults(pd.core.generic.NDFrame)
+  def astype(self, dtype, copy, errors):
+    """astype is not parallelizable when ``errors="ignore"`` is specified.
+
+    ``copy=False`` is not supported because it relies on memory-sharing
+    semantics.
+
+    ``dtype="category`` is not supported because the type of the output column
+    depends on the data. Please use ``pd.CategoricalDtype`` with explicit
+    categories instead.
+    """
+    requires = partitionings.Arbitrary()
+
+    if errors == "ignore":
+      # We need all data in order to ignore errors and propagate the original
+      # data.
+      requires = partitionings.Singleton(
+          reason=(
+              f"astype(errors={errors!r}) is currently not parallelizable, "
+              "because all data must be collected on one node to determine if "
+              "the original data should be propagated instead."))
+
+    if not copy:
+      raise frame_base.WontImplementError(
+          f"astype(copy={copy!r}) is not supported because it relies on "
+          "memory-sharing semantics that are not compatible with the Beam "
+          "model.")
+
+    if dtype == 'category':
+      raise frame_base.WontImplementError(
+          "astype(dtype='category') is not supported because the type of the "
+          "output column depends on the data. Please use pd.CategoricalDtype "
+          "with explicit categories instead.",
+          reason="non-deferred-columns")
+
+    return frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'astype',
+            lambda df: df.astype(dtype=dtype, copy=copy, errors=errors),
+            [self._expr],
+            requires_partition_by=requires,
+            preserves_partition_by=partitionings.Arbitrary()))
+
   copy = frame_base._elementwise_method('copy', base=pd.core.generic.NDFrame)
 
   @frame_base.with_docs_from(pd.DataFrame)
@@ -1557,6 +1601,11 @@ class DeferredSeries(DeferredDataFrameOrSeries):
   def str(self):
     return _DeferredStringMethods(self._expr)
 
+  @property  # type: ignore
+  @frame_base.with_docs_from(pd.Series)
+  def cat(self):
+    return _DeferredCategoricalMethods(self._expr)
+
   apply = frame_base._elementwise_method('apply', base=pd.Series)
   map = frame_base._elementwise_method('map', base=pd.Series)
   # TODO(BEAM-11636): Implement transform using type inference to determine the
@@ -3642,6 +3691,60 @@ for method in ELEMENTWISE_STRING_METHODS:
                                          name=method,
                                          base=pd.core.strings.StringMethods))
 
+
+def make_cat_func(method):
+  def func(df, *args, **kwargs):
+    return getattr(df.cat, method)(*args, **kwargs)
+
+  return func
+
+
+class _DeferredCategoricalMethods(frame_base.DeferredBase):
+  @property  # type: ignore
+  @frame_base.with_docs_from(pd.core.arrays.categorical.CategoricalAccessor)
+  def categories(self):
+    return self._expr.proxy().cat.categories
+
+  @property  # type: ignore
+  @frame_base.with_docs_from(pd.core.arrays.categorical.CategoricalAccessor)
+  def ordered(self):
+    return self._expr.proxy().cat.ordered
+
+  @property  # type: ignore
+  @frame_base.with_docs_from(pd.core.arrays.categorical.CategoricalAccessor)
+  def codes(self):
+    return frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'codes',
+            lambda s: s.cat.codes,
+            [self._expr],
+            requires_partition_by=partitionings.Arbitrary(),
+            preserves_partition_by=partitionings.Arbitrary(),
+        )
+    )
+
+  remove_unused_categories = frame_base.wont_implement_method(
+      pd.core.arrays.categorical.CategoricalAccessor,
+      'remove_unused_categories', reason="non-deferred-columns")
+
+ELEMENTWISE_CATEGORICAL_METHODS = [
+    'add_categories',
+    'as_ordered',
+    'as_unordered',
+    'remove_categories',
+    'rename_categories',
+    'reorder_categories',
+    'set_categories',
+]
+
+for method in ELEMENTWISE_CATEGORICAL_METHODS:
+  setattr(_DeferredCategoricalMethods,
+          method,
+          frame_base._elementwise_method(
+              make_cat_func(method), name=method,
+              base=pd.core.arrays.categorical.CategoricalAccessor))
+
+
 for base in ['add',
              'sub',
              'mul',
diff --git a/sdks/python/apache_beam/dataframe/frames_test.py 
b/sdks/python/apache_beam/dataframe/frames_test.py
index 8ddddfd..72a6fce 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -834,6 +834,22 @@ class DeferredFrameTest(_AbstractFrameTest):
     self._run_test(
         lambda df: df.sample(axis=1, n=10, random_state=3, replace=True), df)
 
+  def test_cat(self):
+    # Replicate the doctests from CategorigcalAccessor
+    # These tests don't translate into pandas_doctests_test.py because it
+    # tries to use astype("category") in Beam, which makes a non-deferred
+    # column type.
+    s = pd.Series(list("abbccc")).astype("category")
+
+    self._run_test(lambda s: s.cat.rename_categories(list("cba")), s)
+    self._run_test(lambda s: s.cat.reorder_categories(list("cba")), s)
+    self._run_test(lambda s: s.cat.add_categories(["d", "e"]), s)
+    self._run_test(lambda s: s.cat.remove_categories(["a", "c"]), s)
+    self._run_test(lambda s: s.cat.set_categories(list("abcde")), s)
+    self._run_test(lambda s: s.cat.as_ordered(), s)
+    self._run_test(lambda s: s.cat.as_unordered(), s)
+    self._run_test(lambda s: s.cat.codes, s)
+
 
 class GroupByTest(_AbstractFrameTest):
   """Tests for DataFrame/Series GroupBy operations."""
@@ -1641,15 +1657,26 @@ class AllowNonParallelTest(unittest.TestCase):
 class ConstructionTimeTest(unittest.TestCase):
   """Tests for operations that can be executed eagerly."""
   DF = pd.DataFrame({
-      'str_col': ['foo', 'bar'],
-      'int_col': [1, 2],
-      'flt_col': [1.1, 2.2],
+      'str_col': ['foo', 'bar'] * 3,
+      'int_col': [1, 2] * 3,
+      'flt_col': [1.1, 2.2] * 3,
+      'cat_col': pd.Series(list('aabbca'), dtype="category"),
   })
   DEFERRED_DF = frame_base.DeferredFrame.wrap(
-      expressions.PlaceholderExpression(DF))
+      expressions.PlaceholderExpression(DF.iloc[:0]))
 
   def _run_test(self, fn):
-    self.assertEqual(fn(self.DEFERRED_DF), fn(self.DF))
+    expected = fn(self.DF)
+    actual = fn(self.DEFERRED_DF)
+
+    if isinstance(expected, pd.Index):
+      pd.testing.assert_index_equal(expected, actual)
+    elif isinstance(expected, pd.Series):
+      pd.testing.assert_series_equal(expected, actual)
+    elif isinstance(expected, pd.DataFrame):
+      pd.testing.assert_frame_equal(expected, actual)
+    else:
+      self.assertEqual(expected, actual)
 
   @parameterized.expand(DF.columns)
   def test_series_name(self, col_name):
@@ -1666,6 +1693,12 @@ class ConstructionTimeTest(unittest.TestCase):
   def test_dataframe_dtypes(self):
     self._run_test(lambda df: list(df.dtypes))
 
+  def test_categories(self):
+    self._run_test(lambda df: df.cat_col.cat.categories)
+
+  def test_categorical_ordered(self):
+    self._run_test(lambda df: df.cat_col.cat.ordered)
+
 
 class DocstringTest(unittest.TestCase):
   @parameterized.expand([
@@ -1673,6 +1706,9 @@ class DocstringTest(unittest.TestCase):
       (frames.DeferredSeries, pd.Series),
       #(frames._DeferredIndex, pd.Index),
       (frames._DeferredStringMethods, pd.core.strings.StringMethods),
+      (
+          frames._DeferredCategoricalMethods,
+          pd.core.arrays.categorical.CategoricalAccessor),
       #(frames.DeferredGroupBy, pd.core.groupby.generic.DataFrameGroupBy),
       #(frames._DeferredGroupByCols, pd.core.groupby.generic.DataFrameGroupBy),
   ])

Reply via email to