damccorm commented on code in PR #26309:
URL: https://github.com/apache/beam/pull/26309#discussion_r1175505250


##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -382,14 +572,38 @@ def expand(
             **resource_hints)
 
     if self._with_exception_handling:
-      run_inference_pardo = run_inference_pardo.with_exception_handling(
+      results, bad_inference = (
+          batched_elements_pcoll
+          | 'BeamML_RunInference' >>
+          run_inference_pardo.with_exception_handling(
           exc_class=self._exc_class,
           use_subprocess=self._use_subprocess,
-          threshold=self._threshold)
+          threshold=self._threshold))
+    else:
+      results = (
+          batched_elements_pcoll
+          | 'BeamML_RunInference' >> run_inference_pardo)
+
+    for idx in range(len(postprocess_fns)):
+      fn = postprocess_fns[idx]
+      if self._with_exception_handling:
+        results, bad = (results
+        | f"BeamML_RunInference_Postprocess-{idx}" >> beam.Map(
+          fn).with_exception_handling(
+          exc_class=self._exc_class,
+          use_subprocess=self._use_subprocess,
+          threshold=self._threshold))

Review Comment:
   Good call



##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -404,13 +618,31 @@ def with_exception_handling(
 
     For example, one would write::
 
-        good, bad = RunInference(
+        main, other = RunInference(
           maybe_error_raising_model_handler
         ).with_exception_handling()
 
-    and `good` will be a PCollection of PredictionResults and `bad` will
-    contain a tuple of all batches that raised exceptions, along with their
-    corresponding exception.
+    and `good` will be a PCollection of PredictionResults and `other` will
+    contain a `RunInferenceDLQ` object with PCollections containing failed
+    records for each failed inference, preprocess operation, or postprocess
+    operation. To access each collection of failed records, one would write:
+
+        failed_inferences = other.failed_inferences
+        failed_preprocessing = other.failed_preprocessing
+        failed_postprocessing = other.failed_postprocessing
+
+    failed_inferences is in the form
+    PCollection[Tuple[failed batch, exception]].
+
+    failed_preprocessing is in the form
+    PCollectionList[Tuple[failed record, exception]]], where each element of
+    the list corresponds to a preprocess function. These PCollections are
+    in the same order that the preprocess functions are applied.
+
+    failed_postprocessing is in the form
+    PCollectionList[Tuple[failed record, exception]]], where each element of

Review Comment:
   Good catch - just a typo (which I then copy and pasted 🙃)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to