damccorm commented on code in PR #34662: URL: https://github.com/apache/beam/pull/34662#discussion_r2051055123
########## sdks/python/apache_beam/ml/anomaly/transforms.py: ########## @@ -426,23 +426,28 @@ class RunOfflineDetector(beam.PTransform[beam.PCollection[KeyedInputT], def __init__(self, offline_detector: OfflineDetector): self._offline_detector = offline_detector - def restore_and_convert( - self, elem: Tuple[Tuple[Any, Any, beam.Row], float]) -> KeyedOutputT: - """Unnests and converts the model output to AnomalyResult. + def _restore_and_convert( + self, elem: Tuple[Tuple[Any, Any, beam.Row], Any]) -> KeyedOutputT: + """Converts the model output to AnomalyResult. Args: - nested: A tuple containing the combined key (origin key, temp key) and - a dictionary of input and output from RunInference. + elem: A tuple containing the combined key (original key, temp key, row) + and the output from RunInference. Returns: - A tuple containing the original key and AnomalyResult. + A tuple containing the keyed AnomalyResult. """ - (orig_key, temp_key, row), score = elem + (orig_key, temp_key, row), prediction = elem + assert isinstance(prediction, AnomalyPrediction), ( + "Wrong model handler output type." + + f"Expected: 'AnomalyPrediction', but got '{type(prediction).__name__}'. " + # pylint: disable=line-too-long + "Consider adding a post-processing function via `with_postprocess_fn`.") Review Comment: ```suggestion "Consider adding a post-processing function via `with_postprocess_fn`" + "to convert from '{type(prediction).__name__}' to 'AnomalyPrediction', or " + "use `score_prediction_adapter` or `label_prediction_adapter` to " + "perform the conversion.") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org