This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ce2e0f2122d Polish anomaly detection notebook and get ready to be 
imported in public devsite. (#35278)
ce2e0f2122d is described below

commit ce2e0f2122d492e1e2fc1184088ee58d4e8b7d3c
Author: Shunping Huang <[email protected]>
AuthorDate: Fri Jun 13 13:17:10 2025 -0400

    Polish anomaly detection notebook and get ready to be imported in public 
devsite. (#35278)
    
    * Polish anomaly detection zscore notebook for public doc.
    
    * Adjust formatting.
    
    * Adjust formatting.
---
 .../anomaly_detection_zscore.ipynb                 | 449 +++++++++++++++++----
 1 file changed, 372 insertions(+), 77 deletions(-)

diff --git 
a/examples/notebooks/beam-ml/anomaly_detection/anomaly_detection_zscore.ipynb 
b/examples/notebooks/beam-ml/anomaly_detection/anomaly_detection_zscore.ipynb
index 1cf91b544b2..cc3951a882b 100644
--- 
a/examples/notebooks/beam-ml/anomaly_detection/anomaly_detection_zscore.ipynb
+++ 
b/examples/notebooks/beam-ml/anomaly_detection/anomaly_detection_zscore.ipynb
@@ -8,7 +8,7 @@
         "cellView": "form",
         "id": "2d79fe3a-952b-478f-ba78-44cafddc91d1"
       },
-      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+      "outputs": [],
       "source": [
         "# @title ###### Licensed to the Apache Software Foundation (ASF), 
Version 2.0 (the \"License\")\n",
         "\n",
@@ -52,17 +52,18 @@
     {
       "cell_type": "markdown",
       "source": [
-        "This notebook demonstrates how to perform anomaly detection on both 
batch and streaming data using the `AnomalyDetection` PTransform. This feature 
was introduced in Apache Beam version 2.64.0.\n",
+        "This notebook demonstrates how to perform anomaly detection on both 
batch and streaming data using the `AnomalyDetection` PTransform:\n",
         "\n",
-        "This notebook is divided into two main sections:\n",
-        "1. Batch Anomaly Detection: We will first generate a synthetic 
univariate dataset containing outliers. We will then apply the 
`AnomalyDetection` PTransform, configured to use the Z-Score algorithm, to this 
batch data. The outlier results (scores and labels) will be logged directly.\n",
+        "1. **Batch Anomaly Detection**: This section focuses on processing a 
static dataset. A synthetic univariate dataset containing outliers is 
generated. Subsequently, the AnomalyDetection PTransform, utilizing the Z-Score 
algorithm, is applied to identify and log the outliers.\n",
         "\n",
-        "1. Streaming Anomaly Detection with Concept Drift: We will generate 
another synthetic univariate dataset that not only includes outliers but also 
incorporates various types of concept drift (i.e., changes in the underlying 
data distribution over time). This data will be published to a Pub/Sub topic to 
simulate a real-time streaming input. An Apache Beam pipeline will then:\n",
-        "  - Read the data from this input Pub/Sub topic.\n",
-        "  - Apply the AnomalyDetection PTransform using the Z-Score algorithm 
within a sliding window to calculate anomaly scores and assign labels.\n",
-        "  - Publish these results (the original data along with their anomaly 
scores and labels) to a second Pub/Sub topic.\n",
+        "2. **Streaming Anomaly Detection with Concept Drift**: This section 
simulates a real-time environment where the data distribution changes over 
time. A synthetic dataset incorporating both outliers and concept drift is 
published to a Pub/Sub topic. An Apache Beam pipeline is configured to:\n",
         "\n",
-        "  Finally, we will visualize the labeled data points in an animated 
plot to observe the detection performance in a streaming context with concept 
drift."
+        "    * Read the streaming data from the input Pub/Sub topic.\n",
+        "    * Apply the AnomalyDetection PTransform within a sliding 
window.\n",
+        "    * Publish the enriched results (original data, anomaly scores, 
and labels) to an output Pub/Sub topic.\n",
+        "  \n",
+        "    Finally, the labeled data points are visulaized in a series of 
plots to observe the detection performance in a streaming context with concept 
drift.\n",
+        "\n"
       ],
       "metadata": {
         "id": "pIlokenR1vs7"
@@ -83,15 +84,14 @@
     {
       "cell_type": "code",
       "source": [
-        "! pip install apache_beam[interactive,gcp]>=2.64.0 --quiet"
+        "! pip install 'apache_beam[interactive,gcp]>=2.64.0' --quiet"
       ],
       "metadata": {
-        "collapsed": true,
         "id": "SafqA6dALKvo"
       },
       "id": "SafqA6dALKvo",
       "execution_count": null,
-      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
+      "outputs": []
     },
     {
       "cell_type": "markdown",
@@ -105,14 +105,14 @@
     },
     {
       "cell_type": "code",
-      "execution_count": null,
+      "execution_count": 2,
       "id": "8fb71376-b0eb-474b-ab51-2161dfa60e2d",
       "metadata": {
         "id": "8fb71376-b0eb-474b-ab51-2161dfa60e2d"
       },
-      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+      "outputs": [],
       "source": [
-        "# Imports Required for the notebook\n",
+        "# Import required dependencies for the notebook\n",
         "import json\n",
         "import os\n",
         "import random\n",
@@ -163,12 +163,12 @@
     },
     {
       "cell_type": "code",
-      "execution_count": null,
+      "execution_count": 3,
       "id": "29cda7f0-a24e-4e74-ba6e-166413ab532c",
       "metadata": {
         "id": "29cda7f0-a24e-4e74-ba6e-166413ab532c"
       },
-      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+      "outputs": [],
       "source": [
         "# GCP-related constant are listed below\n",
         "\n",
@@ -206,8 +206,8 @@
         "id": "51jml7JvMpbD"
       },
       "id": "51jml7JvMpbD",
-      "execution_count": null,
-      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
+      "execution_count": 4,
+      "outputs": []
     },
     {
       "cell_type": "markdown",
@@ -222,7 +222,13 @@
     {
       "cell_type": "markdown",
       "source": [
-        "### Generating Synthetic Data with Outliers"
+        "### Generating Synthetic Data with Outliers\n",
+        "This process synthesizes a dataset (N=200) for anomaly detection. The 
generation consists of two key steps:\n",
+        "\n",
+        "- A base dataset is generated from a standard normal distribution 
(μ=0,σ=1).\n",
+        "- Global outliers are introduced by replacing 1% of these points with 
values drawn from a normal distribution with a significant mean shift 
(μ=9,σ=1).\n",
+        "\n",
+        "A fixed random seed is used to ensure reproducibility."
       ],
       "metadata": {
         "id": "tj9wPNIZxcf2"
@@ -258,8 +264,18 @@
         "id": "S4hqN5tPxm-n"
       },
       "id": "S4hqN5tPxm-n",
-      "execution_count": null,
-      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
+      "execution_count": 5,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "Run the following code to visualize the dataset on a scatter plot."
+      ],
+      "metadata": {
+        "id": "KeCNv5m4mx4G"
+      },
+      "id": "KeCNv5m4mx4G"
     },
     {
       "cell_type": "code",
@@ -270,16 +286,52 @@
         "plt.scatter(x=range(len(df)), y=df, s=10)"
       ],
       "metadata": {
-        "id": "IUD3giMzyxer"
+        "id": "IUD3giMzyxer",
+        "colab": {
+          "base_uri": "https://localhost:8080/";,
+          "height": 391
+        },
+        "outputId": "bbf8d53c-068c-447c-ec6b-b899c0585b80"
       },
       "id": "IUD3giMzyxer",
-      "execution_count": null,
-      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
+      "execution_count": 6,
+      "outputs": [
+        {
+          "output_type": "execute_result",
+          "data": {
+            "text/plain": [
+              "<matplotlib.collections.PathCollection at 0x7ef3027b49d0>"
+            ]
+          },
+          "metadata": {},
+          "execution_count": 6
+        },
+        {
+          "output_type": "display_data",
+          "data": {
+            "text/plain": [
+              "<Figure size 1200x400 with 1 Axes>"
+            ],
+            "image/png": 
"iVBORw0KGgoAAAANSUhEUgAAA+oAAAFlCAYAAAB85xL2AAAAOnRFWHRTb2Z0d2FyZQBNYXRwbG90bGliIHZlcnNpb24zLjEwLjAsIGh0dHBzOi8vbWF0cGxvdGxpYi5vcmcvlHJYcgAAAAlwSFlzAAAPYQAAD2EBqD+naQAANlZJREFUeJzt3X2QHVWdP/7PBMjwlJkQ8jDJkmCCCMhDDCjZ+IRIipBiFYR1lWVXcFmUGFQIKsRSFHY1CFXoV4uFrfqJsOWKrvUTLHFli8ewLgHlIV9XhBRgJLBkAgQyA0EmIenfH/vLmJncmbn3Tvftc++8XlWpytzHc/v06T7v06e727IsywIAAABIwriyCwAAAAD8iaAOAAAACRHUAQAAICGCOgAAACREUAcAAICECOoAAACQEEEdAAAAEiKoAwAAQEIEdQAAAEiIoA4AAAAJKTSor1ixIt7xjnfEhA
 [...]
+          },
+          "metadata": {}
+        }
+      ]
     },
     {
       "cell_type": "markdown",
       "source": [
-        "### Run the Beam Pipeline on the Batch Data"
+        "### Run the Beam Pipeline on the Batch Data\n",
+        "\n",
+        "The following Beam pipeline implements an anomaly detection workflow 
on batch data. It executes the following steps in sequence:\n",
+        "\n",
+        "- **Ingest and Format**: The pipeline begins by ingesting a 
collection of numerical data and converting each number into a `beam.Row`.\n",
+        "\n",
+        "- **Key for Stateful Processing**: A single global key is assigned to 
every element. This ensures all data is processed by a single instance of the 
downstream stateful transform.\n",
+        "\n",
+        "- **Anomaly Detection**: The `AnomalyDetection` PTransform is applied 
to the keyed data.\n",
+        "\n",
+        "- **Log Outliers**: A `Filter` transform inspects the prediction 
output from the detector, retaining only the elements flagged as anomalies 
(label == 1). These outlier records are then logged for inspection or 
downstream action."
       ],
       "metadata": {
         "id": "_JV5fG_px7BM"
@@ -289,7 +341,7 @@
     {
       "cell_type": "code",
       "source": [
-        "options = PipelineOptions([])\n",
+        "options = PipelineOptions()\n",
         "with beam.Pipeline(options=options) as p:\n",
         "    _ = (p | beam.Create(data)\n",
         "        | \"Convert to Rows\" >> beam.Map(lambda x: 
beam.Row(f1=float(x))).with_output_types(beam.Row)\n",
@@ -302,11 +354,64 @@
         "    )"
       ],
       "metadata": {
-        "id": "ZaXkJeHqx58p"
+        "id": "ZaXkJeHqx58p",
+        "colab": {
+          "base_uri": "https://localhost:8080/";,
+          "height": 86
+        },
+        "outputId": "3192203c-f92f-40b7-b3e3-6a951d029f87"
       },
       "id": "ZaXkJeHqx58p",
-      "execution_count": null,
-      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
+      "execution_count": 7,
+      "outputs": [
+        {
+          "output_type": "display_data",
+          "data": {
+            "application/javascript": [
+              "\n",
+              "        if (typeof window.interactive_beam_jquery == 
'undefined') {\n",
+              "          var jqueryScript = 
document.createElement('script');\n",
+              "          jqueryScript.src = 
'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n",
+              "          jqueryScript.type = 'text/javascript';\n",
+              "          jqueryScript.onload = function() {\n",
+              "            var datatableScript = 
document.createElement('script');\n",
+              "            datatableScript.src = 
'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n",
+              "            datatableScript.type = 'text/javascript';\n",
+              "            datatableScript.onload = function() {\n",
+              "              window.interactive_beam_jquery = 
jQuery.noConflict(true);\n",
+              "              
window.interactive_beam_jquery(document).ready(function($){\n",
+              "                \n",
+              "              });\n",
+              "            }\n",
+              "            document.head.appendChild(datatableScript);\n",
+              "          };\n",
+              "          document.head.appendChild(jqueryScript);\n",
+              "        } else {\n",
+              "          
window.interactive_beam_jquery(document).ready(function($){\n",
+              "            \n",
+              "          });\n",
+              "        }"
+            ]
+          },
+          "metadata": {}
+        },
+        {
+          "output_type": "stream",
+          "name": "stderr",
+          "text": [
+            "WARNING:apache_beam.options.pipeline_options:Discarding 
unparseable args: ['-f', 
'/root/.local/share/jupyter/runtime/kernel-ad4fe005-8e82-4549-bac6-63e8e4b4d9c1.json']\n",
+            "WARNING:apache_beam.options.pipeline_options:Discarding 
unparseable args: ['-f', 
'/root/.local/share/jupyter/runtime/kernel-ad4fe005-8e82-4549-bac6-63e8e4b4d9c1.json']\n"
+          ]
+        },
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "(0, AnomalyResult(example=Row(f1=9.544331108822645), 
predictions=[AnomalyPrediction(model_id='ZScore', score=8.672319197619325, 
label=1, threshold=3, info='', source_predictions=None)]))\n",
+            "(0, AnomalyResult(example=Row(f1=9.388712735779308), 
predictions=[AnomalyPrediction(model_id='ZScore', score=7.32926235264911, 
label=1, threshold=3, info='', source_predictions=None)]))\n"
+          ]
+        }
+      ]
     },
     {
       "cell_type": "markdown",
@@ -325,17 +430,18 @@
         "id": "0064575d-5e60-4f8b-a970-9dc39db8d331"
       },
       "source": [
-        "### Generating Synthetic Data with Concept Drift"
+        "### Generating Synthetic Data with Concept Drift\n",
+        "This data generation process synthesizes a single data set (N=1000) 
composed of five distinct segments, each designed to simulate a specific 
distributional behavior or type of concept drift. After concatenating these 
segments, global outliers with a larger mean are injected to complete the 
dataset."
       ]
     },
     {
       "cell_type": "code",
-      "execution_count": null,
+      "execution_count": 8,
       "id": "37c1613e-e2ef-4f2c-8999-cce01563e180",
       "metadata": {
         "id": "37c1613e-e2ef-4f2c-8999-cce01563e180"
       },
-      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+      "outputs": [],
       "source": [
         "# The size of a segment in the synthetic data set. Each segment 
represents\n",
         "# a collection of data points generated from either a fixed 
distribution\n",
@@ -351,13 +457,13 @@
         "\n",
         "np.random.seed(seed)\n",
         "\n",
-        "# starting from a fixed distribution\n",
+        "# Starting from a fixed distribution\n",
         "data_seg1 = np.random.normal(loc=0, scale=1, size=seg_size)\n",
         "\n",
-        "# a sudden change between data_seg1 and data_seg2\n",
+        "# A sudden change between data_seg1 and data_seg2\n",
         "data_seg2 = np.random.normal(loc=3, scale=3, size=seg_size)\n",
         "\n",
-        "# a gradual change in data_seg3\n",
+        "# A gradual change in data_seg3\n",
         "data_seg3 = []\n",
         "for i in range(seg_size):\n",
         "    prob = 1 - 1.0 * i / seg_size\n",
@@ -368,7 +474,7 @@
         "        data_seg3.append(np.random.normal(loc=0, scale=1, size=1))\n",
         "data_seg3 = np.array(data_seg3).ravel()\n",
         "\n",
-        "# an incremental change in data_seg4\n",
+        "# An incremental change in data_seg4\n",
         "data_seg4 = []\n",
         "for i in range(seg_size):\n",
         "    loc = 0 + 3.0 * i / seg_size\n",
@@ -376,12 +482,13 @@
         "    data_seg4.append(np.random.normal(loc=loc, scale=scale, 
size=1))\n",
         "data_seg4 = np.array(data_seg4).ravel()\n",
         "\n",
-        "# back to a fixed distribution\n",
+        "# Back to a fixed distribution\n",
         "data_seg5 = np.random.normal(loc=3, scale=3, size=seg_size)\n",
         "\n",
+        "# Combining all segements\n",
         "data = np.concatenate((data_seg1, data_seg2, data_seg3, data_seg4, 
data_seg5))\n",
         "\n",
