liferoad commented on issue #35666:
URL: https://github.com/apache/beam/issues/35666#issuecomment-3110132293
Using the Python SDK works well:
```python
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
def run():
"""
This function defines and runs the Apache Beam pipeline.
"""
# Define the pipeline options.
# Using DirectRunner to run the pipeline locally.
options = PipelineOptions()
# The pipeline is created using a 'with' block, which ensures that the
# pipeline's run() method is called and any necessary cleanup is
performed.
with beam.Pipeline(options=options) as p:
# Transform 1: Create the first PCollection from a list of
dictionaries.
# This corresponds to the 'Create1' transform in the YAML file.
create1 = p | 'Create1' >> beam.Create([
{'ride_id': '1', 'passenger_count': 1},
{'ride_id': '2', 'passenger_count': 2}
])
# Transform 2: Create the second PCollection.
# This corresponds to the 'Create2' transform in the YAML file.
create2 = p | 'Create2' >> beam.Create([
{'ride_id': '3'},
{'ride_id': '4'}
])
# Transform 3: Flatten the two PCollections into a single one.
# The input PCollections are provided as a tuple to the Flatten
transform.
# This corresponds to the 'Flatten1' transform in the YAML file.
flattened_pcollection = (create1, create2) | 'Flatten1' >>
beam.Flatten()
# Transform 4: Log the elements of the flattened PCollection.
# This is a simple way to inspect the contents of a PCollection,
# similar to the 'LogForTesting' transform in the YAML.
flattened_pcollection | 'LogForTesting' >> beam.Map(print)
if __name__ == '__main__':
# This is the entry point of the script.
# It calls the run() function to execute the pipeline.
run()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]