Polber commented on code in PR #32289:
URL: https://github.com/apache/beam/pull/32289#discussion_r1876183455


##########
sdks/python/apache_beam/yaml/examples/testing/examples_test.py:
##########
@@ -265,14 +387,42 @@ def _spanner_io_read_test_preprocessor(
             k: v
             for k, v in config.items() if k.startswith('__')
         }
-        transform['config']['elements'] = INPUT_TABLES[(
-            str(instance), str(database), str(table))]
+        elements = INPUT_TABLES[(str(instance), str(database), str(table))]
+        if config.get('query', None):
+          config['query'].replace('select ',
+                                  'SELECT ').replace(' from ', ' FROM ')
+          columns = set(
+              ''.join(config['query'].split('SELECT ')[1:]).split(
+                  ' FROM', maxsplit=1)[0])

Review Comment:
   ```suggestion
                     ' FROM', maxsplit=1)[0].split(', '))
   ```



##########
sdks/python/apache_beam/yaml/examples/testing/examples_test.py:
##########
@@ -34,11 +36,63 @@
 from apache_beam.examples.snippets.util import assert_matches_stdout
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.yaml import yaml_provider
 from apache_beam.yaml import yaml_transform
 from apache_beam.yaml.readme_test import TestEnvironment
 from apache_beam.yaml.readme_test import replace_recursive
 
 
+# Used to simulate Enrichment transform during tests
+# The GitHub action that invokes these tests does not
+# have gcp dependencies installed which is a prerequisite
+# to apache_beam.transforms.enrichment.Enrichment as a top-level
+# import.
[email protected]_fn
+def test_enrichment(
+    pcoll,
+    enrichment_handler: str,
+    handler_config: Dict[str, Any],
+    timeout: Optional[float] = 30):
+  if enrichment_handler == 'BigTable':
+    row_key = handler_config['row_key']
+    bt_data = INPUT_TABLES[(
+        'BigTable', handler_config['instance_id'], handler_config['table_id'])]
+    products = {str(data[row_key]): data for data in bt_data}
+
+    def _fn(row):
+      left = row._asdict()
+      right = products[str(row[row_key])]
+      left['product'] = left.get('product', None) or right
+      return beam.Row(**row)

Review Comment:
   ```suggestion
       def _fn(row):
         left = row._asdict()
         right = products[str(left[row_key])]
         left['product'] = left.get('product', None) or right
         return beam.Row(**left)
   ```



##########
sdks/python/apache_beam/yaml/examples/testing/examples_test.py:
##########
@@ -34,11 +36,63 @@
 from apache_beam.examples.snippets.util import assert_matches_stdout
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.yaml import yaml_provider
 from apache_beam.yaml import yaml_transform
 from apache_beam.yaml.readme_test import TestEnvironment
 from apache_beam.yaml.readme_test import replace_recursive
 
 
+# Used to simulate Enrichment transform during tests
+# The GitHub action that invokes these tests does not
+# have gcp dependencies installed which is a prerequisite
+# to apache_beam.transforms.enrichment.Enrichment as a top-level
+# import.
[email protected]_fn
+def test_enrichment(
+    pcoll,
+    enrichment_handler: str,
+    handler_config: Dict[str, Any],
+    timeout: Optional[float] = 30):
+  if enrichment_handler == 'BigTable':
+    row_key = handler_config['row_key']
+    bt_data = INPUT_TABLES[(
+        'BigTable', handler_config['instance_id'], handler_config['table_id'])]
+    products = {str(data[row_key]): data for data in bt_data}
+
+    def _fn(row):
+      left = row._asdict()
+      right = products[str(row[row_key])]
+      left['product'] = left.get('product', None) or right
+      return beam.Row(**row)
+  elif enrichment_handler == 'BigQuery':
+    row_key = handler_config['fields']
+    dataset, table = handler_config['table_name'].split('.')[-2:]
+    bq_data = INPUT_TABLES[('BigQuery', str(dataset), str(table))]
+    products = {
+        tuple(str(data[key]) for key in row_key): data
+        for data in bq_data
+    }
+
+    def _fn(row):
+      left = row._asdict()
+      right = products[tuple(str(left[k]) for k in row_key)]
+      row = {
+          key: left.get(key, None) or right[key]
+          for key in {*left.keys(), *right.keys()}
+      }
+      return beam.Row(**row)

Review Comment:
   ```suggestion
       bq_data = {
           tuple(str(data[key]) for key in row_key): data
           for data in bq_data
       }
   
       def _fn(row):
         left = row._asdict()
         right = bq_data[tuple(str(left[k]) for k in row_key)]
         row = {
             key: left.get(key, None) or right[key]
             for key in {*left.keys(), *right.keys()}
         }
         return beam.Row(**row)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to