shunping commented on code in PR #34845: URL: https://github.com/apache/beam/pull/34845#discussion_r2074126092
########## examples/notebooks/beam-ml/anomaly_detection/anomaly_detection_iforest.ipynb: ########## @@ -0,0 +1,477 @@ +{ + "nbformat": 4, + "nbformat_minor": 0, + "metadata": { + "colab": { + "provenance": [], + "authorship_tag": "ABX9TyMFxdjrFa6MkZv9RHCO0MZM" + }, + "kernelspec": { + "name": "python3", + "display_name": "Python 3" + }, + "language_info": { + "name": "python" + } + }, + "cells": [ + { + "cell_type": "code", + "source": [ + "# @title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n", + "\n", + "# Licensed to the Apache Software Foundation (ASF) under one\n", + "# or more contributor license agreements. See the NOTICE file\n", + "# distributed with this work for additional information\n", + "# regarding copyright ownership. The ASF licenses this file\n", + "# to you under the Apache License, Version 2.0 (the\n", + "# \"License\"); you may not use this file except in compliance\n", + "# with the License. You may obtain a copy of the License at\n", + "#\n", + "# http://www.apache.org/licenses/LICENSE-2.0\n", + "#\n", + "# Unless required by applicable law or agreed to in writing,\n", + "# software distributed under the License is distributed on an\n", + "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n", + "# KIND, either express or implied. See the License for the\n", + "# specific language governing permissions and limitations\n", + "# under the License" + ], + "metadata": { + "cellView": "form", + "id": "IA9uYREbI3m3" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "# Use Apache Beam for Anomaly Detection on Streaming Data (Isolation Forest)\n", + "\n", + "This notebook demonstrates how to perform anomaly detection on streaming data using the AnomalyDetection PTransform, a new feature introduced in Apache Beam 2.64.0 with more improvement on offline model support in 2.65.0.\n", + "\n", + "We will first fetch the data set of Statlog (shuttle) from UCI Machine Learning Repository (cached in gs://apache-beam-samples/anomaly_detection/shuttle/, original link: https://archive.ics.uci.edu/dataset/148/statlog+shuttle). This data will be streamed into the pipeline following a periodic impulse. Our Beam pipeline will then apply the AnomalyDetection PTransform with the a pre-trained isolation forest model, compute model metrics.\n", + "\n", + "We demonstrate running the pipeline with our new local runner Prism." + ], + "metadata": { + "id": "Eef1VqflIrXW" + } + }, + { + "cell_type": "markdown", + "source": [ + "## Setting Environment Variables" + ], + "metadata": { + "id": "EKkFQE8iwT7M" + } + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "id": "L_18eUf7QU2I" + }, + "outputs": [], + "source": [ + "# GCP project id\n", + "PROJECT_ID = 'apache-beam-testing' # @param {type:'string'}\n", + "\n", + "# TODO: Change this to an official release once 2.65.0 is available\n", + "BEAM_VERSION = '2.65.0rc1'" + ] + }, + { + "cell_type": "code", + "source": [ + "from google.colab import auth\n", + "auth.authenticate_user(project_id=PROJECT_ID)" + ], + "metadata": { + "id": "A_49Y2aTQeiH" + }, + "execution_count": 2, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "## Installing Beam and Other Dependencies" + ], + "metadata": { + "id": "0WXpo4aDwG3N" + } + }, + { + "cell_type": "code", + "source": [ + "# For running with local prism runner\n", + "!pip install 'apache_beam[interactive]=={BEAM_VERSION}' --quiet\n", + "\n", + "# # For running with dataflow runner\n", + "# !pip install 'apache_beam[gcp, interactive]=={BEAM_VERSION}' --quiet" + ], + "metadata": { + "id": "5hpDAMOyQfHP" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "# Download the latest prism\n", + "# TODO: We don't need this step once 2.65.0 is available.\n", + "! wget https://dist.apache.org/repos/dist/dev/beam/2.65.0/prism/linux/amd64/apache_beam-v2.65.0-prism-linux-amd64.zip" + ], + "metadata": { + "id": "jAKBvrgq-J3f" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "# Install pyod for offline anomaly detectors\n", + "!pip install pyod==2.0.3" + ], + "metadata": { + "id": "KlkX-iwVm42J" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "## Training an Offline Isolation Forest Model" + ], + "metadata": { + "id": "UIzbsGtWto5X" + } + }, + { + "cell_type": "code", + "source": [ + "# Download the sample data from GCS\n", + "train_data_fn = \"./shuttle.trn\"\n", + "! gcloud storage cp \"gs://apache-beam-samples/anomaly_detection/shuttle/shuttle.trn\" {train_data_fn}" + ], + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "vb6ubiSyipuG", + "outputId": "e6b0c667-b6f8-43a3-ffd5-344c33ba02e1" + }, + "execution_count": 6, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "Copying gs://apache-beam-samples/anomaly_detection/shuttle/shuttle.trn to file://./shuttle.trn\n", + "\n", + "Average throughput: 180.7MiB/s\n" + ] + } + ] + }, + { + "cell_type": "code", + "source": [ + "import pandas as pd\n", + "import pyod.models.iforest as iforest\n", + "import pickle\n", + "\n", + "FIELD_NAMES = [\"time\", \"f1\", \"f2\", \"f3\", \"f4\", \"f5\", \"f6\", \"f7\", \"f8\", \"target\"]\n", + "SEP = \" \"\n", + "df = pd.read_csv(train_data_fn, sep=\" \", names=FIELD_NAMES)\n", + "\n", + "# We don't need the time and target field for training.\n", + "train_data = df.drop(['time', 'target'], axis=1)\n", + "train_data_np = train_data.to_numpy()\n", + "\n", + "# Training an isolation forest model\n", + "my_iforest = iforest.IForest(random_state=1234)\n", + "my_iforest.fit(train_data_np)\n", + "\n", + "# Save the model into a file\n", + "pickled_fn = './iforest_pickled'\n", + "with open(pickled_fn, 'wb') as fp:\n", + " pickle.dump(my_iforest, fp)" + ], + "metadata": { + "id": "P9MzeokSiPxK" + }, + "execution_count": 8, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "# Upload the pickled model file to GCS\n", + "pickled_fn_gcs = 'gs://shunping-test/anomaly-temp/iforest.pickled'\n", + "\n", + "## Uncomment the line below to write the model to GCS\n", + "# ! gcloud storage cp {pickled_fn} {pickled_fn_gcs}" + ], + "metadata": { + "id": "01JdmTuXuqY_" + }, + "execution_count": 9, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "## Defining a Streaming Source and a DoFn for Model Metrics" + ], + "metadata": { + "id": "q9HmkOY5vW-8" + } + }, + { + "cell_type": "code", + "source": [ + "import math\n", + "from typing import Any\n", + "from collections.abc import Sequence\n", + "\n", + "import sklearn\n", + "\n", + "import apache_beam as beam\n", + "from apache_beam.coders import PickleCoder\n", + "from apache_beam.coders import VarIntCoder\n", + "from apache_beam.transforms.periodicsequence import PeriodicImpulse\n", + "from apache_beam.transforms.userstate import ReadModifyWriteStateSpec\n", + "from apache_beam.transforms.window import FixedWindows\n", + "from apache_beam.ml.anomaly.base import AnomalyResult\n", + "\n", + "class SequenceToPeriodicStream(beam.PTransform):\n", + " \"\"\" A streaming source that generate periodic event based on a given sequence. \"\"\"\n", + " def __init__(self, data:Sequence[Any], delay: float = 0.1, repeat: bool = True):\n", + " self._data = data\n", + " self._delay = delay\n", + " self._repeat = repeat\n", + "\n", + " class EmitOne(beam.DoFn):\n", + " INDEX_SPEC = ReadModifyWriteStateSpec('index', VarIntCoder())\n", + "\n", + " def __init__(self, data, repeat):\n", + " self._data = data\n", + " self._repeat = repeat\n", + " self._max_index = len(self._data)\n", + "\n", + " def process(self, element, model_state=beam.DoFn.StateParam(INDEX_SPEC)):\n", + " index = model_state.read() or 0\n", + " if index >= self._max_index:\n", + " return\n", + "\n", + " yield self._data[index]\n", + "\n", + " index += 1\n", + " if self._repeat:\n", + " index %= self._max_index\n", + " model_state.write(index)\n", + "\n", + " def expand(self, input):\n", + " return (input | PeriodicImpulse(fire_interval=self._delay)\n", + " | beam.Map(lambda x: (0, x))\n", + " | beam.ParDo(SequenceToPeriodicStream.EmitOne(self._data, self._repeat))\n", + " | beam.WindowInto(FixedWindows(self._delay)))\n", + "\n", + "\n", + "class ComputeMetrics(beam.DoFn):\n", + " \"\"\" A DoFn to compute Area Under Curve (AUC) \"\"\"\n", + " METRIC_STATE_INDEX = ReadModifyWriteStateSpec('saved_tracker', PickleCoder())\n", + "\n", + " def __init__(self, get_target):\n", + " self._underlying: tuple[list[float], list[int]]\n", + " self._get_target = get_target\n", + "\n", + " def process(self,\n", + " element: tuple[Any, AnomalyResult],\n", + " metric_state=beam.DoFn.StateParam(METRIC_STATE_INDEX),\n", + " **kwargs):\n", + " self._underlying: tuple[list[float], list[int]] = metric_state.read()\n", + " if self._underlying is None:\n", + " scores = []\n", + " labels = []\n", + " targets = []\n", + " self._underlying = (scores, labels, targets)\n", + " else:\n", + " scores, labels, targets = self._underlying\n", + "\n", + " prediction = next(iter(element[1].predictions))\n", + " if math.isnan(prediction.score):\n", + " yield element[0], beam.Row()\n", + " else:\n", + " # buffer the scores and targets for auc computation\n", + " scores.append(prediction.score)\n", + " labels.append(prediction.label)\n", + " targets.append(self._get_target(element[1].example))\n", + "\n", + " accuracy = sklearn.metrics.accuracy_score(targets, labels)\n", + " recall = sklearn.metrics.recall_score(targets, labels)\n", + " precision = sklearn.metrics.precision_score(targets, labels)\n", + " f1 = sklearn.metrics.f1_score(targets, labels)\n", + " fpr, tpr, _ = sklearn.metrics.roc_curve(targets, scores)\n", + " auc = sklearn.metrics.auc(fpr, tpr)\n", + "\n", + " yield element[0], beam.Row(id=element[1].example.id,\n", + " target=element[1].example.target,\n", + " predicted_label=next(iter(element[1].predictions)).label,\n", + " predicted_score=next(iter(element[1].predictions)).score,\n", + " accuracy=float(accuracy),\n", + " recall=float(recall),\n", + " precision=float(precision),\n", + " f1=float(f1),\n", + " auc=float(auc))\n", + "\n", + " metric_state.write(self._underlying)" + ], + "metadata": { + "id": "4WgxpmAPQrbv" + }, + "execution_count": 10, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "## Preparing Test Data for Streaming" + ], + "metadata": { + "id": "OGRa4fbKxpr0" + } + }, + { + "cell_type": "code", + "source": [ + "# Download the data from GCS\n", + "test_data_fn = \"./test.trn\"\n", + "! gcloud storage cp \"gs://apache-beam-samples/anomaly_detection/shuttle/shuttle.tst\" {test_data_fn}" + ], + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "fnidxXybxuFD", + "outputId": "720a58b6-d53f-42ee-ff53-f734b0d88fe0" + }, + "execution_count": 11, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "Copying gs://apache-beam-samples/anomaly_detection/shuttle/shuttle.tst to file://./test.trn\n" + ] + } + ] + }, + { + "cell_type": "code", + "source": [ + "from apache_beam.io.filesystems import FileSystems\n", + "import pandas as pd\n", + "\n", + "FIELD_NAMES = [\"time\", \"f1\", \"f2\", \"f3\", \"f4\", \"f5\", \"f6\", \"f7\", \"f8\", \"target\"]\n", + "SEP = \" \"\n", + "with FileSystems().open(test_data_fn) as f:\n", + " df = pd.read_csv(f, sep=\" \", names=FIELD_NAMES)\n", + " # just use first 500 instances for demo\n", + " df = df[:500]\n", + " rows = [row.to_dict() for _, row in df.iterrows()]\n", + " for i, row in enumerate(rows):\n", + " row[\"id\"] = i\n", + "\n", + "# Dropping time and target for testing\n", + "test_data_cols = FIELD_NAMES.copy()\n", + "test_data_cols.remove(\"time\")\n", + "test_data_cols.remove(\"target\")" + ], + "metadata": { + "id": "iwkrY8oXSAlQ" + }, + "execution_count": 12, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "## Constructing and Run the Pipeline" + ], + "metadata": { + "id": "GKFwC-ky0XZ5" + } + }, + { + "cell_type": "code", + "source": [ + "from apache_beam.ml.anomaly.detectors.pyod_adapter import PyODFactory\n", + "\n", + "# Create detector for PyOd model pickled file\n", + "detector = PyODFactory.create_detector(pickled_fn_gcs, features=test_data_cols)" + ], + "metadata": { + "id": "jP_M9pY2mp1C" + }, + "execution_count": 13, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "import apache_beam as beam\n", + "from apache_beam.options.pipeline_options import PipelineOptions\n", + "from apache_beam.ml.anomaly.transforms import AnomalyDetection\n", + "from apache_beam.transforms.window import GlobalWindows\n", + "from apache_beam.io import fileio\n", + "\n", + "import logging\n", + "logging.getLogger().setLevel(logging.INFO)\n", + "\n", + "\n", + "# # Running the pipeline on dataflow\n", Review Comment: Done. ########## examples/notebooks/beam-ml/anomaly_detection/anomaly_detection_iforest.ipynb: ########## @@ -0,0 +1,477 @@ +{ + "nbformat": 4, + "nbformat_minor": 0, + "metadata": { + "colab": { + "provenance": [], + "authorship_tag": "ABX9TyMFxdjrFa6MkZv9RHCO0MZM" + }, + "kernelspec": { + "name": "python3", + "display_name": "Python 3" + }, + "language_info": { + "name": "python" + } + }, + "cells": [ + { + "cell_type": "code", + "source": [ + "# @title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n", + "\n", + "# Licensed to the Apache Software Foundation (ASF) under one\n", + "# or more contributor license agreements. See the NOTICE file\n", + "# distributed with this work for additional information\n", + "# regarding copyright ownership. The ASF licenses this file\n", + "# to you under the Apache License, Version 2.0 (the\n", + "# \"License\"); you may not use this file except in compliance\n", + "# with the License. You may obtain a copy of the License at\n", + "#\n", + "# http://www.apache.org/licenses/LICENSE-2.0\n", + "#\n", + "# Unless required by applicable law or agreed to in writing,\n", + "# software distributed under the License is distributed on an\n", + "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n", + "# KIND, either express or implied. See the License for the\n", + "# specific language governing permissions and limitations\n", + "# under the License" + ], + "metadata": { + "cellView": "form", + "id": "IA9uYREbI3m3" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "# Use Apache Beam for Anomaly Detection on Streaming Data (Isolation Forest)\n", + "\n", + "This notebook demonstrates how to perform anomaly detection on streaming data using the AnomalyDetection PTransform, a new feature introduced in Apache Beam 2.64.0 with more improvement on offline model support in 2.65.0.\n", + "\n", + "We will first fetch the data set of Statlog (shuttle) from UCI Machine Learning Repository (cached in gs://apache-beam-samples/anomaly_detection/shuttle/, original link: https://archive.ics.uci.edu/dataset/148/statlog+shuttle). This data will be streamed into the pipeline following a periodic impulse. Our Beam pipeline will then apply the AnomalyDetection PTransform with the a pre-trained isolation forest model, compute model metrics.\n", + "\n", + "We demonstrate running the pipeline with our new local runner Prism." + ], + "metadata": { + "id": "Eef1VqflIrXW" + } + }, + { + "cell_type": "markdown", + "source": [ + "## Setting Environment Variables" + ], + "metadata": { + "id": "EKkFQE8iwT7M" + } + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "id": "L_18eUf7QU2I" + }, + "outputs": [], + "source": [ + "# GCP project id\n", + "PROJECT_ID = 'apache-beam-testing' # @param {type:'string'}\n", + "\n", + "# TODO: Change this to an official release once 2.65.0 is available\n", + "BEAM_VERSION = '2.65.0rc1'" + ] + }, + { + "cell_type": "code", + "source": [ + "from google.colab import auth\n", + "auth.authenticate_user(project_id=PROJECT_ID)" + ], + "metadata": { + "id": "A_49Y2aTQeiH" + }, + "execution_count": 2, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "## Installing Beam and Other Dependencies" + ], + "metadata": { + "id": "0WXpo4aDwG3N" + } + }, + { + "cell_type": "code", + "source": [ + "# For running with local prism runner\n", + "!pip install 'apache_beam[interactive]=={BEAM_VERSION}' --quiet\n", + "\n", + "# # For running with dataflow runner\n", + "# !pip install 'apache_beam[gcp, interactive]=={BEAM_VERSION}' --quiet" + ], + "metadata": { + "id": "5hpDAMOyQfHP" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "# Download the latest prism\n", + "# TODO: We don't need this step once 2.65.0 is available.\n", + "! wget https://dist.apache.org/repos/dist/dev/beam/2.65.0/prism/linux/amd64/apache_beam-v2.65.0-prism-linux-amd64.zip" + ], + "metadata": { + "id": "jAKBvrgq-J3f" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "# Install pyod for offline anomaly detectors\n", + "!pip install pyod==2.0.3" + ], + "metadata": { + "id": "KlkX-iwVm42J" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "## Training an Offline Isolation Forest Model" + ], + "metadata": { + "id": "UIzbsGtWto5X" + } + }, + { + "cell_type": "code", + "source": [ + "# Download the sample data from GCS\n", + "train_data_fn = \"./shuttle.trn\"\n", + "! gcloud storage cp \"gs://apache-beam-samples/anomaly_detection/shuttle/shuttle.trn\" {train_data_fn}" + ], + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "vb6ubiSyipuG", + "outputId": "e6b0c667-b6f8-43a3-ffd5-344c33ba02e1" + }, + "execution_count": 6, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "Copying gs://apache-beam-samples/anomaly_detection/shuttle/shuttle.trn to file://./shuttle.trn\n", + "\n", + "Average throughput: 180.7MiB/s\n" Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org