apilloud commented on a change in pull request #12990:
URL: https://github.com/apache/beam/pull/12990#discussion_r499020168
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -34,6 +34,45 @@ def __array__(self, dtype=None):
between = frame_base._elementwise_method('between')
+ def dot(self, other):
+ left = self._expr
+ if isinstance(other, DeferredSeries):
+ right = expressions.ComputedExpression(
+ 'to_dataframe',
+ pd.DataFrame, [other._expr],
+ requires_partition_by=partitionings.Nothing(),
+ preserves_partition_by=partitionings.Index())
+ right_is_series = True
+ elif isinstance(other, DeferredDataFrame):
+ right = other._expr
+ right_is_series = False
+ else:
+ raise frame_base.WontImplementError('non-deferred result')
+
+ dots = expressions.ComputedExpression(
+ 'dot',
+ # Transpose so we can sum across rows.
+ lambda left,
+ right: pd.DataFrame(left @ right).T,
Review comment:
nit: this formatting makes it really unclear where the `lambda` ends. I
think this is it? Is this auto formatted? Can the `lambda` be put on a single
line or indented to make this clearer?
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -415,6 +453,37 @@ def aggregate(self, func, axis=0, *args, **kwargs):
'order-sensitive')
diff = frame_base.wont_implement_method('order-sensitive')
+ def dot(self, other):
+ # We want to broadcast the right hand side to all partitions of the left.
+ # This is OK, as its index must be the same size as the columns set of
self,
+ # so cannot be too large.
+ class AsScalar(object):
+ def __init__(self, value):
+ self.value = value
+
+ if isinstance(other, frame_base.DeferredFrame):
+ proxy = other._expr.proxy()
+ with expressions.allow_non_parallel_operations():
+ side = expressions.ComputedExpression(
+ 'as_scalar',
+ lambda df: AsScalar(df),
+ [other._expr],
+ requires_partition_by=partitionings.Singleton())
+ else:
Review comment:
Should this have a isinstance check as well?
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -34,6 +34,45 @@ def __array__(self, dtype=None):
between = frame_base._elementwise_method('between')
+ def dot(self, other):
+ left = self._expr
+ if isinstance(other, DeferredSeries):
+ right = expressions.ComputedExpression(
+ 'to_dataframe',
+ pd.DataFrame, [other._expr],
+ requires_partition_by=partitionings.Nothing(),
+ preserves_partition_by=partitionings.Index())
+ right_is_series = True
+ elif isinstance(other, DeferredDataFrame):
+ right = other._expr
+ right_is_series = False
+ else:
+ raise frame_base.WontImplementError('non-deferred result')
+
+ dots = expressions.ComputedExpression(
+ 'dot',
+ # Transpose so we can sum across rows.
+ lambda left,
+ right: pd.DataFrame(left @ right).T,
+ [left, right],
+ requires_partition_by=partitionings.Index())
+ with expressions.allow_non_parallel_operations(True):
+ sums = expressions.ComputedExpression(
+ 'sum',
+ lambda dots: dots.sum(), [dots],
+ requires_partition_by=partitionings.Singleton())
+
+ if right_is_series:
+ result = expressions.ComputedExpression(
+ 'extract',
+ lambda df: df[0], [sums],
Review comment:
nit: same thing here, in reverse. Can there be a newline here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]