[ 
https://issues.apache.org/jira/browse/BEAM-9547?focusedWorklogId=463787&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-463787
 ]

ASF GitHub Bot logged work on BEAM-9547:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/Jul/20 07:32
            Start Date: 29/Jul/20 07:32
    Worklog Time Spent: 10m 
      Work Description: TheNeuralBit commented on a change in pull request 
#11974:
URL: https://github.com/apache/beam/pull/11974#discussion_r461942645



##########
File path: sdks/python/apache_beam/dataframe/pandas_doctests_test.py
##########
@@ -34,77 +34,54 @@ def test_dataframe_tests(self):
             'pandas.core.frame.DataFrame.T': ['*'],
             'pandas.core.frame.DataFrame.agg': ['*'],
             'pandas.core.frame.DataFrame.aggregate': ['*'],
-            'pandas.core.frame.DataFrame.all': ['*'],
-            'pandas.core.frame.DataFrame.any': ['*'],
             'pandas.core.frame.DataFrame.append': ['*'],
             'pandas.core.frame.DataFrame.apply': ['*'],
-            'pandas.core.frame.DataFrame.applymap': ['*'],
+            'pandas.core.frame.DataFrame.applymap': ['df ** 2'],
             'pandas.core.frame.DataFrame.assign': ['*'],
             'pandas.core.frame.DataFrame.axes': ['*'],
             'pandas.core.frame.DataFrame.combine': ['*'],
             'pandas.core.frame.DataFrame.combine_first': ['*'],
             'pandas.core.frame.DataFrame.corr': ['*'],
             'pandas.core.frame.DataFrame.count': ['*'],
             'pandas.core.frame.DataFrame.cov': ['*'],
-            'pandas.core.frame.DataFrame.cummax': ['*'],
-            'pandas.core.frame.DataFrame.cummin': ['*'],
-            'pandas.core.frame.DataFrame.cumprod': ['*'],
-            'pandas.core.frame.DataFrame.cumsum': ['*'],
-            'pandas.core.frame.DataFrame.diff': ['*'],
             'pandas.core.frame.DataFrame.dot': ['*'],
             'pandas.core.frame.DataFrame.drop': ['*'],
-            'pandas.core.frame.DataFrame.dropna': ['*'],
             'pandas.core.frame.DataFrame.eval': ['*'],
             'pandas.core.frame.DataFrame.explode': ['*'],
             'pandas.core.frame.DataFrame.fillna': ['*'],
             'pandas.core.frame.DataFrame.info': ['*'],
             'pandas.core.frame.DataFrame.isin': ['*'],
-            'pandas.core.frame.DataFrame.isna': ['*'],
-            'pandas.core.frame.DataFrame.isnull': ['*'],
-            'pandas.core.frame.DataFrame.items': ['*'],
-            'pandas.core.frame.DataFrame.iteritems': ['*'],
-            'pandas.core.frame.DataFrame.iterrows': ['*'],
-            'pandas.core.frame.DataFrame.itertuples': ['*'],
+            'pandas.core.frame.DataFrame.iterrows': ["print(df['int'].dtype)"],
             'pandas.core.frame.DataFrame.join': ['*'],
-            'pandas.core.frame.DataFrame.max': ['*'],
             'pandas.core.frame.DataFrame.melt': ['*'],
             'pandas.core.frame.DataFrame.memory_usage': ['*'],
             'pandas.core.frame.DataFrame.merge': ['*'],
-            'pandas.core.frame.DataFrame.min': ['*'],
-            'pandas.core.frame.DataFrame.mode': ['*'],
+            # Not equal to df.agg('mode', axis='columns', numeric_only=True)
+            'pandas.core.frame.DataFrame.mode': [
+                "df.mode(axis='columns', numeric_only=True)"
+            ],
             'pandas.core.frame.DataFrame.nlargest': ['*'],
-            'pandas.core.frame.DataFrame.notna': ['*'],
-            'pandas.core.frame.DataFrame.notnull': ['*'],
             'pandas.core.frame.DataFrame.nsmallest': ['*'],
             'pandas.core.frame.DataFrame.nunique': ['*'],
             'pandas.core.frame.DataFrame.pivot': ['*'],
             'pandas.core.frame.DataFrame.pivot_table': ['*'],
