TheNeuralBit commented on a change in pull request #17043:
URL: https://github.com/apache/beam/pull/17043#discussion_r835617914



##########
File path: sdks/python/apache_beam/dataframe/frames_test.py
##########
@@ -1295,6 +1295,114 @@ def s_times_shuffled(times, s):
     self._run_test(lambda s: s.pipe(s_times, 2), s)
     self._run_test(lambda s: s.pipe((s_times_shuffled, 's'), 2), s)
 
+  def test_pivot_non_categorical(self):
+    df = pd.DataFrame({
+        'foo': ['one', 'one', 'one', 'two', 'two', 'two'],
+        'bar': ['A', 'B', 'C', 'A', 'B', 'C'],
+        'baz': [1, 2, 3, 4, 5, 6],
+        'zoo': ['x', 'y', 'z', 'q', 'w', 't']
+    })
+    with self.assertRaisesRegex(
+        frame_base.WontImplementError,
+        r"pivot\(\) of non-categorical type is not supported"):
+      self._run_test(
+          lambda df: df.pivot(index='foo', columns='bar', values='baz'), df)
+
+  def test_pivot_pandas_example1(self):
+    # Simple test 1
+    df = pd.DataFrame({
+        'foo': ['one', 'one', 'one', 'two', 'two', 'two'],
+        'bar': ['A', 'B', 'C', 'A', 'B', 'C'],
+        'baz': [1, 2, 3, 4, 5, 6],
+        'zoo': ['x', 'y', 'z', 'q', 'w', 't']
+    })
+    df['bar'] = df['bar'].astype(
+        pd.CategoricalDtype(categories=['A', 'B', 'C']))
+    self._run_test(
+        lambda df: df.pivot(index='foo', columns='bar', values='baz'), df)
+
+  def test_pivot_pandas_example3(self):
+    # Multiple values
+    df = pd.DataFrame({
+        'foo': ['one', 'one', 'one', 'two', 'two', 'two'],
+        'bar': ['A', 'B', 'C', 'A', 'B', 'C'],
+        'baz': [1, 2, 3, 4, 5, 6],
+        'zoo': ['x', 'y', 'z', 'q', 'w', 't']
+    })
+    df['bar'] = df['bar'].astype(
+        pd.CategoricalDtype(categories=['A', 'B', 'C']))
+    self._run_test(
+        lambda df: df.pivot(index='foo', columns='bar', values=['baz', 'zoo']),
+        df)
+
+  def test_pivot_pandas_example4(self):
+    # Multiple columns
+    df = pd.DataFrame({
+        "lev1": [1, 1, 1, 2, 2, 2],
+        "lev2": [1, 1, 2, 1, 1, 2],
+        "lev3": [1, 2, 1, 2, 1, 2],
+        "lev4": [1, 2, 3, 4, 5, 6],
+        "values": [0, 1, 2, 3, 4, 5]
+    })
+    df['lev2'] = df['lev2'].astype(pd.CategoricalDtype(categories=[1, 2]))
+    df['lev3'] = df['lev3'].astype(pd.CategoricalDtype(categories=[1, 2]))
+    df['values'] = df['values'].astype('Int64')
+    self._run_test(
+        lambda df: df.pivot(
+            index="lev1", columns=["lev2", "lev3"], values="values"),
+        df)
+
+  @unittest.skipIf(
+      PD_VERSION < (1, 4), "Bug in DF.pivot with MultiIndex for pandas < 1.4")

Review comment:
       What is the bug?

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -3655,6 +3656,121 @@ def shift(self, axis, freq, **kwargs):
   describe = _agg_method(pd.DataFrame, 'describe')
   max = _agg_method(pd.DataFrame, 'max')
   min = _agg_method(pd.DataFrame, 'min')
