[
https://issues.apache.org/jira/browse/BEAM-11881?focusedWorklogId=564187&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-564187
]
ASF GitHub Bot logged work on BEAM-11881:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 11/Mar/21 00:45
Start Date: 11/Mar/21 00:45
Worklog Time Spent: 10m
Work Description: TheNeuralBit commented on a change in pull request
#14135:
URL: https://github.com/apache/beam/pull/14135#discussion_r591984531
##########
File path: sdks/python/apache_beam/dataframe/expressions_test.py
##########
@@ -53,6 +54,61 @@ def test_expression_proxy_error(self):
with self.assertRaises(TypeError):
expressions.ComputedExpression('add', lambda a, b: a + b, [a, b])
+ def test_output_partitioning_preserves_singleton(self):
+ a = expressions.PlaceholderExpression(1)
+ b = expressions.PlaceholderExpression(2)
+
+ preserves_singleton = expressions.ComputedExpression(
+ 'add',
+ lambda a,
Review comment:
Made these into real functions that yapf doesn't mess up.
##########
File path: sdks/python/apache_beam/dataframe/expressions_test.py
##########
@@ -53,6 +54,61 @@ def test_expression_proxy_error(self):
with self.assertRaises(TypeError):
expressions.ComputedExpression('add', lambda a, b: a + b, [a, b])
+ def test_output_partitioning_preserves_singleton(self):
+ a = expressions.PlaceholderExpression(1)
+ b = expressions.PlaceholderExpression(2)
+
+ preserves_singleton = expressions.ComputedExpression(
Review comment:
Done
##########
File path: sdks/python/apache_beam/dataframe/expressions_test.py
##########
@@ -53,6 +54,61 @@ def test_expression_proxy_error(self):
with self.assertRaises(TypeError):
expressions.ComputedExpression('add', lambda a, b: a + b, [a, b])
+ def test_output_partitioning_preserves_singleton(self):
+ a = expressions.PlaceholderExpression(1)
+ b = expressions.PlaceholderExpression(2)
+
+ preserves_singleton = expressions.ComputedExpression(
+ 'add',
+ lambda a,
+ b: a + b, [a, b],
+ requires_partition_by=partitionings.Nothing(),
+ preserves_partition_by=partitionings.Singleton())
+
+ for partitioning in (partitionings.Singleton(), ):
+ self.assertEqual(
+ expressions.output_partitioning(preserves_singleton, partitioning),
+ partitioning,
+ f"Should preserve {partitioning}")
+
+ for partitioning in (partitionings.Index([0]),
+ partitionings.Index(),
+ partitionings.Nothing()):
+ self.assertEqual(
+ expressions.output_partitioning(preserves_singleton, partitioning),
+ partitionings.Nothing(),
+ f"Should NOT preserve {partitioning}")
+
+ def test_output_partitioning_preserves_index(self):
+ a = expressions.PlaceholderExpression(1)
+ b = expressions.PlaceholderExpression(2)
+
+ preserves_index = expressions.ComputedExpression(
Review comment:
Done
##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -282,30 +282,38 @@ def __repr__(self, indent=0):
self.outputs))
# First define some helper functions.
- def output_is_partitioned_by(expr, stage, partitioning):
- if partitioning == partitionings.Nothing():
- # Always satisfied.
- return True
- elif stage.partitioning == partitionings.Singleton():
- # Within a stage, the singleton partitioning is trivially preserved.
- return True
- elif expr in stage.inputs:
+ def output_partitioning_in_stage(expr, stage):
+ """ Return the output partitioning of expr when computed in stage,
Review comment:
Done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 564187)
Time Spent: 3h 50m (was: 3h 40m)
> DataFrame subpartitioning order is incorrect
> --------------------------------------------
>
> Key: BEAM-11881
> URL: https://issues.apache.org/jira/browse/BEAM-11881
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Brian Hulette
> Priority: P2
> Labels: dataframe-api
> Time Spent: 3h 50m
> Remaining Estimate: 0h
>
> Currently we've defined
> Nothing() < Index([i]) < Index([i,j]) < .. < Index() < Singleton()
> s.t. Singleton is a subpartitoning of Index, is a subpartitioning of
> Index([i,j]), but this is incorrect. The order should be
> Singleton() < Index([i]) < Index([i,j]) < .. < Index() < Nothing()
> s.t. every other partitioning is a subpartitioning of Singleton. This is
> logical, since Singleton will collect the largest amount of data on a single
> node, partitioning by a single index will be alittle more distributed, and
> partitioning by the full Index() will be the most distribtued.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)