shunping commented on code in PR #34662:
URL: https://github.com/apache/beam/pull/34662#discussion_r2053263282


##########
sdks/python/apache_beam/ml/anomaly/transforms.py:
##########
@@ -426,23 +426,28 @@ class 
RunOfflineDetector(beam.PTransform[beam.PCollection[KeyedInputT],
   def __init__(self, offline_detector: OfflineDetector):
     self._offline_detector = offline_detector
 
-  def restore_and_convert(
-      self, elem: Tuple[Tuple[Any, Any, beam.Row], float]) -> KeyedOutputT:
-    """Unnests and converts the model output to AnomalyResult.
+  def _restore_and_convert(
+      self, elem: Tuple[Tuple[Any, Any, beam.Row], Any]) -> KeyedOutputT:
+    """Converts the model output to AnomalyResult.
 
     Args:
-      nested: A tuple containing the combined key (origin key, temp key) and
-        a dictionary of input and output from RunInference.
+      elem: A tuple containing the combined key (original key, temp key, row)
+        and the output from RunInference.
 
     Returns:
-      A tuple containing the original key and AnomalyResult.
+      A tuple containing the keyed AnomalyResult.
     """
-    (orig_key, temp_key, row), score = elem
+    (orig_key, temp_key, row), prediction = elem
+    assert isinstance(prediction, AnomalyPrediction), (
+      "Wrong model handler output type." +
+      f"Expected: 'AnomalyPrediction', but got '{type(prediction).__name__}'. 
" +  # pylint: disable=line-too-long
+      "Consider adding a post-processing function via `with_postprocess_fn`.")

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to