+
+  @frame_base.with_docs_from(pd.DataFrame)
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  def pivot(self, index=None, columns=None, values=None, **kwargs):
+    def verify_all_categorical(all_cols_are_categorical):
+      if not all_cols_are_categorical:
+        raise frame_base.WontImplementError(
+            "pivot() of non-categorical type is not supported because "
+            "the type of the output column depends on the data. Please use "
+            "pd.CategoricalDtype with explicit categories.",
+            reason="non-deferred-columns")

Review comment:
       nit: you might inline this method and instead break out the message as a 
string constant

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -3655,6 +3656,121 @@ def shift(self, axis, freq, **kwargs):
   describe = _agg_method(pd.DataFrame, 'describe')
   max = _agg_method(pd.DataFrame, 'max')
   min = _agg_method(pd.DataFrame, 'min')
+
+  @frame_base.with_docs_from(pd.DataFrame)
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  def pivot(self, index=None, columns=None, values=None, **kwargs):
+    def verify_all_categorical(all_cols_are_categorical):
+      if not all_cols_are_categorical:
+        raise frame_base.WontImplementError(
+            "pivot() of non-categorical type is not supported because "
+            "the type of the output column depends on the data. Please use "
+            "pd.CategoricalDtype with explicit categories.",
+            reason="non-deferred-columns")
+
+    # Construct column index
+    if is_list_like(columns) and len(columns) <= 1:
+      columns = columns[0]
+    selected_cols = self._expr.proxy()[columns]
+    if isinstance(selected_cols, pd.Series):
+      all_cols_are_categorical = isinstance(
+        selected_cols.dtype, pd.CategoricalDtype
+      )
+      verify_all_categorical(all_cols_are_categorical)
+
+      # If values not provided, take all remaining columns of dataframe
+      if not values:
+        values = self._expr.proxy() \
+          .drop(index, axis=1).drop(columns, axis=1).columns.values
+
+      # Take the provided values
+      if is_list_like(values) and len(values) > 1:
+        values_in_col_index = values
+        names = [None, columns]
+        col_index = pd.MultiIndex.from_product(
+          [values_in_col_index,
+          selected_cols.dtypes.categories.astype('category')],
+          names=names
+        )
+      else:
+        col_index = pd.CategoricalIndex(
+          selected_cols.dtype.categories,
+          name=columns
+        )
+    else:
+      all_cols_are_categorical = all(
+        isinstance(c, pd.CategoricalDtype) for c in selected_cols.dtypes
+      )
+      verify_all_categorical(all_cols_are_categorical)
+
+      categories = [
+        c.categories.astype('category') for c in selected_cols.dtypes
+      ]
+      if is_list_like(columns) and len(columns) > 1:
+        col_index = pd.MultiIndex.from_product(categories, names=columns)
+      else:
+        col_index = pd.CategoricalIndex(
+            selected_cols.dtype.categories,
+            name=columns
+        )
+
+    # Construct row index
+    if index:
+      per_partition = expressions.ComputedExpression(
+          'pivot-per-partition',
+          lambda df: df.set_index(keys=index), [self._expr],
+          preserves_partition_by=partitionings.Singleton(),
+          requires_partition_by=partitionings.Arbitrary()
+      )
+      if is_list_like(index):

Review comment:
       What if index is a single-element list?

##########
File path: sdks/python/apache_beam/dataframe/frames_test.py
##########
@@ -1295,6 +1295,114 @@ def s_times_shuffled(times, s):
     self._run_test(lambda s: s.pipe(s_times, 2), s)
     self._run_test(lambda s: s.pipe((s_times_shuffled, 's'), 2), s)
 
