[
https://issues.apache.org/jira/browse/BEAM-9547?focusedWorklogId=575172&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-575172
]
ASF GitHub Bot logged work on BEAM-9547:
----------------------------------------
Author: ASF GitHub Bot
Created on: 31/Mar/21 21:51
Start Date: 31/Mar/21 21:51
Worklog Time Spent: 10m
Work Description: pabloem commented on a change in pull request #14274:
URL: https://github.com/apache/beam/pull/14274#discussion_r605241660
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -344,6 +344,57 @@ def equals(self, other):
requires_partition_by=partitionings.Singleton(),
preserves_partition_by=partitionings.Singleton()))
+ @frame_base.args_to_kwargs(pd.DataFrame)
+ @frame_base.populate_defaults(pd.DataFrame)
+ @frame_base.maybe_inplace
+ def where(self, cond, other, errors, **kwargs):
+ requires = partitionings.Arbitrary()
+ deferred_args = {}
+ actual_args = {}
+
+ # TODO(bhulette): This is very similar to the logic in
+ # frame_base.elementwise_method, can we unify it?
+ if isinstance(cond, frame_base.DeferredFrame):
+ deferred_args['cond'] = cond
+ requires = partitionings.Index()
+ else:
+ actual_args['cond'] = cond
+
+ if isinstance(other, frame_base.DeferredFrame):
+ deferred_args['other'] = other
+ requires = partitionings.Index()
+ else:
+ actual_args['other'] = other
+
+ if errors == "ignore":
+ # We need all data in order to ignore errors and propagate the original
+ # data.
+ requires = partitionings.Singleton()
Review comment:
are errors related to the whole DF or Series? can there be errors
related to individual elements in the series, and ignored individually?
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -344,6 +344,57 @@ def equals(self, other):
requires_partition_by=partitionings.Singleton(),
preserves_partition_by=partitionings.Singleton()))
+ @frame_base.args_to_kwargs(pd.DataFrame)
Review comment:
It's not a must for now, but I'd be curious to see more pydoc on the
elements from the framework so that I can wrap my head around them more easily
: )
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -344,6 +344,57 @@ def equals(self, other):
requires_partition_by=partitionings.Singleton(),
preserves_partition_by=partitionings.Singleton()))
+ @frame_base.args_to_kwargs(pd.DataFrame)
+ @frame_base.populate_defaults(pd.DataFrame)
+ @frame_base.maybe_inplace
+ def where(self, cond, other, errors, **kwargs):
+ requires = partitionings.Arbitrary()
+ deferred_args = {}
+ actual_args = {}
+
+ # TODO(bhulette): This is very similar to the logic in
+ # frame_base.elementwise_method, can we unify it?
+ if isinstance(cond, frame_base.DeferredFrame):
+ deferred_args['cond'] = cond
+ requires = partitionings.Index()
+ else:
+ actual_args['cond'] = cond
+
+ if isinstance(other, frame_base.DeferredFrame):
+ deferred_args['other'] = other
+ requires = partitionings.Index()
+ else:
+ actual_args['other'] = other
+
+ if errors == "ignore":
+ # We need all data in order to ignore errors and propagate the original
+ # data.
+ requires = partitionings.Singleton()
+
+ actual_args['errors'] = errors
+
+ def where(self, *args):
+ runtime_values = {
+ name: value
+ for (name, value) in zip(deferred_args.keys(), args)
+ }
+ return self.where(**runtime_values, **actual_args, **kwargs)
Review comment:
I feel a little confused. What is `self` in this where fn call? Is this
called at pipeline runtime by a dataframe/series?
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -344,6 +344,57 @@ def equals(self, other):
requires_partition_by=partitionings.Singleton(),
preserves_partition_by=partitionings.Singleton()))
+ @frame_base.args_to_kwargs(pd.DataFrame)
+ @frame_base.populate_defaults(pd.DataFrame)
+ @frame_base.maybe_inplace
+ def where(self, cond, other, errors, **kwargs):
+ requires = partitionings.Arbitrary()
+ deferred_args = {}
+ actual_args = {}
+
+ # TODO(bhulette): This is very similar to the logic in
+ # frame_base.elementwise_method, can we unify it?
+ if isinstance(cond, frame_base.DeferredFrame):
+ deferred_args['cond'] = cond
+ requires = partitionings.Index()
+ else:
+ actual_args['cond'] = cond
+
+ if isinstance(other, frame_base.DeferredFrame):
+ deferred_args['other'] = other
+ requires = partitionings.Index()
+ else:
+ actual_args['other'] = other
+
+ if errors == "ignore":
+ # We need all data in order to ignore errors and propagate the original
+ # data.
+ requires = partitionings.Singleton()
Review comment:
I guess my question is - what is an example of an error here?
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -344,6 +344,57 @@ def equals(self, other):
requires_partition_by=partitionings.Singleton(),
preserves_partition_by=partitionings.Singleton()))
+ @frame_base.args_to_kwargs(pd.DataFrame)
+ @frame_base.populate_defaults(pd.DataFrame)
+ @frame_base.maybe_inplace
+ def where(self, cond, other, errors, **kwargs):
+ requires = partitionings.Arbitrary()
+ deferred_args = {}
+ actual_args = {}
+
+ # TODO(bhulette): This is very similar to the logic in
+ # frame_base.elementwise_method, can we unify it?
+ if isinstance(cond, frame_base.DeferredFrame):
+ deferred_args['cond'] = cond
+ requires = partitionings.Index()
+ else:
+ actual_args['cond'] = cond
+
+ if isinstance(other, frame_base.DeferredFrame):
+ deferred_args['other'] = other
+ requires = partitionings.Index()
+ else:
+ actual_args['other'] = other
+
+ if errors == "ignore":
+ # We need all data in order to ignore errors and propagate the original
+ # data.
+ requires = partitionings.Singleton()
+
+ actual_args['errors'] = errors
+
+ def where(self, *args):
+ runtime_values = {
+ name: value
+ for (name, value) in zip(deferred_args.keys(), args)
+ }
+ return self.where(**runtime_values, **actual_args, **kwargs)
+
+ return frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ "where",
+ where,
+ [self._expr] + [df._expr for df in deferred_args.values()],
+ requires_partition_by=requires,
+ preserves_partition_by=partitionings.Index(),
Review comment:
In this case, IIUC, output partitioning will be by Index (or by
Singleton for `errors='ignore'`), right?
Am I correct understanding that Singleton partitioning is a special case of
Index partitioning, and thus it's included?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 575172)
Time Spent: 59.5h (was: 59h 20m)
> Implement all pandas operations (or raise WontImplementError)
> -------------------------------------------------------------
>
> Key: BEAM-9547
> URL: https://issues.apache.org/jira/browse/BEAM-9547
> Project: Beam
> Issue Type: Improvement
> Components: sdk-py-core
> Reporter: Brian Hulette
> Assignee: Robert Bradshaw
> Priority: P2
> Labels: dataframe-api
> Time Spent: 59.5h
> Remaining Estimate: 0h
>
> We should have an implementation for every DataFrame, Series, and GroupBy
> method. Everything that's not possible to implement should get a default
> implementation that raises WontImplementError
> SeeĀ https://github.com/apache/beam/pull/10757#discussion_r389132292
> Progress at the individual operation level is tracked in a
> [spreadsheet|https://docs.google.com/spreadsheets/d/1hHAaJ0n0k2tw465ORs5tfdy4Lg0DnGWIQ53cLjAhel0/edit],
> consider requesting edit access if you'd like to help out.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)