[
https://issues.apache.org/jira/browse/BEAM-9547?focusedWorklogId=494129&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-494129
]
ASF GitHub Bot logged work on BEAM-9547:
----------------------------------------
Author: ASF GitHub Bot
Created on: 02/Oct/20 20:12
Start Date: 02/Oct/20 20:12
Worklog Time Spent: 10m
Work Description: apilloud commented on a change in pull request #12990:
URL: https://github.com/apache/beam/pull/12990#discussion_r499020168
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -34,6 +34,45 @@ def __array__(self, dtype=None):
between = frame_base._elementwise_method('between')
+ def dot(self, other):
+ left = self._expr
+ if isinstance(other, DeferredSeries):
+ right = expressions.ComputedExpression(
+ 'to_dataframe',
+ pd.DataFrame, [other._expr],
+ requires_partition_by=partitionings.Nothing(),
+ preserves_partition_by=partitionings.Index())
+ right_is_series = True
+ elif isinstance(other, DeferredDataFrame):
+ right = other._expr
+ right_is_series = False
+ else:
+ raise frame_base.WontImplementError('non-deferred result')
+
+ dots = expressions.ComputedExpression(
+ 'dot',
+ # Transpose so we can sum across rows.
+ lambda left,
+ right: pd.DataFrame(left @ right).T,
Review comment:
nit: this formatting makes it really unclear where the `lambda` ends. I
think this is it? Is this auto formatted? Can the `lambda` be put on a single
line or indented to make this clearer?
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -415,6 +453,37 @@ def aggregate(self, func, axis=0, *args, **kwargs):
'order-sensitive')
diff = frame_base.wont_implement_method('order-sensitive')
+ def dot(self, other):
+ # We want to broadcast the right hand side to all partitions of the left.
+ # This is OK, as its index must be the same size as the columns set of
self,
+ # so cannot be too large.
+ class AsScalar(object):
+ def __init__(self, value):
+ self.value = value
+
+ if isinstance(other, frame_base.DeferredFrame):
+ proxy = other._expr.proxy()
+ with expressions.allow_non_parallel_operations():
+ side = expressions.ComputedExpression(
+ 'as_scalar',
+ lambda df: AsScalar(df),
+ [other._expr],
+ requires_partition_by=partitionings.Singleton())
+ else:
Review comment:
Should this have a isinstance check as well?
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -34,6 +34,45 @@ def __array__(self, dtype=None):
between = frame_base._elementwise_method('between')
+ def dot(self, other):
+ left = self._expr
+ if isinstance(other, DeferredSeries):
+ right = expressions.ComputedExpression(
+ 'to_dataframe',
+ pd.DataFrame, [other._expr],
+ requires_partition_by=partitionings.Nothing(),
+ preserves_partition_by=partitionings.Index())
+ right_is_series = True
+ elif isinstance(other, DeferredDataFrame):
+ right = other._expr
+ right_is_series = False
+ else:
+ raise frame_base.WontImplementError('non-deferred result')
+
+ dots = expressions.ComputedExpression(
+ 'dot',
+ # Transpose so we can sum across rows.
+ lambda left,
+ right: pd.DataFrame(left @ right).T,
+ [left, right],
+ requires_partition_by=partitionings.Index())
+ with expressions.allow_non_parallel_operations(True):
+ sums = expressions.ComputedExpression(
+ 'sum',
+ lambda dots: dots.sum(), [dots],
+ requires_partition_by=partitionings.Singleton())
+
+ if right_is_series:
+ result = expressions.ComputedExpression(
+ 'extract',
+ lambda df: df[0], [sums],
Review comment:
nit: same thing here, in reverse. Can there be a newline here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 494129)
Time Spent: 19.5h (was: 19h 20m)
> Implement all pandas operations (or raise WontImplementError)
> -------------------------------------------------------------
>
> Key: BEAM-9547
> URL: https://issues.apache.org/jira/browse/BEAM-9547
> Project: Beam
> Issue Type: Improvement
> Components: sdk-py-core
> Reporter: Brian Hulette
> Assignee: Robert Bradshaw
> Priority: P2
> Time Spent: 19.5h
> Remaining Estimate: 0h
>
> We should have an implementation for every DataFrame, Series, and GroupBy
> method. Everything that's not actually implemented should get a default
> implementation that raises WontImplementError
> SeeĀ https://github.com/apache/beam/pull/10757#discussion_r389132292
> Progress at the individual operation level is tracked in a
> [spreadsheet|https://docs.google.com/spreadsheets/d/1hHAaJ0n0k2tw465ORs5tfdy4Lg0DnGWIQ53cLjAhel0/edit],
> consider requesting edit access if you'd like to help out.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)