This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch users/damccorm/timeout-cb
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/users/damccorm/timeout-cb by 
this push:
     new eb3da370760 Feedback
     new 12ecc04f0f7 Merge branch 'users/damccorm/timeout-cb' of 
https://github.com/apache/beam into users/damccorm/timeout-cb
eb3da370760 is described below

commit eb3da370760bc96f893dba7df0a111a405217764
Author: Danny McCormick <[email protected]>
AuthorDate: Wed Aug 21 16:07:41 2024 +0100

    Feedback
---
 sdks/python/apache_beam/ml/inference/base.py | 22 ++++++++++++----------
 1 file changed, 12 insertions(+), 10 deletions(-)

diff --git a/sdks/python/apache_beam/ml/inference/base.py 
b/sdks/python/apache_beam/ml/inference/base.py
index 150d334077c..e055bb1e129 100644
--- a/sdks/python/apache_beam/ml/inference/base.py
+++ b/sdks/python/apache_beam/ml/inference/base.py
@@ -1316,7 +1316,8 @@ class 
RunInference(beam.PTransform[beam.PCollection[Union[ExampleT,
           return
         model_metadata = load_model_status(model_tag, share_across_processes)
         model_metadata.try_mark_current_model_invalid(timeout)
-        logging.warning("Operation timed out, etc…….")
+        logging.warning("Inference failed with a timeout, marking the current "
+                        + "model for garbage collection")
 
       callback = None
       if (self._timeout is not None and
@@ -1351,7 +1352,7 @@ class 
RunInference(beam.PTransform[beam.PCollection[Union[ExampleT,
       exc_class=Exception,
       use_subprocess=False,
       threshold=1,
-      timeout=None):
+      timeout: Optional[int] = None):
     """Automatically provides a dead letter output for skipping bad records.
     This can allow a pipeline to continue successfully rather than fail or
     continuously throw errors on retry when bad elements are encountered.
@@ -1405,12 +1406,12 @@ class 
RunInference(beam.PTransform[beam.PCollection[Union[ExampleT,
       threshold: An upper bound on the ratio of records that can be bad before
           aborting the entire pipeline. Optional, defaults to 1.0 (meaning
           up to 100% of records can be bad and the pipeline will still 
succeed).
-      timeout: The maximum amount of time given to load a model, RunInference
-          on a batch of elements and perform and pre/postprocessing operations.
-          Since the timeout applies in multiple places, it should be equal to
-          the maximum possible timeout for any of these operations. Note in
-          particular that model load and inference on a single batch count to
-          the same timeout value. When an inference fails, all related
+      timeout: The maximum amount of time in seconds given to load a model, run
+          inference on a batch of elements and perform and pre/postprocessing
+          operations. Since the timeout applies in multiple places, it should
+          be equal to the maximum possible timeout for any of these operations.
+          Note in particular that model load and inference on a single batch
+          count to the same timeout value. When an inference fails, all related
           resources, including the model, will be deleted and reloaded. As a
           result, it is recommended to leave significant buffer and set the
           timeout to at least `2 * (time to load model + time to run
@@ -1544,7 +1545,8 @@ class _ModelStatus():
       Will always return a valid tag. If the passed in tag is valid, this
       function will simply return it, otherwise it will deterministically
       generate a new tag to use instead. The new tag will be the original tag
-      with an incrementing suffix (e.g. `my_tag_1`, `my_tag_2`) for each reload
+      with an incrementing suffix (e.g. `my_tag_reload_1`, `my_tag_reload_2`)
+      for each reload
     """
     if tag not in self._invalid_tags:
       if tag not in self._model_first_seen:
@@ -1646,7 +1648,7 @@ class _RunInferenceDoFn(beam.DoFn, Generic[ExampleT, 
PredictionT]):
         model_handler: An implementation of ModelHandler.
         clock: A clock implementing time_ns. *Used for unit testing.*
         metrics_namespace: Namespace of the transform to collect metrics.
-        enable_side_input_loading: Bool to indicate if model loading should be
+        load_model_at_runtime: Bool to indicate if model loading should be
             deferred to runtime - for example if we are depending on side
             inputs to get the model path or we want to enforce a timeout on
             model loading.

Reply via email to