This is an automated email from the ASF dual-hosted git repository.
git-site-role pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/asf-site by this push:
new d6df8ed668b Publishing website 2022/11/22 22:16:55 at commit 67e2008
d6df8ed668b is described below
commit d6df8ed668b92ea0e392699e7bb0b83ed1df2030
Author: jenkins <[email protected]>
AuthorDate: Tue Nov 22 22:16:56 2022 +0000
Publishing website 2022/11/22 22:16:55 at commit 67e2008
---
website/generated-content/documentation/index.xml | 117 +++++++++++----------
.../documentation/ml/anomaly-detection/index.html | 26 ++---
.../documentation/ml/data-processing/index.html | 6 +-
.../ml/multi-model-pipelines/index.html | 36 +++----
.../documentation/ml/online-clustering/index.html | 16 +--
.../documentation/ml/orchestration/index.html | 35 +++---
.../documentation/ml/overview/index.html | 3 +-
.../ml/runinference-metrics/index.html | 6 +-
website/generated-content/sitemap.xml | 2 +-
9 files changed, 125 insertions(+), 122 deletions(-)
diff --git a/website/generated-content/documentation/index.xml
b/website/generated-content/documentation/index.xml
index 9605c47dec9..50e9b3aaf4c 100644
--- a/website/generated-content/documentation/index.xml
+++ b/website/generated-content/documentation/index.xml
@@ -357,10 +357,10 @@ See the License for the specific language governing
permissions and
limitations under the License.
-->
<h1 id="anomaly-detection-example">Anomaly Detection Example</h1>
-<p>The AnomalyDetection example demonstrates how to setup an anomaly
detection pipeline that reads text from PubSub in real-time, and then detects
anomaly using a trained HDBSCAN clustering model.</p>
-<h3 id="dataset-for-anomaly-detection">Dataset for Anomaly Detection</h3>
-<p>For the example, we use a dataset called <a
href="https://huggingface.co/datasets/emotion">emotion</a>. It comprises of
20,000 English Twitter messages with 6 basic emotions: anger, fear, joy, love,
sadness, and surprise. The dataset has three splits: train (for training),
validation and test (for performance evaluation). It is a supervised dataset as
it contains the text and the category (class) of the dataset. This dataset can
easily be accessed using <a href="https://hu [...]
-<p>To have a better understanding of the dataset, here are some examples
from the train split of the dataset:</p>
+<p>The anomaly detection example demonstrates how to set up an anomaly
detection pipeline that reads text from Pub/Sub in real time, and then detects
anomalies using a trained HDBSCAN clustering model.</p>
+<h2 id="dataset-for-anomaly-detection">Dataset for Anomaly Detection</h2>
+<p>This example uses a dataset called <a
href="https://huggingface.co/datasets/emotion">emotion</a> that contains
20,000 English Twitter messages with 6 basic emotions: anger, fear, joy, love,
sadness, and surprise. The dataset has three splits: train (for training),
validation, and test (for performance evaluation). Because it contains the text
and the category (class) of the dataset, it is a supervised dataset. You can
use the <a href="https://huggingface.co/docs/datasets/i [...]
+<p>The following text shows examples from the train split of the
dataset:</p>
<table>
<thead>
<tr>
@@ -395,12 +395,12 @@ limitations under the License.
</tr>
</tbody>
</table>
-<h3 id="anomaly-detection-algorithm">Anomaly Detection Algorithm</h3>
-<p><a
href="https://hdbscan.readthedocs.io/en/latest/how_hdbscan_works.html">HDBSCAN</a>
is a clustering algorithm which extends DBSCAN by converting it into a
hierarchical clustering algorithm, and then using a technique to extract a flat
clustering based in the stability of clusters. Once trained, the model will
predict -1 if a new data point is an outlier, otherwise it will predict one of
the existing clusters.</p>
-<h2 id="ingestion-to-pubsub">Ingestion to PubSub</h2>
-<p>We first ingest the data into <a
href="https://cloud.google.com/pubsub/docs/overview">PubSub</a> so that
while clustering we can read the tweets from PubSub. PubSub is a messaging
service for exchanging event data among applications and services. It is used
for streaming analytics and data integration pipelines to ingest and distribute
data.</p>
-<p>The full example code for ingesting data to PubSub can be found <a
href="https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference/anomaly_detection/write_data_to_pubsub_pipeline/">here</a></p>
-<p>The file structure for ingestion pipeline is:</p>
+<h2 id="anomaly-detection-algorithm">Anomaly Detection Algorithm</h2>
+<p><a
href="https://hdbscan.readthedocs.io/en/latest/how_hdbscan_works.html">HDBSCAN</a>
is a clustering algorithm that extends DBSCAN by converting it into a
hierarchical clustering algorithm and then extracting a flat clustering based
in the stability of clusters. When trained, the model predicts
<code>-1</code> if a new data point is an outlier, otherwise it predicts
one of the existing clusters.</p>
+<h2 id="ingestion-to-pubsub">Ingestion to Pub/Sub</h2>
+<p>Ingest the data into <a
href="https://cloud.google.com/pubsub/docs/overview">Pub/Sub</a> so that
while clustering, the model can read the tweets from Pub/Sub. Pub/Sub is a
messaging service for exchanging event data among applications and services.
Streaming analytics and data integration pipelines use Pub/Sub to ingest and
distribute data.</p>
+<p>You can see the full example code for ingesting data into Pub/Sub in
<a
href="https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference/anomaly_detection/write_data_to_pubsub_pipeline/">GitHub</a></p>
+<p>The file structure for the ingestion pipeline is shown in the following
diagram:</p>
<pre><code>write_data_to_pubsub_pipeline/
├── pipeline/
│ ├── __init__.py
@@ -411,28 +411,28 @@ limitations under the License.
├── main.py
└── setup.py
</code></pre>
-<p><code>pipeline/utils.py</code> contains the code for loading the
emotion dataset and two <code>beam.DoFn</code> that are used for data
transformation</p>
-<p><code>pipeline/options.py</code> contains the pipeline options to
configure the Dataflow pipeline</p>
-<p><code>config.py</code> defines some variables like GCP PROJECT_ID,
NUM_WORKERS that are used multiple times</p>
-<p><code>setup.py</code> defines the packages/requirements for the
pipeline to run</p>
-<p><code>main.py</code> contains the pipeline code and some
additional functions used for running the pipeline</p>
-<h3 id="how-to-run-the-pipeline-">How to Run the Pipeline ?</h3>
-<p>First, make sure you have installed the required packages. One should
have access to a Google Cloud Project and then correctly configure the GCP
variables like <code>PROJECT_ID</code>, <code>REGION</code>,
<code>PubSub TOPIC_ID</code> and others in
<code>config.py</code>.</p>
+<p><code>pipeline/utils.py</code> contains the code for loading the
emotion dataset and two <code>beam.DoFn</code> that are used for data
transformation.</p>
+<p><code>pipeline/options.py</code> contains the pipeline options to
configure the Dataflow pipeline.</p>
+<p><code>config.py</code> defines variables that are used multiple
times, like Google Cloud PROJECT_ID and NUM_WORKERS.</p>
+<p><code>setup.py</code> defines the packages and requirements for
the pipeline to run.</p>
+<p><code>main.py</code> contains the pipeline code and additional
functions used for running the pipeline.</p>
+<h3 id="run-the-pipeline">Run the Pipeline</h3>
+<p>To run the pipeline, install the required packages.For this example, you
need access to a Google Cloud project, and you need to configure the Google
Cloud variables, like <code>PROJECT_ID</code>, <code>REGION</code>,
<code>PubSub TOPIC_ID</code>, and others in the
<code>config.py</code> file.</p>
<ol>
<li>Locally on your machine: <code>python main.py</code></li>
<li>On GCP for Dataflow: <code>python main.py --mode
cloud</code></li>
</ol>
<p>The <code>write_data_to_pubsub_pipeline</code> contains four
different transforms:</p>
<ol>
-<li>Load emotion dataset using HuggingFace Datasets (we take samples from 3
classes instead of 6 for simplicity)</li>
-<li>Associate each text with a unique identifier (UID)</li>
-<li>Convert the text into a format PubSub is expecting</li>
-<li>Write the formatted message to PubSub</li>
+<li>Load the emotion dataset using Hugging Face datasets (for simplicity,
we take samples from three classes instead of six).</li>
+<li>Associate each piece of text with a unique identifier (UID).</li>
+<li>Convert the text into the format that Pub/Sub expects.</li>
+<li>Write the formatted message to Pub/Sub.</li>
</ol>
<h2 id="anomaly-detection-on-streaming-data">Anomaly Detection on Streaming
Data</h2>
-<p>After having the data ingested to PubSub, we can run the anomaly
detection pipeline. This pipeline reads the streaming message from PubSub,
converts the text to an embedding using a language model, and feeds the
embedding to an already trained clustering model to predict if the message is
anomaly or not. One prerequisite for this pipeline is to have a HDBSCAN
clustering model trained on the training split of the dataset.</p>
-<p>The full example code for anomaly detection can be found <a
href="https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference/anomaly_detection/anomaly_detection_pipeline/">here</a></p>
-<p>The file structure for anomaly_detection pipeline is:</p>
+<p>After ingesting the data to Pub/Sub, run the anomaly detection pipeline.
This pipeline reads the streaming message from Pub/Sub, converts the text to an
embedding using a language model, and feeds the embedding to an already trained
clustering model to predict whether the message is an anomaly. One prerequisite
for this pipeline is to have an HDBSCAN clustering model trained on the
training split of the dataset.</p>
+<p>You can find the full example code for anomaly detection in <a
href="https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference/anomaly_detection/anomaly_detection_pipeline/">GitHub</a></p>
+<p>The following diagram shows the file structure for the anomaly_detection
pipeline:</p>
<pre><code>anomaly_detection_pipeline/
├── pipeline/
│ ├── __init__.py
@@ -443,29 +443,29 @@ limitations under the License.
├── main.py
└── setup.py
</code></pre>
-<p><code>pipeline/transformations.py</code> contains the code for
different <code>beam.DoFn</code> and additional functions that are used
in pipeline</p>
-<p><code>pipeline/options.py</code> contains the pipeline options to
configure the Dataflow pipeline</p>
-<p><code>config.py</code> defines some variables like GCP PROJECT_ID,
NUM_WORKERS that are used multiple times</p>
-<p><code>setup.py</code> defines the packages/requirements for the
pipeline to run</p>
-<p><code>main.py</code> contains the pipeline code and some
additional functions used for running the pipeline</p>
-<h3 id="how-to-run-the-pipeline--1">How to Run the Pipeline ?</h3>
-<p>First, make sure you have installed the required packages and you have
pushed data to PubSub. One should have access to a Google Cloud Project and
then correctly configure the GCP variables like <code>PROJECT_ID</code>,
<code>REGION</code>, <code>PubSub SUBSCRIPTION_ID</code> and others
in <code>config.py</code>.</p>
+<p><code>pipeline/transformations.py</code> contains the code for
different <code>beam.DoFn</code> and additional functions that are used
in pipeline.</p>
+<p><code>pipeline/options.py</code> contains the pipeline options to
configure the Dataflow pipeline.</p>
+<p><code>config.py</code> defines variables that are used multiple
times, like the Google Cloud PROJECT_ID and NUM_WORKERS.</p>
+<p><code>setup.py</code> defines the packages and requirements for
the pipeline to run.</p>
+<p><code>main.py</code> contains the pipeline code and additional
functions used to run the pipeline.</p>
+<h3 id="run-the-pipeline-1">Run the Pipeline</h3>
+<p>Install the required packages and push the data to Pub/Sub. For this
example, you need access to a Google Cloud project, and you need to configure
the Google Cloud variables, like <code>PROJECT_ID</code>,
<code>REGION</code>, <code>PubSub SUBSCRIPTION_ID</code>, and
others in the <code>config.py</code> file.</p>
<ol>
<li>Locally on your machine: <code>python main.py</code></li>
<li>On GCP for Dataflow: <code>python main.py --mode
cloud</code></li>
</ol>
-<p>The pipeline can be broken down into few simple steps:</p>
+<p>The pipeline includes the following steps:</p>
<ol>
-<li>Reading the message from PubSub</li>
-<li>Converting the PubSub message into a PCollection of dictionaries where
the key is the UID and the value is the twitter text</li>
-<li>Encoding the text into transformer-readable token ID integers using a
tokenizer</li>
-<li>Using RunInference to get the vector embedding from a Transformer based
Language Model</li>
-<li>Normalizing the embedding</li>
-<li>Using RunInference to get anomaly prediction from a trained HDBSCAN
clustering model</li>
-<li>Writing the prediction to BQ, so that clustering model can be retrained
when needed</li>
-<li>Sending an email alert if anomaly is detected</li>
+<li>Read the message from Pub/Sub.</li>
+<li>Convert the Pub/Sub message into a <code>PCollection</code> of
dictionaries where the key is the UID and the value is the Twitter text.</li>
+<li>Encode the text into transformer-readable token ID integers using a
tokenizer.</li>
+<li>Use RunInference to get the vector embedding from a transformer-based
language model.</li>
+<li>Normalize the embedding.</li>
+<li>Use RunInference to get anomaly prediction from a trained HDBSCAN
clustering model.</li>
+<li>Write the prediction to BigQuery so that the clustering model can be
retrained when needed.</li>
+<li>Send an email alert if an anomaly is detected.</li>
</ol>
-<p>The code snippet for the first two steps of the pipeline:</p>
+<p>The following code snippet shows the first two steps of the
pipeline:</p>
<div class="snippet">
<div class="notebook-skip code-snippet without_switcher">
<a class="copy" type="button" data-bs-toggle="tooltip"
data-bs-placement="bottom" title="Copy to clipboard">
@@ -479,10 +479,15 @@ pipeline
)</code></pre>
</div>
</div>
-<p>We will now focus on important steps of pipeline: tokenizing the text,
getting embedding using RunInference and finally getting prediction from
HDBSCAN model.</p>
-<h3 id="getting-embedding-from-a-language-model">Getting Embedding from a
Language Model</h3>
-<p>In order to do clustering with text data, we first need to map the text
into vectors of numerical values suitable for statistical analysis. We use a
transformer based language model called <a
href="https://huggingface.co/sentence-transformers/stsb-distilbert-base">sentence-transformers/stsb-distilbert-base/stsb-distilbert-base</a>.
It maps sentences &amp; paragraphs to a 768 dimensional dense vector space
and can be used for tasks like clustering or semantic search. But, [...]
-<p>Tokenization is a preprocessing task that transforms text so that it can
be fed into the model for getting predictions.</p>
+<p>The next section describes the following pipeline steps:</p>
+<ul>
+<li>Tokenizing the text</li>
+<li>Getting embedding using RunInference</li>
+<li>Getting predictions from the HDBSCAN model</li>
+</ul>
+<h3 id="get-embedding-from-a-language-model">Get Embedding from a Language
Model</h3>
+<p>In order to do clustering with text data, first map the text into
vectors of numerical values suitable for statistical analysis. This example
uses a transformer-based language model called <a
href="https://huggingface.co/sentence-transformers/stsb-distilbert-base">sentence-transformers/stsb-distilbert-base/stsb-distilbert-base</a>.
This model maps sentences and paragraphs to a 768 dimensional dense vector
space, and you can use it for tasks like clustering or semantic search.</p>
+<p>Because the language model is expecting a tokenized input instead of raw
text, start by tokenizing the text. Tokenization is a preprocessing task that
transforms text so that it can be fed into the model for getting
predictions.</p>
<div class="snippet">
<div class="notebook-skip code-snippet without_switcher">
<a class="copy" type="button" data-bs-toggle="tooltip"
data-bs-placement="bottom" title="Copy to clipboard">
@@ -493,7 +498,7 @@ docs
| &#34;Tokenize Text&#34; &gt;&gt;
beam.Map(tokenize_sentence)</code></pre>
</div>
</div>
-<p>Here, <code>tokenize_sentence</code> is a function that takes a
dictionary with a text and an id, tokenizes the text, and returns a tuple of
the text and id and the tokenized output.</p>
+<p>Here, <code>tokenize_sentence</code> is a function that takes a
dictionary with a text and an ID, tokenizes the text, and returns a tuple of
the text and ID as well as the tokenized output.</p>
<div class="snippet">
<div class="notebook-skip code-snippet without_switcher">
<a class="copy" type="button" data-bs-toggle="tooltip"
data-bs-placement="bottom" title="Copy to clipboard">
@@ -515,7 +520,7 @@ tokens = {key: torch.squeeze(val) for key, val in
tokens.items()}
return (text, uid), tokens</code></pre>
</div>
</div>
-<p>Tokenized output is then passed to the language model for getting the
embeddings. For getting embeddings from language model, we use
<code>RunInference()</code> from beam.</p>
+<p>Tokenized output is then passed to the language model to get the
embeddings. To get embeddings from the language model, we use
<code>RunInference()</code> from Apache Beam.</p>
<p>
<div class="snippet">
<div class="notebook-skip code-snippet without_switcher">
@@ -525,7 +530,7 @@ return (text, uid), tokens</code></pre>
<pre><code> | &#34;Get Embedding&#34; &gt;&gt;
RunInference(KeyedModelHandler(embedding_model_handler))</code></pre>
</div>
</div>
-where <code>embedding_model_handler</code> is,</p>
+where <code>embedding_model_handler</code> is:</p>
<div class="snippet">
<div class="notebook-skip code-snippet without_switcher">
<a class="copy" type="button" data-bs-toggle="tooltip"
data-bs-placement="bottom" title="Copy to clipboard">
@@ -539,7 +544,7 @@ device=&#34;cpu&#34;,
)</code></pre>
</div>
</div>
-<p>We defined <code>PytorchNoBatchModelHandler</code> as a wrapper to
<code>PytorchModelHandler</code> to limit batch size to 1.</p>
+<p>We define <code>PytorchNoBatchModelHandler</code> as a wrapper to
<code>PytorchModelHandler</code> to limit batch size to one.</p>
<div class="snippet">
<div class="notebook-skip code-snippet without_switcher">
<a class="copy" type="button" data-bs-toggle="tooltip"
data-bs-placement="bottom" title="Copy to clipboard">
@@ -558,7 +563,7 @@ def batch_elements_kwargs(self):
return {&#34;max_batch_size&#34;: 1}</code></pre>
</div>
</div>
-<p>We custom defined the model_class <code>ModelWrapper</code> to get
the vector embedding as the <code>forward()</code> for
<code>DistilBertModel</code> doesn&rsquo;t return the
embeddings.</p>
+<p>Because the <code>forward()</code> for
<code>DistilBertModel</code> doesn&rsquo;t return the embeddings, we
custom define the model_class <code>ModelWrapper</code> to get the vector
embedding.</p>
<div class="snippet">
<div class="notebook-skip code-snippet without_switcher">
<a class="copy" type="button" data-bs-toggle="tooltip"
data-bs-placement="bottom" title="Copy to clipboard">
@@ -592,7 +597,7 @@ return torch.sum(token_embeddings * input_mask_expanded, 1)
/ torch.clamp(
input_mask_expanded.sum(1), min=1e-9)</code></pre>
</div>
</div>
-<p>After getting the embedding for each twitter text, the embeddings are
normalized as the trained model is expecting normalized embeddings.</p>
+<p>After getting the embedding for each piece of Twitter text, the
embeddings are normalized, because the trained model is expecting normalized
embeddings.</p>
<div class="snippet">
<div class="notebook-skip code-snippet without_switcher">
<a class="copy" type="button" data-bs-toggle="tooltip"
data-bs-placement="bottom" title="Copy to clipboard">
@@ -601,8 +606,8 @@ input_mask_expanded.sum(1), min=1e-9)</code></pre>
<pre><code> | &#34;Normalize Embedding&#34; &gt;&gt;
beam.ParDo(NormalizeEmbedding())</code></pre>
</div>
</div>
-<h3 id="getting-prediction">Getting Prediction</h3>
-<p>The normalized embeddings are then forwarded to the trained HDBSCAN
model for getting the predictions.</p>
+<h3 id="get-predictions">Get Predictions</h3>
+<p>The normalized embeddings are then forwarded to the trained HDBSCAN
model to get the predictions.</p>
<div class="snippet">
<div class="notebook-skip code-snippet without_switcher">
<a class="copy" type="button" data-bs-toggle="tooltip"
data-bs-placement="bottom" title="Copy to clipboard">
@@ -615,7 +620,7 @@ normalized_embedding
)</code></pre>
</div>
</div>
-<p>where <code>clustering_model_handler</code> is</p>
+<p>where <code>clustering_model_handler</code> is:</p>
<div class="snippet">
<div class="notebook-skip code-snippet without_switcher">
<a class="copy" type="button" data-bs-toggle="tooltip"
data-bs-placement="bottom" title="Copy to clipboard">
@@ -628,7 +633,7 @@ model_uri=cfg.CLUSTERING_MODEL_PATH,
model_file_type=ModelFileType.JOBLIB
)</code></pre>
</div>
</div>
-<p>We defined <code>CustomSklearnModelHandlerNumpy</code> as a
wrapper to <code>SklearnModelHandlerNumpy</code> to limit batch size to 1
and to override the <code>run_inference</code> so that
<code>hdbscan.approximate_predict()</code> is used for getting anomaly
predictions.</p>
+<p>We define <code>CustomSklearnModelHandlerNumpy</code> as a wrapper
to <code>SklearnModelHandlerNumpy</code> to limit batch size to one and
to override <code>run_inference</code> so that
<code>hdbscan.approximate_predict()</code> is used to get anomaly
predictions.</p>
<div class="snippet">
<div class="notebook-skip code-snippet without_switcher">
<a class="copy" type="button" data-bs-toggle="tooltip"
data-bs-placement="bottom" title="Copy to clipboard">
@@ -657,7 +662,7 @@ predictions = hdbscan.approximate_predict(model,
vectorized_batch)
return [PredictionResult(x, y) for x, y in zip(batch,
predictions)]</code></pre>
</div>
</div>
-<p>After getting the model predictions, we first the decode the output from
<code>RunInference</code> into a dictionary. Afterwards, we take two
different actions: i) store the prediction in a BigQuery table for analysis and
updating HDBSCAN model and ii) send email alert if prediction is an
anomaly.</p>
+<p>After getting the model predictions, decode the output from
<code>RunInference</code> into a dictionary. Next, store the prediction
in a BigQuery table for analysis, update the HDBSCAN model, and send an email
alert if the prediction is an anomaly.</p>
<div class="snippet">
<div class="notebook-skip code-snippet without_switcher">
<a class="copy" type="button" data-bs-toggle="tooltip"
data-bs-placement="bottom" title="Copy to clipboard">
diff --git
a/website/generated-content/documentation/ml/anomaly-detection/index.html
b/website/generated-content/documentation/ml/anomaly-detection/index.html
index 80a612e1882..f6c10942974 100644
--- a/website/generated-content/documentation/ml/anomaly-detection/index.html
+++ b/website/generated-content/documentation/ml/anomaly-detection/index.html
@@ -19,7 +19,7 @@
function addPlaceholder(){$('input:text').attr('placeholder',"What are you
looking for?");}
function endSearch(){var
search=document.querySelector(".searchBar");search.classList.add("disappear");var
icons=document.querySelector("#iconsBar");icons.classList.remove("disappear");}
function blockScroll(){$("body").toggleClass("fixedPosition");}
-function openMenu(){addPlaceholder();blockScroll();}</script><div
class="clearfix container-main-content"><div class="section-nav closed"
data-offset-top=90 data-offset-bottom=500><span class="section-nav-back
glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list
data-section-nav><li><span
class=section-nav-list-main-title>Documentation</span></li><li><a
href=/documentation>Using the Documentation</a></li><li
class=section-nav-item--collapsible><span class=section-nav-lis [...]
+function openMenu(){addPlaceholder();blockScroll();}</script><div
class="clearfix container-main-content"><div class="section-nav closed"
data-offset-top=90 data-offset-bottom=500><span class="section-nav-back
glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list
data-section-nav><li><span
class=section-nav-list-main-title>Documentation</span></li><li><a
href=/documentation>Using the Documentation</a></li><li
class=section-nav-item--collapsible><span class=section-nav-lis [...]
├── pipeline/
│ ├── __init__.py
│ ├── options.py
@@ -28,7 +28,7 @@ function
openMenu(){addPlaceholder();blockScroll();}</script><div class="clearfi
├── config.py
├── main.py
└── setup.py
-</code></pre><p><code>pipeline/utils.py</code> contains the code for loading
the emotion dataset and two <code>beam.DoFn</code> that are used for data
transformation</p><p><code>pipeline/options.py</code> contains the pipeline
options to configure the Dataflow pipeline</p><p><code>config.py</code> defines
some variables like GCP PROJECT_ID, NUM_WORKERS that are used multiple
times</p><p><code>setup.py</code> defines the packages/requirements for the
pipeline to run</p><p><code>main.py</c [...]
+</code></pre><p><code>pipeline/utils.py</code> contains the code for loading
the emotion dataset and two <code>beam.DoFn</code> that are used for data
transformation.</p><p><code>pipeline/options.py</code> contains the pipeline
options to configure the Dataflow pipeline.</p><p><code>config.py</code>
defines variables that are used multiple times, like Google Cloud PROJECT_ID
and NUM_WORKERS.</p><p><code>setup.py</code> defines the packages and
requirements for the pipeline to run.</p><p> [...]
├── pipeline/
│ ├── __init__.py
│ ├── options.py
@@ -37,14 +37,14 @@ function
openMenu(){addPlaceholder();blockScroll();}</script><div class="clearfi
├── config.py
├── main.py
└── setup.py
-</code></pre><p><code>pipeline/transformations.py</code> contains the code for
different <code>beam.DoFn</code> and additional functions that are used in
pipeline</p><p><code>pipeline/options.py</code> contains the pipeline options
to configure the Dataflow pipeline</p><p><code>config.py</code> defines some
variables like GCP PROJECT_ID, NUM_WORKERS that are used multiple
times</p><p><code>setup.py</code> defines the packages/requirements for the
pipeline to run</p><p><code>main.py</code [...]
+</code></pre><p><code>pipeline/transformations.py</code> contains the code for
different <code>beam.DoFn</code> and additional functions that are used in
pipeline.</p><p><code>pipeline/options.py</code> contains the pipeline options
to configure the Dataflow pipeline.</p><p><code>config.py</code> defines
variables that are used multiple times, like the Google Cloud PROJECT_ID and
NUM_WORKERS.</p><p><code>setup.py</code> defines the packages and requirements
for the pipeline to run.</p><p [...]
pipeline
| "Read from PubSub"
>> ReadFromPubSub(subscription=cfg.SUBSCRIPTION_ID,
with_attributes=True)
| "Decode PubSubMessage" >> beam.ParDo(Decode())
- )</code></pre></div></div><p>We will now focus on important steps of
pipeline: tokenizing the text, getting embedding using RunInference and finally
getting prediction from HDBSCAN model.</p><h3
id=getting-embedding-from-a-language-model>Getting Embedding from a Language
Model</h3><p>In order to do clustering with text data, we first need to map the
text into vectors of numerical values suitable for statistical analysis. We use
a transformer based language model called <a href=https: [...]
+ )</code></pre></div></div><p>The next section describes the following
pipeline steps:</p><ul><li>Tokenizing the text</li><li>Getting embedding using
RunInference</li><li>Getting predictions from the HDBSCAN model</li></ul><h3
id=get-embedding-from-a-language-model>Get Embedding from a Language
Model</h3><p>In order to do clustering with text data, first map the text into
vectors of numerical values suitable for statistical analysis. This example
uses a transformer-based language mode [...]
docs
- | "Tokenize Text" >>
beam.Map(tokenize_sentence)</code></pre></div></div><p>Here,
<code>tokenize_sentence</code> is a function that takes a dictionary with a
text and an id, tokenizes the text, and returns a tuple of the text and id and
the tokenized output.</p><div class=snippet><div class="notebook-skip
code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip
data-bs-placement=bottom title="Copy to clipboard"><img
src=/images/copy-icon.svg>< [...]
+ | "Tokenize Text" >>
beam.Map(tokenize_sentence)</code></pre></div></div><p>Here,
<code>tokenize_sentence</code> is a function that takes a dictionary with a
text and an ID, tokenizes the text, and returns a tuple of the text and ID as
well as the tokenized output.</p><div class=snippet><div class="notebook-skip
code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip
data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-ico
[...]
def tokenize_sentence(input_dict):
@@ -61,12 +61,12 @@ def tokenize_sentence(input_dict):
text, uid = input_dict["text"], input_dict["id"]
tokens = Tokenizer([text], padding=True, truncation=True,
return_tensors="pt")
tokens = {key: torch.squeeze(val) for key, val in tokens.items()}
- return (text, uid), tokens</code></pre></div></div><p>Tokenized output is
then passed to the language model for getting the embeddings. For getting
embeddings from language model, we use <code>RunInference()</code> from
beam.</p><p><div class=snippet><div class="notebook-skip code-snippet
without_switcher"><a class=copy type=button data-bs-toggle=tooltip
data-bs-placement=bottom title="Copy to clipboard"><img
src=/images/copy-icon.svg></a><pre><code> | "Get Embedding" >&g
[...]
+ return (text, uid), tokens</code></pre></div></div><p>Tokenized output is
then passed to the language model to get the embeddings. To get embeddings from
the language model, we use <code>RunInference()</code> from Apache
Beam.</p><p><div class=snippet><div class="notebook-skip code-snippet
without_switcher"><a class=copy type=button data-bs-toggle=tooltip
data-bs-placement=bottom title="Copy to clipboard"><img
src=/images/copy-icon.svg></a><pre><code> | "Get Embedding" >&
[...]
state_dict_path=cfg.MODEL_STATE_DICT_PATH,
model_class=ModelWrapper,
model_params={"config":
AutoConfig.from_pretrained(cfg.MODEL_CONFIG_PATH)},
device="cpu",
- )</code></pre></div></div><p>We defined
<code>PytorchNoBatchModelHandler</code> as a wrapper to
<code>PytorchModelHandler</code> to limit batch size to 1.</p><div
class=snippet><div class="notebook-skip code-snippet without_switcher"><a
class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom
title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre><code># Can
be removed once: https://github.com/apache/beam/issues/21863 is fixed
+ )</code></pre></div></div><p>We define
<code>PytorchNoBatchModelHandler</code> as a wrapper to
<code>PytorchModelHandler</code> to limit batch size to one.</p><div
class=snippet><div class="notebook-skip code-snippet without_switcher"><a
class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom
title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre><code># Can
be removed once: https://github.com/apache/beam/issues/21863 is fixed
class PytorchNoBatchModelHandler(PytorchModelHandlerKeyedTensor):
"""Wrapper to PytorchModelHandler to limit batch size to 1.
The tokenized strings generated from BertTokenizer may have different
@@ -76,7 +76,7 @@ class
PytorchNoBatchModelHandler(PytorchModelHandlerKeyedTensor):
in the run_inference() call.
"""
def batch_elements_kwargs(self):
- return {"max_batch_size": 1}</code></pre></div></div><p>We custom
defined the model_class <code>ModelWrapper</code> to get the vector embedding
as the <code>forward()</code> for <code>DistilBertModel</code> doesn’t
return the embeddings.</p><div class=snippet><div class="notebook-skip
code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip
data-bs-placement=bottom title="Copy to clipboard"><img
src=/images/copy-icon.svg></a><pre><code>class Model [...]
+ return {"max_batch_size": 1}</code></pre></div></div><p>Because
the <code>forward()</code> for <code>DistilBertModel</code> doesn’t
return the embeddings, we custom define the model_class
<code>ModelWrapper</code> to get the vector embedding.</p><div
class=snippet><div class="notebook-skip code-snippet without_switcher"><a
class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom
title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre><code>class
[...]
"""Wrapper to DistilBertModel to get embeddings when calling
forward function."""
def forward(self, **kwargs):
@@ -104,15 +104,15 @@ class
PytorchNoBatchModelHandler(PytorchModelHandlerKeyedTensor):
input_mask_expanded = (
attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float())
return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(
- input_mask_expanded.sum(1), min=1e-9)</code></pre></div></div><p>After
getting the embedding for each twitter text, the embeddings are normalized as
the trained model is expecting normalized embeddings.</p><div
class=snippet><div class="notebook-skip code-snippet without_switcher"><a
class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom
title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre><code> |
"Normalize Embedding" >> beam.ParDo( [...]
+ input_mask_expanded.sum(1), min=1e-9)</code></pre></div></div><p>After
getting the embedding for each piece of Twitter text, the embeddings are
normalized, because the trained model is expecting normalized
embeddings.</p><div class=snippet><div class="notebook-skip code-snippet
without_switcher"><a class=copy type=button data-bs-toggle=tooltip
data-bs-placement=bottom title="Copy to clipboard"><img
src=/images/copy-icon.svg></a><pre><code> | "Normalize Embedding"
>& [...]
normalized_embedding
| "Get Prediction from Clustering Model"
>> RunInference(model_handler=clustering_model_handler)
- )</code></pre></div></div><p>where <code>clustering_model_handler</code>
is</p><div class=snippet><div class="notebook-skip code-snippet
without_switcher"><a class=copy type=button data-bs-toggle=tooltip
data-bs-placement=bottom title="Copy to clipboard"><img
src=/images/copy-icon.svg></a><pre><code> clustering_model_handler =
KeyedModelHandler(
+ )</code></pre></div></div><p>where <code>clustering_model_handler</code>
is:</p><div class=snippet><div class="notebook-skip code-snippet
without_switcher"><a class=copy type=button data-bs-toggle=tooltip
data-bs-placement=bottom title="Copy to clipboard"><img
src=/images/copy-icon.svg></a><pre><code> clustering_model_handler =
KeyedModelHandler(
CustomSklearnModelHandlerNumpy(
model_uri=cfg.CLUSTERING_MODEL_PATH,
model_file_type=ModelFileType.JOBLIB
)
- )</code></pre></div></div><p>We defined
<code>CustomSklearnModelHandlerNumpy</code> as a wrapper to
<code>SklearnModelHandlerNumpy</code> to limit batch size to 1 and to override
the <code>run_inference</code> so that
<code>hdbscan.approximate_predict()</code> is used for getting anomaly
predictions.</p><div class=snippet><div class="notebook-skip code-snippet
without_switcher"><a class=copy type=button data-bs-toggle=tooltip
data-bs-placement=bottom title="Copy to clipboard"><img sr [...]
+ )</code></pre></div></div><p>We define
<code>CustomSklearnModelHandlerNumpy</code> as a wrapper to
<code>SklearnModelHandlerNumpy</code> to limit batch size to one and to
override <code>run_inference</code> so that
<code>hdbscan.approximate_predict()</code> is used to get anomaly
predictions.</p><div class=snippet><div class="notebook-skip code-snippet
without_switcher"><a class=copy type=button data-bs-toggle=tooltip
data-bs-placement=bottom title="Copy to clipboard"><img src=/image [...]
# limit batch size to 1 can be removed once:
https://github.com/apache/beam/issues/21863 is fixed
def batch_elements_kwargs(self):
"""Limit batch size to 1 for inference"""
@@ -135,7 +135,7 @@ class
PytorchNoBatchModelHandler(PytorchModelHandlerKeyedTensor):
_validate_inference_args(inference_args)
vectorized_batch = np.vstack(batch)
predictions = hdbscan.approximate_predict(model, vectorized_batch)
- return [PredictionResult(x, y) for x, y in zip(batch,
predictions)]</code></pre></div></div><p>After getting the model predictions,
we first the decode the output from <code>RunInference</code> into a
dictionary. Afterwards, we take two different actions: i) store the prediction
in a BigQuery table for analysis and updating HDBSCAN model and ii) send email
alert if prediction is an anomaly.</p><div class=snippet><div
class="notebook-skip code-snippet without_switcher"><a class=copy t [...]
+ return [PredictionResult(x, y) for x, y in zip(batch,
predictions)]</code></pre></div></div><p>After getting the model predictions,
decode the output from <code>RunInference</code> into a dictionary. Next, store
the prediction in a BigQuery table for analysis, update the HDBSCAN model, and
send an email alert if the prediction is an anomaly.</p><div class=snippet><div
class="notebook-skip code-snippet without_switcher"><a class=copy type=button
data-bs-toggle=tooltip data-bs-placemen [...]
predictions
| "Decode Prediction" >> beam.ParDo(DecodePrediction())
| "Write to BQ" >> beam.io.WriteToBigQuery(
@@ -145,7 +145,7 @@ class
PytorchNoBatchModelHandler(PytorchModelHandlerKeyedTensor):
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
))
- _ = predictions | "Alert by Email" >>
beam.ParDo(TriggerEmailAlert())</code></pre></div></div><div class=feedback><p
class=update>Last updated on 2022/10/20</p><h3>Have you found everything you
were looking for?</h3><p class=description>Was it all useful and clear? Is
there anything that you would like to change? Let us know!</p><button
class=load-button><a href="mailto:[email protected]?subject=Beam Website
Feedback">SEND FEEDBACK</a></button></div></div></div><foote [...]
+ _ = predictions | "Alert by Email" >>
beam.ParDo(TriggerEmailAlert())</code></pre></div></div><div class=feedback><p
class=update>Last updated on 2022/11/22</p><h3>Have you found everything you
were looking for?</h3><p class=description>Was it all useful and clear? Is
there anything that you would like to change? Let us know!</p><button
class=load-button><a href="mailto:[email protected]?subject=Beam Website
Feedback">SEND FEEDBACK</a></button></div></div></div><foote [...]
<a href=https://www.apache.org>The Apache Software Foundation</a>
| <a href=/privacy_policy>Privacy Policy</a>
| <a href=/feed.xml>RSS Feed</a><br><br>Apache Beam, Apache, Beam, the Beam
logo, and the Apache feather logo are either registered trademarks or
trademarks of The Apache Software Foundation. All other products or name brands
are trademarks of their respective holders, including The Apache Software
Foundation.</div></div><div class="footer__cols__col
footer__cols__col__logos"><div class=footer__cols__col--group><div
class=footer__cols__col__logo><a href=https://github.com/apache/beam><im [...]
\ No newline at end of file
diff --git
a/website/generated-content/documentation/ml/data-processing/index.html
b/website/generated-content/documentation/ml/data-processing/index.html
index fe71e8cc9ac..1822f4e85cb 100644
--- a/website/generated-content/documentation/ml/data-processing/index.html
+++ b/website/generated-content/documentation/ml/data-processing/index.html
@@ -19,7 +19,7 @@
function addPlaceholder(){$('input:text').attr('placeholder',"What are you
looking for?");}
function endSearch(){var
search=document.querySelector(".searchBar");search.classList.add("disappear");var
icons=document.querySelector("#iconsBar");icons.classList.remove("disappear");}
function blockScroll(){$("body").toggleClass("fixedPosition");}
-function openMenu(){addPlaceholder();blockScroll();}</script><div
class="clearfix container-main-content"><div class="section-nav closed"
data-offset-top=90 data-offset-bottom=500><span class="section-nav-back
glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list
data-section-nav><li><span
class=section-nav-list-main-title>Documentation</span></li><li><a
href=/documentation>Using the Documentation</a></li><li
class=section-nav-item--collapsible><span class=section-nav-lis [...]
+function openMenu(){addPlaceholder();blockScroll();}</script><div
class="clearfix container-main-content"><div class="section-nav closed"
data-offset-top=90 data-offset-bottom=500><span class="section-nav-back
glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list
data-section-nav><li><span
class=section-nav-list-main-title>Documentation</span></li><li><a
href=/documentation>Using the Documentation</a></li><li
class=section-nav-item--collapsible><span class=section-nav-lis [...]
from apache_beam.runners.interactive.interactive_runner import
InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
@@ -34,7 +34,7 @@ ib.collect(beam_df.describe())
# Investigate missing values
ib.collect(beam_df.isnull())
-</code></pre><p>For a full end-to-end example on how to implement data
exploration and data preprocessing with Beam and the DataFrame API for your
AI/ML project, you can follow the <a
href=https://github.com/apache/beam/tree/master/examples/notebooks/beam-ml/dataframe_api_preprocessing.ipynb>Beam
Dataframe API tutorial for AI/ML</a>.</p><h2 id=data-pipeline-for-ml>Data
pipeline for ML</h2><p>A typical data preprocessing pipeline consists of the
following steps:</p><ol><li>Reading and wri [...]
+</code></pre><p>For a full end-to-end example that implements data exploration
and data preprocessing with Apache Beam and the DataFrame API for your AI/ML
project, see the <a
href=https://github.com/apache/beam/tree/master/examples/notebooks/beam-ml/dataframe_api_preprocessing.ipynb>Beam
Dataframe API tutorial for AI/ML</a>.</p><h2 id=data-pipeline-for-ml>Data
pipeline for ML</h2><p>A typical data preprocessing pipeline consists of the
following steps:</p><ol><li>Read and write data: Re [...]
from apache_beam.metrics import Metrics
with beam.Pipeline() as pipeline:
@@ -83,7 +83,7 @@ with beam.Pipeline() as pipeline:
# Write data
output_data | beam.io.WriteToText('output.csv')
-</code></pre><div class=feedback><p class=update>Last updated on
2022/11/03</p><h3>Have you found everything you were looking for?</h3><p
class=description>Was it all useful and clear? Is there anything that you would
like to change? Let us know!</p><button class=load-button><a
href="mailto:[email protected]?subject=Beam Website Feedback">SEND
FEEDBACK</a></button></div></div></div><footer class=footer><div
class=footer__contained><div class=footer__cols><div class="footer__cols__col f
[...]
+</code></pre><div class=feedback><p class=update>Last updated on
2022/11/22</p><h3>Have you found everything you were looking for?</h3><p
class=description>Was it all useful and clear? Is there anything that you would
like to change? Let us know!</p><button class=load-button><a
href="mailto:[email protected]?subject=Beam Website Feedback">SEND
FEEDBACK</a></button></div></div></div><footer class=footer><div
class=footer__contained><div class=footer__cols><div class="footer__cols__col f
[...]
<a href=https://www.apache.org>The Apache Software Foundation</a>
| <a href=/privacy_policy>Privacy Policy</a>
| <a href=/feed.xml>RSS Feed</a><br><br>Apache Beam, Apache, Beam, the Beam
logo, and the Apache feather logo are either registered trademarks or
trademarks of The Apache Software Foundation. All other products or name brands
are trademarks of their respective holders, including The Apache Software
Foundation.</div></div><div class="footer__cols__col
footer__cols__col__logos"><div class=footer__cols__col--group><div
class=footer__cols__col__logo><a href=https://github.com/apache/beam><im [...]
\ No newline at end of file
diff --git
a/website/generated-content/documentation/ml/multi-model-pipelines/index.html
b/website/generated-content/documentation/ml/multi-model-pipelines/index.html
index 6e3e4891b7e..8bb79812be9 100644
---
a/website/generated-content/documentation/ml/multi-model-pipelines/index.html
+++
b/website/generated-content/documentation/ml/multi-model-pipelines/index.html
@@ -19,27 +19,25 @@
function addPlaceholder(){$('input:text').attr('placeholder',"What are you
looking for?");}
function endSearch(){var
search=document.querySelector(".searchBar");search.classList.add("disappear");var
icons=document.querySelector("#iconsBar");icons.classList.remove("disappear");}
function blockScroll(){$("body").toggleClass("fixedPosition");}
-function openMenu(){addPlaceholder();blockScroll();}</script><div
class="clearfix container-main-content"><div class="section-nav closed"
data-offset-top=90 data-offset-bottom=500><span class="section-nav-back
glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list
data-section-nav><li><span
class=section-nav-list-main-title>Documentation</span></li><li><a
href=/documentation>Using the Documentation</a></li><li
class=section-nav-item--collapsible><span class=section-nav-lis [...]
-and transform some input data, run it through a model, and then pass the
outcome of your first model
+function openMenu(){addPlaceholder();blockScroll();}</script><div
class="clearfix container-main-content"><div class="section-nav closed"
data-offset-top=90 data-offset-bottom=500><span class="section-nav-back
glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list
data-section-nav><li><span
class=section-nav-list-main-title>Documentation</span></li><li><a
href=/documentation>Using the Documentation</a></li><li
class=section-nav-item--collapsible><span class=section-nav-lis [...]
+and transform input data, run it through a model, and then pass the outcome of
your first model
into a second model. This page explains how multi-model pipelines work and
gives an overview of what
you need to know to build one.</p><p>Before reading this section, it is
recommended that you become familiar with the information in
-the <a
href=https://beam.apache.org/documentation/pipelines/design-your-pipeline/>Pipeline
development lifecycle</a>
-.</p><h2 id=how-to-build-a-multi-model-pipeline-with-beam>How to build a
Multi-model pipeline with Beam</h2><p>A typical machine learning workflow
involves a series of data transformation steps such as data
-ingestion, data processing tasks, inference, and post-processing. Beam enables
you to orchestrate
-all of those steps together by encapsulating them in a single Beam DAG. This
allows you to build
-resilient and scalable end-to-end machine learning systems.</p><p>To deploy
your machine learning model in a Beam pipeline, you can use
-the <a
href=https://beam.apache.org/documentation/sdks/python-machine-learning/><code>RunInferenceAPI</code></a>
which
+the <a
href=https://beam.apache.org/documentation/pipelines/design-your-pipeline/>Pipeline
development lifecycle</a>.</p><h2
id=how-to-build-a-multi-model-pipeline-with-beam>How to build a Multi-model
pipeline with Beam</h2><p>A typical machine learning workflow involves a series
of data transformation steps, such as data
+ingestion, data processing tasks, inference, and post-processing. Apache Beam
enables you to orchestrate
+all of those steps together by encapsulating them in a single Apache Beam
Directed Acyclic Graph (DAG), which allows you to build
+resilient and scalable end-to-end machine learning systems.</p><p>To deploy
your machine learning model in an Apache Beam pipeline, use
+the <a
href=https://beam.apache.org/documentation/sdks/python-machine-learning/><code>RunInferenceAPI</code></a>,
which
facilitates the integration of your model as a <code>PTransform</code> step in
your DAG. Composing
-multiple <code>RunInference</code> transforms within a single DAG allows us to
build a pipeline that consists
-of multiple ML models. This way Beam supports the development of complex ML
systems.</p><p>There are different patterns that can be used to build
multi-model pipelines in Beam. Let’s have a
-look at a few of them.</p><h3 id=ab-pattern>A/B Pattern</h3><p>The A/B pattern
describes a framework multiple where ML models are running in parallel. One
+multiple <code>RunInference</code> transforms within a single DAG makes it
possible to build a pipeline that consists
+of multiple ML models. In this way, Apache Beam supports the development of
complex ML systems.</p><p>You can use different patterns to build multi-model
pipelines in Apache Beam. This page explores A/B patterns and cascade
patterns.</p><h3 id=ab-pattern>A/B Pattern</h3><p>The A/B pattern describes a
framework multiple where ML models are running in parallel. One
application for this pattern is to test the performance of different machine
learning models and
decide whether a new model is an improvement over an existing one. This is
also known as the
-“Champion/Challenger” method. Here, we typically define a business metric to
compare the performance
+“Champion/Challenger” method. Typically, you define a business metric to
compare the performance
of a control model with the current model.</p><p>An example could be
recommendation engine models where you have an existing model that recommends
ads based on the user’s preferences and activity history. When deciding to
deploy a new model, you
-could split the incoming user traffic into two branches where half of the
users are exposed to the
-new model and the other half to the current one.</p><p>Afterwards, you could
then measure the average click-through rate (CTR) of ads for both sets of
+could split the incoming user traffic into two branches, where half of the
users are exposed to the
+new model and the other half to the current one.</p><p>After, you can measure
the average click-through rate (CTR) of ads for both sets of
users over a defined period of time to determine if the new model is
performing better than the
existing one.</p><pre><code>import apache_beam as beam
@@ -52,19 +50,17 @@ with beam.Pipeline() as pipeline:
model_a_predictions = userset_a_traffic | RunInference(<model_handler_A>)
model_b_predictions = userset_b_traffic | RunInference(<model_handler_B>)
</code></pre><p>Where <code>beam.partition</code> is used to split the data
source into 50/50 split partitions. For more
-information on data partitioning,
-see <a
href=https://beam.apache.org/documentation/transforms/python/elementwise/partition/>Partition</a>.</p><h3
id=cascade-pattern>Cascade Pattern</h3><p>The Cascade pattern is used to solve
use-cases where the solution involves a series of ML models. In
+information about data partitioning,
+see <a
href=https://beam.apache.org/documentation/transforms/python/elementwise/partition/>Partition</a>.</p><h3
id=cascade-pattern>Cascade Pattern</h3><p>The Cascade pattern is used when the
solution to a problem involves a series of ML models. In
this scenario, the output of a model is typically transformed to a suitable
format using
a <code>PTransform</code> before passing it to another
model.</p><pre><code>with pipeline as p:
data = p | 'Read' >> beam.ReadFromSource('a_source')
model_a_predictions = data | RunInference(<model_handler_A>)
model_b_predictions = model_a_predictions | beam.ParDo(post_processing()) |
RunInference(<model_handler_B>)
-</code></pre><p>In
-this <a
href=https://github.com/apache/beam/tree/master/examples/notebooks/beam-ml/run_inference_multi_model.ipynb>notebook</a>
-, we show an end-to-end example of a cascade pipeline used for generating and
ranking image
+</code></pre><p>The <a
href=https://github.com/apache/beam/tree/master/examples/notebooks/beam-ml/run_inference_multi_model.ipynb>Ensemble
model using an image captioning and ranking example</a> notebook shows an
end-to-end example of a cascade pipeline used to generate and rank image
captions. The solution consists of two open-source
models:</p><ol><li><strong>A caption generation model (<a
href=https://github.com/salesforce/BLIP>BLIP</a>)</strong> that generates
candidate image captions from an input image.</li><li><strong>A caption
ranking model (<a href=https://github.com/openai/CLIP>CLIP</a>)</strong> that
uses the image and
-candidate captions to rank the captions in the order in which they best
describe the image.</li></ol><div class=feedback><p class=update>Last updated
on 2022/10/21</p><h3>Have you found everything you were looking for?</h3><p
class=description>Was it all useful and clear? Is there anything that you would
like to change? Let us know!</p><button class=load-button><a
href="mailto:[email protected]?subject=Beam Website Feedback">SEND
FEEDBACK</a></button></div></div></div><footer class=foo [...]
+candidate captions to rank the captions in the order in which they best
describe the image.</li></ol><div class=feedback><p class=update>Last updated
on 2022/11/22</p><h3>Have you found everything you were looking for?</h3><p
class=description>Was it all useful and clear? Is there anything that you would
like to change? Let us know!</p><button class=load-button><a
href="mailto:[email protected]?subject=Beam Website Feedback">SEND
FEEDBACK</a></button></div></div></div><footer class=foo [...]
<a href=https://www.apache.org>The Apache Software Foundation</a>
| <a href=/privacy_policy>Privacy Policy</a>
| <a href=/feed.xml>RSS Feed</a><br><br>Apache Beam, Apache, Beam, the Beam
logo, and the Apache feather logo are either registered trademarks or
trademarks of The Apache Software Foundation. All other products or name brands
are trademarks of their respective holders, including The Apache Software
Foundation.</div></div><div class="footer__cols__col
footer__cols__col__logos"><div class=footer__cols__col--group><div
class=footer__cols__col__logo><a href=https://github.com/apache/beam><im [...]
\ No newline at end of file
diff --git
a/website/generated-content/documentation/ml/online-clustering/index.html
b/website/generated-content/documentation/ml/online-clustering/index.html
index 1bd29ec3ae8..d42ce2d9fec 100644
--- a/website/generated-content/documentation/ml/online-clustering/index.html
+++ b/website/generated-content/documentation/ml/online-clustering/index.html
@@ -19,7 +19,7 @@
function addPlaceholder(){$('input:text').attr('placeholder',"What are you
looking for?");}
function endSearch(){var
search=document.querySelector(".searchBar");search.classList.add("disappear");var
icons=document.querySelector("#iconsBar");icons.classList.remove("disappear");}
function blockScroll(){$("body").toggleClass("fixedPosition");}
-function openMenu(){addPlaceholder();blockScroll();}</script><div
class="clearfix container-main-content"><div class="section-nav closed"
data-offset-top=90 data-offset-bottom=500><span class="section-nav-back
glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list
data-section-nav><li><span
class=section-nav-list-main-title>Documentation</span></li><li><a
href=/documentation>Using the Documentation</a></li><li
class=section-nav-item--collapsible><span class=section-nav-lis [...]
+function openMenu(){addPlaceholder();blockScroll();}</script><div
class="clearfix container-main-content"><div class="section-nav closed"
data-offset-top=90 data-offset-bottom=500><span class="section-nav-back
glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list
data-section-nav><li><span
class=section-nav-list-main-title>Documentation</span></li><li><a
href=/documentation>Using the Documentation</a></li><li
class=section-nav-item--collapsible><span class=section-nav-lis [...]
├── pipeline/
│ ├── __init__.py
│ ├── options.py
@@ -28,7 +28,7 @@ function
openMenu(){addPlaceholder();blockScroll();}</script><div class="clearfi
├── config.py
├── main.py
└── setup.py
-</code></pre><p><code>pipeline/utils.py</code> contains the code for loading
the emotion dataset and two <code>beam.DoFn</code> that are used for data
transformation</p><p><code>pipeline/options.py</code> contains the pipeline
options to configure the Dataflow pipeline</p><p><code>config.py</code> defines
some variables like GCP PROJECT_ID, NUM_WORKERS that are used multiple
times</p><p><code>setup.py</code> defines the packages/requirements for the
pipeline to run</p><p><code>main.py</c [...]
+</code></pre><p><code>pipeline/utils.py</code> contains the code for loading
the emotion dataset and two <code>beam.DoFn</code> that are used for data
transformation.</p><p><code>pipeline/options.py</code> contains the pipeline
options to configure the Dataflow pipeline.</p><p><code>config.py</code>
defines some variables that are used multiple times, like GCP PROJECT_ID and
NUM_WORKERS.</p><p><code>setup.py</code> defines the packages and requirements
for the pipeline to run.</p><p><cod [...]
├── pipeline/
│ ├── __init__.py
│ ├── options.py
@@ -37,23 +37,23 @@ function
openMenu(){addPlaceholder();blockScroll();}</script><div class="clearfi
├── config.py
├── main.py
└── setup.py
-</code></pre><p><code>pipeline/transformations.py</code> contains the code for
different <code>beam.DoFn</code> that are used in the
pipeline</p><p><code>pipeline/options.py</code> contains the pipeline options
to configure the Dataflow pipeline</p><p><code>config.py</code> defines some
variables like GCP PROJECT_ID, NUM_WORKERS that are used multiple
times</p><p><code>setup.py</code> defines the packages/requirements for the
pipeline to run</p><p><code>main.py</code> contains the pipeli [...]
+</code></pre><p><code>pipeline/transformations.py</code> contains the code for
the different <code>beam.DoFn</code> that are used in the
pipeline.</p><p><code>pipeline/options.py</code> contains the pipeline options
to configure the Dataflow pipeline.</p><p><code>config.py</code> defines
variables that are used multiple times, like Google Cloud PROJECT_ID and
NUM_WORKERS.</p><p><code>setup.py</code> defines the packages and requirements
for the pipeline to run.</p><p><code>main.py</code> [...]
pipeline
| "Read from PubSub"
>> ReadFromPubSub(subscription=cfg.SUBSCRIPTION_ID,
with_attributes=True)
| "Decode PubSubMessage" >> beam.ParDo(Decode())
- )</code></pre></div></div><p>We now closely look at three important steps
of pipeline where we tokenize the text, fed the tokenized text to get embedding
from a Transformer based Language Model and performing clustering using <a
href=https://beam.apache.org/blog/stateful-processing/>Stateful
Processing</a>.</p><h3 id=getting-embedding-from-a-language-model>Getting
Embedding from a Language Model</h3><p>In order to do clustering with text
data, we first need to map the text into vecto [...]
+ )</code></pre></div></div><p>The next sections examine three important
pipeline steps:</p><ol><li>Tokenize the text.</li><li>Feed the tokenized text
to get embedding from a transformer-based language model.</li><li>Perform
clustering using <a
href=https://beam.apache.org/blog/stateful-processing/>stateful
processing</a>.</li></ol><h3 id=get-embedding-from-a-language-model>Get
Embedding from a Language Model</h3><p>In order cluster text data, you need to
map the text into vectors of n [...]
docs
- | "Tokenize Text" >>
beam.Map(tokenize_sentence)</code></pre></div></div><p>Here,
<code>tokenize_sentence</code> is a function that takes a dictionary with a
text and an id, tokenizes the text, and returns a tuple (text, id) and the
tokenized output.</p><p>Tokenized output is then passed to the language model
for getting the embeddings. For getting embeddings from language model, we use
<code>RunInference()</code> from beam.</p><div class=snippet><div
class="noteboo [...]
+ | "Tokenize Text" >>
beam.Map(tokenize_sentence)</code></pre></div></div><p>Here,
<code>tokenize_sentence</code> is a function that takes a dictionary with a
text and an ID, tokenizes the text, and returns a tuple (text, id) and the
tokenized output.</p><p>Tokenized output is then passed to the language model
for getting the embeddings. For getting embeddings from the language model, we
use <code>RunInference()</code> from Apache Beam.</p><div class=snippet><div
cla [...]
normalized_embedding
| "Map doc to key" >> beam.Map(lambda x: (1, x))
| "StatefulClustering using Birch" >>
beam.ParDo(StatefulOnlineClustering())
- )</code></pre></div></div><p>As BIRCH doesn’t support
parallelization, so we need to make sure that all the StatefulProcessing is
taking place only by one worker. In order to do that, we use the
<code>Beam.Map</code> to associate each text to the same key
<code>1</code>.</p><p><code>StatefulOnlineClustering</code> is a
<code>DoFn</code> that an embedding of a text and updates the clustering model.
For storing the state it uses <code>ReadModifyWriteStateSpec</code> state
object [...]
+ )</code></pre></div></div><p>Because BIRCH doesn’t support
parallelization, you need to make sure that only one worker is doing all of the
stateful processing. To do that, use <code>Beam.Map</code> to associate each
text to the same key
<code>1</code>.</p><p><code>StatefulOnlineClustering</code> is a
<code>DoFn</code> that takes an embedding of a text and updates the clustering
model. To store the state, it uses the <code>ReadModifyWriteStateSpec</code>
state object, which acts [...]
BIRCH_MODEL_SPEC = ReadModifyWriteStateSpec("clustering_model",
PickleCoder())
DATA_ITEMS_SPEC = ReadModifyWriteStateSpec("data_items",
PickleCoder())
EMBEDDINGS_SPEC = ReadModifyWriteStateSpec("embeddings",
PickleCoder())
- UPDATE_COUNTER_SPEC = ReadModifyWriteStateSpec("update_counter",
PickleCoder())</code></pre></div></div><p>We declare four different
<code>ReadModifyWriteStateSpec
objects</code>:</p><ul><li><code>BIRCH_MODEL_SPEC</code>: holds the state of
clustering model</li><li><code>DATA_ITEMS_SPEC</code>: holds the twitter texts
seen so far</li><li><code>EMBEDDINGS_SPEC</code>: holds the normalized
embeddings</li><li><code>UPDATE_COUNTER_SPEC</code>: holds the number of texts
processed< [...]
+ UPDATE_COUNTER_SPEC = ReadModifyWriteStateSpec("update_counter",
PickleCoder())</code></pre></div></div><p>This example declares four different
<code>ReadModifyWriteStateSpec
objects</code>:</p><ul><li><code>BIRCH_MODEL_SPEC</code> holds the state of
clustering model.</li><li><code>DATA_ITEMS_SPEC</code> holds the Twitter texts
seen so far.</li><li><code>EMBEDDINGS_SPEC</code> holds the normalized
embeddings.</li><li><code>UPDATE_COUNTER_SPEC</code> holds the number of texts
[...]
self,
element,
model_state=beam.DoFn.StateParam(BIRCH_MODEL_SPEC),
@@ -106,7 +106,7 @@ function
openMenu(){addPlaceholder();blockScroll();}</script><div class="clearfi
"docs": collected_documents,
"id": list(collected_embeddings.keys()),
"counter": update_counter,
- }</code></pre></div></div><p><code>GetUpdates</code> is a <code>DoFn</code>
that prints the cluster assigned to each twitter message, every time a new
message arrives.</p><div class=snippet><div class="notebook-skip code-snippet
without_switcher"><a class=copy type=button data-bs-toggle=tooltip
data-bs-placement=bottom title="Copy to clipboard"><img
src=/images/copy-icon.svg></a><pre><code>updated_clusters = clustering |
"Format Update" >> beam.ParDo(GetUpdates())</code>< [...]
+ }</code></pre></div></div><p><code>GetUpdates</code> is a <code>DoFn</code>
that prints the cluster assigned to each Twitter message every time a new
message arrives.</p><div class=snippet><div class="notebook-skip code-snippet
without_switcher"><a class=copy type=button data-bs-toggle=tooltip
data-bs-placement=bottom title="Copy to clipboard"><img
src=/images/copy-icon.svg></a><pre><code>updated_clusters = clustering |
"Format Update" >> beam.ParDo(GetUpdates())</code></ [...]
<a href=https://www.apache.org>The Apache Software Foundation</a>
| <a href=/privacy_policy>Privacy Policy</a>
| <a href=/feed.xml>RSS Feed</a><br><br>Apache Beam, Apache, Beam, the Beam
logo, and the Apache feather logo are either registered trademarks or
trademarks of The Apache Software Foundation. All other products or name brands
are trademarks of their respective holders, including The Apache Software
Foundation.</div></div><div class="footer__cols__col
footer__cols__col__logos"><div class=footer__cols__col--group><div
class=footer__cols__col__logo><a href=https://github.com/apache/beam><im [...]
\ No newline at end of file
diff --git
a/website/generated-content/documentation/ml/orchestration/index.html
b/website/generated-content/documentation/ml/orchestration/index.html
index c0595b68654..191005ea104 100644
--- a/website/generated-content/documentation/ml/orchestration/index.html
+++ b/website/generated-content/documentation/ml/orchestration/index.html
@@ -19,8 +19,9 @@
function addPlaceholder(){$('input:text').attr('placeholder',"What are you
looking for?");}
function endSearch(){var
search=document.querySelector(".searchBar");search.classList.add("disappear");var
icons=document.querySelector("#iconsBar");icons.classList.remove("disappear");}
function blockScroll(){$("body").toggleClass("fixedPosition");}
-function openMenu(){addPlaceholder();blockScroll();}</script><div
class="clearfix container-main-content"><div class="section-nav closed"
data-offset-top=90 data-offset-bottom=500><span class="section-nav-back
glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list
data-section-nav><li><span
class=section-nav-list-main-title>Documentation</span></li><li><a
href=/documentation>Using the Documentation</a></li><li
class=section-nav-item--collapsible><span class=section-nav-lis [...]
-Without this knowledge at your disposal, it will become increasingly difficult
to troubleshoot, monitor and improve your ML solutions as they grow in
size.</li></ul><p>The solution: MLOps. MLOps is an umbrella term used to
describe best practices and guiding principles that aim to make the development
and maintenance of machine learning systems seamless and efficient. Simply put,
MLOps is most often about automating machine learning workflows throughout the
model and data lifecycle. Popu [...]
+function openMenu(){addPlaceholder();blockScroll();}</script><div
class="clearfix container-main-content"><div class="section-nav closed"
data-offset-top=90 data-offset-bottom=500><span class="section-nav-back
glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list
data-section-nav><li><span
class=section-nav-list-main-title>Documentation</span></li><li><a
href=/documentation>Using the Documentation</a></li><li
class=section-nav-item--collapsible><span class=section-nav-lis [...]
+Without this knowledge, troubleshooting, monitoring, and improving your ML
solutions becomes increaseingly difficult when your solutions grow in
size.</li></ul><p>The solution: MLOps. MLOps is an umbrella term used to
describe best practices and guiding principles that aim to make the development
and maintenance of machine learning systems seamless and efficient. MLOps most
often entails automating machine learning workflows throughout the model and
data lifecycle. Popular frameworks to [...]
+When choosing between the two frameworks, you need to consider the trade-off
between flexibility and programming overhead.</li></ul><p>For simplicity, the
workflows only contain three components: data ingestion, data preprocessing,
and model training. Depending on the scenario, you can add a range of extra
components, such as model evaluation and model deployment. This example focuses
on the preprocessing component, because it demonstrates how to use Apache Beam
in an ML workflow for eff [...]
├── pipeline.py
├── components
│ ├── ingestion
@@ -42,7 +43,7 @@ Without this knowledge at your disposal, it will become
increasingly difficult t
│ └── src
│ └── train.py
└── requirements.txt
-</code></pre><p>Let’s start with the component specifications. The full
preprocessing component specification is illustrated below. The inputs are the
path where the ingested dataset was saved by the ingest component and a path to
a directory where the component can store artifacts. Additionally, there are
some inputs that specify how and where the Beam pipeline should run. The
specifications for the ingestion and train component are similar and can be
found <a href=https://github.com/ap [...]
+</code></pre><p>The full preprocessing component specification is shown in the
folllowing illustration. The inputs are the path where the ingested dataset was
saved by the ingest component and a path to a directory where the component can
store artifacts. Additionally, some inputs specify how and where the Apache
Beam pipeline runs. The specifications for the ingestion and train components
are similar and can be found in the <a
href=https://github.com/apache/beam/tree/master/sdks/python/ [...]
description: Component that mimicks scraping data from the web and outputs it
to a jsonlines format file
inputs:
- name: ingested_dataset_path
@@ -87,7 +88,7 @@ implementation:
{inputValue: dataflow_staging_root},
--beam-runner,
{inputValue: beam_runner},
- ]</code></pre></div></div><p>In this case, each component shares an
identical Dockerfile but extra component-specific dependencies could be added
where necessary.</p><div class="language-Dockerfile snippet"><div
class="notebook-skip code-snippet"><a target=_blank type=button
data-bs-toggle=tooltip data-bs-placement=bottom title="View source code"
href=https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/Dockerfile><
[...]
+ ]</code></pre></div></div><p>In this case, each component shares an
identical Dockerfile, but you can add extra component-specific dependencies
where needed.</p><div class="language-Dockerfile snippet"><div
class="notebook-skip code-snippet"><a target=_blank type=button
data-bs-toggle=tooltip data-bs-placement=bottom title="View source code"
href=https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/Dockerfile><img
s [...]
<a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom
title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div
class=highlight><pre class=chroma><code class=language-Dockerfile
data-lang=Dockerfile><span class=k>FROM</span><span class=s>
python:3.9-slim</span><span class=err>
</span><span class=err>
</span><span class=err></span><span class=c># (Optional) install extra
dependencies</span><span class=err>
@@ -98,7 +99,7 @@ implementation:
</span><span class=err>
</span><span class=err></span><span class=c># copy src files and set working
directory</span><span class=err>
</span><span class=err></span><span class=k>COPY</span> src /src<span
class=err>
-</span><span class=err></span><span class=k>WORKDIR</span><span class=s>
/src</span></code></pre></div></div></div><p>With the component specification
and containerization out of the way we can look at the actual implementation of
the preprocessing component.</p><p>Since KFP provides the input and output
arguments as command-line arguments, an <code>argumentparser</code> is
needed.</p><div class=snippet><div class="notebook-skip code-snippet
without_switcher"><a class=copy type=button da [...]
+</span><span class=err></span><span class=k>WORKDIR</span><span class=s>
/src</span></code></pre></div></div></div><p>With the component specification
and containerization done, implement the preprocessing component.</p><p>Because
KFP provides the input and output arguments as command-line arguments, an
<code>argumentparser</code> is needed.</p><div class=snippet><div
class="notebook-skip code-snippet without_switcher"><a class=copy type=button
data-bs-toggle=tooltip data-bs-placement=bo [...]
"""Parse preprocessing arguments."""
parser = argparse.ArgumentParser()
parser.add_argument(
@@ -137,7 +138,7 @@ implementation:
help="Beam runner: DataflowRunner or DirectRunner.",
default="DirectRunner")
- return parser.parse_args()</code></pre></div></div><p>The implementation of
the <code>preprocess_dataset</code> function contains the Beam pipeline code
and the Beam pipeline options to select the desired runner. The executed
preprocessing involves downloading the image bytes from their url, converting
them to a Torch Tensor and resizing to the desired size. The caption undergoes
a series of string manipulations to ensure that our model receives clean
uniform image descriptions (Tokeni [...]
+ return parser.parse_args()</code></pre></div></div><p>The implementation of
the <code>preprocess_dataset</code> function contains the Apache Beam pipeline
code and the Beam pipeline options that select the runner. The executed
preprocessing involves downloading the image bytes from their URL, converting
them to a Torch Tensor, and resizing to the desired size. The caption undergoes
a series of string manipulations to ensure that our model receives uniform
image descriptions. Tokenizati [...]
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(
runner=beam_runner,
@@ -174,18 +175,18 @@ with beam.Pipeline(options=pipeline_options) as pipeline:
"name": "image", "type":
"bytes"
}]
},
- file_name_suffix=".avro"))</code></pre></div></div><p>It
also contains the necessary code to perform the component IO. First, a target
path is constructed to store the preprocessed dataset based on the component
input parameter <code>base_artifact_path</code> and a timestamp. Output values
from components can only be returned as files so we write the value of the
constructed target path to an output file that was provided by KFP to our
component.</p><div class=snippet>< [...]
+ file_name_suffix=".avro"))</code></pre></div></div><p>It
also contains the necessary code to perform the component I/O. First, a target
path is constructed to store the preprocessed dataset based on the component
input parameter <code>base_artifact_path</code> and a timestamp. Output values
from components are only returned as files, so we write the value of the
constructed target path to an output file that was provided to our component by
KFP.</p><div class=snippet><d [...]
target_path =
f"{base_artifact_path}/preprocessing/preprocessed_dataset_{timestamp}"
# the directory where the output file is created may or may not exists
# so we have to create it.
Path(preprocessed_dataset_path).parent.mkdir(parents=True, exist_ok=True)
with open(preprocessed_dataset_path, 'w') as f:
- f.write(target_path)</code></pre></div></div><p>Since we are mainly
interested in the preprocessing component to show how a Beam pipeline can be
integrated into a larger ML workflow, we will not cover the implementation of
the ingestion and train component in depth. Implementations of dummy components
that mock their behavior are provided in the full example code.</p><h4
id=create-the-pipeline-definition>Create the pipeline
definition</h4><p><code>pipeline.py</code> first loads the cre [...]
+ f.write(target_path)</code></pre></div></div><p>Because we are mainly
interested in the preprocessing component to show how a Beam pipeline can be
integrated into a larger ML workflow, this section doesn’t cover the
implementation of the ingestion and train components in depth. Implementations
of dummy components that mock their behavior are provided in the full example
code.</p><h4 id=create-the-pipeline-definition>Create the pipeline
definition</h4><p><code>pipeline.py</code> f [...]
DataIngestOp =
comp.load_component('components/ingestion/component.yaml')
DataPreprocessingOp = comp.load_component(
'components/preprocessing/component.yaml')
-TrainModelOp =
comp.load_component('components/train/component.yaml')</code></pre></div></div><p>After
that, the pipeline is created and the required components inputs and outputs
are specified manually.</p><div class=snippet><div class="notebook-skip
code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip
data-bs-placement=bottom title="Copy to clipboard"><img
src=/images/copy-icon.svg></a><pre><code>@dsl.pipeline(
+TrainModelOp =
comp.load_component('components/train/component.yaml')</code></pre></div></div><p>After
that, the pipeline is created, and the required component inputs and outputs
are specified manually.</p><div class=snippet><div class="notebook-skip
code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip
data-bs-placement=bottom title="Copy to clipboard"><img
src=/images/copy-icon.svg></a><pre><code>@dsl.pipeline(
pipeline_root=PIPELINE_ROOT,
name="beam-preprocessing-kfp-example",
description="Pipeline to show an apache beam preprocessing example in
KFP")
@@ -219,13 +220,13 @@ def pipeline(
train_model_task = TrainModelOp(
preprocessed_dataset_path=data_preprocessing_task.
outputs["preprocessed_dataset_path"],
-
base_artifact_path=component_artifact_root)</code></pre></div></div><p>Finally,
the defined pipeline is compiled and a <code>pipeline.json</code> specification
file is generated.</p><div class=snippet><div class="notebook-skip code-snippet
without_switcher"><a class=copy type=button data-bs-toggle=tooltip
data-bs-placement=bottom title="Copy to clipboard"><img
src=/images/copy-icon.svg></a><pre><code>Compiler().compile(pipeline_func=pipeline,
package_path="pipeline.json")</ [...]
+
base_artifact_path=component_artifact_root)</code></pre></div></div><p>Finally,
the defined pipeline is compiled, and a <code>pipeline.json</code>
specification file is generated.</p><div class=snippet><div
class="notebook-skip code-snippet without_switcher"><a class=copy type=button
data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img
src=/images/copy-icon.svg></a><pre><code>Compiler().compile(pipeline_func=pipeline,
package_path="pipeline.json")< [...]
experiment = client.create_experiment("KFP orchestration example")
run_result = client.run_pipeline(
experiment_id=experiment.id,
job_name="KFP orchestration job",
pipeline_package_path="pipeline.json",
- params=run_arguments)</code></pre></div></div><h3
id=tensorflow-extended-tfx>Tensorflow Extended (TFX)</h3><p>The way of working
for TFX is similar to the approach for KFP as illustrated above: Define the
individual workflow components, connect them in a pipeline object and run the
pipeline in the target environment. However, what makes TFX different is that
it has already built a set of Python packages that are libraries to create
workflow components. So unlike the KFP example, we d [...]
+ params=run_arguments)</code></pre></div></div><h3
id=tensorflow-extended-tfx>Tensorflow Extended (TFX)</h3><p>Working with TFX is
similar to the approach for KFP illustrated previously: Define the individual
workflow components, connect them in a pipeline object, and run the pipeline in
the target environment. What makes TFX different is that it has already built a
set of Python packages that are libraries to create workflow components. Unlike
with the KFP example, you don’t ne [...]
gcp_project_id,
region,
pipeline_name,
@@ -285,9 +286,9 @@ run_result = client.run_pipeline(
enable_cache=True,
metadata_connection_config=tfx.orchestration.metadata.
sqlite_metadata_connection_config(metadata_file),
-
beam_pipeline_args=beam_pipeline_args_by_runner[beam_runner])</code></pre></div></div><p>We
will use the same data input as last time, i.e. a couple of image-captions
pairs extracted from the <a href=https://cocodataset.org/#home>MSCOCO 2014
dataset</a>. This time, however, in CSV format because the ExampleGen component
does not by default have support for jsonlines. (The formats that are supported
out of the box are listed <a
href=https://www.tensorflow.org/tfx/guide/examplegen#da [...]
+
beam_pipeline_args=beam_pipeline_args_by_runner[beam_runner])</code></pre></div></div><p>We
use the same data input, that is, a couple of image-captions pairs extracted
from the <a href=https://cocodataset.org/#home>MSCOCO 2014 dataset</a>. This
time, however, we use the data in CSV format, because the ExampleGen component
does not by default support jsonlines. The formats that are supported out of
the box are listed in the <a
href=https://www.tensorflow.org/tfx/guide/examplegen#da [...]
318556,255,"An angled view of a beautifully decorated
bathroom.","http://farm4.staticflickr.com/3133/3378902101_3c9fa16b84_z.jpg","COCO_train2014_000000318556.jpg","Attribution-NonCommercial-ShareAlike
License"
-476220,14,"An empty kitchen with white and black
appliances.","http://farm7.staticflickr.com/6173/6207941582_b69380c020_z.jpg","COCO_train2014_000000476220.jpg","Attribution-NonCommercial
License"</code></pre></div></div><p>So far, we have only imported standard
TFX components and chained them together into a pipeline. Both the Transform
and Trainer components have a <code>module_file</code> argument defined. That’s
where we define the behavior we want fro [...]
+476220,14,"An empty kitchen with white and black
appliances.","http://farm7.staticflickr.com/6173/6207941582_b69380c020_z.jpg","COCO_train2014_000000476220.jpg","Attribution-NonCommercial
License"</code></pre></div></div><p>So far, we have only imported standard
TFX components and chained them together into a pipeline. Both the Transform
and the Trainer components have a <code>module_file</code> argument defined.
That’s where we define the behavior we want [...]
"""Transform raw data."""
# convert the captions to lowercase
# split the captions into separate words
@@ -300,7 +301,7 @@ run_result = client.run_pipeline(
return {
'caption_lower': lower,
- }</code></pre></div></div><p>However this function only defines the logical
steps that have to be performed during preprocessing and needs a concrete
implementation before it can be executed. One such implementation is provided
by <code>tf.Transform</code> using Apache Beam and provides a PTransform
<code>tft_beam.AnalyzeAndTransformDataset</code> to process the data. We can
test this preproccesing_fn outside of the TFX Transform component using this
PTransform explicitly. Calling the [...]
+ }</code></pre></div></div><p>This function only defines the logical steps
that must be performed during preprocessing. The function needs a concrete
implementation before it can run. One such implementation is provided by
<code>tf.Transform</code> using Apache Beam, which provides a PTransform
<code>tft_beam.AnalyzeAndTransformDataset</code> to process the data. We can
test this preproccesing_fn outside of the TFX Transform component using this
PTransform explicitly. Calling the <code> [...]
# Test processing_fn directly without the tfx pipeline
raw_data = [
{
@@ -322,7 +323,7 @@ run_result = client.run_pipeline(
transformed_dataset, transform_fn = (
(raw_data, raw_data_metadata)
| tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
- transformed_data, transformed_metadata =
transformed_dataset</code></pre></div></div><h4 id=train>Train</h4><p>Finally
the Trainer component behaves in a similar way as the Transform component, but
instead of looking for a <code>preprocessing_fn</code> it requires a
<code>run_fn</code> function to be present in the specified
<code>module_file</code>. Our simple implementation, creates a stub model using
<code>tf.Keras</code> and saves the resulting model to a directory.</p><div
class=s [...]
+ transformed_data, transformed_metadata =
transformed_dataset</code></pre></div></div><h4 id=train>Train</h4><p>The
Trainer component behaves like the Transform component, but instead of looking
for a <code>preprocessing_fn</code>, it requires a <code>run_fn</code> function
in the specified <code>module_file</code>. Our simple implementation creates a
stub model using <code>tf.Keras</code> and saves the resulting model to a
directory.</p><div class=snippet><div class="notebook-skip code [...]
"""Build the TF model, train it and export it."""
# create a model
model = tf.keras.Sequential()
@@ -333,8 +334,8 @@ run_result = client.run_pipeline(
# model.fit(...)
# Save model to fn_args.serving_model_dir.
- model.save(fn_args.serving_model_dir)</code></pre></div></div><h4
id=executing-the-pipeline>Executing the pipeline</h4><p>To launch the pipeline
two configurations must be provided: The orchestrator for the TFX pipeline and
the pipeline options to run Beam pipelines. In this case we use the
<code>LocalDagRunner</code> for orchestration to run the pipeline locally
without extra setup dependencies. Where the created pipeline can specify Beam’s
pipeline options as usual through the <code> [...]
-tfx.orchestration.LocalDagRunner().run(create_pipeline(**vars(args)))</code></pre></div></div><div
class=feedback><p class=update>Last updated on 2022/11/03</p><h3>Have you
found everything you were looking for?</h3><p class=description>Was it all
useful and clear? Is there anything that you would like to change? Let us
know!</p><button class=load-button><a
href="mailto:[email protected]?subject=Beam Website Feedback">SEND
FEEDBACK</a></button></div></div></div><footer class=footer><di [...]
+ model.save(fn_args.serving_model_dir)</code></pre></div></div><h4
id=executing-the-pipeline>Executing the pipeline</h4><p>To launch the pipeline,
provide two configurations: The orchestrator for the TFX pipeline and the
pipeline options to run Apache Beam pipelines. To run the pipeline locally
without extra setup dependencies, this example uses the
<code>LocalDagRunner</code> for orchestration. The pipeline created can specify
Apache Beam’s pipeline options through the <code>beam_pipel [...]
+tfx.orchestration.LocalDagRunner().run(create_pipeline(**vars(args)))</code></pre></div></div><div
class=feedback><p class=update>Last updated on 2022/11/22</p><h3>Have you
found everything you were looking for?</h3><p class=description>Was it all
useful and clear? Is there anything that you would like to change? Let us
know!</p><button class=load-button><a
href="mailto:[email protected]?subject=Beam Website Feedback">SEND
FEEDBACK</a></button></div></div></div><footer class=footer><di [...]
<a href=https://www.apache.org>The Apache Software Foundation</a>
| <a href=/privacy_policy>Privacy Policy</a>
| <a href=/feed.xml>RSS Feed</a><br><br>Apache Beam, Apache, Beam, the Beam
logo, and the Apache feather logo are either registered trademarks or
trademarks of The Apache Software Foundation. All other products or name brands
are trademarks of their respective holders, including The Apache Software
Foundation.</div></div><div class="footer__cols__col
footer__cols__col__logos"><div class=footer__cols__col--group><div
class=footer__cols__col__logo><a href=https://github.com/apache/beam><im [...]
\ No newline at end of file
diff --git a/website/generated-content/documentation/ml/overview/index.html
b/website/generated-content/documentation/ml/overview/index.html
index 050f4114d17..e282511155c 100644
--- a/website/generated-content/documentation/ml/overview/index.html
+++ b/website/generated-content/documentation/ml/overview/index.html
@@ -19,7 +19,8 @@
function addPlaceholder(){$('input:text').attr('placeholder',"What are you
looking for?");}
function endSearch(){var
search=document.querySelector(".searchBar");search.classList.add("disappear");var
icons=document.querySelector("#iconsBar");icons.classList.remove("disappear");}
function blockScroll(){$("body").toggleClass("fixedPosition");}
-function openMenu(){addPlaceholder();blockScroll();}</script><div
class="clearfix container-main-content"><div class="section-nav closed"
data-offset-top=90 data-offset-bottom=500><span class="section-nav-back
glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list
data-section-nav><li><span
class=section-nav-list-main-title>Documentation</span></li><li><a
href=/documentation>Using the Documentation</a></li><li
class=section-nav-item--collapsible><span class=section-nav-lis [...]
+function openMenu(){addPlaceholder();blockScroll();}</script><div
class="clearfix container-main-content"><div class="section-nav closed"
data-offset-top=90 data-offset-bottom=500><span class="section-nav-back
glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list
data-section-nav><li><span
class=section-nav-list-main-title>Documentation</span></li><li><a
href=/documentation>Using the Documentation</a></li><li
class=section-nav-item--collapsible><span class=section-nav-lis [...]
+upscaling your data pipelines as part of your MLOps ecosystem in a production
environment.</li><li>It enables you to run your model in production on a
varying data load, both in batch and streaming.</li></ul><h2
id=aiml-workloads>AI/ML workloads</h2><p>Let’s take a look at the different
building blocks that we need to create an end-to-end AI/ML use case and where
Apache Beam can help.</p><p><img src=/images/ml-workflows.svg alt="Overview of
AI/ML building blocks and where Apache Beam can [...]
<a href=https://www.apache.org>The Apache Software Foundation</a>
| <a href=/privacy_policy>Privacy Policy</a>
| <a href=/feed.xml>RSS Feed</a><br><br>Apache Beam, Apache, Beam, the Beam
logo, and the Apache feather logo are either registered trademarks or
trademarks of The Apache Software Foundation. All other products or name brands
are trademarks of their respective holders, including The Apache Software
Foundation.</div></div><div class="footer__cols__col
footer__cols__col__logos"><div class=footer__cols__col--group><div
class=footer__cols__col__logo><a href=https://github.com/apache/beam><im [...]
\ No newline at end of file
diff --git
a/website/generated-content/documentation/ml/runinference-metrics/index.html
b/website/generated-content/documentation/ml/runinference-metrics/index.html
index 21ca6dddf68..fdceeaab691 100644
--- a/website/generated-content/documentation/ml/runinference-metrics/index.html
+++ b/website/generated-content/documentation/ml/runinference-metrics/index.html
@@ -19,7 +19,7 @@
function addPlaceholder(){$('input:text').attr('placeholder',"What are you
looking for?");}
function endSearch(){var
search=document.querySelector(".searchBar");search.classList.add("disappear");var
icons=document.querySelector("#iconsBar");icons.classList.remove("disappear");}
function blockScroll(){$("body").toggleClass("fixedPosition");}
-function openMenu(){addPlaceholder();blockScroll();}</script><div
class="clearfix container-main-content"><div class="section-nav closed"
data-offset-top=90 data-offset-bottom=500><span class="section-nav-back
glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list
data-section-nav><li><span
class=section-nav-list-main-title>Documentation</span></li><li><a
href=/documentation>Using the Documentation</a></li><li
class=section-nav-item--collapsible><span class=section-nav-lis [...]
+function openMenu(){addPlaceholder();blockScroll();}</script><div
class="clearfix container-main-content"><div class="section-nav closed"
data-offset-top=90 data-offset-bottom=500><span class="section-nav-back
glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list
data-section-nav><li><span
class=section-nav-list-main-title>Documentation</span></li><li><a
href=/documentation>Using the Documentation</a></li><li
class=section-nav-item--collapsible><span class=section-nav-lis [...]
├── pipeline/
│ ├── __init__.py
│ ├── options.py
@@ -28,14 +28,14 @@ function
openMenu(){addPlaceholder();blockScroll();}</script><div class="clearfi
├── config.py
├── main.py
└── setup.py
-</code></pre><p><code>pipeline/transformations.py</code> contains the code for
<code>beam.DoFn</code> and additional functions that are used for
pipeline</p><p><code>pipeline/options.py</code> contains the pipeline options
to configure the Dataflow pipeline</p><p><code>config.py</code> defines some
variables like GCP <code>PROJECT_ID</code>, <code>NUM_WORKERS</code> that are
used multiple times</p><p><code>setup.py</code> defines the
packages/requirements for the pipeline to run</p><p><c [...]
+</code></pre><p><code>pipeline/transformations.py</code> contains the code for
<code>beam.DoFn</code> and additional functions that are used for the
pipeline.</p><p><code>pipeline/options.py</code> contains the pipeline options
to configure the Dataflow pipeline.</p><p><code>config.py</code> defines
variables that are used multiple times, like the Google Cloud
<code>PROJECT_ID</code> and
<code>NUM_WORKERS</code>.</p><p><code>setup.py</code> defines the packages and
requirements for the p [...]
_ = (
pipeline
| "Create inputs" >> beam.Create(inputs)
| "Tokenize" >> beam.ParDo(Tokenize(cfg.TOKENIZER_NAME))
| "Inference" >>
RunInference(model_handler=KeyedModelHandler(model_handler))
- | "Decode Predictions" >>
beam.ParDo(PostProcessor()))</code></pre></div></div><h2
id=runinference-metrics>RunInference Metrics</h2><p>As mentioned above, we
benchmarked the performance of RunInference using Dataflow on both CPU and GPU.
These metrics can be seen in the GCP UI and can also be printed using</p><div
class=snippet><div class="notebook-skip code-snippet without_switcher"><a
class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom
title="Co [...]
+ | "Decode Predictions" >>
beam.ParDo(PostProcessor()))</code></pre></div></div><h2
id=runinference-metrics-1>RunInference Metrics</h2><p>As mentioned previously,
we benchmarked the performance of RunInference using Dataflow on both CPU and
GPU. You can see these metrics in the Google Cloud console, or you can use the
following line to print the metrics:</p><div class=snippet><div
class="notebook-skip code-snippet without_switcher"><a class=copy type=button
data-bs-t [...]
<a href=https://www.apache.org>The Apache Software Foundation</a>
| <a href=/privacy_policy>Privacy Policy</a>
| <a href=/feed.xml>RSS Feed</a><br><br>Apache Beam, Apache, Beam, the Beam
logo, and the Apache feather logo are either registered trademarks or
trademarks of The Apache Software Foundation. All other products or name brands
are trademarks of their respective holders, including The Apache Software
Foundation.</div></div><div class="footer__cols__col
footer__cols__col__logos"><div class=footer__cols__col--group><div
class=footer__cols__col__logo><a href=https://github.com/apache/beam><im [...]
\ No newline at end of file
diff --git a/website/generated-content/sitemap.xml
b/website/generated-content/sitemap.xml
index e7ae092ae21..58854919628 100644
--- a/website/generated-content/sitemap.xml
+++ b/website/generated-content/sitemap.xml
@@ -1 +1 @@
-<?xml version="1.0" encoding="utf-8" standalone="yes"?><urlset
xmlns="http://www.sitemaps.org/schemas/sitemap/0.9"
xmlns:xhtml="http://www.w3.org/1999/xhtml"><url><loc>/blog/beam-2.43.0/</loc><lastmod>2022-11-18T09:53:19-08:00</lastmod></url><url><loc>/categories/blog/</loc><lastmod>2022-11-18T09:53:19-08:00</lastmod></url><url><loc>/blog/</loc><lastmod>2022-11-18T09:53:19-08:00</lastmod></url><url><loc>/categories/</loc><lastmod>2022-11-18T09:53:19-08:00</lastmod></url><url><loc>/catego
[...]
\ No newline at end of file
+<?xml version="1.0" encoding="utf-8" standalone="yes"?><urlset
xmlns="http://www.sitemaps.org/schemas/sitemap/0.9"
xmlns:xhtml="http://www.w3.org/1999/xhtml"><url><loc>/blog/beam-2.43.0/</loc><lastmod>2022-11-18T09:53:19-08:00</lastmod></url><url><loc>/categories/blog/</loc><lastmod>2022-11-18T09:53:19-08:00</lastmod></url><url><loc>/blog/</loc><lastmod>2022-11-18T09:53:19-08:00</lastmod></url><url><loc>/categories/</loc><lastmod>2022-11-18T09:53:19-08:00</lastmod></url><url><loc>/catego
[...]
\ No newline at end of file