-            'pandas.core.frame.DataFrame.prod': ['*'],
-            'pandas.core.frame.DataFrame.product': ['*'],
-            'pandas.core.frame.DataFrame.quantile': ['*'],
             'pandas.core.frame.DataFrame.query': ['*'],
             'pandas.core.frame.DataFrame.reindex': ['*'],
             'pandas.core.frame.DataFrame.reindex_axis': ['*'],
             'pandas.core.frame.DataFrame.rename': ['*'],
-            'pandas.core.frame.DataFrame.replace': ['*'],
-            'pandas.core.frame.DataFrame.reset_index': ['*'],
+            # Raises right exception, but testing framework has matching 
issues.
+            'pandas.core.frame.DataFrame.replace': [
+                "df.replace({'a string': 'new value', True: False})  # raises"
+            ],
+            # Uses unseeded np.random.
             'pandas.core.frame.DataFrame.round': ['*'],
-            'pandas.core.frame.DataFrame.select_dtypes': ['*'],
             'pandas.core.frame.DataFrame.set_index': ['*'],
-            'pandas.core.frame.DataFrame.shape': ['*'],
-            'pandas.core.frame.DataFrame.shift': ['*'],
-            'pandas.core.frame.DataFrame.sort_values': ['*'],
-            'pandas.core.frame.DataFrame.stack': ['*'],
-            'pandas.core.frame.DataFrame.sum': ['*'],
-            'pandas.core.frame.DataFrame.to_dict': ['*'],
-            'pandas.core.frame.DataFrame.to_numpy': ['*'],
+            'pandas.core.frame.DataFrame.transpose': [
+                'df1_transposed.dtypes', 'df2_transposed.dtypes'
+            ],
+            'pandas.core.frame.DataFrame.to_sparse': ['type(df)'],
+            # Uses df.index

Review comment:
       Doesn't need to happen in this PR, but it could be preferable to have 
different types/modes of skip. e.g.
   - allow_wont_implement: passes if WontImplementError is thrown.
   - allow_error: passes if any other exception is thrown
   - alllow_error_or_wrong_answer: passes if an exception is thrown or if Beam 
is producing the wrong answer.
   
   That way it'll be clearly documented which ops we don't intend to implement, 
and which ones can produce incorrect data and should be avoided.

##########
File path: sdks/python/apache_beam/dataframe/frame_base.py
##########
@@ -41,16 +42,34 @@ def wrapper(deferred_type):
 
   @classmethod
   def wrap(cls, expr):
-    return cls._pandas_type_map[type(expr.proxy())](expr)
+    proxy_type = type(expr.proxy())
+    if proxy_type in cls._pandas_type_map:
+      wrapper_type = cls._pandas_type_map[proxy_type]
+    else:
+      if expr.requires_partition_by() != partitionings.Singleton():
+        raise ValueError(
+            'Scalar expression %s partitoned by non-singleton %s' %
+            (expr, expr.requires_partition_by()))
+      wrapper_type = _DeferredScaler
+    return wrapper_type(expr)
 
   def _elementwise(self, func, name=None, other_args=(), inplace=False):
     return _elementwise_function(func, name, inplace=inplace)(self, 
*other_args)
 
+
+class DeferredFrame(DeferredBase):
   @property
   def dtypes(self):
     return self._expr.proxy().dtypes
 
 
+class _DeferredScaler(DeferredBase):

Review comment:
       nit: should this be scalar?

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -111,19 +164,181 @@ def at(self, *args, **kwargs):
   def loc(self):
     return _DeferredLoc(self)
 
-  def aggregate(self, func, axis=0, *args, **kwargs):
-    if axis != 0:
-      raise NotImplementedError()
+  def aggregate(self, *args, **kwargs):
+    if 'axis' in kwargs and kwargs['axis'] is None:
+      return self.agg(*args, **dict(kwargs, axis=1)).agg(
+          *args, **dict(kwargs, axis=0))
     return frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
             'aggregate',
-            lambda df: df.agg(func, axis, *args, **kwargs),
+            lambda df: df.agg(*args, **kwargs),
             [self._expr],
             # TODO(robertwb): Sub-aggregate when possible.
             requires_partition_by=partitionings.Singleton()))
 
   agg = aggregate
 
