TheNeuralBit commented on a change in pull request #13141:
URL: https://github.com/apache/beam/pull/13141#discussion_r512962676
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -575,6 +575,8 @@ def __setitem__(self, key, value):
else:
raise NotImplementedError(key)
+ align = frame_base._elementwise_method('align')
Review comment:
Is this safe to implement with `elementwise_method`? (need to justify
this to myself)
I think it will work for most cases.. the resulting expression will require
`partitionings.Index` since we do that if any arg is a DeferredBase, and
`other` should always be one. That should handle axis='index'. Also all of the
join modes should work as intended within Index partitions.
I think actually for `axis='columns'` Index partitioning is _too_
restrictive. We could do that without any partitioning. Is that right?
Looking at the remaining args:
- `copy`: I think copy=False won't work since we can't predict if the
operation is inplace or not, it depends on the data.
- `fill_value`: Works trivially
- `method`: non-default options are order-sensitive
- `limit`: I don't think we can support this correctly right now without
Singleton partitioning
- `level`, `fill_axis`, `broadcast_axis`: I'm not actually sure what these
are doing.
Can we reject the options that won't work?
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -771,6 +773,62 @@ def fill_matrix(*args):
requires_partition_by=partitionings.Singleton(),
proxy=proxy))
+ @frame_base.args_to_kwargs(pd.DataFrame)
+ @frame_base.populate_defaults(pd.DataFrame)
+ def corrwith(self, other, axis, **kwargs):
+ if axis not in (0, 'index'):
+ raise NotImplementedError('corrwith(axis=%r)' % axis)
+ if not isinstance(other, frame_base.DeferredFrame):
+ other = frame_base.DeferredFrame.wrap(
+ expressions.ConstantExpression(other))
+
+ if isinstance(other, DeferredSeries):
+ proxy = self._expr.proxy().corrwith(other._expr.proxy())
+ self, other = self.align(other, axis=0, join='inner')
+ corrs = [self[col].corr(other, **kwargs) for col in proxy.index]
Review comment:
It looks like `method` is actually the only `kwarg` that overlaps
between corr and corrwith. We should probably pass that explicitly, and handle
the remaining arg, `drop`, here.
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -771,6 +773,62 @@ def fill_matrix(*args):
requires_partition_by=partitionings.Singleton(),
proxy=proxy))
+ @frame_base.args_to_kwargs(pd.DataFrame)
+ @frame_base.populate_defaults(pd.DataFrame)
+ def corrwith(self, other, axis, **kwargs):
+ if axis not in (0, 'index'):
+ raise NotImplementedError('corrwith(axis=%r)' % axis)
+ if not isinstance(other, frame_base.DeferredFrame):
+ other = frame_base.DeferredFrame.wrap(
+ expressions.ConstantExpression(other))
+
+ if isinstance(other, DeferredSeries):
+ proxy = self._expr.proxy().corrwith(other._expr.proxy())
+ self, other = self.align(other, axis=0, join='inner')
+ corrs = [self[col].corr(other, **kwargs) for col in proxy.index]
+ def fill_dataframe(*args):
+ result = proxy.copy(deep=True)
+ for col, value in zip(proxy.index, args):
+ result[col] = value
+ return result
+ with expressions.allow_non_parallel_operations(True):
+ return frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'fill_dataframe',
+ fill_dataframe,
+ [corr._expr for corr in corrs],
+ requires_partition_by=partitionings.Singleton(),
+ proxy=proxy))
+
+ elif isinstance(other, DeferredDataFrame):
+ proxy = self._expr.proxy().corrwith(other._expr.proxy())
+ self, other = self.align(other, axis=0, join='inner')
+ valid_cols = list(
+ set(self.columns)
+ .intersection(other.columns)
+ .intersection(proxy.index))
+ corrs = [self[col].corr(other[col], **kwargs) for col in valid_cols]
+ def fill_dataframe(*args):
+ result = proxy.copy(deep=True)
+ for col, value in zip(valid_cols, args):
+ result[col] = value
+ return result
+ with expressions.allow_non_parallel_operations(True):
+ return frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'fill_dataframe',
+ fill_dataframe,
+ [corr._expr for corr in corrs],
+ requires_partition_by=partitionings.Singleton(),
+ proxy=proxy))
Review comment:
nit: The two branches here are almost identical, the only difference is
`valid_cols` vs. `proxy.index`. You might consider re-working this so the other
logic is shared. That might just make it more confusing though... up to you.
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -771,6 +773,62 @@ def fill_matrix(*args):
requires_partition_by=partitionings.Singleton(),
proxy=proxy))
+ @frame_base.args_to_kwargs(pd.DataFrame)
+ @frame_base.populate_defaults(pd.DataFrame)
+ def corrwith(self, other, axis, **kwargs):
+ if axis not in (0, 'index'):
+ raise NotImplementedError('corrwith(axis=%r)' % axis)
+ if not isinstance(other, frame_base.DeferredFrame):
+ other = frame_base.DeferredFrame.wrap(
+ expressions.ConstantExpression(other))
+
+ if isinstance(other, DeferredSeries):
+ proxy = self._expr.proxy().corrwith(other._expr.proxy())
+ self, other = self.align(other, axis=0, join='inner')
+ corrs = [self[col].corr(other, **kwargs) for col in proxy.index]
+ def fill_dataframe(*args):
+ result = proxy.copy(deep=True)
+ for col, value in zip(proxy.index, args):
+ result[col] = value
+ return result
+ with expressions.allow_non_parallel_operations(True):
+ return frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'fill_dataframe',
+ fill_dataframe,
+ [corr._expr for corr in corrs],
+ requires_partition_by=partitionings.Singleton(),
+ proxy=proxy))
+
+ elif isinstance(other, DeferredDataFrame):
+ proxy = self._expr.proxy().corrwith(other._expr.proxy())
+ self, other = self.align(other, axis=0, join='inner')
+ valid_cols = list(
+ set(self.columns)
+ .intersection(other.columns)
+ .intersection(proxy.index))
+ corrs = [self[col].corr(other[col], **kwargs) for col in valid_cols]
+ def fill_dataframe(*args):
Review comment:
```suggestion
# Generate expressions to compute the actual correlations
corrs = [self[col].corr(other[col], **kwargs) for col in valid_cols]
# Combine the results
def fill_dataframe(*args):
```
It took me a while to realize this is what was going on, hopefully this will
expedite it for future readers.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]