-        "# adding outliers\n",
+        "# Adding global outliers\n",
         "outlier_idx = np.random.choice(len(data), size=int(outlier_ratio * 
len(data)), replace = False)\n",
         "\n",
         "for idx in outlier_idx:\n",
@@ -390,14 +497,50 @@
         "df = pd.Series(data, name='f1')"
       ]
     },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "Run the following code to visualize the dataset on a scatter plot."
+      ],
+      "metadata": {
+        "id": "DWui3p_ouMPH"
+      },
+      "id": "DWui3p_ouMPH"
+    },
     {
       "cell_type": "code",
-      "execution_count": null,
+      "execution_count": 9,
       "id": "8e6f4f59-c6e5-4991-84d9-14eab18eb699",
       "metadata": {
-        "id": "8e6f4f59-c6e5-4991-84d9-14eab18eb699"
+        "id": "8e6f4f59-c6e5-4991-84d9-14eab18eb699",
+        "colab": {
+          "base_uri": "https://localhost:8080/";,
+          "height": 391
+        },
+        "outputId": "15203973-9b73-4697-843a-70a66097ce61"
       },
-      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+      "outputs": [
+        {
+          "output_type": "execute_result",
+          "data": {
+            "text/plain": [
+              "<matplotlib.collections.PathCollection at 0x7ef2fe753e90>"
+            ]
+          },
+          "metadata": {},
+          "execution_count": 9
+        },
+        {
+          "output_type": "display_data",
+          "data": {
+            "text/plain": [
+              "<Figure size 1200x400 with 1 Axes>"
+            ],
+            "image/png": 
"iVBORw0KGgoAAAANSUhEUgAAA+4AAAFlCAYAAAB1DLKMAAAAOnRFWHRTb2Z0d2FyZQBNYXRwbG90bGliIHZlcnNpb24zLjEwLjAsIGh0dHBzOi8vbWF0cGxvdGxpYi5vcmcvlHJYcgAAAAlwSFlzAAAPYQAAD2EBqD+naQAAakhJREFUeJzt3X+YHVWd4P9P/yCdn31Dk6RDhzQ2IwSVJESQECKyo0jMw7IojiiLbmQccSAMQliVzI4oM+MEYUFHDcLMrsA+K2KcBVlx9ftkQIOEEIExJKAEkMx0JiG/COmmE9JJ31vfP2Ld1D1dv+tU1am679fz8Gi6b997btWpc87n/GyxLMsSAAAAAABgpNa8EwAAAAAAALwRuAMAAAAAYDACdwAAAAAADEbgDgAAAACAwQjcAQAAAAAwGIE7AAAAAAAGI3AHAAAAAMBgBO4AAAAAABiMwB0AAAAAAI
 [...]
+          },
+          "metadata": {}
+        }
+      ],
       "source": [
         "plt.figure(figsize=(12, 4))\n",
         "plt.xlim(0, 1000)\n",
@@ -412,17 +555,18 @@
         "id": "32e7cdf4-a795-47d1-b5f1-9ae5e924a427"
       },
       "source": [
-        "### Setting Up Input/Output Pubsubs"
+        "### Setting Up Input/Output Pubsubs\n",
+        "Use the following code to create pubsub topics for input and output."
       ]
     },
     {
       "cell_type": "code",
-      "execution_count": null,
+      "execution_count": 10,
       "id": "11017009-f97e-4805-9cbb-6a9d4ddb68d3",
       "metadata": {
         "id": "11017009-f97e-4805-9cbb-6a9d4ddb68d3"
       },
-      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+      "outputs": [],
       "source": [
         "def create_topic_if_not_exists(project_id:str, topic_name:str, 
enable_message_ordering=False):\n",
         "    if enable_message_ordering:\n",
@@ -460,12 +604,27 @@
     },
     {
       "cell_type": "code",
-      "execution_count": null,
+      "execution_count": 11,
       "id": "66784c36-9f9e-410e-850b-3b8da29ff5ce",
       "metadata": {
-        "id": "66784c36-9f9e-410e-850b-3b8da29ff5ce"
+        "id": "66784c36-9f9e-410e-850b-3b8da29ff5ce",
+        "colab": {
+          "base_uri": "https://localhost:8080/";
+        },
+        "outputId": "e06dfbde-c92e-4b1f-a6c0-b7897cfe9343"
       },
-      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "Created topic: 
projects/apache-beam-testing/topics/anomaly-input-9625\n",
+            "Created subscription: 
projects/apache-beam-testing/subscriptions/anomaly-input-9625-sub\n",
+            "Created topic: 
projects/apache-beam-testing/topics/anomaly-output-9625\n",
+            "Created subscription: 
projects/apache-beam-testing/subscriptions/anomaly-output-9625-sub\n"
+          ]
+        }
+      ],
       "source": [
         "# for input data\n",
         "input_publisher = create_topic_if_not_exists(PROJECT_ID, INPUT_TOPIC, 
True)\n",
@@ -483,17 +642,30 @@
         "id": "dc4afa04-fb39-40cd-a8d7-f9d1c461648a"
       },
       "source": [
-        "### Publishing Input to Pub/Sub"
+        "### Publishing Input to Pub/Sub\n",
+        "To simulate a live data stream without blocking the execution, the 
following code starts a separate thread to publish the generated data to the 
input Pub/Sub topic."
       ]
     },
     {
       "cell_type": "code",
-      "execution_count": null,
+      "execution_count": 12,
       "id": "907f2469-1687-4ef3-bafd-9c4ec963b7e9",
       "metadata": {
-        "id": "907f2469-1687-4ef3-bafd-9c4ec963b7e9"
+        "id": "907f2469-1687-4ef3-bafd-9c4ec963b7e9",
+        "colab": {
+          "base_uri": "https://localhost:8080/";
+        },
+        "outputId": "c8b97419-e612-4f21-e579-af991d68289f"
       },
-      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "Started to publish data to anomaly-input-9625\n"
+          ]
+        }
+      ],
       "source": [
         "def publish_data(publisher, project_id: str, topic: str, data: 
Iterable[Any], delay=0.01, enable_message_ordering=False) -> None:\n",
         "    topic_path = publisher.topic_path(project_id, topic)\n",
