This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new edd00b24580 Add support and unit test for PyOD models (#34709)
edd00b24580 is described below
commit edd00b24580108c09f43cd9b57d2fcca4086cc79
Author: Shunping Huang <[email protected]>
AuthorDate: Wed Apr 23 15:20:59 2025 -0400
Add support and unit test for PyOD models (#34709)
* Add support and unit test for PyOD models
Additionally, it includes:
- A minor fix for error messages in the `specifiable` module.
- Support for scoring offline detectors on a subset of features.
* Fix lints
* More lints.
* Add pyod dependencyto ml_test extra
* Revise based on reviews. Fix lints.
* Fix failed tests due to the side effect of lazy init on model handlers.
---
.../ml/anomaly/detectors/pyod_adapter.py | 110 ++++++++++++++
.../ml/anomaly/detectors/pyod_adapter_test.py | 160 +++++++++++++++++++++
sdks/python/apache_beam/ml/anomaly/specifiable.py | 4 +-
sdks/python/apache_beam/ml/anomaly/transforms.py | 14 ++
sdks/python/setup.py | 2 +
5 files changed, 288 insertions(+), 2 deletions(-)
diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py
b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py
new file mode 100644
index 00000000000..f8df78261b9
--- /dev/null
+++ b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import pickle
+from typing import Any
+from typing import Dict
+from typing import Iterable
+from typing import Optional
+from typing import Sequence
+
+import numpy as np
+
+import apache_beam as beam
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.anomaly.detectors.offline import OfflineDetector
+from apache_beam.ml.anomaly.specifiable import specifiable
+from apache_beam.ml.anomaly.thresholds import FixedThreshold
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import ModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import _PostProcessingModelHandler
+from apache_beam.ml.inference.utils import _convert_to_result
+from pyod.models.base import BaseDetector as PyODBaseDetector
+
+# Turn the used ModelHandler into specifiable, but without lazy init.
+KeyedModelHandler = specifiable( # type: ignore[misc]
+ KeyedModelHandler,
+ on_demand_init=False,
+ just_in_time_init=False)
+_PostProcessingModelHandler = specifiable( # type: ignore[misc]
+ _PostProcessingModelHandler,
+ on_demand_init=False,
+ just_in_time_init=False)
+
+
+@specifiable
+class PyODModelHandler(ModelHandler[beam.Row,
+ PredictionResult,
+ PyODBaseDetector]):
+ """Implementation of the ModelHandler interface for PyOD [#]_ Models.
+
+ The ModelHandler processes input data as `beam.Row` objects.
+
+ **NOTE:** This API and its implementation are currently under active
+ development and may not be backward compatible.
+
+ Args:
+ model_uri: The URI specifying the location of the pickled PyOD model.
+
+ .. [#] https://github.com/yzhao062/pyod
+ """
+ def __init__(self, model_uri: str):
+ self._model_uri = model_uri
+
+ def load_model(self) -> PyODBaseDetector:
+ file = FileSystems.open(self._model_uri, 'rb')
+ return pickle.load(file)
+
+ def run_inference(
+ self,
+ batch: Sequence[beam.Row],
+ model: PyODBaseDetector,
+ inference_args: Optional[Dict[str, Any]] = None
+ ) -> Iterable[PredictionResult]:
+ np_batch = []
+ for row in batch:
+ np_batch.append(np.fromiter(row, dtype=np.float64))
+
+ # stack a batch of samples into a 2-D array for better performance
+ vectorized_batch = np.stack(np_batch, axis=0)
+ predictions = model.decision_function(vectorized_batch)
+
+ return _convert_to_result(batch, predictions, model_id=self._model_uri)
+
+
+class PyODFactory():
+ @staticmethod
+ def create_detector(model_uri: str, **kwargs) -> OfflineDetector:
+ """A utility function to create OfflineDetector for a PyOD model.
+
+ **NOTE:** This API and its implementation are currently under active
+ development and may not be backward compatible.
+
+ Args:
+ model_uri: The URI specifying the location of the pickled PyOD model.
+ **kwargs: Additional keyword arguments.
+ """
+ model_handler = KeyedModelHandler(
+ PyODModelHandler(model_uri=model_uri)).with_postprocess_fn(
+ OfflineDetector.score_prediction_adapter)
+ m = model_handler.load_model()
+ assert (isinstance(m, PyODBaseDetector))
+ threshold = float(m.threshold_)
+ detector = OfflineDetector(
+ model_handler, threshold_criterion=FixedThreshold(threshold),
**kwargs) # type: ignore[arg-type]
+ return detector
diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter_test.py
b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter_test.py
new file mode 100644
index 00000000000..bb83e1aeca1
--- /dev/null
+++ b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter_test.py
@@ -0,0 +1,160 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+import os.path
+import pickle
+import shutil
+import tempfile
+import unittest
+
+import numpy as np
+from parameterized import parameterized
+
+import apache_beam as beam
+from apache_beam.ml.anomaly.base import AnomalyPrediction
+from apache_beam.ml.anomaly.base import AnomalyResult
+from apache_beam.ml.anomaly.transforms import AnomalyDetection
+from apache_beam.ml.anomaly.transforms_test import _keyed_result_is_equal_to
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+# Protect against environments where onnx and pytorch library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
+try:
+ from apache_beam.ml.anomaly.detectors.pyod_adapter import PyODFactory
+ from pyod.models.iforest import IForest
+except ImportError:
+ raise unittest.SkipTest('PyOD dependencies are not installed')
+
+
+class PyODIForestTest(unittest.TestCase):
+ def setUp(self) -> None:
+ self.tmp_dir = tempfile.mkdtemp()
+
+ seed = 1234
+ model = IForest(random_state=seed)
+ model.fit(self.get_train_data())
+ self.pickled_model_uri = os.path.join(self.tmp_dir, 'iforest_pickled')
+
+ with open(self.pickled_model_uri, 'wb') as fp:
+ pickle.dump(model, fp)
+
+ def tearDown(self) -> None:
+ shutil.rmtree(self.tmp_dir)
+
+ def get_train_data(self):
+ return [
+ np.array([1, 5], dtype="float32"),
+ np.array([2, 6], dtype="float32"),
+ np.array([3, 4], dtype="float32"),
+ np.array([2, 6], dtype="float32"),
+ np.array([10, 10], dtype="float32"), # need an outlier in training
data
+ np.array([3, 4], dtype="float32"),
+ np.array([2, 6], dtype="float32"),
+ np.array([2, 6], dtype="float32"),
+ np.array([2, 5], dtype="float32"),
+ ]
+
+ def get_test_data(self):
+ return [
+ np.array([2, 6], dtype="float32"),
+ np.array([100, 100], dtype="float32"),
+ ]
+
+ def get_test_data_with_target(self):
+ return [
+ np.array([2, 6, 0], dtype="float32"),
+ np.array([100, 100, 1], dtype="float32"),
+ ]
+
+ @parameterized.expand([True, False])
+ def test_scoring_with_matched_features(self, with_target):
+ if with_target:
+ rows = [beam.Row(a=2, b=6, target=0), beam.Row(a=100, b=100, target=1)]
+ field_names = ["a", "b", "target"]
+ # The selected features should match the features used for training
+ detector = PyODFactory.create_detector(
+ self.pickled_model_uri, features=["a", "b"])
+ input_data = self.get_test_data_with_target()
+ else:
+ rows = [beam.Row(a=2, b=6), beam.Row(a=100, b=100)]
+ field_names = ["a", "b"]
+ detector = PyODFactory.create_detector(self.pickled_model_uri)
+ input_data = self.get_test_data()
+
+ expected_out = [(
+ 0,
+ AnomalyResult(
+ example=rows[0],
+ predictions=[
+ AnomalyPrediction(
+ model_id='OfflineDetector',
+ score=-0.20316164744828075,
+ label=0,
+ threshold=8.326672684688674e-17,
+ info='',
+ source_predictions=None)
+ ])),
+ (
+ 0,
+ AnomalyResult(
+ example=rows[1],
+ predictions=[
+ AnomalyPrediction(
+ model_id='OfflineDetector',
+ score=0.179516865091218,
+ label=1,
+ threshold=8.326672684688674e-17,
+ info='',
+ source_predictions=None)
+ ]))]
+
+ options = PipelineOptions([])
+ with beam.Pipeline(options=options) as p:
+ out = (
+ p | beam.Create(input_data)
+ | beam.Map(lambda x: beam.Row(**dict(zip(field_names, map(int, x)))))
+ | beam.WithKeys(0)
+ | AnomalyDetection(detector=detector))
+ assert_that(out, equal_to(expected_out, _keyed_result_is_equal_to))
+
+ def test_scoring_with_unmatched_features(self):
+ # The model is trained with two features: a, b, but the input features of
+ # scoring has one more feature (target).
+ # In this case, we should either get rid of the extra feature(s) from
+ # the scoring input or set `features` when creating the offline detector
+ # (see the `test_scoring_with_matched_features`)
+ detector = PyODFactory.create_detector(self.pickled_model_uri)
+ options = PipelineOptions([])
+ p = beam.Pipeline(options=options)
+ _ = (
+ p | beam.Create(self.get_test_data_with_target())
+ | beam.Map(
+ lambda x: beam.Row(**dict(zip(["a", "b", "target"], map(int, x)))))
+ | beam.WithKeys(0)
+ | AnomalyDetection(detector=detector))
+
+ # This should raise a ValueError with message
+ # "X has 3 features, but IsolationForest is expecting 2 features as input."
+ self.assertRaises(ValueError, p.run)
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.WARNING)
+ unittest.main()
diff --git a/sdks/python/apache_beam/ml/anomaly/specifiable.py
b/sdks/python/apache_beam/ml/anomaly/specifiable.py
index 3a2baf434f9..f13c9496672 100644
--- a/sdks/python/apache_beam/ml/anomaly/specifiable.py
+++ b/sdks/python/apache_beam/ml/anomaly/specifiable.py
@@ -116,7 +116,7 @@ def _specifiable_from_spec_helper(v, _run_init):
# TODO: support spec treatment for more types
if not isinstance(v, BUILTIN_TYPES_IN_SPEC):
logging.warning(
- "Type %s is not a recognized supported type for the"
+ "Type %s is not a recognized supported type for the "
"specification. It will be included without conversion.",
str(type(v)))
return v
@@ -142,7 +142,7 @@ def _specifiable_to_spec_helper(v):
# TODO: support spec treatment for more types
if not isinstance(v, BUILTIN_TYPES_IN_SPEC):
logging.warning(
- "Type %s is not a recognized supported type for the"
+ "Type %s is not a recognized supported type for the "
"specification. It will be included without conversion.",
str(type(v)))
return v
diff --git a/sdks/python/apache_beam/ml/anomaly/transforms.py
b/sdks/python/apache_beam/ml/anomaly/transforms.py
index ae8c78dfc0f..c497421be02 100644
--- a/sdks/python/apache_beam/ml/anomaly/transforms.py
+++ b/sdks/python/apache_beam/ml/anomaly/transforms.py
@@ -454,6 +454,16 @@ class
RunOfflineDetector(beam.PTransform[beam.PCollection[KeyedInputT],
])
return orig_key, (temp_key, result)
+ def _select_features(self, elem: Tuple[Any,
+ beam.Row]) -> Tuple[Any, beam.Row]:
+ assert self._offline_detector._features is not None
+ k, v = elem
+ row_dict = v._asdict()
+ return (
+ k,
+ beam.Row(**{k: row_dict[k]
+ for k in self._offline_detector._features}))
+
def expand(
self,
input: beam.PCollection[KeyedInputT]) -> beam.PCollection[KeyedOutputT]:
@@ -468,6 +478,10 @@ class
RunOfflineDetector(beam.PTransform[beam.PCollection[KeyedInputT],
rekeyed_model_input = input | "Rekey" >> beam.Map(
lambda x: ((x[0], x[1][0], x[1][1]), x[1][1]))
+ if self._offline_detector._features is not None:
+ rekeyed_model_input = rekeyed_model_input | "Select Features" >>
beam.Map(
+ self._select_features)
+
# ((orig_key, temp_key, beam.Row), AnomalyPrediction)
rekeyed_model_output = (
rekeyed_model_input
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 165b39918f1..2b21d0463c9 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -490,6 +490,7 @@ if __name__ == '__main__':
'sentence-transformers',
'skl2onnx',
'pillow',
+ 'pyod',
'tensorflow',
'tensorflow-hub',
'tensorflow-transform',
@@ -509,6 +510,7 @@ if __name__ == '__main__':
'sentence-transformers',
'skl2onnx',
'pillow',
+ 'pyod',
'tensorflow',
'tensorflow-hub',
'tf2onnx',