This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ce2e0f2122d Polish anomaly detection notebook and get ready to be
imported in public devsite. (#35278)
ce2e0f2122d is described below
commit ce2e0f2122d492e1e2fc1184088ee58d4e8b7d3c
Author: Shunping Huang <[email protected]>
AuthorDate: Fri Jun 13 13:17:10 2025 -0400
Polish anomaly detection notebook and get ready to be imported in public
devsite. (#35278)
* Polish anomaly detection zscore notebook for public doc.
* Adjust formatting.
* Adjust formatting.
---
.../anomaly_detection_zscore.ipynb | 449 +++++++++++++++++----
1 file changed, 372 insertions(+), 77 deletions(-)
diff --git
a/examples/notebooks/beam-ml/anomaly_detection/anomaly_detection_zscore.ipynb
b/examples/notebooks/beam-ml/anomaly_detection/anomaly_detection_zscore.ipynb
index 1cf91b544b2..cc3951a882b 100644
---
a/examples/notebooks/beam-ml/anomaly_detection/anomaly_detection_zscore.ipynb
+++
b/examples/notebooks/beam-ml/anomaly_detection/anomaly_detection_zscore.ipynb
@@ -8,7 +8,7 @@
"cellView": "form",
"id": "2d79fe3a-952b-478f-ba78-44cafddc91d1"
},
- "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+ "outputs": [],
"source": [
"# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
"\n",
@@ -52,17 +52,18 @@
{
"cell_type": "markdown",
"source": [
- "This notebook demonstrates how to perform anomaly detection on both
batch and streaming data using the `AnomalyDetection` PTransform. This feature
was introduced in Apache Beam version 2.64.0.\n",
+ "This notebook demonstrates how to perform anomaly detection on both
batch and streaming data using the `AnomalyDetection` PTransform:\n",
"\n",
- "This notebook is divided into two main sections:\n",
- "1. Batch Anomaly Detection: We will first generate a synthetic
univariate dataset containing outliers. We will then apply the
`AnomalyDetection` PTransform, configured to use the Z-Score algorithm, to this
batch data. The outlier results (scores and labels) will be logged directly.\n",
+ "1. **Batch Anomaly Detection**: This section focuses on processing a
static dataset. A synthetic univariate dataset containing outliers is
generated. Subsequently, the AnomalyDetection PTransform, utilizing the Z-Score
algorithm, is applied to identify and log the outliers.\n",
"\n",
- "1. Streaming Anomaly Detection with Concept Drift: We will generate
another synthetic univariate dataset that not only includes outliers but also
incorporates various types of concept drift (i.e., changes in the underlying
data distribution over time). This data will be published to a Pub/Sub topic to
simulate a real-time streaming input. An Apache Beam pipeline will then:\n",
- " - Read the data from this input Pub/Sub topic.\n",
- " - Apply the AnomalyDetection PTransform using the Z-Score algorithm
within a sliding window to calculate anomaly scores and assign labels.\n",
- " - Publish these results (the original data along with their anomaly
scores and labels) to a second Pub/Sub topic.\n",
+ "2. **Streaming Anomaly Detection with Concept Drift**: This section
simulates a real-time environment where the data distribution changes over
time. A synthetic dataset incorporating both outliers and concept drift is
published to a Pub/Sub topic. An Apache Beam pipeline is configured to:\n",
"\n",
- " Finally, we will visualize the labeled data points in an animated
plot to observe the detection performance in a streaming context with concept
drift."
+ " * Read the streaming data from the input Pub/Sub topic.\n",
+ " * Apply the AnomalyDetection PTransform within a sliding
window.\n",
+ " * Publish the enriched results (original data, anomaly scores,
and labels) to an output Pub/Sub topic.\n",
+ " \n",
+ " Finally, the labeled data points are visulaized in a series of
plots to observe the detection performance in a streaming context with concept
drift.\n",
+ "\n"
],
"metadata": {
"id": "pIlokenR1vs7"
@@ -83,15 +84,14 @@
{
"cell_type": "code",
"source": [
- "! pip install apache_beam[interactive,gcp]>=2.64.0 --quiet"
+ "! pip install 'apache_beam[interactive,gcp]>=2.64.0' --quiet"
],
"metadata": {
- "collapsed": true,
"id": "SafqA6dALKvo"
},
"id": "SafqA6dALKvo",
"execution_count": null,
- "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
+ "outputs": []
},
{
"cell_type": "markdown",
@@ -105,14 +105,14 @@
},
{
"cell_type": "code",
- "execution_count": null,
+ "execution_count": 2,
"id": "8fb71376-b0eb-474b-ab51-2161dfa60e2d",
"metadata": {
"id": "8fb71376-b0eb-474b-ab51-2161dfa60e2d"
},
- "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+ "outputs": [],
"source": [
- "# Imports Required for the notebook\n",
+ "# Import required dependencies for the notebook\n",
"import json\n",
"import os\n",
"import random\n",
@@ -163,12 +163,12 @@
},
{
"cell_type": "code",
- "execution_count": null,
+ "execution_count": 3,
"id": "29cda7f0-a24e-4e74-ba6e-166413ab532c",
"metadata": {
"id": "29cda7f0-a24e-4e74-ba6e-166413ab532c"
},
- "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+ "outputs": [],
"source": [
"# GCP-related constant are listed below\n",
"\n",
@@ -206,8 +206,8 @@
"id": "51jml7JvMpbD"
},
"id": "51jml7JvMpbD",
- "execution_count": null,
- "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
+ "execution_count": 4,
+ "outputs": []
},
{
"cell_type": "markdown",
@@ -222,7 +222,13 @@
{
"cell_type": "markdown",
"source": [
- "### Generating Synthetic Data with Outliers"
+ "### Generating Synthetic Data with Outliers\n",
+ "This process synthesizes a dataset (N=200) for anomaly detection. The
generation consists of two key steps:\n",
+ "\n",
+ "- A base dataset is generated from a standard normal distribution
(μ=0,σ=1).\n",
+ "- Global outliers are introduced by replacing 1% of these points with
values drawn from a normal distribution with a significant mean shift
(μ=9,σ=1).\n",
+ "\n",
+ "A fixed random seed is used to ensure reproducibility."
],
"metadata": {
"id": "tj9wPNIZxcf2"
@@ -258,8 +264,18 @@
"id": "S4hqN5tPxm-n"
},
"id": "S4hqN5tPxm-n",
- "execution_count": null,
- "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
+ "execution_count": 5,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "Run the following code to visualize the dataset on a scatter plot."
+ ],
+ "metadata": {
+ "id": "KeCNv5m4mx4G"
+ },
+ "id": "KeCNv5m4mx4G"
},
{
"cell_type": "code",
@@ -270,16 +286,52 @@
"plt.scatter(x=range(len(df)), y=df, s=10)"
],
"metadata": {
- "id": "IUD3giMzyxer"
+ "id": "IUD3giMzyxer",
+ "colab": {
+ "base_uri": "https://localhost:8080/",
+ "height": 391
+ },
+ "outputId": "bbf8d53c-068c-447c-ec6b-b899c0585b80"
},
"id": "IUD3giMzyxer",
- "execution_count": null,
- "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
+ "execution_count": 6,
+ "outputs": [
+ {
+ "output_type": "execute_result",
+ "data": {
+ "text/plain": [
+ "<matplotlib.collections.PathCollection at 0x7ef3027b49d0>"
+ ]
+ },
+ "metadata": {},
+ "execution_count": 6
+ },
+ {
+ "output_type": "display_data",
+ "data": {
+ "text/plain": [
+ "<Figure size 1200x400 with 1 Axes>"
+ ],
+ "image/png":
"iVBORw0KGgoAAAANSUhEUgAAA+oAAAFlCAYAAAB85xL2AAAAOnRFWHRTb2Z0d2FyZQBNYXRwbG90bGliIHZlcnNpb24zLjEwLjAsIGh0dHBzOi8vbWF0cGxvdGxpYi5vcmcvlHJYcgAAAAlwSFlzAAAPYQAAD2EBqD+naQAANlZJREFUeJzt3X2QHVWdP/7PBMjwlJkQ8jDJkmCCCMhDDCjZ+IRIipBiFYR1lWVXcFmUGFQIKsRSFHY1CFXoV4uFrfqJsOWKrvUTLHFli8ewLgHlIV9XhBRgJLBkAgQyA0EmIenfH/vLmJncmbn3Tvftc++8XlWpytzHc/v06T7v06e727IsywIAAABIwriyCwAAAAD8iaAOAAAACRHUAQAAICGCOgAAACREUAcAAICECOoAAACQEEEdAAAAEiKoAwAAQEIEdQAAAEiIoA4AAAAJKTSor1ixIt7xjnfEhA
[...]
+ },
+ "metadata": {}
+ }
+ ]
},
{
"cell_type": "markdown",
"source": [
- "### Run the Beam Pipeline on the Batch Data"
+ "### Run the Beam Pipeline on the Batch Data\n",
+ "\n",
+ "The following Beam pipeline implements an anomaly detection workflow
on batch data. It executes the following steps in sequence:\n",
+ "\n",
+ "- **Ingest and Format**: The pipeline begins by ingesting a
collection of numerical data and converting each number into a `beam.Row`.\n",
+ "\n",
+ "- **Key for Stateful Processing**: A single global key is assigned to
every element. This ensures all data is processed by a single instance of the
downstream stateful transform.\n",
+ "\n",
+ "- **Anomaly Detection**: The `AnomalyDetection` PTransform is applied
to the keyed data.\n",
+ "\n",
+ "- **Log Outliers**: A `Filter` transform inspects the prediction
output from the detector, retaining only the elements flagged as anomalies
(label == 1). These outlier records are then logged for inspection or
downstream action."
],
"metadata": {
"id": "_JV5fG_px7BM"
@@ -289,7 +341,7 @@
{
"cell_type": "code",
"source": [
- "options = PipelineOptions([])\n",
+ "options = PipelineOptions()\n",
"with beam.Pipeline(options=options) as p:\n",
" _ = (p | beam.Create(data)\n",
" | \"Convert to Rows\" >> beam.Map(lambda x:
beam.Row(f1=float(x))).with_output_types(beam.Row)\n",
@@ -302,11 +354,64 @@
" )"
],
"metadata": {
- "id": "ZaXkJeHqx58p"
+ "id": "ZaXkJeHqx58p",
+ "colab": {
+ "base_uri": "https://localhost:8080/",
+ "height": 86
+ },
+ "outputId": "3192203c-f92f-40b7-b3e3-6a951d029f87"
},
"id": "ZaXkJeHqx58p",
- "execution_count": null,
- "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
+ "execution_count": 7,
+ "outputs": [
+ {
+ "output_type": "display_data",
+ "data": {
+ "application/javascript": [
+ "\n",
+ " if (typeof window.interactive_beam_jquery ==
'undefined') {\n",
+ " var jqueryScript =
document.createElement('script');\n",
+ " jqueryScript.src =
'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n",
+ " jqueryScript.type = 'text/javascript';\n",
+ " jqueryScript.onload = function() {\n",
+ " var datatableScript =
document.createElement('script');\n",
+ " datatableScript.src =
'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n",
+ " datatableScript.type = 'text/javascript';\n",
+ " datatableScript.onload = function() {\n",
+ " window.interactive_beam_jquery =
jQuery.noConflict(true);\n",
+ "
window.interactive_beam_jquery(document).ready(function($){\n",
+ " \n",
+ " });\n",
+ " }\n",
+ " document.head.appendChild(datatableScript);\n",
+ " };\n",
+ " document.head.appendChild(jqueryScript);\n",
+ " } else {\n",
+ "
window.interactive_beam_jquery(document).ready(function($){\n",
+ " \n",
+ " });\n",
+ " }"
+ ]
+ },
+ "metadata": {}
+ },
+ {
+ "output_type": "stream",
+ "name": "stderr",
+ "text": [
+ "WARNING:apache_beam.options.pipeline_options:Discarding
unparseable args: ['-f',
'/root/.local/share/jupyter/runtime/kernel-ad4fe005-8e82-4549-bac6-63e8e4b4d9c1.json']\n",
+ "WARNING:apache_beam.options.pipeline_options:Discarding
unparseable args: ['-f',
'/root/.local/share/jupyter/runtime/kernel-ad4fe005-8e82-4549-bac6-63e8e4b4d9c1.json']\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ "(0, AnomalyResult(example=Row(f1=9.544331108822645),
predictions=[AnomalyPrediction(model_id='ZScore', score=8.672319197619325,
label=1, threshold=3, info='', source_predictions=None)]))\n",
+ "(0, AnomalyResult(example=Row(f1=9.388712735779308),
predictions=[AnomalyPrediction(model_id='ZScore', score=7.32926235264911,
label=1, threshold=3, info='', source_predictions=None)]))\n"
+ ]
+ }
+ ]
},
{
"cell_type": "markdown",
@@ -325,17 +430,18 @@
"id": "0064575d-5e60-4f8b-a970-9dc39db8d331"
},
"source": [
- "### Generating Synthetic Data with Concept Drift"
+ "### Generating Synthetic Data with Concept Drift\n",
+ "This data generation process synthesizes a single data set (N=1000)
composed of five distinct segments, each designed to simulate a specific
distributional behavior or type of concept drift. After concatenating these
segments, global outliers with a larger mean are injected to complete the
dataset."
]
},
{
"cell_type": "code",
- "execution_count": null,
+ "execution_count": 8,
"id": "37c1613e-e2ef-4f2c-8999-cce01563e180",
"metadata": {
"id": "37c1613e-e2ef-4f2c-8999-cce01563e180"
},
- "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+ "outputs": [],
"source": [
"# The size of a segment in the synthetic data set. Each segment
represents\n",
"# a collection of data points generated from either a fixed
distribution\n",
@@ -351,13 +457,13 @@
"\n",
"np.random.seed(seed)\n",
"\n",
- "# starting from a fixed distribution\n",
+ "# Starting from a fixed distribution\n",
"data_seg1 = np.random.normal(loc=0, scale=1, size=seg_size)\n",
"\n",
- "# a sudden change between data_seg1 and data_seg2\n",
+ "# A sudden change between data_seg1 and data_seg2\n",
"data_seg2 = np.random.normal(loc=3, scale=3, size=seg_size)\n",
"\n",
- "# a gradual change in data_seg3\n",
+ "# A gradual change in data_seg3\n",
"data_seg3 = []\n",
"for i in range(seg_size):\n",
" prob = 1 - 1.0 * i / seg_size\n",
@@ -368,7 +474,7 @@
" data_seg3.append(np.random.normal(loc=0, scale=1, size=1))\n",
"data_seg3 = np.array(data_seg3).ravel()\n",
"\n",
- "# an incremental change in data_seg4\n",
+ "# An incremental change in data_seg4\n",
"data_seg4 = []\n",
"for i in range(seg_size):\n",
" loc = 0 + 3.0 * i / seg_size\n",
@@ -376,12 +482,13 @@
" data_seg4.append(np.random.normal(loc=loc, scale=scale,
size=1))\n",
"data_seg4 = np.array(data_seg4).ravel()\n",
"\n",
- "# back to a fixed distribution\n",
+ "# Back to a fixed distribution\n",
"data_seg5 = np.random.normal(loc=3, scale=3, size=seg_size)\n",
"\n",
+ "# Combining all segements\n",
"data = np.concatenate((data_seg1, data_seg2, data_seg3, data_seg4,
data_seg5))\n",
"\n",
- "# adding outliers\n",
+ "# Adding global outliers\n",
"outlier_idx = np.random.choice(len(data), size=int(outlier_ratio *
len(data)), replace = False)\n",
"\n",
"for idx in outlier_idx:\n",
@@ -390,14 +497,50 @@
"df = pd.Series(data, name='f1')"
]
},
+ {
+ "cell_type": "markdown",
+ "source": [
+ "Run the following code to visualize the dataset on a scatter plot."
+ ],
+ "metadata": {
+ "id": "DWui3p_ouMPH"
+ },
+ "id": "DWui3p_ouMPH"
+ },
{
"cell_type": "code",
- "execution_count": null,
+ "execution_count": 9,
"id": "8e6f4f59-c6e5-4991-84d9-14eab18eb699",
"metadata": {
- "id": "8e6f4f59-c6e5-4991-84d9-14eab18eb699"
+ "id": "8e6f4f59-c6e5-4991-84d9-14eab18eb699",
+ "colab": {
+ "base_uri": "https://localhost:8080/",
+ "height": 391
+ },
+ "outputId": "15203973-9b73-4697-843a-70a66097ce61"
},
- "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+ "outputs": [
+ {
+ "output_type": "execute_result",
+ "data": {
+ "text/plain": [
+ "<matplotlib.collections.PathCollection at 0x7ef2fe753e90>"
+ ]
+ },
+ "metadata": {},
+ "execution_count": 9
+ },
+ {
+ "output_type": "display_data",
+ "data": {
+ "text/plain": [
+ "<Figure size 1200x400 with 1 Axes>"
+ ],
+ "image/png":
"iVBORw0KGgoAAAANSUhEUgAAA+4AAAFlCAYAAAB1DLKMAAAAOnRFWHRTb2Z0d2FyZQBNYXRwbG90bGliIHZlcnNpb24zLjEwLjAsIGh0dHBzOi8vbWF0cGxvdGxpYi5vcmcvlHJYcgAAAAlwSFlzAAAPYQAAD2EBqD+naQAAakhJREFUeJzt3X+YHVWd4P9P/yCdn31Dk6RDhzQ2IwSVJESQECKyo0jMw7IojiiLbmQccSAMQliVzI4oM+MEYUFHDcLMrsA+K2KcBVlx9ftkQIOEEIExJKAEkMx0JiG/COmmE9JJ31vfP2Ld1D1dv+tU1am679fz8Gi6b997btWpc87n/GyxLMsSAAAAAABgpNa8EwAAAAAAALwRuAMAAAAAYDACdwAAAAAADEbgDgAAAACAwQjcAQAAAAAwGIE7AAAAAAAGI3AHAAAAAMBgBO4AAAAAABiMwB0AAAAAAI
[...]
+ },
+ "metadata": {}
+ }
+ ],
"source": [
"plt.figure(figsize=(12, 4))\n",
"plt.xlim(0, 1000)\n",
@@ -412,17 +555,18 @@
"id": "32e7cdf4-a795-47d1-b5f1-9ae5e924a427"
},
"source": [
- "### Setting Up Input/Output Pubsubs"
+ "### Setting Up Input/Output Pubsubs\n",
+ "Use the following code to create pubsub topics for input and output."
]
},
{
"cell_type": "code",
- "execution_count": null,
+ "execution_count": 10,
"id": "11017009-f97e-4805-9cbb-6a9d4ddb68d3",
"metadata": {
"id": "11017009-f97e-4805-9cbb-6a9d4ddb68d3"
},
- "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+ "outputs": [],
"source": [
"def create_topic_if_not_exists(project_id:str, topic_name:str,
enable_message_ordering=False):\n",
" if enable_message_ordering:\n",
@@ -460,12 +604,27 @@
},
{
"cell_type": "code",
- "execution_count": null,
+ "execution_count": 11,
"id": "66784c36-9f9e-410e-850b-3b8da29ff5ce",
"metadata": {
- "id": "66784c36-9f9e-410e-850b-3b8da29ff5ce"
+ "id": "66784c36-9f9e-410e-850b-3b8da29ff5ce",
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "outputId": "e06dfbde-c92e-4b1f-a6c0-b7897cfe9343"
},
- "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ "Created topic:
projects/apache-beam-testing/topics/anomaly-input-9625\n",
+ "Created subscription:
projects/apache-beam-testing/subscriptions/anomaly-input-9625-sub\n",
+ "Created topic:
projects/apache-beam-testing/topics/anomaly-output-9625\n",
+ "Created subscription:
projects/apache-beam-testing/subscriptions/anomaly-output-9625-sub\n"
+ ]
+ }
+ ],
"source": [
"# for input data\n",
"input_publisher = create_topic_if_not_exists(PROJECT_ID, INPUT_TOPIC,
True)\n",
@@ -483,17 +642,30 @@
"id": "dc4afa04-fb39-40cd-a8d7-f9d1c461648a"
},
"source": [
- "### Publishing Input to Pub/Sub"
+ "### Publishing Input to Pub/Sub\n",
+ "To simulate a live data stream without blocking the execution, the
following code starts a separate thread to publish the generated data to the
input Pub/Sub topic."
]
},
{
"cell_type": "code",
- "execution_count": null,
+ "execution_count": 12,
"id": "907f2469-1687-4ef3-bafd-9c4ec963b7e9",
"metadata": {
- "id": "907f2469-1687-4ef3-bafd-9c4ec963b7e9"
+ "id": "907f2469-1687-4ef3-bafd-9c4ec963b7e9",
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "outputId": "c8b97419-e612-4f21-e579-af991d68289f"
},
- "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ "Started to publish data to anomaly-input-9625\n"
+ ]
+ }
+ ],
"source": [
"def publish_data(publisher, project_id: str, topic: str, data:
Iterable[Any], delay=0.01, enable_message_ordering=False) -> None:\n",
" topic_path = publisher.topic_path(project_id, topic)\n",
@@ -519,7 +691,10 @@
{
"cell_type": "markdown",
"source": [
- "### Launching the Beam Pipeline"
+ "### Launching the Beam Pipeline\n",
+ "This pipeline adapts the core anomaly detection logic from the
previous batch example for a real-time, streaming application. The key
modification is in the I/O: instead of operating on a static collection, this
pipeline reads its input stream from a Pub/Sub topic and writes the results to
a separate output topic.\n",
+ "\n",
+ "Notice that the pipeline is run on a separate thread so later steps
are not blocked."
],
"metadata": {
"id": "9RjcaxzDN5Tv"
@@ -528,13 +703,25 @@
},
{
"cell_type": "code",
- "execution_count": null,
+ "execution_count": 13,
"id": "6e8b069d-9d94-4938-a87e-bd5f9f7620c0",
"metadata": {
"scrolled": true,
- "id": "6e8b069d-9d94-4938-a87e-bd5f9f7620c0"
+ "id": "6e8b069d-9d94-4938-a87e-bd5f9f7620c0",
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "outputId": "3fc8cace-ee6a-41d5-b262-64d09be82b01"
},
- "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ "Started to run beam pipeline for anomaly detection\n"
+ ]
+ }
+ ],
"source": [
"def message_to_beam_row(msg: bytes) -> beam.Row:\n",
" try:\n",
@@ -581,17 +768,18 @@
"id": "1b785e34-a035-4148-9b58-f364ce0aed08"
},
"source": [
- "### Collecting Results and Plotting"
+ "### Collecting Results and Plotting\n",
+ "To prepare for visualization, start another thread that retrieves
output from the output pubsub topic."
]
},
{
"cell_type": "code",
- "execution_count": null,
+ "execution_count": 14,
"id": "e4ca8af3-d74c-4d95-aeba-c34cb791525f",
"metadata": {
"id": "e4ca8af3-d74c-4d95-aeba-c34cb791525f"
},
- "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+ "outputs": [],
"source": [
"x = []\n",
"y = []\n",
@@ -600,12 +788,12 @@
},
{
"cell_type": "code",
- "execution_count": null,
+ "execution_count": 15,
"id": "b6bf369f-2b20-4834-b457-e9b1f0a596ca",
"metadata": {
"id": "b6bf369f-2b20-4834-b457-e9b1f0a596ca"
},
- "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+ "outputs": [],
"source": [
"def collect_result(subscriber):\n",
" subscription_path =
pubsub_v1.SubscriberClient.subscription_path(PROJECT_ID, OUTPUT_SUB)\n",
@@ -639,19 +827,50 @@
"result_thread.start()"
]
},
+ {
+ "cell_type": "markdown",
+ "source": [
+ "Run the following line to check how many output results are coming
out from the output pubsub."
+ ],
+ "metadata": {
+ "id": "1oB5UnvR0onC"
+ },
+ "id": "1oB5UnvR0onC"
+ },
{
"cell_type": "code",
- "execution_count": null,
+ "execution_count": 39,
"id": "a3433ea1-70ae-408d-84b2-27118a3fd898",
"metadata": {
- "id": "a3433ea1-70ae-408d-84b2-27118a3fd898"
+ "id": "a3433ea1-70ae-408d-84b2-27118a3fd898",
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "outputId": "194511f7-7939-4d09-ed16-ac7940c9959f"
},
- "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ "168\n"
+ ]
+ }
+ ],
"source": [
- "# Refresh this cell until we see results from the output pubsub\n",
"print(len(x))"
]
},
+ {
+ "cell_type": "markdown",
+ "source": [
+ "This following code visualizes the streaming output by repeatedly
generating an animation. It refreshes the visualization every 20 seconds to
incorporate newly arrived results. Within each refresh, an new animated scatter
plot is rendered, progressively drawing each data point to show the evolution
of the stream.In these plots, outliers are highlighted in red.\n"
+ ],
+ "metadata": {
+ "id": "GMLXN3a11Imf"
+ },
+ "id": "GMLXN3a11Imf"
+ },
{
"cell_type": "code",
"execution_count": null,
@@ -659,11 +878,10 @@
"metadata": {
"id": "31f24bfc-b91d-4e67-b804-732dc65e7525"
},
- "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+ "outputs": [],
"source": [
- "# When we see the output, run this cell. It will generate a plot
every 5 seconds\n",
- "# to show how the data stream is processed.\n",
- "for i in range (10):\n",
+ "# This will generate a plot every 20 seconds to show how the data
stream is processed.\n",
+ "for i in range (5):\n",
" matplotlib.rcParams['animation.embed_limit'] = 300\n",
"\n",
" data = np.array(list(zip(x,y)))\n",
@@ -684,7 +902,58 @@
"\n",
" ani = matplotlib.animation.FuncAnimation(fig, animate,
frames=int(len(x)/10), interval=50, repeat=False)\n",
" display(HTML(ani.to_jshtml()))\n",
- " time.sleep(5)"
+ " time.sleep(20)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "After all the data is processed, run the code below to draw the final
scatterplot."
+ ],
+ "metadata": {
+ "id": "TklGkLa2I8AN"
+ },
+ "id": "TklGkLa2I8AN"
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "plt.figure(figsize=(12, 4))\n",
+ "plt.xlim(0, 1000)\n",
+ "plt.ylim(-10, 20)\n",
+ "plt.scatter(x=x, y=y, c=c, s=3)"
+ ],
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/",
+ "height": 391
+ },
+ "id": "pSHS7AWDIiw-",
+ "outputId": "e8caae20-6a63-4e18-b0a4-abf9229d7830"
+ },
+ "id": "pSHS7AWDIiw-",
+ "execution_count": 42,
+ "outputs": [
+ {
+ "output_type": "execute_result",
+ "data": {
+ "text/plain": [
+ "<matplotlib.collections.PathCollection at 0x7ef2eca5f1d0>"
+ ]
+ },
+ "metadata": {},
+ "execution_count": 42
+ },
+ {
+ "output_type": "display_data",
+ "data": {
+ "text/plain": [
+ "<Figure size 1200x400 with 1 Axes>"
+ ],
+ "image/png":
"iVBORw0KGgoAAAANSUhEUgAAA+4AAAFlCAYAAAB1DLKMAAAAOnRFWHRTb2Z0d2FyZQBNYXRwbG90bGliIHZlcnNpb24zLjEwLjAsIGh0dHBzOi8vbWF0cGxvdGxpYi5vcmcvlHJYcgAAAAlwSFlzAAAPYQAAD2EBqD+naQABAABJREFUeJzs3Xdc1fX3wPHX5zIVEBBBRVFx77333ntr7tQyrZyVWZlZ+dXSzDRHqam5c48cufdC3ANRFJWh7D3u/fz+uD+vkci+XNDz7MGjC/czzgXkfs7n/X6fo6iqqiKEEEIIIYQQQogcSWPqAIQQQgghhBBCCPF6krgLIYQQQgghhBA5mCTuQgghhBBCCCFEDiaJuxBCCCGEEEIIkYNJ4i6EEEIIIYQQQuRgkrgLIYQQQgghhBA5mCTuQgghhBBCCCFEDiaJuxBCCCGEEEIIkYNJ4i6EEEIIIYQQQu
[...]
+ },
+ "metadata": {}
+ }
]
},
{
@@ -699,12 +968,25 @@
},
{
"cell_type": "code",
- "execution_count": null,
+ "execution_count": 43,
"id": "350b8b1a-3010-4ecd-924f-010308bb5eb2",
"metadata": {
- "id": "350b8b1a-3010-4ecd-924f-010308bb5eb2"
+ "id": "350b8b1a-3010-4ecd-924f-010308bb5eb2",
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "outputId": "f57e4602-8817-4694-813f-91cce4cc673c"
},
- "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ "Input subscription deleted:
projects/apache-beam-testing/subscriptions/anomaly-input-9625-sub.\n",
+ "Output subscription deleted:
projects/apache-beam-testing/subscriptions/anomaly-output-9625-sub.\n"
+ ]
+ }
+ ],
"source": [
"# deleting input and output subscriptions\n",
"subscriber = pubsub_v1.SubscriberClient()\n",
@@ -726,12 +1008,25 @@
},
{
"cell_type": "code",
- "execution_count": null,
+ "execution_count": 44,
"id": "10dc95cf-94ab-4a51-882b-88559340d4d2",
"metadata": {
- "id": "10dc95cf-94ab-4a51-882b-88559340d4d2"
+ "id": "10dc95cf-94ab-4a51-882b-88559340d4d2",
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "outputId": "53f46c20-dc28-4a14-8c69-95b44fda5933"
},
- "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ "Input topic deleted:
projects/apache-beam-testing/topics/anomaly-input-9625\n",
+ "Output topic deleted:
projects/apache-beam-testing/topics/anomaly-output-9625\n"
+ ]
+ }
+ ],
"source": [
"# deleting input and output topics\n",
"publisher = pubsub_v1.PublisherClient()\n",
@@ -776,4 +1071,4 @@
},
"nbformat": 4,
"nbformat_minor": 5
-}
+}
\ No newline at end of file