TheNeuralBit commented on a change in pull request #14135:
URL: https://github.com/apache/beam/pull/14135#discussion_r591956347



##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -282,30 +282,38 @@ def __repr__(self, indent=0):
                 self.outputs))
 
     # First define some helper functions.
-    def output_is_partitioned_by(expr, stage, partitioning):
-      if partitioning == partitionings.Nothing():
-        # Always satisfied.
-        return True
-      elif stage.partitioning == partitionings.Singleton():
-        # Within a stage, the singleton partitioning is trivially preserved.
-        return True
-      elif expr in stage.inputs:
+    def output_partitioning_in_stage(expr, stage):
+      """ Return the output partitioning of expr when computed in stage,
+      or returns None if the expression cannot be computed in this stage.
+      """
+      if expr in stage.inputs or expr in inputs:
         # Inputs are all partitioned by stage.partitioning.
-        return stage.partitioning.is_subpartitioning_of(partitioning)
-      elif expr.preserves_partition_by().is_subpartitioning_of(partitioning):
-        # Here expr preserves at least the requested partitioning; its outputs
-        # will also have this partitioning iff its inputs do.
-        if expr.requires_partition_by().is_subpartitioning_of(partitioning):
-          # If expr requires at least this partitioning, we will arrange such
-          # that its inputs satisfy this.
-          return True
-        else:
-          # Otherwise, recursively check all the inputs.
-          return all(
-              output_is_partitioned_by(arg, stage, partitioning)
-              for arg in expr.args())
-      else:
-        return False
+        return stage.partitioning
+
+      # Anything that's not an input must have arguments
+      assert len(expr.args())
+
+      arg_partitionings = set(
+          output_partitioning_in_stage(arg, stage) for arg in expr.args()
+          if not is_scalar(arg))
+
+      # TODO: what does it mean for all args to be 0? only scalar arguments? 
the

Review comment:
       No we didn't run into this case. I think this makes sense based on our 
discussion yesterday - there's currently no way to create an expression that 
operates on only scalar inputs. The only thing you can do with a 
`DeferredScalar` is use it in arithmetic with a `DeferredFrame`. If/when we 
support arithmetic on DeferredScalars I think we'd hit this. Probably we should 
just return expr.preserves_partition_by() in this case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to