damccorm commented on code in PR #35728: URL: https://github.com/apache/beam/pull/35728#discussion_r2245432723
########## sdks/python/apache_beam/yaml/yaml_transform_test.py: ########## @@ -477,6 +477,238 @@ def test_composite_resource_hints(self): b'1000000000', proto) + def test_flatten_unifies_schemas(self): + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + _ = p | YamlTransform( + ''' + type: composite + transforms: + - type: Create + name: Create1 + config: + elements: + - {ride_id: '1', passenger_count: 1} + - {ride_id: '2', passenger_count: 2} + - type: Create + name: Create2 + config: + elements: + - {ride_id: '3'} + - {ride_id: '4'} + - type: Flatten + input: [Create1, Create2] + - type: AssertEqual + input: Flatten + config: + elements: + - {ride_id: '1', passenger_count: 1} + - {ride_id: '2', passenger_count: 2} + - {ride_id: '3'} + - {ride_id: '4'} + ''') + + def test_flatten_unifies_optional_fields(self): + """Test that Flatten correctly unifies schemas with optional fields.""" + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + _ = p | YamlTransform( + ''' + type: composite + transforms: + - type: Create + name: Create1 + config: + elements: + - {id: '1', name: 'Alice', age: 30} + - {id: '2', name: 'Bob', age: 25} + - type: Create + name: Create2 + config: + elements: + - {id: '3', name: 'Charlie'} + - {id: '4', name: 'Diana'} + - type: Flatten + input: [Create1, Create2] + - type: AssertEqual + input: Flatten + config: + elements: + - {id: '1', name: 'Alice', age: 30} + - {id: '2', name: 'Bob', age: 25} + - {id: '3', name: 'Charlie'} + - {id: '4', name: 'Diana'} + ''') + + def test_flatten_unifies_different_types(self): + """Test that Flatten correctly unifies schemas with different + field types.""" + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + _ = p | YamlTransform( + ''' + type: composite + transforms: + - type: Create + name: Create1 + config: + elements: + - {id: 1, value: 100} + - {id: 2, value: 200} + - type: Create + name: Create2 + config: + elements: + - {id: '3', value: 'text'} + - {id: '4', value: 'data'} + - type: Flatten + input: [Create1, Create2] + - type: AssertEqual + input: Flatten + config: + elements: + - {id: 1, value: 100} + - {id: 2, value: 200} + - {id: '3', value: 'text'} + - {id: '4', value: 'data'} + ''') + + def test_flatten_unifies_list_fields(self): + """Test that Flatten correctly unifies schemas with list fields.""" + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + _ = p | YamlTransform( + ''' + type: composite + transforms: + - type: Create + name: Create1 + config: + elements: + - {id: '1', tags: ['red', 'blue']} + - {id: '2', tags: ['green']} + - type: Create + name: Create2 + config: + elements: + - {id: '3', tags: ['yellow', 'purple', 'orange']} + - {id: '4', tags: []} + - type: Flatten + input: [Create1, Create2] + - type: AssertEqual + input: Flatten + config: + elements: + - {id: '1', tags: ['red', 'blue']} + - {id: '2', tags: ['green']} + - {id: '3', tags: ['yellow', 'purple', 'orange']} + - {id: '4', tags: []} + ''') + + def test_flatten_unifies_with_missing_fields(self): + """Test that Flatten correctly unifies schemas when some inputs have + missing fields.""" + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + _ = p | YamlTransform( + ''' + type: composite + transforms: + - type: Create + name: Create1 + config: + elements: + - {id: '1', name: 'Alice', department: 'Engineering', + salary: 75000} + - {id: '2', name: 'Bob', department: 'Marketing', + salary: 65000} + - type: Create + name: Create2 + config: + elements: + - {id: '3', name: 'Charlie', department: 'Sales'} + - {id: '4', name: 'Diana'} + - type: Flatten + input: [Create1, Create2] + - type: AssertEqual + input: Flatten + config: + elements: + - {id: '1', name: 'Alice', department: 'Engineering', + salary: 75000} + - {id: '2', name: 'Bob', department: 'Marketing', + salary: 65000} + - {id: '3', name: 'Charlie', department: 'Sales'} + - {id: '4', name: 'Diana'} + ''') + + def test_flatten_unifies_complex_mixed_schemas(self): + """Test that Flatten correctly unifies complex mixed + schemas.""" + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + result = p | YamlTransform( + ''' + type: composite + transforms: + - type: Create + name: Create1 + config: + elements: + - {id: 1, name: 'Product A', price: 29.99, + categories: ['electronics', 'gadgets']} + - {id: 2, name: 'Product B', price: 15.50, + categories: ['books']} + - type: Create + name: Create2 + config: + elements: + - {id: 3, name: 'Product C', categories: ['clothing']} + - {id: 4, name: 'Product D', price: 99.99} + - type: Create + name: Create3 + config: + elements: + - {id: 5, name: 'Product E', price: 5.00, + categories: []} + - type: Flatten + input: [Create1, Create2, Create3] + output: Flatten + ''') + + # Verify that the result contains all expected elements + # with proper schema unification + def check_result(actual): Review Comment: Any reason we can't just use `AssertEqual` here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org