This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0972bc0170b Propagate correct boundedness when using multiple outputs
(#29506)
0972bc0170b is described below
commit 0972bc0170b852ece0e8cccfec2f1d58dd438955
Author: Danny McCormick <[email protected]>
AuthorDate: Tue Nov 21 11:39:32 2023 -0500
Propagate correct boundedness when using multiple outputs (#29506)
* Add failing test
* boundedness fix
---
sdks/python/apache_beam/pvalue.py | 7 ++++++-
sdks/python/apache_beam/transforms/core_test.py | 28 +++++++++++++++++++++++++
2 files changed, 34 insertions(+), 1 deletion(-)
diff --git a/sdks/python/apache_beam/pvalue.py
b/sdks/python/apache_beam/pvalue.py
index 90882651d0b..b783d61f95c 100644
--- a/sdks/python/apache_beam/pvalue.py
+++ b/sdks/python/apache_beam/pvalue.py
@@ -304,7 +304,12 @@ class DoOutputsTuple(object):
assert self.producer is not None
if tag is not None:
self._transform.output_tags.add(tag)
- pcoll = PCollection(self._pipeline, tag=tag, element_type=typehints.Any)
+ is_bounded = all(i.is_bounded for i in
self.producer.main_inputs.values())
+ pcoll = PCollection(
+ self._pipeline,
+ tag=tag,
+ element_type=typehints.Any,
+ is_bounded=is_bounded)
# Transfer the producer from the DoOutputsTuple to the resulting
# PCollection.
pcoll.producer = self.producer.parts[0]
diff --git a/sdks/python/apache_beam/transforms/core_test.py
b/sdks/python/apache_beam/transforms/core_test.py
index 0fba2826613..a60974ceb70 100644
--- a/sdks/python/apache_beam/transforms/core_test.py
+++ b/sdks/python/apache_beam/transforms/core_test.py
@@ -108,6 +108,34 @@ class CreateTest(unittest.TestCase):
assert warning_text in self._caplog.text
+class PartitionTest(unittest.TestCase):
+ def test_partition_boundedness(self):
+ def partition_fn(val, num_partitions):
+ return val % num_partitions
+
+ class UnboundedDoFn(beam.DoFn):
+ @beam.DoFn.unbounded_per_element()
+ def process(self, element):
+ yield element
+
+ with beam.testing.test_pipeline.TestPipeline() as p:
+ source = p | beam.Create([1, 2, 3, 4, 5])
+ p1, p2, p3 = source | "bounded" >> beam.Partition(partition_fn, 3)
+
+ self.assertEqual(source.is_bounded, True)
+ self.assertEqual(p1.is_bounded, True)
+ self.assertEqual(p2.is_bounded, True)
+ self.assertEqual(p3.is_bounded, True)
+
+ unbounded = source | beam.ParDo(UnboundedDoFn())
+ p4, p5, p6 = unbounded | "unbounded" >> beam.Partition(partition_fn, 3)
+
+ self.assertEqual(unbounded.is_bounded, False)
+ self.assertEqual(p4.is_bounded, False)
+ self.assertEqual(p5.is_bounded, False)
+ self.assertEqual(p6.is_bounded, False)
+
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()