[ 
https://issues.apache.org/jira/browse/BEAM-9547?focusedWorklogId=500877&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-500877
 ]

ASF GitHub Bot logged work on BEAM-9547:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Oct/20 22:42
            Start Date: 14/Oct/20 22:42
    Worklog Time Spent: 10m 
      Work Description: TheNeuralBit commented on a change in pull request 
#13082:
URL: https://github.com/apache/beam/pull/13082#discussion_r504989018



##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -27,12 +28,118 @@
 from apache_beam.dataframe import partitionings
 
 
-@frame_base.DeferredFrame._register_for(pd.Series)
-class DeferredSeries(frame_base.DeferredFrame):
+class DeferredDataFrameOrSeries(frame_base.DeferredFrame):
   def __array__(self, dtype=None):
     raise frame_base.WontImplementError(
         'Conversion to a non-deferred a numpy array.')
 
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  def droplevel(self, level, axis):
+    return frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'droplevel',
+            lambda df: df.droplevel(level, axis=axis), [self._expr],
+            requires_partition_by=partitionings.Nothing(),
+            preserves_partition_by=partitionings.Index()
+            if axis in (1, 'column') else partitionings.Nothing()))
+
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  def groupby(self, by, level, axis, as_index, group_keys, **kwargs):

Review comment:
       Could you update pandas_doctests_test?  It looks like the skipped 
DataFrame.groupby tests should pass now, and hopefully the Series.groupby ones 
as well.

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1120,6 +1222,15 @@ def agg(self, fn):
             requires_partition_by=partitionings.Index(),
             preserves_partition_by=partitionings.Singleton()))
 
+  aggregate = agg
+
+  first = last = head = tail = frame_base.not_implemented_method(
+      'order sensitive')
+
+  __len__ = frame_base.wont_implement_method('non-deferred')

Review comment:
       Should we consider implementing this and `groups` for categorical 
grouping keys?

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -27,12 +28,118 @@
 from apache_beam.dataframe import partitionings
 
 
-@frame_base.DeferredFrame._register_for(pd.Series)
-class DeferredSeries(frame_base.DeferredFrame):
+class DeferredDataFrameOrSeries(frame_base.DeferredFrame):
   def __array__(self, dtype=None):
     raise frame_base.WontImplementError(
         'Conversion to a non-deferred a numpy array.')
 
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  def droplevel(self, level, axis):
+    return frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'droplevel',
+            lambda df: df.droplevel(level, axis=axis), [self._expr],
+            requires_partition_by=partitionings.Nothing(),
+            preserves_partition_by=partitionings.Index()
+            if axis in (1, 'column') else partitionings.Nothing()))
+
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  def groupby(self, by, level, axis, as_index, group_keys, **kwargs):
+    if not as_index:
+      raise NotImplementedError('groupby(as_index=False)')
+    if not group_keys:
+      raise NotImplementedError('groupby(group_keys=False)')
+
+    if axis in (1, 'columns'):
+      return _DeferredGroupByCols(
+          expressions.ComputedExpression(
+              'groupbycols',
+              lambda df: df.groupby(by, axis=axis, **kwargs), [self._expr],
+              requires_partition_by=partitionings.Nothing(),
+              preserves_partition_by=partitionings.Index()))
+
+    if level is None and by is None:
+      raise TypeError("You have to supply one of 'by' and 'level'")
+
+    elif level is not None:
+      if isinstance(level, (list, tuple)):
+        levels = level
+      else:
+        levels = [level]
+      all_levels = self._expr.proxy().index.names
+      levels = [all_levels[i] if isinstance(i, int) else i for i in levels]
+      levels_to_drop = self._expr.proxy().index.names.difference(levels)
+      if levels_to_drop:
+        to_group = self.droplevel(levels_to_drop)._expr
+      else:
+        to_group = self._expr
+
+    elif callable(by):
+
+      def map_index(df):
+        df = df.copy()
+        df.index = df.index.map(by)
+        return df
+
+      to_group = expressions.ComputedExpression(
+          'map_index',
+          map_index, [self._expr],
+          requires_partition_by=partitionings.Nothing(),
+          preserves_partition_by=partitionings.Singleton())
+
+    elif isinstance(by, DeferredSeries):
+
+      if isinstance(self, DeferredSeries):
+
+        def set_index(s, by):
+          df = pd.DataFrame(s)
+          df, by = df.align(by, axis=0)
+          return df.set_index(by).iloc[:, 0]
+      else:
+
+        def set_index(df, by):
+          df, by = df.align(by, axis=0)
+          return df.set_index(by)
+
+      to_group = expressions.ComputedExpression(
+          'set_index',
+          set_index,  #
+          [self._expr, by._expr],
+          requires_partition_by=partitionings.Index(),
+          preserves_partition_by=partitionings.Singleton())
+
+    elif isinstance(by, np.ndarray):
+      raise frame_base.WontImplementError('order sensitive')
+
+    else:
+      if not isinstance(by, list):

Review comment:
       I was going to suggest you allow tuple here, but after looking into it I 
realized the pandas groupby intentionally does the same thing, since a tuple 
could be a field name. TIL
   
   
https://github.com/pandas-dev/pandas/blob/d7a5b838d8d6234f6bec5a30bfa33b24bd4afbd9/pandas/core/groupby/grouper.py#L713-L718




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 500877)
    Time Spent: 23h 40m  (was: 23.5h)

> Implement all pandas operations (or raise WontImplementError)
> -------------------------------------------------------------
>
>                 Key: BEAM-9547
>                 URL: https://issues.apache.org/jira/browse/BEAM-9547
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>            Reporter: Brian Hulette
>            Assignee: Robert Bradshaw
>            Priority: P2
>          Time Spent: 23h 40m
>  Remaining Estimate: 0h
>
> We should have an implementation for every DataFrame, Series, and GroupBy 
> method. Everything that's not actually implemented should get a default 
> implementation that raises WontImplementError
> SeeĀ https://github.com/apache/beam/pull/10757#discussion_r389132292
> Progress at the individual operation level is tracked in a 
> [spreadsheet|https://docs.google.com/spreadsheets/d/1hHAaJ0n0k2tw465ORs5tfdy4Lg0DnGWIQ53cLjAhel0/edit],
>  consider requesting edit access if you'd like to help out.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to