This is an automated email from the ASF dual-hosted git repository.

riteshghorse pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 50f33cd786d [Python] Check feature store existence at pipeline 
construction time (#30668)
50f33cd786d is described below

commit 50f33cd786dc63463688315a1c73b1cf4ef18807
Author: Ritesh Ghorse <[email protected]>
AuthorDate: Tue Mar 19 10:01:35 2024 -0400

    [Python] Check feature store existence at pipeline construction time 
(#30668)
    
    * check feature store existence at construction time
    
    * postcommit
---
 .github/trigger_files/beam_PostCommit_Python.json  |  0
 .../enrichment_handlers/vertex_ai_feature_store.py | 49 ++++++++++++++++------
 .../vertex_ai_feature_store_it_test.py             | 45 ++++++++++++++------
 3 files changed, 70 insertions(+), 24 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_Python.json 
b/.github/trigger_files/beam_PostCommit_Python.json
deleted file mode 100644
index e69de29bb2d..00000000000
diff --git 
a/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store.py
 
b/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store.py
index b135739ef59..753b04e1793 100644
--- 
a/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store.py
+++ 
b/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store.py
@@ -110,6 +110,32 @@ class 
VertexAIFeatureStoreEnrichmentHandler(EnrichmentSourceHandler[beam.Row,
     else:
       self.kwargs['client_options'] = {"api_endpoint": self.api_endpoint}
 
+    # check if the feature store exists
+    try:
+      admin_client = aiplatform.gapic.FeatureOnlineStoreAdminServiceClient(
+          **self.kwargs)
+    except Exception:
+      _LOGGER.warning(
+          'Due to insufficient admin permission, could not verify '
+          'the existence of feature store. If the `exception_level` '
+          'is set to WARN then make sure the feature store exists '
+          'otherwise the data enrichment will not happen without '
+          'throwing an error.')
+    else:
+      location_path = admin_client.common_location_path(
+          project=self.project, location=self.location)
+      feature_store_path = admin_client.feature_online_store_path(
+          project=self.project,
+          location=self.location,
+          feature_online_store=self.feature_store_name)
+      feature_store = admin_client.get_feature_online_store(
+          name=feature_store_path)
+
+      if not feature_store:
+        raise NotFound(
+            'Vertex AI Feature Store %s does not exists in %s' %
+            (self.feature_store_name, location_path))
+
   def __enter__(self):
     """Connect with the Vertex AI Feature Store."""
     self.client = aiplatform.gapic.FeatureOnlineStoreServiceClient(
@@ -228,26 +254,25 @@ class 
VertexAIFeatureStoreLegacyEnrichmentHandler(EnrichmentSourceHandler):
     else:
       self.kwargs['client_options'] = {"api_endpoint": self.api_endpoint}
 
-  def __enter__(self):
-    """Connect with the Vertex AI Feature Store (Legacy)."""
+    # checks if feature store exists
     try:
-      # checks if feature store exists
       _ = aiplatform.Featurestore(
           featurestore_name=self.feature_store_id,
           project=self.project,
           location=self.location,
           credentials=self.kwargs.get('credentials'),
       )
-      self.client = aiplatform.gapic.FeaturestoreOnlineServingServiceClient(
-          **self.kwargs)
-      self.entity_type_path = self.client.entity_type_path(
-          self.project,
-          self.location,
-          self.feature_store_id,
-          self.entity_type_id)
     except NotFound:
-      raise ValueError(
-          'Vertex AI Feature Store %s does not exist' % self.feature_store_id)
+      raise NotFound(
+          'Vertex AI Feature Store (Legacy) %s does not exist' %
+          self.feature_store_id)
+
+  def __enter__(self):
+    """Connect with the Vertex AI Feature Store (Legacy)."""
+    self.client = aiplatform.gapic.FeaturestoreOnlineServingServiceClient(
+        **self.kwargs)
+    self.entity_type_path = self.client.entity_type_path(
+        self.project, self.location, self.feature_store_id, 
self.entity_type_id)
 
   def __call__(self, request: beam.Row, *args, **kwargs):
     """Fetches feature value for an entity-id from
diff --git 
a/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store_it_test.py
 
b/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store_it_test.py
index d4224be060e..066553decf2 100644
--- 
a/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store_it_test.py
+++ 
b/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store_it_test.py
@@ -27,6 +27,7 @@ from apache_beam.testing.util import BeamAssertException
 
 # pylint: disable=ungrouped-imports
 try:
+  from google.api_core.exceptions import NotFound
   from testcontainers.redis import RedisContainer
   from apache_beam.transforms.enrichment import Enrichment
   from apache_beam.transforms.enrichment_handlers.utils import ExceptionLevel
@@ -71,7 +72,7 @@ class TestVertexAIFeatureStoreHandler(unittest.TestCase):
     self.entity_type_name = "entity_id"
     self.api_endpoint = "us-central1-aiplatform.googleapis.com"
     self.feature_ids = ['title', 'genres']
-
+    self.retries = 3
     self._start_container()
 
   def _start_container(self):
@@ -124,6 +125,26 @@ class TestVertexAIFeatureStoreHandler(unittest.TestCase):
           | Enrichment(handler)
           | beam.ParDo(ValidateResponse(expected_fields)))
 
+  def test_vertex_ai_feature_store_wrong_name(self):
+    requests = [
+        beam.Row(entity_id="847", name='cardigan jacket'),
+        beam.Row(entity_id="16050", name='stripe t-shirt'),
+    ]
+
+    with self.assertRaises(NotFound):
+      handler = VertexAIFeatureStoreEnrichmentHandler(
+          project=self.project,
+          location=self.location,
+          api_endpoint=self.api_endpoint,
+          feature_store_name="incorrect_name",
+          feature_view_name=self.feature_view_name,
+          row_key=self.entity_type_name,
+      )
+      test_pipeline = beam.Pipeline()
+      _ = (test_pipeline | beam.Create(requests) | Enrichment(handler))
+      res = test_pipeline.run()
+      res.wait_until_finish()
+
   def test_vertex_ai_feature_store_bigtable_serving_enrichment_bad(self):
     requests = [
         beam.Row(entity_id="ui", name="fred perry men\'s sharp stripe t-shirt")
@@ -203,18 +224,18 @@ class TestVertexAIFeatureStoreHandler(unittest.TestCase):
     ]
     feature_store_id = "invalid_name"
     entity_type_id = "movies"
-    handler = VertexAIFeatureStoreLegacyEnrichmentHandler(
-        project=self.project,
-        location=self.location,
-        api_endpoint=self.api_endpoint,
-        feature_store_id=feature_store_id,
-        entity_type_id=entity_type_id,
-        feature_ids=self.feature_ids,
-        row_key=self.entity_type_name,
-        exception_level=ExceptionLevel.RAISE,
-    )
 
-    with self.assertRaises(ValueError):
+    with self.assertRaises(NotFound):
+      handler = VertexAIFeatureStoreLegacyEnrichmentHandler(
+          project=self.project,
+          location=self.location,
+          api_endpoint=self.api_endpoint,
+          feature_store_id=feature_store_id,
+          entity_type_id=entity_type_id,
+          feature_ids=self.feature_ids,
+          row_key=self.entity_type_name,
+          exception_level=ExceptionLevel.RAISE,
+      )
       test_pipeline = beam.Pipeline()
       _ = (
           test_pipeline

Reply via email to