This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new edd00b24580 Add support and unit test for PyOD models (#34709)
edd00b24580 is described below

commit edd00b24580108c09f43cd9b57d2fcca4086cc79
Author: Shunping Huang <[email protected]>
AuthorDate: Wed Apr 23 15:20:59 2025 -0400

    Add support and unit test for PyOD models (#34709)
    
    * Add support and unit test for PyOD models
    
    Additionally, it includes:
    - A minor fix for error messages in the `specifiable` module.
    - Support for scoring offline detectors on a subset of features.
    
    * Fix lints
    
    * More lints.
    
    * Add pyod dependencyto ml_test extra
    
    * Revise based on reviews. Fix lints.
    
    * Fix failed tests due to the side effect of lazy init on model handlers.
---
 .../ml/anomaly/detectors/pyod_adapter.py           | 110 ++++++++++++++
 .../ml/anomaly/detectors/pyod_adapter_test.py      | 160 +++++++++++++++++++++
 sdks/python/apache_beam/ml/anomaly/specifiable.py  |   4 +-
 sdks/python/apache_beam/ml/anomaly/transforms.py   |  14 ++
 sdks/python/setup.py                               |   2 +
 5 files changed, 288 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py 
b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py
new file mode 100644
index 00000000000..f8df78261b9
--- /dev/null
+++ b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import pickle
+from typing import Any
+from typing import Dict
+from typing import Iterable
+from typing import Optional
+from typing import Sequence
+
+import numpy as np
+
+import apache_beam as beam
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.anomaly.detectors.offline import OfflineDetector
+from apache_beam.ml.anomaly.specifiable import specifiable
+from apache_beam.ml.anomaly.thresholds import FixedThreshold
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import ModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import _PostProcessingModelHandler
+from apache_beam.ml.inference.utils import _convert_to_result
+from pyod.models.base import BaseDetector as PyODBaseDetector
+
+# Turn the used ModelHandler into specifiable, but without lazy init.
+KeyedModelHandler = specifiable(  # type: ignore[misc]
+    KeyedModelHandler,
+    on_demand_init=False,
+    just_in_time_init=False)
+_PostProcessingModelHandler = specifiable(  # type: ignore[misc]
+    _PostProcessingModelHandler,
+    on_demand_init=False,
+    just_in_time_init=False)
+
+
+@specifiable
+class PyODModelHandler(ModelHandler[beam.Row,
+                                    PredictionResult,
+                                    PyODBaseDetector]):
+  """Implementation of the ModelHandler interface for PyOD [#]_ Models.
+
+  The ModelHandler processes input data as `beam.Row` objects.
+
+  **NOTE:** This API and its implementation are currently under active
+  development and may not be backward compatible.
+
+  Args:
+    model_uri: The URI specifying the location of the pickled PyOD model.
+
+  .. [#] https://github.com/yzhao062/pyod
+  """
+  def __init__(self, model_uri: str):
+    self._model_uri = model_uri
+
+  def load_model(self) -> PyODBaseDetector:
+    file = FileSystems.open(self._model_uri, 'rb')
+    return pickle.load(file)
+
+  def run_inference(
+      self,
+      batch: Sequence[beam.Row],
+      model: PyODBaseDetector,
+      inference_args: Optional[Dict[str, Any]] = None
+  ) -> Iterable[PredictionResult]:
+    np_batch = []
+    for row in batch:
+      np_batch.append(np.fromiter(row, dtype=np.float64))
+
+    # stack a batch of samples into a 2-D array for better performance
+    vectorized_batch = np.stack(np_batch, axis=0)
+    predictions = model.decision_function(vectorized_batch)
+
+    return _convert_to_result(batch, predictions, model_id=self._model_uri)
+
+
+class PyODFactory():
+  @staticmethod
+  def create_detector(model_uri: str, **kwargs) -> OfflineDetector:
+    """A utility function to create OfflineDetector for a PyOD model.
+
+    **NOTE:** This API and its implementation are currently under active
+    development and may not be backward compatible.
+
+    Args:
+      model_uri: The URI specifying the location of the pickled PyOD model.
+      **kwargs: Additional keyword arguments.
+    """
+    model_handler = KeyedModelHandler(
+        PyODModelHandler(model_uri=model_uri)).with_postprocess_fn(
+            OfflineDetector.score_prediction_adapter)
+    m = model_handler.load_model()
+    assert (isinstance(m, PyODBaseDetector))
+    threshold = float(m.threshold_)
+    detector = OfflineDetector(
+        model_handler, threshold_criterion=FixedThreshold(threshold), 
**kwargs)  # type: ignore[arg-type]
+    return detector
diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter_test.py 
b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter_test.py
new file mode 100644
index 00000000000..bb83e1aeca1
--- /dev/null
+++ b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter_test.py
@@ -0,0 +1,160 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+import os.path
+import pickle
+import shutil
+import tempfile
+import unittest
+
+import numpy as np
+from parameterized import parameterized
+
+import apache_beam as beam
+from apache_beam.ml.anomaly.base import AnomalyPrediction
+from apache_beam.ml.anomaly.base import AnomalyResult
+from apache_beam.ml.anomaly.transforms import AnomalyDetection
+from apache_beam.ml.anomaly.transforms_test import _keyed_result_is_equal_to
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+# Protect against environments where onnx and pytorch library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
+try:
+  from apache_beam.ml.anomaly.detectors.pyod_adapter import PyODFactory
+  from pyod.models.iforest import IForest
+except ImportError:
+  raise unittest.SkipTest('PyOD dependencies are not installed')
+
+
+class PyODIForestTest(unittest.TestCase):
+  def setUp(self) -> None:
+    self.tmp_dir = tempfile.mkdtemp()
+
+    seed = 1234
+    model = IForest(random_state=seed)
+    model.fit(self.get_train_data())
+    self.pickled_model_uri = os.path.join(self.tmp_dir, 'iforest_pickled')
+
+    with open(self.pickled_model_uri, 'wb') as fp:
+      pickle.dump(model, fp)
+
+  def tearDown(self) -> None:
+    shutil.rmtree(self.tmp_dir)
+
+  def get_train_data(self):
+    return [
+        np.array([1, 5], dtype="float32"),
+        np.array([2, 6], dtype="float32"),
+        np.array([3, 4], dtype="float32"),
+        np.array([2, 6], dtype="float32"),
+        np.array([10, 10], dtype="float32"),  # need an outlier in training 
data
+        np.array([3, 4], dtype="float32"),
+        np.array([2, 6], dtype="float32"),
+        np.array([2, 6], dtype="float32"),
+        np.array([2, 5], dtype="float32"),
+    ]
+
+  def get_test_data(self):
+    return [
+        np.array([2, 6], dtype="float32"),
+        np.array([100, 100], dtype="float32"),
+    ]
+
+  def get_test_data_with_target(self):
+    return [
+        np.array([2, 6, 0], dtype="float32"),
+        np.array([100, 100, 1], dtype="float32"),
+    ]
+
+  @parameterized.expand([True, False])
+  def test_scoring_with_matched_features(self, with_target):
+    if with_target:
+      rows = [beam.Row(a=2, b=6, target=0), beam.Row(a=100, b=100, target=1)]
+      field_names = ["a", "b", "target"]
+      # The selected features should match the features used for training
+      detector = PyODFactory.create_detector(
+          self.pickled_model_uri, features=["a", "b"])
+      input_data = self.get_test_data_with_target()
+    else:
+      rows = [beam.Row(a=2, b=6), beam.Row(a=100, b=100)]
+      field_names = ["a", "b"]
+      detector = PyODFactory.create_detector(self.pickled_model_uri)
+      input_data = self.get_test_data()
+
+    expected_out = [(
+        0,
+        AnomalyResult(
+            example=rows[0],
+            predictions=[
+                AnomalyPrediction(
+                    model_id='OfflineDetector',
+                    score=-0.20316164744828075,
+                    label=0,
+                    threshold=8.326672684688674e-17,
+                    info='',
+                    source_predictions=None)
+            ])),
+                    (
+                        0,
+                        AnomalyResult(
+                            example=rows[1],
+                            predictions=[
+                                AnomalyPrediction(
+                                    model_id='OfflineDetector',
+                                    score=0.179516865091218,
+                                    label=1,
+                                    threshold=8.326672684688674e-17,
+                                    info='',
+                                    source_predictions=None)
+                            ]))]
+
+    options = PipelineOptions([])
+    with beam.Pipeline(options=options) as p:
+      out = (
+          p | beam.Create(input_data)
+          | beam.Map(lambda x: beam.Row(**dict(zip(field_names, map(int, x)))))
+          | beam.WithKeys(0)
+          | AnomalyDetection(detector=detector))
+      assert_that(out, equal_to(expected_out, _keyed_result_is_equal_to))
+
+  def test_scoring_with_unmatched_features(self):
+    # The model is trained with two features: a, b, but the input features of
+    # scoring has one more feature (target).
+    # In this case, we should either get rid of the extra feature(s) from
+    # the scoring input or set `features` when creating the offline detector
+    # (see the `test_scoring_with_matched_features`)
+    detector = PyODFactory.create_detector(self.pickled_model_uri)
+    options = PipelineOptions([])
+    p = beam.Pipeline(options=options)
+    _ = (
+        p | beam.Create(self.get_test_data_with_target())
+        | beam.Map(
+            lambda x: beam.Row(**dict(zip(["a", "b", "target"], map(int, x)))))
+        | beam.WithKeys(0)
+        | AnomalyDetection(detector=detector))
+
+    # This should raise a ValueError with message
+    # "X has 3 features, but IsolationForest is expecting 2 features as input."
+    self.assertRaises(ValueError, p.run)
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.WARNING)
+  unittest.main()
diff --git a/sdks/python/apache_beam/ml/anomaly/specifiable.py 
b/sdks/python/apache_beam/ml/anomaly/specifiable.py
index 3a2baf434f9..f13c9496672 100644
--- a/sdks/python/apache_beam/ml/anomaly/specifiable.py
+++ b/sdks/python/apache_beam/ml/anomaly/specifiable.py
@@ -116,7 +116,7 @@ def _specifiable_from_spec_helper(v, _run_init):
   # TODO: support spec treatment for more types
   if not isinstance(v, BUILTIN_TYPES_IN_SPEC):
     logging.warning(
-        "Type %s is not a recognized supported type for the"
+        "Type %s is not a recognized supported type for the "
         "specification. It will be included without conversion.",
         str(type(v)))
   return v
@@ -142,7 +142,7 @@ def _specifiable_to_spec_helper(v):
   # TODO: support spec treatment for more types
   if not isinstance(v, BUILTIN_TYPES_IN_SPEC):
     logging.warning(
-        "Type %s is not a recognized supported type for the"
+        "Type %s is not a recognized supported type for the "
         "specification. It will be included without conversion.",
         str(type(v)))
   return v
diff --git a/sdks/python/apache_beam/ml/anomaly/transforms.py 
b/sdks/python/apache_beam/ml/anomaly/transforms.py
index ae8c78dfc0f..c497421be02 100644
--- a/sdks/python/apache_beam/ml/anomaly/transforms.py
+++ b/sdks/python/apache_beam/ml/anomaly/transforms.py
@@ -454,6 +454,16 @@ class 
RunOfflineDetector(beam.PTransform[beam.PCollection[KeyedInputT],
         ])
     return orig_key, (temp_key, result)
 
+  def _select_features(self, elem: Tuple[Any,
+                                         beam.Row]) -> Tuple[Any, beam.Row]:
+    assert self._offline_detector._features is not None
+    k, v = elem
+    row_dict = v._asdict()
+    return (
+        k,
+        beam.Row(**{k: row_dict[k]
+                    for k in self._offline_detector._features}))
+
   def expand(
       self,
       input: beam.PCollection[KeyedInputT]) -> beam.PCollection[KeyedOutputT]:
@@ -468,6 +478,10 @@ class 
RunOfflineDetector(beam.PTransform[beam.PCollection[KeyedInputT],
     rekeyed_model_input = input | "Rekey" >> beam.Map(
         lambda x: ((x[0], x[1][0], x[1][1]), x[1][1]))
 
+    if self._offline_detector._features is not None:
+      rekeyed_model_input = rekeyed_model_input | "Select Features" >> 
beam.Map(
+          self._select_features)
+
     # ((orig_key, temp_key, beam.Row), AnomalyPrediction)
     rekeyed_model_output = (
         rekeyed_model_input
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 165b39918f1..2b21d0463c9 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -490,6 +490,7 @@ if __name__ == '__main__':
               'sentence-transformers',
               'skl2onnx',
               'pillow',
+              'pyod',
               'tensorflow',
               'tensorflow-hub',
               'tensorflow-transform',
@@ -509,6 +510,7 @@ if __name__ == '__main__':
               'sentence-transformers',
               'skl2onnx',
               'pillow',
+              'pyod',
               'tensorflow',
               'tensorflow-hub',
               'tf2onnx',

Reply via email to