Polber commented on code in PR #32289:
URL: https://github.com/apache/beam/pull/32289#discussion_r1727793074


##########
sdks/python/apache_beam/yaml/examples/bigtable_enrichment.yaml:
##########
@@ -0,0 +1,35 @@
+pipeline:
+  type: chain
+  transforms:
+
+  # Step 1: Creating a collection of elements that needs
+  # to be enriched. Here we are simulating sales data
+    - type: Create
+      config:
+        elements:
+        - sale_id: 1
+          customer_id: 1
+          product_id: 1
+          quantity: 1
+
+  # Step 2: Enriching the data with Bigtable
+  # This specific bigtable stores product data in the below format
+  # product:product_id, product:product_name, product:product_stock
+    - type: Enrichment
+      config:
+        enrichment_handler: 'BigTable'
+        handler_config:
+          project_id: 'apache-beam-testing'
+          instance_id: 'beam-test'
+          table_id: 'bigtable-enrichment-test'
+          row_key: 'product_id'
+        timeout: 30
+
+  # Step 3: Logging for testing
+  # This is a simple way to view the enriched data
+  # We can also store it somewhere like a json file
+    - type: LogForTesting
+
+  # The logs will show the below output
+  # {"sale_id": 1, "customer_id": 1, "product_id": 1, "quantity": 1, 
"product": {"product_id": "1", "product_name": "pixel 5", "product_stock": "2"}}
+  # We can see that the original collection we created has been enriched from 
the product data from bigtable

Review Comment:
   See my comment on your other PR: 
https://github.com/apache/beam/pull/32288#discussion_r1727733220
   
   (These changes are needed to make gitactions tests pass)



##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -351,8 +351,8 @@ class BigQueryWrapper(object):
 
   HISTOGRAM_METRIC_LOGGER = MetricLogger()
 
-  def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None):
-    self.client = client or BigQueryWrapper._bigquery_client(PipelineOptions())
+  def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None, 
project_id = "apache-beam-testing"):
+    self.client = client or 
BigQueryWrapper._bigquery_client(PipelineOptions(project = project_id))

Review Comment:
   I don't think this is needed anymore, and if it is, set default to None and 
pass "apache-beam-testing" from caller



##########
sdks/python/apache_beam/yaml/examples/enrich_spanner_with_bigquery.yaml:
##########
@@ -0,0 +1,79 @@
+pipeline:
+  transforms:
+    # Step 1: Read orders details from Spanner
+    - type: ReadFromSpanner
+      name: ReadOrders
+      config:
+        project_id: 'apache-beam-testing'
+        instance_id: 'orders-test'
+        database_id: 'order-database'
+        query: 'SELECT customer_id, product_id, order_date, order_amount FROM 
orders'
+
+    # Step 2: Enrich order details with customers details from BigQuery
+    - type: Enrichment
+      name: Enriched
+      input: ReadOrders
+      config:
+        enrichment_handler: 'BigQuery'
+        handler_config:
+          project: "apache-beam-testing"
+          table_name: "apache-beam-testing.ALL_TEST.customers"
+          row_restriction_template: "customer_id = 1001 or customer_id = 1003"
+          fields: ["customer_id"]
+
+    # Step 3: Map enriched values to fields

Review Comment:
   Maybe also add a `TODO` that mentions this should be removed once schema'd 
enrichment is supported



##########
sdks/python/apache_beam/yaml/examples/enrich_spanner_with_bigquery.yaml:
##########
@@ -0,0 +1,79 @@
+pipeline:
+  transforms:
+    # Step 1: Read orders details from Spanner
+    - type: ReadFromSpanner
+      name: ReadOrders
+      config:
+        project_id: 'apache-beam-testing'
+        instance_id: 'orders-test'
+        database_id: 'order-database'
+        query: 'SELECT customer_id, product_id, order_date, order_amount FROM 
orders'
+
+    # Step 2: Enrich order details with customers details from BigQuery
+    - type: Enrichment
+      name: Enriched
+      input: ReadOrders
+      config:
+        enrichment_handler: 'BigQuery'
+        handler_config:
+          project: "apache-beam-testing"
+          table_name: "apache-beam-testing.ALL_TEST.customers"
+          row_restriction_template: "customer_id = 1001 or customer_id = 1003"
+          fields: ["customer_id"]
+
+    # Step 3: Map enriched values to fields
+    - type: MapToFields
+      name: MapEnrichedValues
+      input: Enriched
+      config:
+        language: python
+        fields:
+          customer_id:
+            callable: 'lambda x: x.customer_id'
+            output_type: integer
+          customer_name:
+            callable: 'lambda x: x.customer_name'
+            output_type: string
+          customer_email:
+            callable: 'lambda x: x.customer_email'
+            output_type: string 
+          product_id:
+            callable: 'lambda x: x.product_id'
+            output_type: integer
+          order_date:
+            callable: 'lambda x: x.order_date'
+            output_type: string
+          order_amount:
+            callable: 'lambda x: x.order_amount'
+            output_type: integer
+
+    # Step 4: Filter orders with amount greater than 100
+    - type: Filter
+      name: FilterHighValueOrders
+      input: MapEnrichedValues
+      config:
+        keep: "order_amount > 100"
+        language: "python"
+
+
+    # Step 6: Write processed order to another spanner table
+    - type: WriteToSpanner
+      name: WriteProcessedOrders
+      input: FilterHighValueOrders
+      config:
+        project_id: 'apache-beam-testing'
+        instance_id: 'orders-test'
+        database_id: 'order-database'
+        table_id: 'orders_with_customers'
+        error_handling:
+          output: my_error_output
+
+    # Step 7: Handle write errors by writing to JSON
+    - type: WriteToJson
+      name: WriteErrorsToJson
+      input: WriteProcessedOrders.my_error_output
+      config:
+        path: 'errors.json'
+options:
+  project_id: "apache-beam-testing"

Review Comment:
   Is this necessary? It is defined in all the transforms, so not sure why it 
would be required



##########
sdks/python/apache_beam/yaml/examples/enrich_spanner_with_bigquery.yaml:
##########
@@ -0,0 +1,79 @@
+pipeline:
+  transforms:
+    # Step 1: Read orders details from Spanner
+    - type: ReadFromSpanner
+      name: ReadOrders
+      config:
+        project_id: 'apache-beam-testing'
+        instance_id: 'orders-test'
+        database_id: 'order-database'
+        query: 'SELECT customer_id, product_id, order_date, order_amount FROM 
orders'
+
+    # Step 2: Enrich order details with customers details from BigQuery
+    - type: Enrichment
+      name: Enriched
+      input: ReadOrders
+      config:
+        enrichment_handler: 'BigQuery'
+        handler_config:
+          project: "apache-beam-testing"
+          table_name: "apache-beam-testing.ALL_TEST.customers"
+          row_restriction_template: "customer_id = 1001 or customer_id = 1003"
+          fields: ["customer_id"]
+
+    # Step 3: Map enriched values to fields

Review Comment:
   nit:
   ```suggestion
       # Step 3: Map enriched values to Beam schema
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to