Polber commented on code in PR #32288: URL: https://github.com/apache/beam/pull/32288#discussion_r1727723306
########## sdks/python/apache_beam/yaml/examples/spanner_read.yaml: ########## @@ -0,0 +1,63 @@ +pipeline: + transforms: + + # Reading data from a Spanner database. The table used here has the following columns: + # shipment_id (String), customer_id (String), shipment_date (String), shipment_cost (Float64), customer_name (String), customer_email (String) + # ReadFromSpanner transform is called using project_id, instance_id, database_id and a query + # A table with a list of columns can also be specified instead of a query + - type: ReadFromSpanner + name: ReadShipments + config: + project_id: 'apache-beam-testing' + instance_id: 'shipment-test' + database_id: 'shipment' Review Comment: @damccorm Are Spanner resources (specifically these) exposed to general public? i.e., would anyone be able to run this YAML as-is? ########## sdks/python/apache_beam/yaml/examples/spanner_read.yaml: ########## Review Comment: Let's create a new folder `examples/io` and put these files in there ########## sdks/python/apache_beam/yaml/examples/spanner_read.yaml: ########## @@ -0,0 +1,63 @@ +pipeline: + transforms: + + # Reading data from a Spanner database. The table used here has the following columns: + # shipment_id (String), customer_id (String), shipment_date (String), shipment_cost (Float64), customer_name (String), customer_email (String) + # ReadFromSpanner transform is called using project_id, instance_id, database_id and a query + # A table with a list of columns can also be specified instead of a query + - type: ReadFromSpanner + name: ReadShipments + config: + project_id: 'apache-beam-testing' + instance_id: 'shipment-test' + database_id: 'shipment' + query: 'SELECT * FROM shipments' + + # Filtering the data based on a specific condition + # Here, the condition is used to keep only the rows where the customer_id is 'C1' + - type: Filter + name: FilterShipments + input: ReadShipments + config: + language: python + keep: "customer_id == 'C1'" + + # Mapping the data fields and applying transformations + # A new field 'shipment_cost_category' is added with a custom transformation + # A callable is defined to categorize shipment cost + - type: MapToFields + name: MapFieldsForSpanner + input: FilterShipments + config: + language: python + fields: + shipment_id: shipment_id + customer_id: customer_id + shipment_date: shipment_date + shipment_cost: shipment_cost + customer_name: customer_name + customer_email: customer_email + shipment_cost_category: + callable: | + def categorize_cost(row): + cost = float(row[3]) + if cost < 50: + return 'Low Cost' + elif cost < 200: + return 'Medium Cost' + else: + return 'High Cost' + + # Writing the transformed data to a CSV file + - type: WriteToCsv + name: WriteBig + input: MapFieldsForSpanner + config: + path: shipments.csv + + + # On executing the above pipeline, a new CSV file is created with the following records + + # shipment_id,customer_id,shipment_date,shipment_cost,customer_name,customer_email,shipment_cost_category + # S1,C1,2023-05-01,150.0,Alice,[email protected],Medium Cost + # S3,C1,2023-05-10,20.0,Alice,[email protected],Low Cost Review Comment: You will notice all the other example YAML files have a block like: ``` # Expected: # Row(...) # Row(...) # Row(...) ``` This is actually used by the testing framework which runs the YAML and validates against this Expected output. So, it provides not only a way for the user to see expected output, but also allows us to incorporate automated testing. You should be able to refactor to something like: ``` # Expected: # Row(shipment_id='S1', customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='[email protected]', shipment_cost_category='Medium Cost') # Row(shipment_id='S3', customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='[email protected]', shipment_cost_category='Low Cost') ``` You will also need to add test suite in https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/testing/examples_test.py#L208: ``` IOTest = YamlExamplesTestSuite( 'AggregationExamplesTest', os.path.join(YAML_DOCS_DIR, '../io/*.yaml')).run() ``` ########## sdks/python/apache_beam/yaml/examples/spanner_write.yaml: ########## @@ -0,0 +1,41 @@ +pipeline: + transforms: + + # Step 1: Creating rows to be written to Spanner + # The element names correspond to the column names in the Spanner table + - type: Create + name: CreateRows + config: + elements: + - shipment_id: "S5" + customer_id: "C5" + shipment_date: "2023-05-09" + shipment_cost: "300" + customer_name: "Erin" + customer_email: "[email protected]" + + # Step 2: Writing the created rows to a Spanner database + # We require the project ID, instance ID, database ID and table ID to connect to Spanner + # Error handling can be specified optionally to ensure any failed operations aren't lost + # The failed data is passed on in the pipeline and can be handled + - type: WriteToSpanner + name: WriteSpanner + input: CreateRows + config: + project_id: 'apache-beam-testing' + instance_id: 'shipment-test' + database_id: 'shipment' + table_id: 'shipments' + error_handling: + output: my_error_output + + # Step 3: Writing the failed records to a JSON file + - type: WriteToJson + input: WriteSpanner.my_error_output + config: + path: errors.json + + # The above pipeline on execution creates a JSON file where the following failed record is stored: + # {"error_message":"INSERT_OR_UPDATE operation failed at instance: shipment-test, database: shipment, table: shipments","failed_row":{"shipment_id":"S5","customer_id":"C5","shipment_date":"2023-05-09","shipment_cost":300,"customer_name":"Erin","customer_email":"[email protected]"}} + # The operation fails as "shipment_cost" required a float value, whereas a string value is provided in the CreateRows transform + # Changing the value of "shipment_cost" from "300" to 300.0 will result in a successful write operation Review Comment: Same Expected block as other file -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
