[
https://issues.apache.org/jira/browse/BEAM-9547?focusedWorklogId=463787&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-463787
]
ASF GitHub Bot logged work on BEAM-9547:
----------------------------------------
Author: ASF GitHub Bot
Created on: 29/Jul/20 07:32
Start Date: 29/Jul/20 07:32
Worklog Time Spent: 10m
Work Description: TheNeuralBit commented on a change in pull request
#11974:
URL: https://github.com/apache/beam/pull/11974#discussion_r461942645
##########
File path: sdks/python/apache_beam/dataframe/pandas_doctests_test.py
##########
@@ -34,77 +34,54 @@ def test_dataframe_tests(self):
'pandas.core.frame.DataFrame.T': ['*'],
'pandas.core.frame.DataFrame.agg': ['*'],
'pandas.core.frame.DataFrame.aggregate': ['*'],
- 'pandas.core.frame.DataFrame.all': ['*'],
- 'pandas.core.frame.DataFrame.any': ['*'],
'pandas.core.frame.DataFrame.append': ['*'],
'pandas.core.frame.DataFrame.apply': ['*'],
- 'pandas.core.frame.DataFrame.applymap': ['*'],
+ 'pandas.core.frame.DataFrame.applymap': ['df ** 2'],
'pandas.core.frame.DataFrame.assign': ['*'],
'pandas.core.frame.DataFrame.axes': ['*'],
'pandas.core.frame.DataFrame.combine': ['*'],
'pandas.core.frame.DataFrame.combine_first': ['*'],
'pandas.core.frame.DataFrame.corr': ['*'],
'pandas.core.frame.DataFrame.count': ['*'],
'pandas.core.frame.DataFrame.cov': ['*'],
- 'pandas.core.frame.DataFrame.cummax': ['*'],
- 'pandas.core.frame.DataFrame.cummin': ['*'],
- 'pandas.core.frame.DataFrame.cumprod': ['*'],
- 'pandas.core.frame.DataFrame.cumsum': ['*'],
- 'pandas.core.frame.DataFrame.diff': ['*'],
'pandas.core.frame.DataFrame.dot': ['*'],
'pandas.core.frame.DataFrame.drop': ['*'],
- 'pandas.core.frame.DataFrame.dropna': ['*'],
'pandas.core.frame.DataFrame.eval': ['*'],
'pandas.core.frame.DataFrame.explode': ['*'],
'pandas.core.frame.DataFrame.fillna': ['*'],
'pandas.core.frame.DataFrame.info': ['*'],
'pandas.core.frame.DataFrame.isin': ['*'],
- 'pandas.core.frame.DataFrame.isna': ['*'],
- 'pandas.core.frame.DataFrame.isnull': ['*'],
- 'pandas.core.frame.DataFrame.items': ['*'],
- 'pandas.core.frame.DataFrame.iteritems': ['*'],
- 'pandas.core.frame.DataFrame.iterrows': ['*'],
- 'pandas.core.frame.DataFrame.itertuples': ['*'],
+ 'pandas.core.frame.DataFrame.iterrows': ["print(df['int'].dtype)"],
'pandas.core.frame.DataFrame.join': ['*'],
- 'pandas.core.frame.DataFrame.max': ['*'],
'pandas.core.frame.DataFrame.melt': ['*'],
'pandas.core.frame.DataFrame.memory_usage': ['*'],
'pandas.core.frame.DataFrame.merge': ['*'],
- 'pandas.core.frame.DataFrame.min': ['*'],
- 'pandas.core.frame.DataFrame.mode': ['*'],
+ # Not equal to df.agg('mode', axis='columns', numeric_only=True)
+ 'pandas.core.frame.DataFrame.mode': [
+ "df.mode(axis='columns', numeric_only=True)"
+ ],
'pandas.core.frame.DataFrame.nlargest': ['*'],
- 'pandas.core.frame.DataFrame.notna': ['*'],
- 'pandas.core.frame.DataFrame.notnull': ['*'],
'pandas.core.frame.DataFrame.nsmallest': ['*'],
'pandas.core.frame.DataFrame.nunique': ['*'],
'pandas.core.frame.DataFrame.pivot': ['*'],
'pandas.core.frame.DataFrame.pivot_table': ['*'],
- 'pandas.core.frame.DataFrame.prod': ['*'],
- 'pandas.core.frame.DataFrame.product': ['*'],
- 'pandas.core.frame.DataFrame.quantile': ['*'],
'pandas.core.frame.DataFrame.query': ['*'],
'pandas.core.frame.DataFrame.reindex': ['*'],
'pandas.core.frame.DataFrame.reindex_axis': ['*'],
'pandas.core.frame.DataFrame.rename': ['*'],
- 'pandas.core.frame.DataFrame.replace': ['*'],
- 'pandas.core.frame.DataFrame.reset_index': ['*'],
+ # Raises right exception, but testing framework has matching
issues.
+ 'pandas.core.frame.DataFrame.replace': [
+ "df.replace({'a string': 'new value', True: False}) # raises"
+ ],
+ # Uses unseeded np.random.
'pandas.core.frame.DataFrame.round': ['*'],
- 'pandas.core.frame.DataFrame.select_dtypes': ['*'],
'pandas.core.frame.DataFrame.set_index': ['*'],
- 'pandas.core.frame.DataFrame.shape': ['*'],
- 'pandas.core.frame.DataFrame.shift': ['*'],
- 'pandas.core.frame.DataFrame.sort_values': ['*'],
- 'pandas.core.frame.DataFrame.stack': ['*'],
- 'pandas.core.frame.DataFrame.sum': ['*'],
- 'pandas.core.frame.DataFrame.to_dict': ['*'],
- 'pandas.core.frame.DataFrame.to_numpy': ['*'],
+ 'pandas.core.frame.DataFrame.transpose': [
+ 'df1_transposed.dtypes', 'df2_transposed.dtypes'
+ ],
+ 'pandas.core.frame.DataFrame.to_sparse': ['type(df)'],
+ # Uses df.index
Review comment:
Doesn't need to happen in this PR, but it could be preferable to have
different types/modes of skip. e.g.
- allow_wont_implement: passes if WontImplementError is thrown.
- allow_error: passes if any other exception is thrown
- alllow_error_or_wrong_answer: passes if an exception is thrown or if Beam
is producing the wrong answer.
That way it'll be clearly documented which ops we don't intend to implement,
and which ones can produce incorrect data and should be avoided.
##########
File path: sdks/python/apache_beam/dataframe/frame_base.py
##########
@@ -41,16 +42,34 @@ def wrapper(deferred_type):
@classmethod
def wrap(cls, expr):
- return cls._pandas_type_map[type(expr.proxy())](expr)
+ proxy_type = type(expr.proxy())
+ if proxy_type in cls._pandas_type_map:
+ wrapper_type = cls._pandas_type_map[proxy_type]
+ else:
+ if expr.requires_partition_by() != partitionings.Singleton():
+ raise ValueError(
+ 'Scalar expression %s partitoned by non-singleton %s' %
+ (expr, expr.requires_partition_by()))
+ wrapper_type = _DeferredScaler
+ return wrapper_type(expr)
def _elementwise(self, func, name=None, other_args=(), inplace=False):
return _elementwise_function(func, name, inplace=inplace)(self,
*other_args)
+
+class DeferredFrame(DeferredBase):
@property
def dtypes(self):
return self._expr.proxy().dtypes
+class _DeferredScaler(DeferredBase):
Review comment:
nit: should this be scalar?
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -111,19 +164,181 @@ def at(self, *args, **kwargs):
def loc(self):
return _DeferredLoc(self)
- def aggregate(self, func, axis=0, *args, **kwargs):
- if axis != 0:
- raise NotImplementedError()
+ def aggregate(self, *args, **kwargs):
+ if 'axis' in kwargs and kwargs['axis'] is None:
+ return self.agg(*args, **dict(kwargs, axis=1)).agg(
+ *args, **dict(kwargs, axis=0))
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'aggregate',
- lambda df: df.agg(func, axis, *args, **kwargs),
+ lambda df: df.agg(*args, **kwargs),
[self._expr],
# TODO(robertwb): Sub-aggregate when possible.
requires_partition_by=partitionings.Singleton()))
agg = aggregate
+ applymap = frame_base._elementwise_method('applymap')
+
+ def memory_usage(self):
+ raise frame_base.WontImplementError()
+
+ all = frame_base._associative_agg_method('all')
+ any = frame_base._associative_agg_method('any')
+
+ cummax = cummin = cumsum = cumprod = frame_base.wont_implement_method(
+ 'order-sensitive')
+ diff = frame_base.wont_implement_method('order-sensitive')
+
+ max = frame_base._associative_agg_method('max')
+ min = frame_base._associative_agg_method('min')
+ mode = frame_base._agg_method('mode')
+
+ def dropna(self, axis=0, how='any', thresh=None, subset=None, inplace=False,
*args, **kwargs):
+ # TODO(robertwb): This is a common pattern. Generalize?
+ if axis == 1:
+ requires_partition_by = partitionings.Singleton()
+ else:
+ requires_partition_by = partitionings.Nothing()
+ result = frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'dropna',
+ lambda df: df.dropna(axis, how, thresh, subset, False, *args,
**kwargs),
+ [self._expr],
+ preserves_partition_by=partitionings.Singleton(),
+ requires_partition_by=requires_partition_by))
+ if inplace:
+ self._expr = result._expr
+ else:
+ return result
+
+ items = itertuples = iterrows = iteritems = frame_base.wont_implement_method(
+ 'non-lazy')
+
+ isna = frame_base._elementwise_method('isna')
+ notnull = notna = frame_base._elementwise_method('notna')
+
+ prod = product = frame_base._associative_agg_method('prod')
+
+ def quantile(self, q=0.5, axis=0, *args, **kwargs):
+ if axis != 0:
+ raise frame_base.WontImplementError('non-deferred column values')
+ return frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'quantile',
+ lambda df: df.quantile(q, axis, *args, **kwargs),
+ [self._expr],
+ #TODO(robertwb): Approximate quantiles?
+ requires_partition_by=partitionings.Singleton(),
+ preserves_partition_by=partitionings.Singleton()))
+
+ query = frame_base._elementwise_method('query')
+
+ def replace(self, to_replace=None,
+ value=None,
+ inplace=False,
+ limit=None, *args, **kwargs):
+ if limit is None:
+ requires_partition_by = partitionings.Nothing()
+ else:
+ requires_partition_by = partitionings.Singleton()
+ result = frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'replace',
+ lambda df: df.replace(to_replace, value, False, limit, *args,
**kwargs),
+ [self._expr],
+ preserves_partition_by=partitionings.Singleton(),
+ requires_partition_by=requires_partition_by))
+ if inplace:
+ self._expr = result._expr
+ else:
+ return result
+
+ def reset_index(self, level=None, drop=False, inplace=False, *args,
**kwargs):
+ if level is not None and not isinstance(level, (tuple, list)):
+ level = [level]
+ if level is None or len(level) == len(self._expr.proxy().index.levels):
+ # TODO: Could do distributed re-index with offsets.
+ requires_partition_by = partitionings.Singleton()
+ else:
+ requires_partition_by = partitionings.Nothing()
+ result = frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'reset_index',
+ lambda df: df.reset_index(level, drop, False, *args, **kwargs),
+ [self._expr],
+ preserves_partition_by=partitionings.Singleton(),
+ requires_partition_by=requires_partition_by))
+ if inplace:
+ self._expr = result._expr
+ else:
+ return result
+
+ round = frame_base._elementwise_method('round')
+ select_dtypes = frame_base._elementwise_method('select_dtypes')
+
+ def shift(self, periods=1, freq=None, axis=0, *args, **kwargs):
+ if axis == 1:
Review comment:
Looks like this can be 'columns' as well:
https://pandas.pydata.org/pandas-docs/version/0.24.2/reference/api/pandas.DataFrame.shift.html?highlight=shift#pandas.DataFrame.shift
We should check for that throughout so we don't end up using Singleton
partitioning unnecessarily.
```suggestion
if axis == 1 or axis == 'columns':
```
##########
File path: sdks/python/apache_beam/dataframe/pandas_doctests_test.py
##########
@@ -34,77 +34,54 @@ def test_dataframe_tests(self):
'pandas.core.frame.DataFrame.T': ['*'],
'pandas.core.frame.DataFrame.agg': ['*'],
'pandas.core.frame.DataFrame.aggregate': ['*'],
Review comment:
```suggestion
```
These should be supported right (also for Series)? Looks like the tests pass
for me locally with these unskipped.
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -111,19 +164,181 @@ def at(self, *args, **kwargs):
def loc(self):
return _DeferredLoc(self)
- def aggregate(self, func, axis=0, *args, **kwargs):
- if axis != 0:
- raise NotImplementedError()
+ def aggregate(self, *args, **kwargs):
+ if 'axis' in kwargs and kwargs['axis'] is None:
+ return self.agg(*args, **dict(kwargs, axis=1)).agg(
+ *args, **dict(kwargs, axis=0))
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'aggregate',
- lambda df: df.agg(func, axis, *args, **kwargs),
+ lambda df: df.agg(*args, **kwargs),
[self._expr],
# TODO(robertwb): Sub-aggregate when possible.
requires_partition_by=partitionings.Singleton()))
agg = aggregate
+ applymap = frame_base._elementwise_method('applymap')
+
+ def memory_usage(self):
+ raise frame_base.WontImplementError()
+
+ all = frame_base._associative_agg_method('all')
+ any = frame_base._associative_agg_method('any')
+
+ cummax = cummin = cumsum = cumprod = frame_base.wont_implement_method(
+ 'order-sensitive')
+ diff = frame_base.wont_implement_method('order-sensitive')
+
+ max = frame_base._associative_agg_method('max')
+ min = frame_base._associative_agg_method('min')
+ mode = frame_base._agg_method('mode')
+
+ def dropna(self, axis=0, how='any', thresh=None, subset=None, inplace=False,
*args, **kwargs):
+ # TODO(robertwb): This is a common pattern. Generalize?
+ if axis == 1:
+ requires_partition_by = partitionings.Singleton()
+ else:
+ requires_partition_by = partitionings.Nothing()
+ result = frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'dropna',
+ lambda df: df.dropna(axis, how, thresh, subset, False, *args,
**kwargs),
+ [self._expr],
+ preserves_partition_by=partitionings.Singleton(),
+ requires_partition_by=requires_partition_by))
+ if inplace:
+ self._expr = result._expr
+ else:
+ return result
+
+ items = itertuples = iterrows = iteritems = frame_base.wont_implement_method(
+ 'non-lazy')
+
+ isna = frame_base._elementwise_method('isna')
+ notnull = notna = frame_base._elementwise_method('notna')
+
+ prod = product = frame_base._associative_agg_method('prod')
+
+ def quantile(self, q=0.5, axis=0, *args, **kwargs):
+ if axis != 0:
+ raise frame_base.WontImplementError('non-deferred column values')
+ return frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'quantile',
+ lambda df: df.quantile(q, axis, *args, **kwargs),
+ [self._expr],
+ #TODO(robertwb): Approximate quantiles?
+ requires_partition_by=partitionings.Singleton(),
+ preserves_partition_by=partitionings.Singleton()))
+
+ query = frame_base._elementwise_method('query')
+
+ def replace(self, to_replace=None,
+ value=None,
+ inplace=False,
+ limit=None, *args, **kwargs):
+ if limit is None:
+ requires_partition_by = partitionings.Nothing()
+ else:
+ requires_partition_by = partitionings.Singleton()
+ result = frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'replace',
+ lambda df: df.replace(to_replace, value, False, limit, *args,
**kwargs),
+ [self._expr],
+ preserves_partition_by=partitionings.Singleton(),
+ requires_partition_by=requires_partition_by))
+ if inplace:
+ self._expr = result._expr
+ else:
+ return result
+
+ def reset_index(self, level=None, drop=False, inplace=False, *args,
**kwargs):
+ if level is not None and not isinstance(level, (tuple, list)):
+ level = [level]
+ if level is None or len(level) == len(self._expr.proxy().index.levels):
+ # TODO: Could do distributed re-index with offsets.
+ requires_partition_by = partitionings.Singleton()
+ else:
+ requires_partition_by = partitionings.Nothing()
+ result = frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'reset_index',
+ lambda df: df.reset_index(level, drop, False, *args, **kwargs),
+ [self._expr],
+ preserves_partition_by=partitionings.Singleton(),
+ requires_partition_by=requires_partition_by))
+ if inplace:
+ self._expr = result._expr
+ else:
+ return result
+
+ round = frame_base._elementwise_method('round')
+ select_dtypes = frame_base._elementwise_method('select_dtypes')
+
+ def shift(self, periods=1, freq=None, axis=0, *args, **kwargs):
+ if axis == 1:
+ requires_partition_by = partitionings.Nothing()
+ else:
+ requires_partition_by = partitionings.Singleton()
+ return frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'shift',
+ lambda df: df.shift(periods, freq, axis, *args, **kwargs),
+ [self._expr],
+ preserves_partition_by=partitionings.Singleton(),
+ requires_partition_by=requires_partition_by))
+
+ @property
+ def shape(self):
+ raise frame_base.WontImplementError('scalar value')
+
+ def sort_values(self, by, axis=0, ascending=True, inplace=False, *args,
**kwargs):
+ if axis == 1:
+ requires_partition_by = partitionings.Nothing()
+ else:
+ requires_partition_by = partitionings.Singleton()
+ result = frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'sort_values',
+ lambda df: df.sort_values(by, axis, ascending, False, *args,
**kwargs),
+ [self._expr],
+ preserves_partition_by=partitionings.Singleton(),
+ requires_partition_by=requires_partition_by))
+ if inplace:
+ self._expr = result._expr
+ else:
+ return result
+
+ stack = frame_base._elementwise_method('stack')
+
+ sum = frame_base._associative_agg_method('sum')
+
+ def to_string(self, *args, **kwargs):
+ raise frame_base.WontImplementError('non-deferred value')
+
+ to_records = to_dict = to_numpy = to_string
Review comment:
```suggestion
to_records = to_dict = to_numpy = to_string =
frame_base.wont_implement_method('non-deferred value')
```
nit: I think we should prefer `wont_implement_method` instead of `def` and
`raise`
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -111,19 +164,181 @@ def at(self, *args, **kwargs):
def loc(self):
return _DeferredLoc(self)
- def aggregate(self, func, axis=0, *args, **kwargs):
- if axis != 0:
- raise NotImplementedError()
+ def aggregate(self, *args, **kwargs):
+ if 'axis' in kwargs and kwargs['axis'] is None:
+ return self.agg(*args, **dict(kwargs, axis=1)).agg(
+ *args, **dict(kwargs, axis=0))
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'aggregate',
- lambda df: df.agg(func, axis, *args, **kwargs),
+ lambda df: df.agg(*args, **kwargs),
[self._expr],
# TODO(robertwb): Sub-aggregate when possible.
requires_partition_by=partitionings.Singleton()))
agg = aggregate
+ applymap = frame_base._elementwise_method('applymap')
+
+ def memory_usage(self):
+ raise frame_base.WontImplementError()
+
+ all = frame_base._associative_agg_method('all')
+ any = frame_base._associative_agg_method('any')
+
+ cummax = cummin = cumsum = cumprod = frame_base.wont_implement_method(
+ 'order-sensitive')
+ diff = frame_base.wont_implement_method('order-sensitive')
+
+ max = frame_base._associative_agg_method('max')
+ min = frame_base._associative_agg_method('min')
+ mode = frame_base._agg_method('mode')
+
+ def dropna(self, axis=0, how='any', thresh=None, subset=None, inplace=False,
*args, **kwargs):
+ # TODO(robertwb): This is a common pattern. Generalize?
+ if axis == 1:
+ requires_partition_by = partitionings.Singleton()
+ else:
+ requires_partition_by = partitionings.Nothing()
+ result = frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'dropna',
+ lambda df: df.dropna(axis, how, thresh, subset, False, *args,
**kwargs),
+ [self._expr],
+ preserves_partition_by=partitionings.Singleton(),
+ requires_partition_by=requires_partition_by))
+ if inplace:
+ self._expr = result._expr
+ else:
+ return result
+
+ items = itertuples = iterrows = iteritems = frame_base.wont_implement_method(
+ 'non-lazy')
+
+ isna = frame_base._elementwise_method('isna')
+ notnull = notna = frame_base._elementwise_method('notna')
+
+ prod = product = frame_base._associative_agg_method('prod')
+
+ def quantile(self, q=0.5, axis=0, *args, **kwargs):
+ if axis != 0:
+ raise frame_base.WontImplementError('non-deferred column values')
+ return frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'quantile',
+ lambda df: df.quantile(q, axis, *args, **kwargs),
+ [self._expr],
+ #TODO(robertwb): Approximate quantiles?
+ requires_partition_by=partitionings.Singleton(),
+ preserves_partition_by=partitionings.Singleton()))
+
+ query = frame_base._elementwise_method('query')
+
+ def replace(self, to_replace=None,
+ value=None,
+ inplace=False,
+ limit=None, *args, **kwargs):
+ if limit is None:
+ requires_partition_by = partitionings.Nothing()
+ else:
+ requires_partition_by = partitionings.Singleton()
+ result = frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'replace',
+ lambda df: df.replace(to_replace, value, False, limit, *args,
**kwargs),
+ [self._expr],
+ preserves_partition_by=partitionings.Singleton(),
+ requires_partition_by=requires_partition_by))
+ if inplace:
+ self._expr = result._expr
+ else:
+ return result
+
+ def reset_index(self, level=None, drop=False, inplace=False, *args,
**kwargs):
+ if level is not None and not isinstance(level, (tuple, list)):
+ level = [level]
+ if level is None or len(level) == len(self._expr.proxy().index.levels):
+ # TODO: Could do distributed re-index with offsets.
+ requires_partition_by = partitionings.Singleton()
+ else:
+ requires_partition_by = partitionings.Nothing()
+ result = frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'reset_index',
+ lambda df: df.reset_index(level, drop, False, *args, **kwargs),
+ [self._expr],
+ preserves_partition_by=partitionings.Singleton(),
+ requires_partition_by=requires_partition_by))
+ if inplace:
+ self._expr = result._expr
+ else:
+ return result
+
+ round = frame_base._elementwise_method('round')
+ select_dtypes = frame_base._elementwise_method('select_dtypes')
+
+ def shift(self, periods=1, freq=None, axis=0, *args, **kwargs):
+ if axis == 1:
+ requires_partition_by = partitionings.Nothing()
+ else:
+ requires_partition_by = partitionings.Singleton()
+ return frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'shift',
+ lambda df: df.shift(periods, freq, axis, *args, **kwargs),
+ [self._expr],
+ preserves_partition_by=partitionings.Singleton(),
+ requires_partition_by=requires_partition_by))
+
+ @property
+ def shape(self):
+ raise frame_base.WontImplementError('scalar value')
+
+ def sort_values(self, by, axis=0, ascending=True, inplace=False, *args,
**kwargs):
+ if axis == 1:
+ requires_partition_by = partitionings.Nothing()
+ else:
+ requires_partition_by = partitionings.Singleton()
Review comment:
Any thoughts on how we should communicate to the user that Singleton
partitioning is happening and it's bad?
Maybe we should refuse to apply operations that require singleton
partitioning by default unless a user opts in?
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -111,19 +164,181 @@ def at(self, *args, **kwargs):
def loc(self):
return _DeferredLoc(self)
- def aggregate(self, func, axis=0, *args, **kwargs):
- if axis != 0:
- raise NotImplementedError()
+ def aggregate(self, *args, **kwargs):
+ if 'axis' in kwargs and kwargs['axis'] is None:
+ return self.agg(*args, **dict(kwargs, axis=1)).agg(
+ *args, **dict(kwargs, axis=0))
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'aggregate',
- lambda df: df.agg(func, axis, *args, **kwargs),
+ lambda df: df.agg(*args, **kwargs),
[self._expr],
# TODO(robertwb): Sub-aggregate when possible.
requires_partition_by=partitionings.Singleton()))
agg = aggregate
+ applymap = frame_base._elementwise_method('applymap')
+
+ def memory_usage(self):
+ raise frame_base.WontImplementError()
Review comment:
This should accept *args, **kwargs:
```suggestion
memory_usage = frame_base.wont_implement_method('scalar value')
```
It looks like this still can't be un-skipped after that change though
because the test uses `df.head()`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 463787)
Time Spent: 0.5h (was: 20m)
> Implement all pandas operations (or raise WontImplementError)
> -------------------------------------------------------------
>
> Key: BEAM-9547
> URL: https://issues.apache.org/jira/browse/BEAM-9547
> Project: Beam
> Issue Type: Improvement
> Components: sdk-py-core
> Reporter: Brian Hulette
> Assignee: Robert Bradshaw
> Priority: P2
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> We should have an implementation for every DataFrame, Series, and GroupBy
> method. Everything that's not actually implemented should get a default
> implementation that raises WontImplementError
> SeeĀ https://github.com/apache/beam/pull/10757#discussion_r389132292
--
This message was sent by Atlassian Jira
(v8.3.4#803005)