Polber commented on code in PR #32286:
URL: https://github.com/apache/beam/pull/32286#discussion_r1729475058


##########
sdks/python/apache_beam/yaml/yaml_enrichment.py:
##########
@@ -0,0 +1,55 @@
+from typing import Any, Dict
+import apache_beam as beam
+from apache_beam.transforms.enrichment_handlers.bigquery import 
BigQueryEnrichmentHandler
+from apache_beam.transforms.enrichment_handlers.bigtable import 
BigTableEnrichmentHandler
+from apache_beam.transforms.enrichment_handlers.feast_feature_store import 
FeastFeatureStoreEnrichmentHandler
+from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store import 
VertexAIFeatureStoreEnrichmentHandler
+from apache_beam.transforms.enrichment import Enrichment
+from typing import Optional
+
[email protected]_fn
+def enrichment_transform(pcoll, enrichment_handler: str, handler_config: 
Dict[str, Any], timeout: Optional[float] = 30):
+    """
+    The Enrichment transform allows you to dynamically enhance elements in a 
pipeline 
+    by performing key-value lookups against external services like APIs or 
databases. 
+
+    Args:
+        enrichment_handler: Specifies the source from where data needs to be 
extracted
+            into the pipeline for enriching data. It can be a string value in 
["BigQuery", 
+            "BigTable", "FeastFeatureStore", "VertexAIFeatureStore"].
+        handler_config: Specifies the parameters for the respective 
enrichment_handler in a dictionary format. 
+            BigQuery:  project, table_name, row_restriction_template, fields, 
column_names, condition_value_fn, query_fn, min_batch_size, max_batch_size
+            BigTable: project_id, instance_id, table_id row_key, row_filter, 
app_profile_id, encoding, ow_key_fn, exception_level, include_timestamp
+            FeastFeatureStore: feature_store_yaml_path, feature_names, 
feature_service_name, full_feature_names, entity_row_fn, exception_level
+            VertexAIFeatureStore: project, location, api_endpoint, 
feature_store_name:, feature_view_name, row_key, exception_level

Review Comment:
   Could be convenient to link to PyDocs for the different handlers:
   
   For example:
   
https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.bigquery.html
   
https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.bigtable.html
   
https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.feast_feature_store.html
   
https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store.html



##########
sdks/python/apache_beam/yaml/yaml_enrichment_test.py:
##########
@@ -0,0 +1,62 @@
+import unittest
+import logging
+import mock
+import apache_beam as beam
+from apache_beam.testing.util import assert_that, equal_to
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.transforms import Map
+from apache_beam.yaml.yaml_enrichment import enrichment_transform
+from apache_beam import Row
+from unittest.mock import patch
+from apache_beam.yaml.yaml_transform import YamlTransform
+
+class FakeEnrichmentTransform:
+    def __init__(self, enrichment_handler, handler_config, timeout = 30):
+        self._enrichment_handler = enrichment_handler
+        self._handler_config = handler_config
+        self._timeout = timeout
+
+    def __call__(self, enrichment_handler, *, handler_config, timeout = 30):
+        assert enrichment_handler == self._enrichment_handler
+        assert handler_config == self._handler_config
+        assert timeout == self._timeout
+        return beam.Map(lambda x: beam.Row(**x._asdict()))
+
+
+class EnrichmentTransformTest(unittest.TestCase):
+
+    @patch('apache_beam.yaml.yaml_enrichment.enrichment_transform', 
FakeEnrichmentTransform)

Review Comment:
   You are already calling patch explicitly in the test
   ```suggestion
   ```



##########
sdks/python/apache_beam/yaml/yaml_enrichment.py:
##########
@@ -0,0 +1,55 @@
+from typing import Any, Dict
+import apache_beam as beam
+from apache_beam.transforms.enrichment_handlers.bigquery import 
BigQueryEnrichmentHandler
+from apache_beam.transforms.enrichment_handlers.bigtable import 
BigTableEnrichmentHandler
+from apache_beam.transforms.enrichment_handlers.feast_feature_store import 
FeastFeatureStoreEnrichmentHandler
+from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store import 
VertexAIFeatureStoreEnrichmentHandler
+from apache_beam.transforms.enrichment import Enrichment
+from typing import Optional
+
[email protected]_fn
+def enrichment_transform(pcoll, enrichment_handler: str, handler_config: 
Dict[str, Any], timeout: Optional[float] = 30):
+    """
+    The Enrichment transform allows you to dynamically enhance elements in a 
pipeline 
+    by performing key-value lookups against external services like APIs or 
databases. 
+
+    Args:
+        enrichment_handler: Specifies the source from where data needs to be 
extracted
+            into the pipeline for enriching data. It can be a string value in 
["BigQuery", 
+            "BigTable", "FeastFeatureStore", "VertexAIFeatureStore"].
+        handler_config: Specifies the parameters for the respective 
enrichment_handler in a dictionary format. 
+            BigQuery:  project, table_name, row_restriction_template, fields, 
column_names, condition_value_fn, query_fn, min_batch_size, max_batch_size
+            BigTable: project_id, instance_id, table_id row_key, row_filter, 
app_profile_id, encoding, ow_key_fn, exception_level, include_timestamp
+            FeastFeatureStore: feature_store_yaml_path, feature_names, 
feature_service_name, full_feature_names, entity_row_fn, exception_level
+            VertexAIFeatureStore: project, location, api_endpoint, 
feature_store_name:, feature_view_name, row_key, exception_level
+
+    Example Usage:
+    
+        - type: Enrichment
+          config:
+            enrichment_handler: 'BigTable'
+            handler_config:
+                project_id: 'apache-beam-testing'
+                instance_id: 'beam-test'
+                table_id: 'bigtable-enrichment-test'
+                row_key: 'product_id'
+            timeout: 30
+
+    """
+    if enrichment_handler is None:
+        raise ValueError("Missing 'source' in enrichment spec.")
+    if handler_config is None:
+        raise ValueError("Missing 'handler_config' in enrichment spec.")

Review Comment:
   I don't think it is possible to hit these since the compiler will complain 
that the method did not receive required parameters before executing the code 
in the method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to