+  applymap = frame_base._elementwise_method('applymap')
+
+  def memory_usage(self):
+    raise frame_base.WontImplementError()
+
+  all = frame_base._associative_agg_method('all')
+  any = frame_base._associative_agg_method('any')
+
+  cummax = cummin = cumsum = cumprod = frame_base.wont_implement_method(
+      'order-sensitive')
+  diff = frame_base.wont_implement_method('order-sensitive')
+
+  max = frame_base._associative_agg_method('max')
+  min = frame_base._associative_agg_method('min')
+  mode = frame_base._agg_method('mode')
+
+  def dropna(self, axis=0, how='any', thresh=None, subset=None, inplace=False, 
*args, **kwargs):
+    # TODO(robertwb): This is a common pattern. Generalize?
+    if axis == 1:
+      requires_partition_by = partitionings.Singleton()
+    else:
+      requires_partition_by = partitionings.Nothing()
+    result = frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'dropna',
+            lambda df: df.dropna(axis, how, thresh, subset, False, *args, 
**kwargs),
+            [self._expr],
+            preserves_partition_by=partitionings.Singleton(),
+            requires_partition_by=requires_partition_by))
+    if inplace:
+      self._expr = result._expr
+    else:
+      return result
+
+  items = itertuples = iterrows = iteritems = frame_base.wont_implement_method(
+      'non-lazy')
+
+  isna = frame_base._elementwise_method('isna')
+  notnull = notna = frame_base._elementwise_method('notna')
+
+  prod = product = frame_base._associative_agg_method('prod')
+
+  def quantile(self, q=0.5, axis=0, *args, **kwargs):
+    if axis != 0:
+      raise frame_base.WontImplementError('non-deferred column values')
+    return frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'quantile',
+            lambda df: df.quantile(q, axis, *args, **kwargs),
+            [self._expr],
+            #TODO(robertwb): Approximate quantiles?
+            requires_partition_by=partitionings.Singleton(),
+            preserves_partition_by=partitionings.Singleton()))
+
+  query = frame_base._elementwise_method('query')
+
+  def replace(self, to_replace=None,
+      value=None,
+      inplace=False,
+      limit=None, *args, **kwargs):
+    if limit is None:
+      requires_partition_by = partitionings.Nothing()
+    else:
+      requires_partition_by = partitionings.Singleton()
+    result = frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'replace',
+            lambda df: df.replace(to_replace, value, False, limit, *args, 
**kwargs),
+            [self._expr],
+            preserves_partition_by=partitionings.Singleton(),
+            requires_partition_by=requires_partition_by))
+    if inplace:
+      self._expr = result._expr
+    else:
+      return result
+
+  def reset_index(self, level=None, drop=False, inplace=False, *args, 
**kwargs):
+    if level is not None and not isinstance(level, (tuple, list)):
+      level = [level]
+    if level is None or len(level) == len(self._expr.proxy().index.levels):
+      # TODO: Could do distributed re-index with offsets.
+      requires_partition_by = partitionings.Singleton()
+    else:
+      requires_partition_by = partitionings.Nothing()
+    result = frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'reset_index',
+            lambda df: df.reset_index(level, drop, False, *args, **kwargs),
+            [self._expr],
+            preserves_partition_by=partitionings.Singleton(),
+            requires_partition_by=requires_partition_by))
+    if inplace:
+      self._expr = result._expr
+    else:
+      return result
+
+  round = frame_base._elementwise_method('round')
+  select_dtypes = frame_base._elementwise_method('select_dtypes')
+
+  def shift(self, periods=1, freq=None, axis=0, *args, **kwargs):
+    if axis == 1:

Review comment:
       Looks like this can be 'columns' as well: 
https://pandas.pydata.org/pandas-docs/version/0.24.2/reference/api/pandas.DataFrame.shift.html?highlight=shift#pandas.DataFrame.shift
   We should check for that  throughout so we don't end up using Singleton 
partitioning unnecessarily.
   ```suggestion
       if axis == 1 or axis == 'columns':
   ```

##########
File path: sdks/python/apache_beam/dataframe/pandas_doctests_test.py
##########
@@ -34,77 +34,54 @@ def test_dataframe_tests(self):
             'pandas.core.frame.DataFrame.T': ['*'],
             'pandas.core.frame.DataFrame.agg': ['*'],
             'pandas.core.frame.DataFrame.aggregate': ['*'],

Review comment:
       ```suggestion
   ```
   These should be supported right (also for Series)? Looks like the tests pass 
for me locally with these unskipped.

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -111,19 +164,181 @@ def at(self, *args, **kwargs):
   def loc(self):
     return _DeferredLoc(self)
 
