tvalentyn commented on code in PR #26261:
URL: https://github.com/apache/beam/pull/26261#discussion_r1167351279
##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -368,20 +369,71 @@ def expand(
# batching DoFn APIs.
| beam.BatchElements(**self._model_handler.batch_elements_kwargs()))
+ run_inference_pardo = beam.ParDo(
+ _RunInferenceDoFn(
+ self._model_handler,
+ self._clock,
+ self._metrics_namespace,
+ self._enable_side_input_loading),
+ self._inference_args,
+ beam.pvalue.AsSingleton(
+ self._model_metadata_pcoll,
+ ) if self._enable_side_input_loading else None).with_resource_hints(
+ **resource_hints)
+
+ if self._with_exception_handling:
+ run_inference_pardo = run_inference_pardo.with_exception_handling(
+ exc_class=self._exc_class,
+ use_subprocess=self._use_subprocess,
+ threshold=self._threshold)
+
return (
batched_elements_pcoll
- | 'BeamML_RunInference' >> (
- beam.ParDo(
- _RunInferenceDoFn(
- self._model_handler,
- self._clock,
- self._metrics_namespace,
- self._enable_side_input_loading),
- self._inference_args,
- beam.pvalue.AsSingleton(
- self._model_metadata_pcoll,
- ) if self._enable_side_input_loading else
- None).with_resource_hints(**resource_hints)))
+ | 'BeamML_RunInference' >> run_inference_pardo)
+
+ def with_exception_handling(
+ self, *, exc_class=Exception, use_subprocess=False, threshold=1):
+ """Automatically provides a dead letter output for skipping bad records.
+ This can allow a pipeline to continue successfully rather than fail or
+ continuously throw errors on retry when bad elements are encountered.
+
+ This returns a tagged output with two PCollections, the first being the
+ results of successfully processing the input PCollection, and the second
+ being the set of bad batches of records (those which threw exceptions
+ during processing) along with information about the errors raised.
+
+ For example, one would write::
+
+ good, bad = RunInference(
+ maybe_error_raising_model_handler
+ ).with_exception_handling()
+
+ and `good` will be a PCollection of PredictionResults and `bad` will
+ contain a tuple of all batches that raised exceptions, along with their
+ corresponding exception.
+
+
+ Args:
+ exc_class: An exception class, or tuple of exception classes, to catch.
+ Optional, defaults to 'Exception'.
+ use_subprocess: Whether to execute the DoFn logic in a subprocess. This
Review Comment:
JFYI, use_subprocess currently has some known issues / limitations, which
may surface here as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]