[ 
https://issues.apache.org/jira/browse/BEAM-9547?focusedWorklogId=575591&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-575591
 ]

ASF GitHub Bot logged work on BEAM-9547:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Apr/21 16:16
            Start Date: 01/Apr/21 16:16
    Worklog Time Spent: 10m 
      Work Description: TheNeuralBit commented on a change in pull request 
#14274:
URL: https://github.com/apache/beam/pull/14274#discussion_r605768355



##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -344,6 +344,57 @@ def equals(self, other):
               requires_partition_by=partitionings.Singleton(),
               preserves_partition_by=partitionings.Singleton()))
 
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  @frame_base.maybe_inplace
+  def where(self, cond, other, errors, **kwargs):
+    requires = partitionings.Arbitrary()
+    deferred_args = {}
+    actual_args = {}
+
+    # TODO(bhulette): This is very similar to the logic in
+    # frame_base.elementwise_method, can we unify it?
+    if isinstance(cond, frame_base.DeferredFrame):
+      deferred_args['cond'] = cond
+      requires = partitionings.Index()
+    else:
+      actual_args['cond'] = cond
+
+    if isinstance(other, frame_base.DeferredFrame):
+      deferred_args['other'] = other
+      requires = partitionings.Index()
+    else:
+      actual_args['other'] = other
+
+    if errors == "ignore":
+      # We need all data in order to ignore errors and propagate the original
+      # data.
+      requires = partitionings.Singleton()

Review comment:
       I think the intention is that if you have `try_cast=True`, this 
parameter will determine what to do if we encounter an element that can't be 
casted. It does not seem to affect "construction time" errors, like passing in 
a DataFrame with non-matching columns.
   
   Based on the [pandas 
docs](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.where.html) 
it seems this parameter doesn't actually affect anything right now since "the 
results and will always coerce to a suitable dtype", but I figure we may as 
well handle it this way to future-proof.
   

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -344,6 +344,57 @@ def equals(self, other):
               requires_partition_by=partitionings.Singleton(),
               preserves_partition_by=partitionings.Singleton()))
 
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  @frame_base.maybe_inplace
+  def where(self, cond, other, errors, **kwargs):
+    requires = partitionings.Arbitrary()
+    deferred_args = {}
+    actual_args = {}
+
+    # TODO(bhulette): This is very similar to the logic in
+    # frame_base.elementwise_method, can we unify it?
+    if isinstance(cond, frame_base.DeferredFrame):
+      deferred_args['cond'] = cond
+      requires = partitionings.Index()
+    else:
+      actual_args['cond'] = cond
+
+    if isinstance(other, frame_base.DeferredFrame):
+      deferred_args['other'] = other
+      requires = partitionings.Index()
+    else:
+      actual_args['other'] = other
+
+    if errors == "ignore":
+      # We need all data in order to ignore errors and propagate the original
+      # data.
+      requires = partitionings.Singleton()
+
+    actual_args['errors'] = errors
+
+    def where(self, *args):
+      runtime_values = {
+          name: value
+          for (name, value) in zip(deferred_args.keys(), args)
+      }
+      return self.where(**runtime_values, **actual_args, **kwargs)

Review comment:
       Yeah your confusion is understandable, I just pushed a commit to clarify 
the naming a bit. You're right this is the function that will be used at 
execution time, it's called with the outputs from the input expressions:
   
   ```
   [self._expr] + [df._expr for df in deferred_args.values()],
   ```
   
   So in this function, the first argument, `self`, will be a pandas DataFrame 
(the execution time result of `self._expr`)

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -344,6 +344,57 @@ def equals(self, other):
               requires_partition_by=partitionings.Singleton(),
               preserves_partition_by=partitionings.Singleton()))
 
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  @frame_base.maybe_inplace
+  def where(self, cond, other, errors, **kwargs):
+    requires = partitionings.Arbitrary()
+    deferred_args = {}
+    actual_args = {}
+
+    # TODO(bhulette): This is very similar to the logic in
+    # frame_base.elementwise_method, can we unify it?
+    if isinstance(cond, frame_base.DeferredFrame):
+      deferred_args['cond'] = cond
+      requires = partitionings.Index()
+    else:
+      actual_args['cond'] = cond
+
+    if isinstance(other, frame_base.DeferredFrame):
+      deferred_args['other'] = other
+      requires = partitionings.Index()
+    else:
+      actual_args['other'] = other
+
+    if errors == "ignore":
+      # We need all data in order to ignore errors and propagate the original
+      # data.
+      requires = partitionings.Singleton()
+
+    actual_args['errors'] = errors
+
+    def where(self, *args):
+      runtime_values = {
+          name: value
+          for (name, value) in zip(deferred_args.keys(), args)
+      }
+      return self.where(**runtime_values, **actual_args, **kwargs)
+
+    return frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            "where",
+            where,
+            [self._expr] + [df._expr for df in deferred_args.values()],
+            requires_partition_by=requires,
+            preserves_partition_by=partitionings.Index(),

Review comment:
       Yes you have that right. It's also possible that an upstream expression 
will require `Singleton` partitioning, which could propagate into this one.
   
   I tried to document these semantics with some pictures over in 
expressions.py: 
https://github.com/apache/beam/blob/7963cd3329f5349cb3ad93e0bbdebebdeeb3b86f/sdks/python/apache_beam/dataframe/expressions.py#L180-L188
   
   If the input is any partitioning that's "less than" the `preserves` spec, 
the output will have that partitioning. Otherwise we make no guarantees.

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -344,6 +344,57 @@ def equals(self, other):
               requires_partition_by=partitionings.Singleton(),
               preserves_partition_by=partitionings.Singleton()))
 
+  @frame_base.args_to_kwargs(pd.DataFrame)

Review comment:
       Good point, I will add these :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 575591)
    Time Spent: 59h 50m  (was: 59h 40m)

> Implement all pandas operations (or raise WontImplementError)
> -------------------------------------------------------------
>
>                 Key: BEAM-9547
>                 URL: https://issues.apache.org/jira/browse/BEAM-9547
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>            Reporter: Brian Hulette
>            Assignee: Robert Bradshaw
>            Priority: P2
>              Labels: dataframe-api
>          Time Spent: 59h 50m
>  Remaining Estimate: 0h
>
> We should have an implementation for every DataFrame, Series, and GroupBy 
> method. Everything that's not possible to implement should get a default 
> implementation that raises WontImplementError
> SeeĀ https://github.com/apache/beam/pull/10757#discussion_r389132292
> Progress at the individual operation level is tracked in a 
> [spreadsheet|https://docs.google.com/spreadsheets/d/1hHAaJ0n0k2tw465ORs5tfdy4Lg0DnGWIQ53cLjAhel0/edit],
>  consider requesting edit access if you'd like to help out.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to