This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch users/damccorm/timeout-cb
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/users/damccorm/timeout-cb by
this push:
new eb3da370760 Feedback
new 12ecc04f0f7 Merge branch 'users/damccorm/timeout-cb' of
https://github.com/apache/beam into users/damccorm/timeout-cb
eb3da370760 is described below
commit eb3da370760bc96f893dba7df0a111a405217764
Author: Danny McCormick <[email protected]>
AuthorDate: Wed Aug 21 16:07:41 2024 +0100
Feedback
---
sdks/python/apache_beam/ml/inference/base.py | 22 ++++++++++++----------
1 file changed, 12 insertions(+), 10 deletions(-)
diff --git a/sdks/python/apache_beam/ml/inference/base.py
b/sdks/python/apache_beam/ml/inference/base.py
index 150d334077c..e055bb1e129 100644
--- a/sdks/python/apache_beam/ml/inference/base.py
+++ b/sdks/python/apache_beam/ml/inference/base.py
@@ -1316,7 +1316,8 @@ class
RunInference(beam.PTransform[beam.PCollection[Union[ExampleT,
return
model_metadata = load_model_status(model_tag, share_across_processes)
model_metadata.try_mark_current_model_invalid(timeout)
- logging.warning("Operation timed out, etc…….")
+ logging.warning("Inference failed with a timeout, marking the current "
+ + "model for garbage collection")
callback = None
if (self._timeout is not None and
@@ -1351,7 +1352,7 @@ class
RunInference(beam.PTransform[beam.PCollection[Union[ExampleT,
exc_class=Exception,
use_subprocess=False,
threshold=1,
- timeout=None):
+ timeout: Optional[int] = None):
"""Automatically provides a dead letter output for skipping bad records.
This can allow a pipeline to continue successfully rather than fail or
continuously throw errors on retry when bad elements are encountered.
@@ -1405,12 +1406,12 @@ class
RunInference(beam.PTransform[beam.PCollection[Union[ExampleT,
threshold: An upper bound on the ratio of records that can be bad before
aborting the entire pipeline. Optional, defaults to 1.0 (meaning
up to 100% of records can be bad and the pipeline will still
succeed).
- timeout: The maximum amount of time given to load a model, RunInference
- on a batch of elements and perform and pre/postprocessing operations.
- Since the timeout applies in multiple places, it should be equal to
- the maximum possible timeout for any of these operations. Note in
- particular that model load and inference on a single batch count to
- the same timeout value. When an inference fails, all related
+ timeout: The maximum amount of time in seconds given to load a model, run
+ inference on a batch of elements and perform and pre/postprocessing
+ operations. Since the timeout applies in multiple places, it should
+ be equal to the maximum possible timeout for any of these operations.
+ Note in particular that model load and inference on a single batch
+ count to the same timeout value. When an inference fails, all related
resources, including the model, will be deleted and reloaded. As a
result, it is recommended to leave significant buffer and set the
timeout to at least `2 * (time to load model + time to run
@@ -1544,7 +1545,8 @@ class _ModelStatus():
Will always return a valid tag. If the passed in tag is valid, this
function will simply return it, otherwise it will deterministically
generate a new tag to use instead. The new tag will be the original tag
- with an incrementing suffix (e.g. `my_tag_1`, `my_tag_2`) for each reload
+ with an incrementing suffix (e.g. `my_tag_reload_1`, `my_tag_reload_2`)
+ for each reload
"""
if tag not in self._invalid_tags:
if tag not in self._model_first_seen:
@@ -1646,7 +1648,7 @@ class _RunInferenceDoFn(beam.DoFn, Generic[ExampleT,
PredictionT]):
model_handler: An implementation of ModelHandler.
clock: A clock implementing time_ns. *Used for unit testing.*
metrics_namespace: Namespace of the transform to collect metrics.
- enable_side_input_loading: Bool to indicate if model loading should be
+ load_model_at_runtime: Bool to indicate if model loading should be
deferred to runtime - for example if we are depending on side
inputs to get the model path or we want to enforce a timeout on
model loading.