This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d138b750102 Add throttling metrics and retries to vertex embeddings 
(#33311)
d138b750102 is described below

commit d138b7501022fb63b9dde427c42eeabddd5c2243
Author: Danny McCormick <[email protected]>
AuthorDate: Fri Dec 6 16:20:50 2024 -0500

    Add throttling metrics and retries to vertex embeddings (#33311)
    
    * Add throttling metrics and retries to vertex embeddings
    
    * Format + run postcommits
    
    * fix + lint
---
 .github/trigger_files/beam_PostCommit_Python.json  |   2 +-
 .../ml/transforms/embeddings/vertex_ai.py          | 103 ++++++++++++++++++++-
 2 files changed, 101 insertions(+), 4 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_Python.json 
b/.github/trigger_files/beam_PostCommit_Python.json
index 00bd9e03564..9c7a70ceed7 100644
--- a/.github/trigger_files/beam_PostCommit_Python.json
+++ b/.github/trigger_files/beam_PostCommit_Python.json
@@ -1,5 +1,5 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run.",
-  "modification": 6
+  "modification": 7
 }
 
diff --git a/sdks/python/apache_beam/ml/transforms/embeddings/vertex_ai.py 
b/sdks/python/apache_beam/ml/transforms/embeddings/vertex_ai.py
index 6fe8320e758..6df505508ae 100644
--- a/sdks/python/apache_beam/ml/transforms/embeddings/vertex_ai.py
+++ b/sdks/python/apache_beam/ml/transforms/embeddings/vertex_ai.py
@@ -19,20 +19,27 @@
 # Follow 
https://cloud.google.com/vertex-ai/docs/python-sdk/use-vertex-ai-python-sdk # 
pylint: disable=line-too-long
 # to install Vertex AI Python SDK.
 
+import logging
+import time
 from collections.abc import Iterable
 from collections.abc import Sequence
 from typing import Any
 from typing import Optional
 
+from google.api_core.exceptions import ServerError
+from google.api_core.exceptions import TooManyRequests
 from google.auth.credentials import Credentials
 
 import apache_beam as beam
 import vertexai
+from apache_beam.io.components.adaptive_throttler import AdaptiveThrottler
+from apache_beam.metrics.metric import Metrics
 from apache_beam.ml.inference.base import ModelHandler
 from apache_beam.ml.inference.base import RunInference
 from apache_beam.ml.transforms.base import EmbeddingsManager
 from apache_beam.ml.transforms.base import _ImageEmbeddingHandler
 from apache_beam.ml.transforms.base import _TextEmbeddingHandler
+from apache_beam.utils import retry
 from vertexai.language_models import TextEmbeddingInput
 from vertexai.language_models import TextEmbeddingModel
 from vertexai.vision_models import Image
@@ -51,6 +58,26 @@ TASK_TYPE_INPUTS = [
     "CLUSTERING"
 ]
 _BATCH_SIZE = 5  # Vertex AI limits requests to 5 at a time.
+_MSEC_TO_SEC = 1000
+
+LOGGER = logging.getLogger("VertexAIEmbeddings")
+
+
+def _retry_on_appropriate_gcp_error(exception):
+  """
+  Retry filter that returns True if a returned HTTP error code is 5xx or 429.
+  This is used to retry remote requests that fail, most notably 429
+  (TooManyRequests.)
+
+  Args:
+    exception: the returned exception encountered during the request/response
+      loop.
+
+  Returns:
+    boolean indication whether or not the exception is a Server Error (5xx) or
+      a TooManyRequests (429) error.
+  """
+  return isinstance(exception, (TooManyRequests, ServerError))
 
 
 class _VertexAITextEmbeddingHandler(ModelHandler):
@@ -74,6 +101,41 @@ class _VertexAITextEmbeddingHandler(ModelHandler):
     self.task_type = task_type
     self.title = title
 
+    # Configure AdaptiveThrottler and throttling metrics for client-side
+    # throttling behavior.
+    # See 
https://docs.google.com/document/d/1ePorJGZnLbNCmLD9mR7iFYOdPsyDA1rDnTpYnbdrzSU/edit?usp=sharing
+    # for more details.
+    self.throttled_secs = Metrics.counter(
+        VertexAIImageEmbeddings, "cumulativeThrottlingSeconds")
+    self.throttler = AdaptiveThrottler(
+        window_ms=1, bucket_ms=1, overload_ratio=2)
+
+  @retry.with_exponential_backoff(
+      num_retries=5, retry_filter=_retry_on_appropriate_gcp_error)
+  def get_request(
+      self,
+      text_batch: Sequence[TextEmbeddingInput],
+      model: MultiModalEmbeddingModel,
+      throttle_delay_secs: int):
+    while self.throttler.throttle_request(time.time() * _MSEC_TO_SEC):
+      LOGGER.info(
+          "Delaying request for %d seconds due to previous failures",
+          throttle_delay_secs)
+      time.sleep(throttle_delay_secs)
+      self.throttled_secs.inc(throttle_delay_secs)
+
+    try:
+      req_time = time.time()
+      prediction = model.get_embeddings(text_batch)
+      self.throttler.successful_request(req_time * _MSEC_TO_SEC)
+      return prediction
+    except TooManyRequests as e:
+      LOGGER.warning("request was limited by the service with code %i", e.code)
+      raise
+    except Exception as e:
+      LOGGER.error("unexpected exception raised as part of request, got %s", e)
+      raise
+
   def run_inference(
       self,
       batch: Sequence[str],
@@ -89,7 +151,8 @@ class _VertexAITextEmbeddingHandler(ModelHandler):
               text=text, title=self.title, task_type=self.task_type)
           for text in text_batch
       ]