-  def aggregate(self, func, axis=0, *args, **kwargs):
-    if axis != 0:
-      raise NotImplementedError()
+  def aggregate(self, *args, **kwargs):
+    if 'axis' in kwargs and kwargs['axis'] is None:
+      return self.agg(*args, **dict(kwargs, axis=1)).agg(
+          *args, **dict(kwargs, axis=0))
     return frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
             'aggregate',
-            lambda df: df.agg(func, axis, *args, **kwargs),
+            lambda df: df.agg(*args, **kwargs),
             [self._expr],
             # TODO(robertwb): Sub-aggregate when possible.
             requires_partition_by=partitionings.Singleton()))
 
   agg = aggregate
 
+  applymap = frame_base._elementwise_method('applymap')
+
+  def memory_usage(self):
+    raise frame_base.WontImplementError()
+
+  all = frame_base._associative_agg_method('all')
+  any = frame_base._associative_agg_method('any')
+
+  cummax = cummin = cumsum = cumprod = frame_base.wont_implement_method(
+      'order-sensitive')
+  diff = frame_base.wont_implement_method('order-sensitive')
+
+  max = frame_base._associative_agg_method('max')
+  min = frame_base._associative_agg_method('min')
+  mode = frame_base._agg_method('mode')
+
+  def dropna(self, axis=0, how='any', thresh=None, subset=None, inplace=False, 
*args, **kwargs):
+    # TODO(robertwb): This is a common pattern. Generalize?
+    if axis == 1:
+      requires_partition_by = partitionings.Singleton()
+    else:
+      requires_partition_by = partitionings.Nothing()
+    result = frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'dropna',
+            lambda df: df.dropna(axis, how, thresh, subset, False, *args, 
**kwargs),
+            [self._expr],
+            preserves_partition_by=partitionings.Singleton(),
+            requires_partition_by=requires_partition_by))
+    if inplace:
+      self._expr = result._expr
+    else:
+      return result
+
+  items = itertuples = iterrows = iteritems = frame_base.wont_implement_method(
+      'non-lazy')
+
+  isna = frame_base._elementwise_method('isna')
+  notnull = notna = frame_base._elementwise_method('notna')
+
+  prod = product = frame_base._associative_agg_method('prod')
+
+  def quantile(self, q=0.5, axis=0, *args, **kwargs):
+    if axis != 0:
+      raise frame_base.WontImplementError('non-deferred column values')
+    return frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'quantile',
+            lambda df: df.quantile(q, axis, *args, **kwargs),
+            [self._expr],
+            #TODO(robertwb): Approximate quantiles?
+            requires_partition_by=partitionings.Singleton(),
+            preserves_partition_by=partitionings.Singleton()))
+
+  query = frame_base._elementwise_method('query')
+
+  def replace(self, to_replace=None,
+      value=None,
+      inplace=False,
+      limit=None, *args, **kwargs):
+    if limit is None:
+      requires_partition_by = partitionings.Nothing()
+    else:
+      requires_partition_by = partitionings.Singleton()
+    result = frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'replace',
+            lambda df: df.replace(to_replace, value, False, limit, *args, 
**kwargs),
+            [self._expr],
+            preserves_partition_by=partitionings.Singleton(),
+            requires_partition_by=requires_partition_by))
+    if inplace:
+      self._expr = result._expr
+    else:
+      return result
+
+  def reset_index(self, level=None, drop=False, inplace=False, *args, 
**kwargs):
+    if level is not None and not isinstance(level, (tuple, list)):
+      level = [level]
+    if level is None or len(level) == len(self._expr.proxy().index.levels):
+      # TODO: Could do distributed re-index with offsets.
+      requires_partition_by = partitionings.Singleton()
+    else:
+      requires_partition_by = partitionings.Nothing()
+    result = frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'reset_index',
+            lambda df: df.reset_index(level, drop, False, *args, **kwargs),
+            [self._expr],
+            preserves_partition_by=partitionings.Singleton(),
+            requires_partition_by=requires_partition_by))
+    if inplace:
+      self._expr = result._expr
+    else:
+      return result
+
+  round = frame_base._elementwise_method('round')
+  select_dtypes = frame_base._elementwise_method('select_dtypes')
+
+  def shift(self, periods=1, freq=None, axis=0, *args, **kwargs):
+    if axis == 1:
+      requires_partition_by = partitionings.Nothing()
+    else:
+      requires_partition_by = partitionings.Singleton()
+    return frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'shift',
+            lambda df: df.shift(periods, freq, axis, *args, **kwargs),
+            [self._expr],
+            preserves_partition_by=partitionings.Singleton(),
+            requires_partition_by=requires_partition_by))
+
+  @property
+  def shape(self):
+    raise frame_base.WontImplementError('scalar value')
+
+  def sort_values(self, by, axis=0, ascending=True, inplace=False, *args, 
**kwargs):
+    if axis == 1:
+      requires_partition_by = partitionings.Nothing()
+    else:
+      requires_partition_by = partitionings.Singleton()
+    result = frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'sort_values',
+            lambda df: df.sort_values(by, axis, ascending, False, *args, 
**kwargs),
+            [self._expr],
+            preserves_partition_by=partitionings.Singleton(),
+            requires_partition_by=requires_partition_by))
+    if inplace:
+      self._expr = result._expr
+    else:
+      return result
+
+  stack = frame_base._elementwise_method('stack')
+
+  sum = frame_base._associative_agg_method('sum')
+
+  def to_string(self, *args, **kwargs):
+    raise frame_base.WontImplementError('non-deferred value')
+
+  to_records = to_dict = to_numpy = to_string