@@ -519,7 +691,10 @@
     {
       "cell_type": "markdown",
       "source": [
-        "### Launching the Beam Pipeline"
+        "### Launching the Beam Pipeline\n",
+        "This pipeline adapts the core anomaly detection logic from the 
previous batch example for a real-time, streaming application. The key 
modification is in the I/O: instead of operating on a static collection, this 
pipeline reads its input stream from a Pub/Sub topic and writes the results to 
a separate output topic.\n",
+        "\n",
+        "Notice that the pipeline is run on a separate thread so later steps 
are not blocked."
       ],
       "metadata": {
         "id": "9RjcaxzDN5Tv"
@@ -528,13 +703,25 @@
     },
     {
       "cell_type": "code",
-      "execution_count": null,
+      "execution_count": 13,
       "id": "6e8b069d-9d94-4938-a87e-bd5f9f7620c0",
       "metadata": {
         "scrolled": true,
-        "id": "6e8b069d-9d94-4938-a87e-bd5f9f7620c0"
+        "id": "6e8b069d-9d94-4938-a87e-bd5f9f7620c0",
+        "colab": {
+          "base_uri": "https://localhost:8080/";
+        },
+        "outputId": "3fc8cace-ee6a-41d5-b262-64d09be82b01"
       },
-      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "Started to run beam pipeline for anomaly detection\n"
+          ]
+        }
+      ],
       "source": [
         "def message_to_beam_row(msg: bytes) -> beam.Row:\n",
         "    try:\n",
@@ -581,17 +768,18 @@
         "id": "1b785e34-a035-4148-9b58-f364ce0aed08"
       },
       "source": [
-        "### Collecting Results and Plotting"
+        "### Collecting Results and Plotting\n",
+        "To prepare for visualization, start another thread that retrieves 
output from the output pubsub topic."
       ]
     },
     {
       "cell_type": "code",
-      "execution_count": null,
+      "execution_count": 14,
       "id": "e4ca8af3-d74c-4d95-aeba-c34cb791525f",
       "metadata": {
         "id": "e4ca8af3-d74c-4d95-aeba-c34cb791525f"
       },
-      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+      "outputs": [],
       "source": [
         "x = []\n",
         "y = []\n",
@@ -600,12 +788,12 @@
     },
     {
       "cell_type": "code",
-      "execution_count": null,
+      "execution_count": 15,
       "id": "b6bf369f-2b20-4834-b457-e9b1f0a596ca",
       "metadata": {
         "id": "b6bf369f-2b20-4834-b457-e9b1f0a596ca"
       },
-      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+      "outputs": [],
       "source": [
         "def collect_result(subscriber):\n",
         "    subscription_path = 
pubsub_v1.SubscriberClient.subscription_path(PROJECT_ID, OUTPUT_SUB)\n",
@@ -639,19 +827,50 @@
         "result_thread.start()"
       ]
     },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "Run the following line to check how many output results are coming 
out from the output pubsub."
+      ],
+      "metadata": {
+        "id": "1oB5UnvR0onC"
+      },
+      "id": "1oB5UnvR0onC"
+    },
     {
       "cell_type": "code",
-      "execution_count": null,
+      "execution_count": 39,
       "id": "a3433ea1-70ae-408d-84b2-27118a3fd898",
       "metadata": {
-        "id": "a3433ea1-70ae-408d-84b2-27118a3fd898"
+        "id": "a3433ea1-70ae-408d-84b2-27118a3fd898",
+        "colab": {
+          "base_uri": "https://localhost:8080/";
+        },
+        "outputId": "194511f7-7939-4d09-ed16-ac7940c9959f"
       },
-      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "168\n"
+          ]
+        }
+      ],
       "source": [
-        "# Refresh this cell until we see results from the output pubsub\n",
         "print(len(x))"
       ]
     },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "This following code visualizes the streaming output by repeatedly 