-      embeddings_batch = model.get_embeddings(text_batch)
+      embeddings_batch = self.get_request(
+          text_batch=text_batch, model=model, throttle_delay_secs=5)
       embeddings.extend([el.values for el in embeddings_batch])
     return embeddings
 
@@ -173,6 +236,41 @@ class _VertexAIImageEmbeddingHandler(ModelHandler):
     self.model_name = model_name
     self.dimension = dimension
 
+    # Configure AdaptiveThrottler and throttling metrics for client-side
+    # throttling behavior.
+    # See 
https://docs.google.com/document/d/1ePorJGZnLbNCmLD9mR7iFYOdPsyDA1rDnTpYnbdrzSU/edit?usp=sharing
+    # for more details.
+    self.throttled_secs = Metrics.counter(
+        VertexAIImageEmbeddings, "cumulativeThrottlingSeconds")
+    self.throttler = AdaptiveThrottler(
+        window_ms=1, bucket_ms=1, overload_ratio=2)
+
+  @retry.with_exponential_backoff(
+      num_retries=5, retry_filter=_retry_on_appropriate_gcp_error)
+  def get_request(
+      self,
+      img: Image,
+      model: MultiModalEmbeddingModel,
+      throttle_delay_secs: int):
+    while self.throttler.throttle_request(time.time() * _MSEC_TO_SEC):
+      LOGGER.info(
+          "Delaying request for %d seconds due to previous failures",
+          throttle_delay_secs)
+      time.sleep(throttle_delay_secs)
+      self.throttled_secs.inc(throttle_delay_secs)
+
+    try:
+      req_time = time.time()
+      prediction = model.get_embeddings(image=img, dimension=self.dimension)
+      self.throttler.successful_request(req_time * _MSEC_TO_SEC)
+      return prediction
+    except TooManyRequests as e:
+      LOGGER.warning("request was limited by the service with code %i", e.code)
+      raise
+    except Exception as e:
+      LOGGER.error("unexpected exception raised as part of request, got %s", e)
+      raise
+
   def run_inference(
       self,
       batch: Sequence[Image],
@@ -182,8 +280,7 @@ class _VertexAIImageEmbeddingHandler(ModelHandler):
     embeddings = []
     # Maximum request size for muli-model embedding models is 1.
     for img in batch:
-      embedding_response = model.get_embeddings(
-          image=img, dimension=self.dimension)
+      embedding_response = self.get_request(img, model, throttle_delay_secs=5)
       embeddings.append(embedding_response.image_embedding)
     return embeddings
 

Reply via email to