damccorm commented on code in PR #35675:
URL: https://github.com/apache/beam/pull/35675#discussion_r2228922888
##########
sdks/python/apache_beam/yaml/yaml_transform_test.py:
##########
@@ -217,6 +217,104 @@ def test_implicit_flatten(self):
providers=TEST_PROVIDERS)
assert_that(result, equal_to([1, 4, 9, 10000, 40000]))
+ def test_flatten_different_schemas_error(self):
+ with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+ pickle_library='cloudpickle')) as p:
+ with self.assertRaisesRegex(
+ Exception, r"Cannot flatten PCollections with different schemas"):
+ _ = p | YamlTransform(
+ '''
+ type: composite
+ transforms:
+ - type: Create
+ name: Create1
+ config:
+ elements:
+ - {'ride_id': '1', 'passenger_count': 1}
+ - {'ride_id': '2', 'passenger_count': 2}
+ - type: Create
+ name: Create2
+ config:
+ elements:
+ - {'ride_id': '3'}
+ - {'ride_id': '4'}
+ - type: Flatten
+ name: Flatten1
+ input:
+ - Create1
+ - Create2
+ output: Flatten1
+ ''',
+ providers=TEST_PROVIDERS)
+
+ def test_flatten_compatible_schemas_success(self):
+ with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+ pickle_library='cloudpickle')) as p:
+ result = p | YamlTransform(
+ '''
+ type: composite
+ transforms:
+ - type: Create
+ name: Create1
+ config:
+ elements:
+ - {'ride_id': '1', 'passenger_count': 1}
+ - {'ride_id': '2', 'passenger_count': 2}
+ - type: Create
+ name: Create2
+ config:
+ elements:
+ - {'ride_id': '3', 'passenger_count': 3}
+ - {'ride_id': '4', 'passenger_count': 4}
+ - type: Flatten
+ name: Flatten1
+ input:
+ - Create1
+ - Create2
+ output: Flatten1
+ ''',
+ providers=TEST_PROVIDERS)
+ # This should not raise an error since the schemas are identical
+ assert_that(
+ result,
+ equal_to([
+ beam.Row(ride_id='1', passenger_count=1),
+ beam.Row(ride_id='2', passenger_count=2),
+ beam.Row(ride_id='3', passenger_count=3),
+ beam.Row(ride_id='4', passenger_count=4)
+ ]))
+
+ def test_flatten_with_null_values_error(self):
+ with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+ pickle_library='cloudpickle')) as p:
+ # This should raise an error because null values create different schema
types
+ # (nullable logical type vs INT64)
+ with self.assertRaisesRegex(
Review Comment:
Oh interesting - one option would be to define our own deep equality
function here (basically a consistency check), or we could try to move forward
with https://github.com/apache/beam/pull/35672
I don't think we can move forward with this change if it regresses these use
cases
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]