generating an animation. It refreshes the visualization every 20 seconds to 
incorporate newly arrived results. Within each refresh, an new animated scatter 
plot is rendered, progressively drawing each data point to show the evolution 
of the stream.In these plots, outliers are highlighted in red.\n"
+      ],
+      "metadata": {
+        "id": "GMLXN3a11Imf"
+      },
+      "id": "GMLXN3a11Imf"
+    },
     {
       "cell_type": "code",
       "execution_count": null,
@@ -659,11 +878,10 @@
       "metadata": {
         "id": "31f24bfc-b91d-4e67-b804-732dc65e7525"
       },
-      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+      "outputs": [],
       "source": [
-        "# When we see the output, run this cell. It will generate a plot 
every 5 seconds\n",
-        "# to show how the data stream is processed.\n",
-        "for i in range (10):\n",
+        "# This will generate a plot every 20 seconds to show how the data 
stream is processed.\n",
+        "for i in range (5):\n",
         "  matplotlib.rcParams['animation.embed_limit'] = 300\n",
         "\n",
         "  data = np.array(list(zip(x,y)))\n",
@@ -684,7 +902,58 @@
         "\n",
         "  ani = matplotlib.animation.FuncAnimation(fig, animate, 
frames=int(len(x)/10), interval=50, repeat=False)\n",
         "  display(HTML(ani.to_jshtml()))\n",
-        "  time.sleep(5)"
+        "  time.sleep(20)"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "After all the data is processed, run the code below to draw the final 
scatterplot."
+      ],
+      "metadata": {
+        "id": "TklGkLa2I8AN"
+      },
+      "id": "TklGkLa2I8AN"
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "plt.figure(figsize=(12, 4))\n",
+        "plt.xlim(0, 1000)\n",
+        "plt.ylim(-10, 20)\n",
+        "plt.scatter(x=x, y=y, c=c, s=3)"
+      ],
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/";,
+          "height": 391
+        },
+        "id": "pSHS7AWDIiw-",
+        "outputId": "e8caae20-6a63-4e18-b0a4-abf9229d7830"
+      },
+      "id": "pSHS7AWDIiw-",
+      "execution_count": 42,
+      "outputs": [
+        {
+          "output_type": "execute_result",
+          "data": {
+            "text/plain": [
+              "<matplotlib.collections.PathCollection at 0x7ef2eca5f1d0>"
+            ]
+          },
+          "metadata": {},
+          "execution_count": 42
+        },
+        {
+          "output_type": "display_data",
+          "data": {
+            "text/plain": [
+              "<Figure size 1200x400 with 1 Axes>"
+            ],
+            "image/png": 
"iVBORw0KGgoAAAANSUhEUgAAA+4AAAFlCAYAAAB1DLKMAAAAOnRFWHRTb2Z0d2FyZQBNYXRwbG90bGliIHZlcnNpb24zLjEwLjAsIGh0dHBzOi8vbWF0cGxvdGxpYi5vcmcvlHJYcgAAAAlwSFlzAAAPYQAAD2EBqD+naQABAABJREFUeJzs3Xdc1fX3wPHX5zIVEBBBRVFx77333ntr7tQyrZyVWZlZ+dXSzDRHqam5c48cufdC3ANRFJWh7D3u/fz+uD+vkci+XNDz7MGjC/czzgXkfs7n/X6fo6iqqiKEEEIIIYQQQogcSWPqAIQQQgghhBBCCPF6krgLIYQQQgghhBA5mCTuQgghhBBCCCFEDiaJuxBCCCGEEEIIkYNJ4i6EEEIIIYQQQuRgkrgLIYQQQgghhBA5mCTuQgghhBBCCCFEDiaJuxBCCCGEEEIIkYNJ4i6EEEIIIYQQQu
 [...]
+          },
+          "metadata": {}
+        }
       ]
     },
     {
@@ -699,12 +968,25 @@
     },
     {
       "cell_type": "code",
-      "execution_count": null,
+      "execution_count": 43,
       "id": "350b8b1a-3010-4ecd-924f-010308bb5eb2",
       "metadata": {
-        "id": "350b8b1a-3010-4ecd-924f-010308bb5eb2"
+        "id": "350b8b1a-3010-4ecd-924f-010308bb5eb2",
+        "colab": {
+          "base_uri": "https://localhost:8080/";
+        },
+        "outputId": "f57e4602-8817-4694-813f-91cce4cc673c"
       },
-      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "Input subscription deleted: 
projects/apache-beam-testing/subscriptions/anomaly-input-9625-sub.\n",
+            "Output subscription deleted: 
projects/apache-beam-testing/subscriptions/anomaly-output-9625-sub.\n"
+          ]
+        }
+      ],
       "source": [
         "# deleting input and output subscriptions\n",
         "subscriber = pubsub_v1.SubscriberClient()\n",
@@ -726,12 +1008,25 @@
     },
     {
       "cell_type": "code",
-      "execution_count": null,
+      "execution_count": 44,
       "id": "10dc95cf-94ab-4a51-882b-88559340d4d2",
       "metadata": {
-        "id": "10dc95cf-94ab-4a51-882b-88559340d4d2"
+        "id": "10dc95cf-94ab-4a51-882b-88559340d4d2",
+        "colab": {
+          "base_uri": "https://localhost:8080/";
+        },
+        "outputId": "53f46c20-dc28-4a14-8c69-95b44fda5933"
       },
-      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "Input topic deleted: 
projects/apache-beam-testing/topics/anomaly-input-9625\n",
+            "Output topic deleted: 
projects/apache-beam-testing/topics/anomaly-output-9625\n"
+          ]
+        }
+      ],
       "source": [
         "# deleting input and output topics\n",
         "publisher = pubsub_v1.PublisherClient()\n",
@@ -776,4 +1071,4 @@
   },
   "nbformat": 4,
   "nbformat_minor": 5
-}
+}
\ No newline at end of file

Reply via email to