Polber commented on code in PR #32286: URL: https://github.com/apache/beam/pull/32286#discussion_r1729475058
########## sdks/python/apache_beam/yaml/yaml_enrichment.py: ########## @@ -0,0 +1,55 @@ +from typing import Any, Dict +import apache_beam as beam +from apache_beam.transforms.enrichment_handlers.bigquery import BigQueryEnrichmentHandler +from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler +from apache_beam.transforms.enrichment_handlers.feast_feature_store import FeastFeatureStoreEnrichmentHandler +from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store import VertexAIFeatureStoreEnrichmentHandler +from apache_beam.transforms.enrichment import Enrichment +from typing import Optional + [email protected]_fn +def enrichment_transform(pcoll, enrichment_handler: str, handler_config: Dict[str, Any], timeout: Optional[float] = 30): + """ + The Enrichment transform allows you to dynamically enhance elements in a pipeline + by performing key-value lookups against external services like APIs or databases. + + Args: + enrichment_handler: Specifies the source from where data needs to be extracted + into the pipeline for enriching data. It can be a string value in ["BigQuery", + "BigTable", "FeastFeatureStore", "VertexAIFeatureStore"]. + handler_config: Specifies the parameters for the respective enrichment_handler in a dictionary format. + BigQuery: project, table_name, row_restriction_template, fields, column_names, condition_value_fn, query_fn, min_batch_size, max_batch_size + BigTable: project_id, instance_id, table_id row_key, row_filter, app_profile_id, encoding, ow_key_fn, exception_level, include_timestamp + FeastFeatureStore: feature_store_yaml_path, feature_names, feature_service_name, full_feature_names, entity_row_fn, exception_level + VertexAIFeatureStore: project, location, api_endpoint, feature_store_name:, feature_view_name, row_key, exception_level Review Comment: Could be convenient to link to PyDocs for the different handlers: For example: https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.bigquery.html https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.bigtable.html https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.feast_feature_store.html https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store.html ########## sdks/python/apache_beam/yaml/yaml_enrichment_test.py: ########## @@ -0,0 +1,62 @@ +import unittest +import logging +import mock +import apache_beam as beam +from apache_beam.testing.util import assert_that, equal_to +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.transforms import Map +from apache_beam.yaml.yaml_enrichment import enrichment_transform +from apache_beam import Row +from unittest.mock import patch +from apache_beam.yaml.yaml_transform import YamlTransform + +class FakeEnrichmentTransform: + def __init__(self, enrichment_handler, handler_config, timeout = 30): + self._enrichment_handler = enrichment_handler + self._handler_config = handler_config + self._timeout = timeout + + def __call__(self, enrichment_handler, *, handler_config, timeout = 30): + assert enrichment_handler == self._enrichment_handler + assert handler_config == self._handler_config + assert timeout == self._timeout + return beam.Map(lambda x: beam.Row(**x._asdict())) + + +class EnrichmentTransformTest(unittest.TestCase): + + @patch('apache_beam.yaml.yaml_enrichment.enrichment_transform', FakeEnrichmentTransform) Review Comment: You are already calling patch explicitly in the test ```suggestion ``` ########## sdks/python/apache_beam/yaml/yaml_enrichment.py: ########## @@ -0,0 +1,55 @@ +from typing import Any, Dict +import apache_beam as beam +from apache_beam.transforms.enrichment_handlers.bigquery import BigQueryEnrichmentHandler +from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler +from apache_beam.transforms.enrichment_handlers.feast_feature_store import FeastFeatureStoreEnrichmentHandler +from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store import VertexAIFeatureStoreEnrichmentHandler +from apache_beam.transforms.enrichment import Enrichment +from typing import Optional + [email protected]_fn +def enrichment_transform(pcoll, enrichment_handler: str, handler_config: Dict[str, Any], timeout: Optional[float] = 30): + """ + The Enrichment transform allows you to dynamically enhance elements in a pipeline + by performing key-value lookups against external services like APIs or databases. + + Args: + enrichment_handler: Specifies the source from where data needs to be extracted + into the pipeline for enriching data. It can be a string value in ["BigQuery", + "BigTable", "FeastFeatureStore", "VertexAIFeatureStore"]. + handler_config: Specifies the parameters for the respective enrichment_handler in a dictionary format. + BigQuery: project, table_name, row_restriction_template, fields, column_names, condition_value_fn, query_fn, min_batch_size, max_batch_size + BigTable: project_id, instance_id, table_id row_key, row_filter, app_profile_id, encoding, ow_key_fn, exception_level, include_timestamp + FeastFeatureStore: feature_store_yaml_path, feature_names, feature_service_name, full_feature_names, entity_row_fn, exception_level + VertexAIFeatureStore: project, location, api_endpoint, feature_store_name:, feature_view_name, row_key, exception_level + + Example Usage: + + - type: Enrichment + config: + enrichment_handler: 'BigTable' + handler_config: + project_id: 'apache-beam-testing' + instance_id: 'beam-test' + table_id: 'bigtable-enrichment-test' + row_key: 'product_id' + timeout: 30 + + """ + if enrichment_handler is None: + raise ValueError("Missing 'source' in enrichment spec.") + if handler_config is None: + raise ValueError("Missing 'handler_config' in enrichment spec.") Review Comment: I don't think it is possible to hit these since the compiler will complain that the method did not receive required parameters before executing the code in the method -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
