[ 
https://issues.apache.org/jira/browse/BEAM-11881?focusedWorklogId=564154&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-564154
 ]

ASF GitHub Bot logged work on BEAM-11881:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/Mar/21 23:36
            Start Date: 10/Mar/21 23:36
    Worklog Time Spent: 10m 
      Work Description: TheNeuralBit commented on a change in pull request 
#14135:
URL: https://github.com/apache/beam/pull/14135#discussion_r591956347



##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -282,30 +282,38 @@ def __repr__(self, indent=0):
                 self.outputs))
 
     # First define some helper functions.
-    def output_is_partitioned_by(expr, stage, partitioning):
-      if partitioning == partitionings.Nothing():
-        # Always satisfied.
-        return True
-      elif stage.partitioning == partitionings.Singleton():
-        # Within a stage, the singleton partitioning is trivially preserved.
-        return True
-      elif expr in stage.inputs:
+    def output_partitioning_in_stage(expr, stage):
+      """ Return the output partitioning of expr when computed in stage,
+      or returns None if the expression cannot be computed in this stage.
+      """
+      if expr in stage.inputs or expr in inputs:
         # Inputs are all partitioned by stage.partitioning.
-        return stage.partitioning.is_subpartitioning_of(partitioning)
-      elif expr.preserves_partition_by().is_subpartitioning_of(partitioning):
-        # Here expr preserves at least the requested partitioning; its outputs
-        # will also have this partitioning iff its inputs do.
-        if expr.requires_partition_by().is_subpartitioning_of(partitioning):
-          # If expr requires at least this partitioning, we will arrange such
-          # that its inputs satisfy this.
-          return True
-        else:
-          # Otherwise, recursively check all the inputs.
-          return all(
-              output_is_partitioned_by(arg, stage, partitioning)
-              for arg in expr.args())
-      else:
-        return False
+        return stage.partitioning
+
+      # Anything that's not an input must have arguments
+      assert len(expr.args())
+
+      arg_partitionings = set(
+          output_partitioning_in_stage(arg, stage) for arg in expr.args()
+          if not is_scalar(arg))
+
+      # TODO: what does it mean for all args to be 0? only scalar arguments? 
the

Review comment:
       No we didn't run into this case. I think this makes sense based on our 
discussion yesterday - there's currently no way to create an expression that 
operates on only scalar inputs. The only thing you can do with a 
`DeferredScalar` is use it in arithmetic with a `DeferredFrame`. If/when we 
support arithmetic on DeferredScalars I think we'd hit this. Probably we should 
just return expr.preserves_partition_by() in this case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 564154)
    Time Spent: 3h 10m  (was: 3h)

> DataFrame subpartitioning order is incorrect
> --------------------------------------------
>
>                 Key: BEAM-11881
>                 URL: https://issues.apache.org/jira/browse/BEAM-11881
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Brian Hulette
>            Priority: P2
>              Labels: dataframe-api
>          Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> Currently we've defined
> Nothing() < Index([i]) < Index([i,j]) < .. < Index() < Singleton()
> s.t. Singleton is a subpartitoning of Index, is a subpartitioning of 
> Index([i,j]), but this is incorrect. The order should be 
> Singleton() < Index([i]) < Index([i,j]) < .. < Index() < Nothing()
> s.t. every other partitioning is a subpartitioning of Singleton. This is 
> logical, since Singleton will collect the largest amount of data on a single 
> node, partitioning by a single index will be alittle more distributed, and 
> partitioning by the full Index() will be the most distribtued.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to