Review comment:
       ```suggestion
     to_records = to_dict = to_numpy = to_string = 
frame_base.wont_implement_method('non-deferred value')
   ```
   
   nit: I think we should prefer `wont_implement_method` instead of `def` and 
`raise`

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -111,19 +164,181 @@ def at(self, *args, **kwargs):
   def loc(self):
     return _DeferredLoc(self)
 
-  def aggregate(self, func, axis=0, *args, **kwargs):
-    if axis != 0:
-      raise NotImplementedError()
+  def aggregate(self, *args, **kwargs):
+    if 'axis' in kwargs and kwargs['axis'] is None:
+      return self.agg(*args, **dict(kwargs, axis=1)).agg(
+          *args, **dict(kwargs, axis=0))
     return frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
             'aggregate',
-            lambda df: df.agg(func, axis, *args, **kwargs),
+            lambda df: df.agg(*args, **kwargs),
             [self._expr],
             # TODO(robertwb): Sub-aggregate when possible.
             requires_partition_by=partitionings.Singleton()))
 
   agg = aggregate
 
+  applymap = frame_base._elementwise_method('applymap')
+
+  def memory_usage(self):
+    raise frame_base.WontImplementError()
+
+  all = frame_base._associative_agg_method('all')
+  any = frame_base._associative_agg_method('any')
+
+  cummax = cummin = cumsum = cumprod = frame_base.wont_implement_method(
+      'order-sensitive')
+  diff = frame_base.wont_implement_method('order-sensitive')
+
+  max = frame_base._associative_agg_method('max')
+  min = frame_base._associative_agg_method('min')
+  mode = frame_base._agg_method('mode')
+
+  def dropna(self, axis=0, how='any', thresh=None, subset=None, inplace=False, 
*args, **kwargs):
+    # TODO(robertwb): This is a common pattern. Generalize?
+    if axis == 1:
+      requires_partition_by = partitionings.Singleton()
+    else:
+      requires_partition_by = partitionings.Nothing()
+    result = frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'dropna',
+            lambda df: df.dropna(axis, how, thresh, subset, False, *args, 
**kwargs),
+            [self._expr],
+            preserves_partition_by=partitionings.Singleton(),
+            requires_partition_by=requires_partition_by))
+    if inplace:
+      self._expr = result._expr
+    else:
+      return result
+
+  items = itertuples = iterrows = iteritems = frame_base.wont_implement_method(
+      'non-lazy')
+
+  isna = frame_base._elementwise_method('isna')
+  notnull = notna = frame_base._elementwise_method('notna')
+
+  prod = product = frame_base._associative_agg_method('prod')
+
+  def quantile(self, q=0.5, axis=0, *args, **kwargs):
+    if axis != 0:
+      raise frame_base.WontImplementError('non-deferred column values')
+    return frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'quantile',
+            lambda df: df.quantile(q, axis, *args, **kwargs),
+            [self._expr],
+            #TODO(robertwb): Approximate quantiles?
+            requires_partition_by=partitionings.Singleton(),
+            preserves_partition_by=partitionings.Singleton()))
+
+  query = frame_base._elementwise_method('query')
+
+  def replace(self, to_replace=None,
+      value=None,
+      inplace=False,
+      limit=None, *args, **kwargs):
+    if limit is None:
+      requires_partition_by = partitionings.Nothing()
+    else:
+      requires_partition_by = partitionings.Singleton()
+    result = frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'replace',
+            lambda df: df.replace(to_replace, value, False, limit, *args, 
**kwargs),
+            [self._expr],
+            preserves_partition_by=partitionings.Singleton(),
+            requires_partition_by=requires_partition_by))
+    if inplace:
+      self._expr = result._expr
+    else:
+      return result
+
+  def reset_index(self, level=None, drop=False, inplace=False, *args, 
**kwargs):
+    if level is not None and not isinstance(level, (tuple, list)):
+      level = [level]
+    if level is None or len(level) == len(self._expr.proxy().index.levels):
+      # TODO: Could do distributed re-index with offsets.
+      requires_partition_by = partitionings.Singleton()
+    else:
+      requires_partition_by = partitionings.Nothing()
+    result = frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'reset_index',
+            lambda df: df.reset_index(level, drop, False, *args, **kwargs),
+            [self._expr],
+            preserves_partition_by=partitionings.Singleton(),
+            requires_partition_by=requires_partition_by))
+    if inplace:
+      self._expr = result._expr
+    else:
+      return result
+
+  round = frame_base._elementwise_method('round')
+  select_dtypes = frame_base._elementwise_method('select_dtypes')
+
+  def shift(self, periods=1, freq=None, axis=0, *args, **kwargs):
+    if axis == 1:
+      requires_partition_by = partitionings.Nothing()
+    else:
+      requires_partition_by = partitionings.Singleton()
+    return frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'shift',
+            lambda df: df.shift(periods, freq, axis, *args, **kwargs),
+            [self._expr],
+            preserves_partition_by=partitionings.Singleton(),
+            requires_partition_by=requires_partition_by))
+
+  @property
+  def shape(self):
+    raise frame_base.WontImplementError('scalar value')
+
+  def sort_values(self, by, axis=0, ascending=True, inplace=False, *args, 
**kwargs):
+    if axis == 1:
+      requires_partition_by = partitionings.Nothing()
+    else:
+      requires_partition_by = partitionings.Singleton()

