TheNeuralBit commented on a change in pull request #14135:
URL: https://github.com/apache/beam/pull/14135#discussion_r591984531



##########
File path: sdks/python/apache_beam/dataframe/expressions_test.py
##########
@@ -53,6 +54,61 @@ def test_expression_proxy_error(self):
     with self.assertRaises(TypeError):
       expressions.ComputedExpression('add', lambda a, b: a + b, [a, b])
 
+  def test_output_partitioning_preserves_singleton(self):
+    a = expressions.PlaceholderExpression(1)
+    b = expressions.PlaceholderExpression(2)
+
+    preserves_singleton = expressions.ComputedExpression(
+        'add',
+        lambda a,

Review comment:
       Made these into real functions that yapf doesn't mess up.

##########
File path: sdks/python/apache_beam/dataframe/expressions_test.py
##########
@@ -53,6 +54,61 @@ def test_expression_proxy_error(self):
     with self.assertRaises(TypeError):
       expressions.ComputedExpression('add', lambda a, b: a + b, [a, b])
 
+  def test_output_partitioning_preserves_singleton(self):
+    a = expressions.PlaceholderExpression(1)
+    b = expressions.PlaceholderExpression(2)
+
+    preserves_singleton = expressions.ComputedExpression(

Review comment:
       Done

##########
File path: sdks/python/apache_beam/dataframe/expressions_test.py
##########
@@ -53,6 +54,61 @@ def test_expression_proxy_error(self):
     with self.assertRaises(TypeError):
       expressions.ComputedExpression('add', lambda a, b: a + b, [a, b])
 
+  def test_output_partitioning_preserves_singleton(self):
+    a = expressions.PlaceholderExpression(1)
+    b = expressions.PlaceholderExpression(2)
+
+    preserves_singleton = expressions.ComputedExpression(
+        'add',
+        lambda a,
+        b: a + b, [a, b],
+        requires_partition_by=partitionings.Nothing(),
+        preserves_partition_by=partitionings.Singleton())
+
+    for partitioning in (partitionings.Singleton(), ):
+      self.assertEqual(
+          expressions.output_partitioning(preserves_singleton, partitioning),
+          partitioning,
+          f"Should preserve {partitioning}")
+
+    for partitioning in (partitionings.Index([0]),
+                         partitionings.Index(),
+                         partitionings.Nothing()):
+      self.assertEqual(
+          expressions.output_partitioning(preserves_singleton, partitioning),
+          partitionings.Nothing(),
+          f"Should NOT preserve {partitioning}")
+
+  def test_output_partitioning_preserves_index(self):
+    a = expressions.PlaceholderExpression(1)
+    b = expressions.PlaceholderExpression(2)
+
+    preserves_index = expressions.ComputedExpression(

Review comment:
       Done

##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -282,30 +282,38 @@ def __repr__(self, indent=0):
                 self.outputs))
 
     # First define some helper functions.
-    def output_is_partitioned_by(expr, stage, partitioning):
-      if partitioning == partitionings.Nothing():
-        # Always satisfied.
-        return True
-      elif stage.partitioning == partitionings.Singleton():
-        # Within a stage, the singleton partitioning is trivially preserved.
-        return True
-      elif expr in stage.inputs:
+    def output_partitioning_in_stage(expr, stage):
+      """ Return the output partitioning of expr when computed in stage,

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to