+  def test_pivot_non_categorical(self):
+    df = pd.DataFrame({
+        'foo': ['one', 'one', 'one', 'two', 'two', 'two'],
+        'bar': ['A', 'B', 'C', 'A', 'B', 'C'],
+        'baz': [1, 2, 3, 4, 5, 6],
+        'zoo': ['x', 'y', 'z', 'q', 'w', 't']
+    })
+    with self.assertRaisesRegex(
+        frame_base.WontImplementError,
+        r"pivot\(\) of non-categorical type is not supported"):
+      self._run_test(
+          lambda df: df.pivot(index='foo', columns='bar', values='baz'), df)
+
+  def test_pivot_pandas_example1(self):
+    # Simple test 1
+    df = pd.DataFrame({
+        'foo': ['one', 'one', 'one', 'two', 'two', 'two'],
+        'bar': ['A', 'B', 'C', 'A', 'B', 'C'],
+        'baz': [1, 2, 3, 4, 5, 6],
+        'zoo': ['x', 'y', 'z', 'q', 'w', 't']
+    })
+    df['bar'] = df['bar'].astype(
+        pd.CategoricalDtype(categories=['A', 'B', 'C']))
+    self._run_test(
+        lambda df: df.pivot(index='foo', columns='bar', values='baz'), df)
+
+  def test_pivot_pandas_example3(self):
+    # Multiple values
+    df = pd.DataFrame({
+        'foo': ['one', 'one', 'one', 'two', 'two', 'two'],
+        'bar': ['A', 'B', 'C', 'A', 'B', 'C'],
+        'baz': [1, 2, 3, 4, 5, 6],
+        'zoo': ['x', 'y', 'z', 'q', 'w', 't']
+    })
+    df['bar'] = df['bar'].astype(
+        pd.CategoricalDtype(categories=['A', 'B', 'C']))
+    self._run_test(
+        lambda df: df.pivot(index='foo', columns='bar', values=['baz', 'zoo']),

Review comment:
       You might also test this with `columns=["bar"]`, sometimes pandas does 
different things in this case (e.g. df['bar'] yields a Series and `df[['bar']]` 
yields a DataFrame).

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -3655,6 +3656,121 @@ def shift(self, axis, freq, **kwargs):
   describe = _agg_method(pd.DataFrame, 'describe')
   max = _agg_method(pd.DataFrame, 'max')
   min = _agg_method(pd.DataFrame, 'min')
+
+  @frame_base.with_docs_from(pd.DataFrame)
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  def pivot(self, index=None, columns=None, values=None, **kwargs):
+    def verify_all_categorical(all_cols_are_categorical):
+      if not all_cols_are_categorical:
+        raise frame_base.WontImplementError(
+            "pivot() of non-categorical type is not supported because "
+            "the type of the output column depends on the data. Please use "
+            "pd.CategoricalDtype with explicit categories.",
+            reason="non-deferred-columns")
+
+    # Construct column index
+    if is_list_like(columns) and len(columns) <= 1:
+      columns = columns[0]
+    selected_cols = self._expr.proxy()[columns]
+    if isinstance(selected_cols, pd.Series):
+      all_cols_are_categorical = isinstance(
+        selected_cols.dtype, pd.CategoricalDtype
+      )
+      verify_all_categorical(all_cols_are_categorical)
+
+      # If values not provided, take all remaining columns of dataframe
+      if not values:
+        values = self._expr.proxy() \
+          .drop(index, axis=1).drop(columns, axis=1).columns.values
+
+      # Take the provided values
+      if is_list_like(values) and len(values) > 1:
+        values_in_col_index = values
+        names = [None, columns]
+        col_index = pd.MultiIndex.from_product(
+          [values_in_col_index,
+          selected_cols.dtypes.categories.astype('category')],
+          names=names
+        )
+      else:
+        col_index = pd.CategoricalIndex(
+          selected_cols.dtype.categories,
+          name=columns
+        )
+    else:
+      all_cols_are_categorical = all(
+        isinstance(c, pd.CategoricalDtype) for c in selected_cols.dtypes
+      )
+      verify_all_categorical(all_cols_are_categorical)
+
+      categories = [
+        c.categories.astype('category') for c in selected_cols.dtypes
+      ]
+      if is_list_like(columns) and len(columns) > 1:

Review comment:
       I think this check is a no-op, don't we know it will be true because 
we're in the else? (i.e. `isinstance(selected_cols, pd.Series)` is false).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to