Review comment:
       Any thoughts on how we should communicate to the user that Singleton 
partitioning is happening and it's bad?
   
   Maybe we should refuse to apply operations that require singleton 
partitioning by default unless a user opts in?

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -111,19 +164,181 @@ def at(self, *args, **kwargs):
   def loc(self):
     return _DeferredLoc(self)
 
-  def aggregate(self, func, axis=0, *args, **kwargs):
-    if axis != 0:
-      raise NotImplementedError()
+  def aggregate(self, *args, **kwargs):
+    if 'axis' in kwargs and kwargs['axis'] is None:
+      return self.agg(*args, **dict(kwargs, axis=1)).agg(
+          *args, **dict(kwargs, axis=0))
     return frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
             'aggregate',
-            lambda df: df.agg(func, axis, *args, **kwargs),
+            lambda df: df.agg(*args, **kwargs),
             [self._expr],
             # TODO(robertwb): Sub-aggregate when possible.
             requires_partition_by=partitionings.Singleton()))
 
   agg = aggregate
 
+  applymap = frame_base._elementwise_method('applymap')
+
+  def memory_usage(self):
+    raise frame_base.WontImplementError()

Review comment:
       This should accept *args, **kwargs:
   ```suggestion
     memory_usage = frame_base.wont_implement_method('scalar value')
   ```
   
   It looks like this still can't be un-skipped after that change though 
because the test uses `df.head()`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 463787)
    Time Spent: 0.5h  (was: 20m)

> Implement all pandas operations (or raise WontImplementError)
> -------------------------------------------------------------
>
>                 Key: BEAM-9547
>                 URL: https://issues.apache.org/jira/browse/BEAM-9547
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>            Reporter: Brian Hulette
>            Assignee: Robert Bradshaw
>            Priority: P2
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> We should have an implementation for every DataFrame, Series, and GroupBy 
> method. Everything that's not actually implemented should get a default 
> implementation that raises WontImplementError
> SeeĀ https://github.com/apache/beam/pull/10757#discussion_r389132292



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to