liferoad commented on issue #35666: URL: https://github.com/apache/beam/issues/35666#issuecomment-3110132293
Using the Python SDK works well: ```python import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions def run(): """ This function defines and runs the Apache Beam pipeline. """ # Define the pipeline options. # Using DirectRunner to run the pipeline locally. options = PipelineOptions() # The pipeline is created using a 'with' block, which ensures that the # pipeline's run() method is called and any necessary cleanup is performed. with beam.Pipeline(options=options) as p: # Transform 1: Create the first PCollection from a list of dictionaries. # This corresponds to the 'Create1' transform in the YAML file. create1 = p | 'Create1' >> beam.Create([ {'ride_id': '1', 'passenger_count': 1}, {'ride_id': '2', 'passenger_count': 2} ]) # Transform 2: Create the second PCollection. # This corresponds to the 'Create2' transform in the YAML file. create2 = p | 'Create2' >> beam.Create([ {'ride_id': '3'}, {'ride_id': '4'} ]) # Transform 3: Flatten the two PCollections into a single one. # The input PCollections are provided as a tuple to the Flatten transform. # This corresponds to the 'Flatten1' transform in the YAML file. flattened_pcollection = (create1, create2) | 'Flatten1' >> beam.Flatten() # Transform 4: Log the elements of the flattened PCollection. # This is a simple way to inspect the contents of a PCollection, # similar to the 'LogForTesting' transform in the YAML. flattened_pcollection | 'LogForTesting' >> beam.Map(print) if __name__ == '__main__': # This is the entry point of the script. # It calls the run() function to execute the pipeline. run() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org