[
https://issues.apache.org/jira/browse/BEAM-9547?focusedWorklogId=494161&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-494161
]
ASF GitHub Bot logged work on BEAM-9547:
----------------------------------------
Author: ASF GitHub Bot
Created on: 02/Oct/20 21:37
Start Date: 02/Oct/20 21:37
Worklog Time Spent: 10m
Work Description: robertwb commented on a change in pull request #12990:
URL: https://github.com/apache/beam/pull/12990#discussion_r499060394
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -34,6 +34,45 @@ def __array__(self, dtype=None):
between = frame_base._elementwise_method('between')
+ def dot(self, other):
+ left = self._expr
+ if isinstance(other, DeferredSeries):
+ right = expressions.ComputedExpression(
+ 'to_dataframe',
+ pd.DataFrame, [other._expr],
+ requires_partition_by=partitionings.Nothing(),
+ preserves_partition_by=partitionings.Index())
+ right_is_series = True
+ elif isinstance(other, DeferredDataFrame):
+ right = other._expr
+ right_is_series = False
+ else:
+ raise frame_base.WontImplementError('non-deferred result')
+
+ dots = expressions.ComputedExpression(
+ 'dot',
+ # Transpose so we can sum across rows.
+ lambda left,
+ right: pd.DataFrame(left @ right).T,
Review comment:
Yes, this is the auto-formatter. I'll play around with this.
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -34,6 +34,45 @@ def __array__(self, dtype=None):
between = frame_base._elementwise_method('between')
+ def dot(self, other):
+ left = self._expr
+ if isinstance(other, DeferredSeries):
+ right = expressions.ComputedExpression(
+ 'to_dataframe',
+ pd.DataFrame, [other._expr],
+ requires_partition_by=partitionings.Nothing(),
+ preserves_partition_by=partitionings.Index())
+ right_is_series = True
+ elif isinstance(other, DeferredDataFrame):
+ right = other._expr
+ right_is_series = False
+ else:
+ raise frame_base.WontImplementError('non-deferred result')
+
+ dots = expressions.ComputedExpression(
+ 'dot',
+ # Transpose so we can sum across rows.
+ lambda left,
+ right: pd.DataFrame(left @ right).T,
+ [left, right],
+ requires_partition_by=partitionings.Index())
+ with expressions.allow_non_parallel_operations(True):
+ sums = expressions.ComputedExpression(
+ 'sum',
+ lambda dots: dots.sum(), [dots],
+ requires_partition_by=partitionings.Singleton())
+
+ if right_is_series:
+ result = expressions.ComputedExpression(
+ 'extract',
+ lambda df: df[0], [sums],
Review comment:
Same.
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -415,6 +453,37 @@ def aggregate(self, func, axis=0, *args, **kwargs):
'order-sensitive')
diff = frame_base.wont_implement_method('order-sensitive')
+ def dot(self, other):
+ # We want to broadcast the right hand side to all partitions of the left.
+ # This is OK, as its index must be the same size as the columns set of
self,
+ # so cannot be too large.
+ class AsScalar(object):
+ def __init__(self, value):
+ self.value = value
+
+ if isinstance(other, frame_base.DeferredFrame):
+ proxy = other._expr.proxy()
+ with expressions.allow_non_parallel_operations():
+ side = expressions.ComputedExpression(
+ 'as_scalar',
+ lambda df: AsScalar(df),
+ [other._expr],
+ requires_partition_by=partitionings.Singleton())
+ else:
Review comment:
This side is all the constant types that pandas may accept.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 494161)
Time Spent: 19h 40m (was: 19.5h)
> Implement all pandas operations (or raise WontImplementError)
> -------------------------------------------------------------
>
> Key: BEAM-9547
> URL: https://issues.apache.org/jira/browse/BEAM-9547
> Project: Beam
> Issue Type: Improvement
> Components: sdk-py-core
> Reporter: Brian Hulette
> Assignee: Robert Bradshaw
> Priority: P2
> Time Spent: 19h 40m
> Remaining Estimate: 0h
>
> We should have an implementation for every DataFrame, Series, and GroupBy
> method. Everything that's not actually implemented should get a default
> implementation that raises WontImplementError
> SeeĀ https://github.com/apache/beam/pull/10757#discussion_r389132292
> Progress at the individual operation level is tracked in a
> [spreadsheet|https://docs.google.com/spreadsheets/d/1hHAaJ0n0k2tw465ORs5tfdy4Lg0DnGWIQ53cLjAhel0/edit],
> consider requesting edit access if you'd like to help out.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)