rszper commented on code in PR #27075:
URL: https://github.com/apache/beam/pull/27075#discussion_r1223627255
##########
examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb:
##########
@@ -0,0 +1,292 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": []
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "IVkpU8HZ1eyz"
+ },
+ "execution_count": 74,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Apache Beam RunInference in a Streaming Pipeline\n",
+ "\n",
+ "<table align=\"left\">\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\"
/>Run in Google Colab</a>\n",
+ " </td>\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\"
/>View source on GitHub</a>\n",
+ " </td>\n",
+ "</table>\n"
+ ],
+ "metadata": {
+ "id": "kH8SORNim8on"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "This notebook shows how to use the Apache Beam
[RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference)
transform in a streaming pipeline with [Google Cloud
Pub-Sub](https://cloud.google.com/pubsub?utm_source=google&utm_medium=cpc&utm_campaign=na-US-all-en-dr-bkws-all-all-trial-b-dr-1605212&utm_content=text-ad-none-any-DEV_c-CRE_648329165516-ADGP_Desk%20%7C%20BKWS%20-%20BRO%20%7C%20Txt%20_%20Pub%2Fsub-KWID_43700075187144857-aud-664745643345%3Akwd-874320293016&utm_term=KW_pub%20sub%20google%20cloud-ST_pub%20sub%20google%20cloud&gclid=CjwKCAjw-IWkBhBTEiwA2exyO19xMFn6h1UKjb4QUavatV8Yb5Au9pCQj2_VAo0rzaYS8v2bq5VmuBoCL9wQAvD_BwE&gclsrc=aw.ds).\n",
Review Comment:
```suggestion
"This notebook shows how to use the Apache Beam
[RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference)
transform in a streaming pipeline with [Google Cloud
Pub/Sub](https://cloud.google.com/pubsub/docs).\n",
```
We need to add information to this intro about the use case of this notebook.
##########
examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb:
##########
@@ -0,0 +1,292 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": []
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "IVkpU8HZ1eyz"
+ },
+ "execution_count": 74,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Apache Beam RunInference in a Streaming Pipeline\n",
Review Comment:
Can we make this title more descriptive? Most of the notebooks we have fit
this description. What's the specific use case here? Why should a user pick
this notebook instead of a different one?
##########
examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb:
##########
@@ -0,0 +1,292 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": []
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "IVkpU8HZ1eyz"
+ },
+ "execution_count": 74,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Apache Beam RunInference in a Streaming Pipeline\n",
+ "\n",
+ "<table align=\"left\">\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\"
/>Run in Google Colab</a>\n",
+ " </td>\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\"
/>View source on GitHub</a>\n",
+ " </td>\n",
+ "</table>\n"
+ ],
+ "metadata": {
+ "id": "kH8SORNim8on"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "This notebook shows how to use the Apache Beam
[RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference)
transform in a streaming pipeline with [Google Cloud
Pub-Sub](https://cloud.google.com/pubsub?utm_source=google&utm_medium=cpc&utm_campaign=na-US-all-en-dr-bkws-all-all-trial-b-dr-1605212&utm_content=text-ad-none-any-DEV_c-CRE_648329165516-ADGP_Desk%20%7C%20BKWS%20-%20BRO%20%7C%20Txt%20_%20Pub%2Fsub-KWID_43700075187144857-aud-664745643345%3Akwd-874320293016&utm_term=KW_pub%20sub%20google%20cloud-ST_pub%20sub%20google%20cloud&gclid=CjwKCAjw-IWkBhBTEiwA2exyO19xMFn6h1UKjb4QUavatV8Yb5Au9pCQj2_VAo0rzaYS8v2bq5VmuBoCL9wQAvD_BwE&gclsrc=aw.ds).\n",
+ "\n",
+ "This notebook demonstrates the following steps:\n",
+ "- Load and save a model from Hugging Face Models Hub.\n",
+ "- Use Pub/Sub IO in Python SDK as a streaming source.\n",
Review Comment:
```suggestion
"- Use Pub/Sub as a streaming source with the Apache Beam Python
SDK.\n",
```
##########
examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb:
##########
@@ -0,0 +1,292 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": []
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "IVkpU8HZ1eyz"
+ },
+ "execution_count": 74,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Apache Beam RunInference in a Streaming Pipeline\n",
+ "\n",
+ "<table align=\"left\">\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\"
/>Run in Google Colab</a>\n",
+ " </td>\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\"
/>View source on GitHub</a>\n",
+ " </td>\n",
+ "</table>\n"
+ ],
+ "metadata": {
+ "id": "kH8SORNim8on"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "This notebook shows how to use the Apache Beam
[RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference)
transform in a streaming pipeline with [Google Cloud
Pub-Sub](https://cloud.google.com/pubsub?utm_source=google&utm_medium=cpc&utm_campaign=na-US-all-en-dr-bkws-all-all-trial-b-dr-1605212&utm_content=text-ad-none-any-DEV_c-CRE_648329165516-ADGP_Desk%20%7C%20BKWS%20-%20BRO%20%7C%20Txt%20_%20Pub%2Fsub-KWID_43700075187144857-aud-664745643345%3Akwd-874320293016&utm_term=KW_pub%20sub%20google%20cloud-ST_pub%20sub%20google%20cloud&gclid=CjwKCAjw-IWkBhBTEiwA2exyO19xMFn6h1UKjb4QUavatV8Yb5Au9pCQj2_VAo0rzaYS8v2bq5VmuBoCL9wQAvD_BwE&gclsrc=aw.ds).\n",
+ "\n",
+ "This notebook demonstrates the following steps:\n",
+ "- Load and save a model from Hugging Face Models Hub.\n",
+ "- Use Pub/Sub IO in Python SDK as a streaming source.\n",
+ "- Use PyTorch model handler for RunInference.\n",
+ "\n",
+ "For more information about using RunInference, see [Get started with
AI/ML pipelines](https://beam.apache.org/documentation/ml/overview/) in the
Apache Beam documentation."
+ ],
+ "metadata": {
+ "id": "7N2XzwoA0k4L"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Before you begin\n",
+ "Set up your environment and download dependencies."
+ ],
+ "metadata": {
+ "id": "nhf_lOeEsO1C"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "wS9a3Y0oZ_l5"
+ },
+ "outputs": [],
+ "source": [
+ "!pip install apache_beam[gcp]==2.48.0\n",
+ "!pip install torch\n",
+ "!pip install transformers\n",
+ "!pip install tensorflow"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Authenticate with Google Cloud\n",
+ "This notebook relies on Google Cloud Pub/Sub as an input to the
pipeline as well for writing out the results. To use your Google Cloud account,
authenticate this notebook."
+ ],
+ "metadata": {
+ "id": "A0v-oGSfsTHh"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "from google.colab import auth\n",
+ "auth.authenticate_user()"
+ ],
+ "metadata": {
+ "id": "hNslPJGil2Zc"
+ },
+ "execution_count": 63,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Import dependencies and set up your bucket\n",
Review Comment:
Are we setting up a bucket here? Maybe the heading should be:
## Configure your Pub/Sub topic
##########
examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb:
##########
@@ -0,0 +1,292 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": []
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "IVkpU8HZ1eyz"
+ },
+ "execution_count": 74,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Apache Beam RunInference in a Streaming Pipeline\n",
+ "\n",
+ "<table align=\"left\">\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\"
/>Run in Google Colab</a>\n",
+ " </td>\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\"
/>View source on GitHub</a>\n",
+ " </td>\n",
+ "</table>\n"
+ ],
+ "metadata": {
+ "id": "kH8SORNim8on"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "This notebook shows how to use the Apache Beam
[RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference)
transform in a streaming pipeline with [Google Cloud
Pub-Sub](https://cloud.google.com/pubsub?utm_source=google&utm_medium=cpc&utm_campaign=na-US-all-en-dr-bkws-all-all-trial-b-dr-1605212&utm_content=text-ad-none-any-DEV_c-CRE_648329165516-ADGP_Desk%20%7C%20BKWS%20-%20BRO%20%7C%20Txt%20_%20Pub%2Fsub-KWID_43700075187144857-aud-664745643345%3Akwd-874320293016&utm_term=KW_pub%20sub%20google%20cloud-ST_pub%20sub%20google%20cloud&gclid=CjwKCAjw-IWkBhBTEiwA2exyO19xMFn6h1UKjb4QUavatV8Yb5Au9pCQj2_VAo0rzaYS8v2bq5VmuBoCL9wQAvD_BwE&gclsrc=aw.ds).\n",
+ "\n",
+ "This notebook demonstrates the following steps:\n",
+ "- Load and save a model from Hugging Face Models Hub.\n",
+ "- Use Pub/Sub IO in Python SDK as a streaming source.\n",
+ "- Use PyTorch model handler for RunInference.\n",
+ "\n",
+ "For more information about using RunInference, see [Get started with
AI/ML pipelines](https://beam.apache.org/documentation/ml/overview/) in the
Apache Beam documentation."
+ ],
+ "metadata": {
+ "id": "7N2XzwoA0k4L"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Before you begin\n",
+ "Set up your environment and download dependencies."
+ ],
+ "metadata": {
+ "id": "nhf_lOeEsO1C"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "wS9a3Y0oZ_l5"
+ },
+ "outputs": [],
+ "source": [
+ "!pip install apache_beam[gcp]==2.48.0\n",
+ "!pip install torch\n",
+ "!pip install transformers\n",
+ "!pip install tensorflow"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Authenticate with Google Cloud\n",
+ "This notebook relies on Google Cloud Pub/Sub as an input to the
pipeline as well for writing out the results. To use your Google Cloud account,
authenticate this notebook."
+ ],
+ "metadata": {
+ "id": "A0v-oGSfsTHh"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "from google.colab import auth\n",
+ "auth.authenticate_user()"
+ ],
+ "metadata": {
+ "id": "hNslPJGil2Zc"
+ },
+ "execution_count": 63,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Import dependencies and set up your bucket\n",
+ "Use the following code to import dependencies and to set up your
Google Cloud Storage bucket.\n",
+ "\n",
+ "Replace `MESSAGE_TOPIC` and `RESPONSE_TOPIC` with the Pub/Sub topics
in your project.\n",
+ "\n",
+ "**Important**: If an error occurs, restart your runtime."
+ ],
+ "metadata": {
+ "id": "I7vMsFGW16bZ"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "import os\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.options.pipeline_options import PipelineOptions\n",
+ "from apache_beam.ml.inference.base import PredictionResult\n",
+ "from apache_beam.ml.inference.base import RunInference\n",
+ "from apache_beam.ml.inference.pytorch_inference import
make_tensor_model_fn\n",
+ "from apache_beam.ml.inference.pytorch_inference import
PytorchModelHandlerTensor\n",
+ "import torch\n",
+ "from transformers import AutoConfig\n",
+ "from transformers import AutoModelForSeq2SeqLM\n",
+ "from transformers import AutoTokenizer\n",
+ "from transformers.tokenization_utils import PreTrainedTokenizer\n",
+ "\n",
+ "message_topic = \"MESSAGE_TOPIC\"\n",
+ "response_topic = \"RESPONSE_TOPIC\"\n",
+ "\n",
+ "MAX_RESPONSE_TOKENS = 256\n",
+ "\n",
+ "model_name = \"google/flan-t5-small\"\n",
+ "state_dict_path = \"saved_model\""
+ ],
+ "metadata": {
+ "id": "uhbOYUzvbOSc"
+ },
+ "execution_count": 75,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Download and save the model\n",
+ "We will use the AutoClasses from Hugging Face to instantly load the
model in memory and later save it to the path defined above."
+ ],
+ "metadata": {
+ "id": "yRls3LmxswrC"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "model = AutoModelForSeq2SeqLM.from_pretrained(\n",
+ " model_name, torch_dtype=torch.bfloat16\n",
+ " )\n",
+ "\n",
+ "directory = os.path.dirname(state_dict_path)\n",
+ "torch.save(model.state_dict(), state_dict_path)"
+ ],
+ "metadata": {
+ "id": "PKhkiQFJe44n"
+ },
+ "execution_count": 76,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Utitlity functions for before/after running RunInference"
Review Comment:
```suggestion
"## Install the utility functions"
```
Or if we're not installing the utility functions, what are we doing? Writing
them?
##########
examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb:
##########
@@ -0,0 +1,292 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": []
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "IVkpU8HZ1eyz"
+ },
+ "execution_count": 74,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Apache Beam RunInference in a Streaming Pipeline\n",
+ "\n",
+ "<table align=\"left\">\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\"
/>Run in Google Colab</a>\n",
+ " </td>\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\"
/>View source on GitHub</a>\n",
+ " </td>\n",
+ "</table>\n"
+ ],
+ "metadata": {
+ "id": "kH8SORNim8on"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "This notebook shows how to use the Apache Beam
[RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference)
transform in a streaming pipeline with [Google Cloud
Pub-Sub](https://cloud.google.com/pubsub?utm_source=google&utm_medium=cpc&utm_campaign=na-US-all-en-dr-bkws-all-all-trial-b-dr-1605212&utm_content=text-ad-none-any-DEV_c-CRE_648329165516-ADGP_Desk%20%7C%20BKWS%20-%20BRO%20%7C%20Txt%20_%20Pub%2Fsub-KWID_43700075187144857-aud-664745643345%3Akwd-874320293016&utm_term=KW_pub%20sub%20google%20cloud-ST_pub%20sub%20google%20cloud&gclid=CjwKCAjw-IWkBhBTEiwA2exyO19xMFn6h1UKjb4QUavatV8Yb5Au9pCQj2_VAo0rzaYS8v2bq5VmuBoCL9wQAvD_BwE&gclsrc=aw.ds).\n",
+ "\n",
+ "This notebook demonstrates the following steps:\n",
+ "- Load and save a model from Hugging Face Models Hub.\n",
+ "- Use Pub/Sub IO in Python SDK as a streaming source.\n",
+ "- Use PyTorch model handler for RunInference.\n",
+ "\n",
+ "For more information about using RunInference, see [Get started with
AI/ML pipelines](https://beam.apache.org/documentation/ml/overview/) in the
Apache Beam documentation."
+ ],
+ "metadata": {
+ "id": "7N2XzwoA0k4L"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Before you begin\n",
+ "Set up your environment and download dependencies."
+ ],
+ "metadata": {
+ "id": "nhf_lOeEsO1C"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "wS9a3Y0oZ_l5"
+ },
+ "outputs": [],
+ "source": [
+ "!pip install apache_beam[gcp]==2.48.0\n",
+ "!pip install torch\n",
+ "!pip install transformers\n",
+ "!pip install tensorflow"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Authenticate with Google Cloud\n",
+ "This notebook relies on Google Cloud Pub/Sub as an input to the
pipeline as well for writing out the results. To use your Google Cloud account,
authenticate this notebook."
+ ],
+ "metadata": {
+ "id": "A0v-oGSfsTHh"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "from google.colab import auth\n",
+ "auth.authenticate_user()"
+ ],
+ "metadata": {
+ "id": "hNslPJGil2Zc"
+ },
+ "execution_count": 63,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Import dependencies and set up your bucket\n",
+ "Use the following code to import dependencies and to set up your
Google Cloud Storage bucket.\n",
+ "\n",
+ "Replace `MESSAGE_TOPIC` and `RESPONSE_TOPIC` with the Pub/Sub topics
in your project.\n",
+ "\n",
+ "**Important**: If an error occurs, restart your runtime."
+ ],
+ "metadata": {
+ "id": "I7vMsFGW16bZ"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "import os\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.options.pipeline_options import PipelineOptions\n",
+ "from apache_beam.ml.inference.base import PredictionResult\n",
+ "from apache_beam.ml.inference.base import RunInference\n",
+ "from apache_beam.ml.inference.pytorch_inference import
make_tensor_model_fn\n",
+ "from apache_beam.ml.inference.pytorch_inference import
PytorchModelHandlerTensor\n",
+ "import torch\n",
+ "from transformers import AutoConfig\n",
+ "from transformers import AutoModelForSeq2SeqLM\n",
+ "from transformers import AutoTokenizer\n",
+ "from transformers.tokenization_utils import PreTrainedTokenizer\n",
+ "\n",
+ "message_topic = \"MESSAGE_TOPIC\"\n",
+ "response_topic = \"RESPONSE_TOPIC\"\n",
+ "\n",
+ "MAX_RESPONSE_TOKENS = 256\n",
+ "\n",
+ "model_name = \"google/flan-t5-small\"\n",
+ "state_dict_path = \"saved_model\""
+ ],
+ "metadata": {
+ "id": "uhbOYUzvbOSc"
+ },
+ "execution_count": 75,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Download and save the model\n",
+ "We will use the AutoClasses from Hugging Face to instantly load the
model in memory and later save it to the path defined above."
Review Comment:
```suggestion
"This notebook uses the [auto
classes](https://huggingface.co/docs/transformers/model_doc/auto) from Hugging
Face to instantly load the model in memory. Later, the model is saved to the
path defined previously."
```
##########
examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb:
##########
@@ -0,0 +1,292 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": []
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "IVkpU8HZ1eyz"
+ },
+ "execution_count": 74,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Apache Beam RunInference in a Streaming Pipeline\n",
+ "\n",
+ "<table align=\"left\">\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\"
/>Run in Google Colab</a>\n",
+ " </td>\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\"
/>View source on GitHub</a>\n",
+ " </td>\n",
+ "</table>\n"
+ ],
+ "metadata": {
+ "id": "kH8SORNim8on"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "This notebook shows how to use the Apache Beam
[RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference)
transform in a streaming pipeline with [Google Cloud
Pub-Sub](https://cloud.google.com/pubsub?utm_source=google&utm_medium=cpc&utm_campaign=na-US-all-en-dr-bkws-all-all-trial-b-dr-1605212&utm_content=text-ad-none-any-DEV_c-CRE_648329165516-ADGP_Desk%20%7C%20BKWS%20-%20BRO%20%7C%20Txt%20_%20Pub%2Fsub-KWID_43700075187144857-aud-664745643345%3Akwd-874320293016&utm_term=KW_pub%20sub%20google%20cloud-ST_pub%20sub%20google%20cloud&gclid=CjwKCAjw-IWkBhBTEiwA2exyO19xMFn6h1UKjb4QUavatV8Yb5Au9pCQj2_VAo0rzaYS8v2bq5VmuBoCL9wQAvD_BwE&gclsrc=aw.ds).\n",
+ "\n",
+ "This notebook demonstrates the following steps:\n",
+ "- Load and save a model from Hugging Face Models Hub.\n",
+ "- Use Pub/Sub IO in Python SDK as a streaming source.\n",
+ "- Use PyTorch model handler for RunInference.\n",
+ "\n",
+ "For more information about using RunInference, see [Get started with
AI/ML pipelines](https://beam.apache.org/documentation/ml/overview/) in the
Apache Beam documentation."
+ ],
+ "metadata": {
+ "id": "7N2XzwoA0k4L"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Before you begin\n",
+ "Set up your environment and download dependencies."
+ ],
+ "metadata": {
+ "id": "nhf_lOeEsO1C"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "wS9a3Y0oZ_l5"
+ },
+ "outputs": [],
+ "source": [
+ "!pip install apache_beam[gcp]==2.48.0\n",
+ "!pip install torch\n",
+ "!pip install transformers\n",
+ "!pip install tensorflow"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Authenticate with Google Cloud\n",
+ "This notebook relies on Google Cloud Pub/Sub as an input to the
pipeline as well for writing out the results. To use your Google Cloud account,
authenticate this notebook."
+ ],
+ "metadata": {
+ "id": "A0v-oGSfsTHh"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "from google.colab import auth\n",
+ "auth.authenticate_user()"
+ ],
+ "metadata": {
+ "id": "hNslPJGil2Zc"
+ },
+ "execution_count": 63,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Import dependencies and set up your bucket\n",
+ "Use the following code to import dependencies and to set up your
Google Cloud Storage bucket.\n",
+ "\n",
+ "Replace `MESSAGE_TOPIC` and `RESPONSE_TOPIC` with the Pub/Sub topics
in your project.\n",
+ "\n",
+ "**Important**: If an error occurs, restart your runtime."
+ ],
+ "metadata": {
+ "id": "I7vMsFGW16bZ"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "import os\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.options.pipeline_options import PipelineOptions\n",
+ "from apache_beam.ml.inference.base import PredictionResult\n",
+ "from apache_beam.ml.inference.base import RunInference\n",
+ "from apache_beam.ml.inference.pytorch_inference import
make_tensor_model_fn\n",
+ "from apache_beam.ml.inference.pytorch_inference import
PytorchModelHandlerTensor\n",
+ "import torch\n",
+ "from transformers import AutoConfig\n",
+ "from transformers import AutoModelForSeq2SeqLM\n",
+ "from transformers import AutoTokenizer\n",
+ "from transformers.tokenization_utils import PreTrainedTokenizer\n",
+ "\n",
+ "message_topic = \"MESSAGE_TOPIC\"\n",
+ "response_topic = \"RESPONSE_TOPIC\"\n",
+ "\n",
+ "MAX_RESPONSE_TOKENS = 256\n",
+ "\n",
+ "model_name = \"google/flan-t5-small\"\n",
+ "state_dict_path = \"saved_model\""
+ ],
+ "metadata": {
+ "id": "uhbOYUzvbOSc"
+ },
+ "execution_count": 75,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Download and save the model\n",
+ "We will use the AutoClasses from Hugging Face to instantly load the
model in memory and later save it to the path defined above."
+ ],
+ "metadata": {
+ "id": "yRls3LmxswrC"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "model = AutoModelForSeq2SeqLM.from_pretrained(\n",
+ " model_name, torch_dtype=torch.bfloat16\n",
+ " )\n",
+ "\n",
+ "directory = os.path.dirname(state_dict_path)\n",
+ "torch.save(model.state_dict(), state_dict_path)"
+ ],
+ "metadata": {
+ "id": "PKhkiQFJe44n"
+ },
+ "execution_count": 76,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Utitlity functions for before/after running RunInference"
+ ],
+ "metadata": {
+ "id": "7TSqb3l1s7F7"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "def to_tensors(input_text: str, tokenizer) -> torch.Tensor:\n",
+ " \"\"\"Encodes input text into token tensors.\n",
+ " Args:\n",
+ " input_text: Input text for the LLM model.\n",
+ " tokenizer: Tokenizer for the LLM model.\n",
+ " Returns: Tokenized input tokens.\n",
+ " \"\"\"\n",
+ " return tokenizer(input_text,
return_tensors=\"pt\").input_ids[0]\n",
+ "\n",
+ "\n",
+ "def get_response(result: PredictionResult, tokenizer) -> str:\n",
+ " \"\"\"Decodes output token tensors into text.\n",
+ " Args:\n",
+ " result: Prediction results from the RunInference
transform.\n",
+ " tokenizer: Tokenizer for the LLM model.\n",
+ " Returns: The model's response as text.\n",
+ " \"\"\"\n",
+ " output_tokens = result.inference\n",
+ " return tokenizer.decode(output_tokens, skip_special_tokens=True)"
+ ],
+ "metadata": {
+ "id": "OeTMbaLidnBe"
+ },
+ "execution_count": 78,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Run Inference Pipeline\n",
+ "Run the cell below and publish messages from the `message_topic`
using google cloud console.\n",
Review Comment:
```suggestion
"Run the following code, and publish messages from the
`MESSAGE_TOPIC` by using the Google Cloud console. For more information about
using the console to publish messages, see [Publish messages to
topics](https://cloud.google.com/pubsub/docs/publisher#console).\n",
```
##########
examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb:
##########
@@ -0,0 +1,292 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": []
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "IVkpU8HZ1eyz"
+ },
+ "execution_count": 74,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Apache Beam RunInference in a Streaming Pipeline\n",
+ "\n",
+ "<table align=\"left\">\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\"
/>Run in Google Colab</a>\n",
+ " </td>\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\"
/>View source on GitHub</a>\n",
+ " </td>\n",
+ "</table>\n"
+ ],
+ "metadata": {
+ "id": "kH8SORNim8on"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "This notebook shows how to use the Apache Beam
[RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference)
transform in a streaming pipeline with [Google Cloud
Pub-Sub](https://cloud.google.com/pubsub?utm_source=google&utm_medium=cpc&utm_campaign=na-US-all-en-dr-bkws-all-all-trial-b-dr-1605212&utm_content=text-ad-none-any-DEV_c-CRE_648329165516-ADGP_Desk%20%7C%20BKWS%20-%20BRO%20%7C%20Txt%20_%20Pub%2Fsub-KWID_43700075187144857-aud-664745643345%3Akwd-874320293016&utm_term=KW_pub%20sub%20google%20cloud-ST_pub%20sub%20google%20cloud&gclid=CjwKCAjw-IWkBhBTEiwA2exyO19xMFn6h1UKjb4QUavatV8Yb5Au9pCQj2_VAo0rzaYS8v2bq5VmuBoCL9wQAvD_BwE&gclsrc=aw.ds).\n",
+ "\n",
+ "This notebook demonstrates the following steps:\n",
+ "- Load and save a model from Hugging Face Models Hub.\n",
+ "- Use Pub/Sub IO in Python SDK as a streaming source.\n",
+ "- Use PyTorch model handler for RunInference.\n",
+ "\n",
+ "For more information about using RunInference, see [Get started with
AI/ML pipelines](https://beam.apache.org/documentation/ml/overview/) in the
Apache Beam documentation."
+ ],
+ "metadata": {
+ "id": "7N2XzwoA0k4L"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Before you begin\n",
+ "Set up your environment and download dependencies."
+ ],
+ "metadata": {
+ "id": "nhf_lOeEsO1C"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "wS9a3Y0oZ_l5"
+ },
+ "outputs": [],
+ "source": [
+ "!pip install apache_beam[gcp]==2.48.0\n",
+ "!pip install torch\n",
+ "!pip install transformers\n",
+ "!pip install tensorflow"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Authenticate with Google Cloud\n",
+ "This notebook relies on Google Cloud Pub/Sub as an input to the
pipeline as well for writing out the results. To use your Google Cloud account,
authenticate this notebook."
+ ],
+ "metadata": {
+ "id": "A0v-oGSfsTHh"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "from google.colab import auth\n",
+ "auth.authenticate_user()"
+ ],
+ "metadata": {
+ "id": "hNslPJGil2Zc"
+ },
+ "execution_count": 63,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Import dependencies and set up your bucket\n",
+ "Use the following code to import dependencies and to set up your
Google Cloud Storage bucket.\n",
+ "\n",
+ "Replace `MESSAGE_TOPIC` and `RESPONSE_TOPIC` with the Pub/Sub topics
in your project.\n",
+ "\n",
+ "**Important**: If an error occurs, restart your runtime."
+ ],
+ "metadata": {
+ "id": "I7vMsFGW16bZ"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "import os\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.options.pipeline_options import PipelineOptions\n",
+ "from apache_beam.ml.inference.base import PredictionResult\n",
+ "from apache_beam.ml.inference.base import RunInference\n",
+ "from apache_beam.ml.inference.pytorch_inference import
make_tensor_model_fn\n",
+ "from apache_beam.ml.inference.pytorch_inference import
PytorchModelHandlerTensor\n",
+ "import torch\n",
+ "from transformers import AutoConfig\n",
+ "from transformers import AutoModelForSeq2SeqLM\n",
+ "from transformers import AutoTokenizer\n",
+ "from transformers.tokenization_utils import PreTrainedTokenizer\n",
+ "\n",
+ "message_topic = \"MESSAGE_TOPIC\"\n",
+ "response_topic = \"RESPONSE_TOPIC\"\n",
+ "\n",
+ "MAX_RESPONSE_TOKENS = 256\n",
+ "\n",
+ "model_name = \"google/flan-t5-small\"\n",
+ "state_dict_path = \"saved_model\""
+ ],
+ "metadata": {
+ "id": "uhbOYUzvbOSc"
+ },
+ "execution_count": 75,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Download and save the model\n",
+ "We will use the AutoClasses from Hugging Face to instantly load the
model in memory and later save it to the path defined above."
+ ],
+ "metadata": {
+ "id": "yRls3LmxswrC"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "model = AutoModelForSeq2SeqLM.from_pretrained(\n",
+ " model_name, torch_dtype=torch.bfloat16\n",
+ " )\n",
+ "\n",
+ "directory = os.path.dirname(state_dict_path)\n",
+ "torch.save(model.state_dict(), state_dict_path)"
+ ],
+ "metadata": {
+ "id": "PKhkiQFJe44n"
+ },
+ "execution_count": 76,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Utitlity functions for before/after running RunInference"
+ ],
+ "metadata": {
+ "id": "7TSqb3l1s7F7"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "def to_tensors(input_text: str, tokenizer) -> torch.Tensor:\n",
+ " \"\"\"Encodes input text into token tensors.\n",
+ " Args:\n",
+ " input_text: Input text for the LLM model.\n",
+ " tokenizer: Tokenizer for the LLM model.\n",
+ " Returns: Tokenized input tokens.\n",
+ " \"\"\"\n",
+ " return tokenizer(input_text,
return_tensors=\"pt\").input_ids[0]\n",
+ "\n",
+ "\n",
+ "def get_response(result: PredictionResult, tokenizer) -> str:\n",
+ " \"\"\"Decodes output token tensors into text.\n",
+ " Args:\n",
+ " result: Prediction results from the RunInference
transform.\n",
+ " tokenizer: Tokenizer for the LLM model.\n",
+ " Returns: The model's response as text.\n",
+ " \"\"\"\n",
+ " output_tokens = result.inference\n",
+ " return tokenizer.decode(output_tokens, skip_special_tokens=True)"
+ ],
+ "metadata": {
+ "id": "OeTMbaLidnBe"
+ },
+ "execution_count": 78,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Run Inference Pipeline\n",
+ "Run the cell below and publish messages from the `message_topic`
using google cloud console.\n",
+ "\n",
+ "(Since it is a streming pipeline, the cell will keep on running until
it is manually stopped.)"
+ ],
+ "metadata": {
+ "id": "y7IU3cYKtA3e"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "tokenizer = AutoTokenizer.from_pretrained(model_name)\n",
+ "\n",
+ "# create an instance of pytorch model handler\n",
+ "model_handler = PytorchModelHandlerTensor(\n",
+ " state_dict_path=state_dict_path,\n",
+ " model_class=AutoModelForSeq2SeqLM.from_config,\n",
+ " model_params={\"config\":
AutoConfig.from_pretrained(model_name)},\n",
+ " inference_fn=make_tensor_model_fn(\"generate\"),\n",
+ " )\n",
+ "\n",
+ "# set up pipeline options to enable streaming\n",
Review Comment:
```suggestion
"# Set pipeline options to enable streaming.\n",
```
##########
examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb:
##########
@@ -0,0 +1,292 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": []
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "IVkpU8HZ1eyz"
+ },
+ "execution_count": 74,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Apache Beam RunInference in a Streaming Pipeline\n",
+ "\n",
+ "<table align=\"left\">\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\"
/>Run in Google Colab</a>\n",
+ " </td>\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\"
/>View source on GitHub</a>\n",
+ " </td>\n",
+ "</table>\n"
+ ],
+ "metadata": {
+ "id": "kH8SORNim8on"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "This notebook shows how to use the Apache Beam
[RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference)
transform in a streaming pipeline with [Google Cloud
Pub-Sub](https://cloud.google.com/pubsub?utm_source=google&utm_medium=cpc&utm_campaign=na-US-all-en-dr-bkws-all-all-trial-b-dr-1605212&utm_content=text-ad-none-any-DEV_c-CRE_648329165516-ADGP_Desk%20%7C%20BKWS%20-%20BRO%20%7C%20Txt%20_%20Pub%2Fsub-KWID_43700075187144857-aud-664745643345%3Akwd-874320293016&utm_term=KW_pub%20sub%20google%20cloud-ST_pub%20sub%20google%20cloud&gclid=CjwKCAjw-IWkBhBTEiwA2exyO19xMFn6h1UKjb4QUavatV8Yb5Au9pCQj2_VAo0rzaYS8v2bq5VmuBoCL9wQAvD_BwE&gclsrc=aw.ds).\n",
+ "\n",
+ "This notebook demonstrates the following steps:\n",
+ "- Load and save a model from Hugging Face Models Hub.\n",
+ "- Use Pub/Sub IO in Python SDK as a streaming source.\n",
+ "- Use PyTorch model handler for RunInference.\n",
+ "\n",
+ "For more information about using RunInference, see [Get started with
AI/ML pipelines](https://beam.apache.org/documentation/ml/overview/) in the
Apache Beam documentation."
+ ],
+ "metadata": {
+ "id": "7N2XzwoA0k4L"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Before you begin\n",
+ "Set up your environment and download dependencies."
+ ],
+ "metadata": {
+ "id": "nhf_lOeEsO1C"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "wS9a3Y0oZ_l5"
+ },
+ "outputs": [],
+ "source": [
+ "!pip install apache_beam[gcp]==2.48.0\n",
+ "!pip install torch\n",
+ "!pip install transformers\n",
+ "!pip install tensorflow"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Authenticate with Google Cloud\n",
+ "This notebook relies on Google Cloud Pub/Sub as an input to the
pipeline as well for writing out the results. To use your Google Cloud account,
authenticate this notebook."
+ ],
+ "metadata": {
+ "id": "A0v-oGSfsTHh"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "from google.colab import auth\n",
+ "auth.authenticate_user()"
+ ],
+ "metadata": {
+ "id": "hNslPJGil2Zc"
+ },
+ "execution_count": 63,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Import dependencies and set up your bucket\n",
+ "Use the following code to import dependencies and to set up your
Google Cloud Storage bucket.\n",
+ "\n",
+ "Replace `MESSAGE_TOPIC` and `RESPONSE_TOPIC` with the Pub/Sub topics
in your project.\n",
+ "\n",
+ "**Important**: If an error occurs, restart your runtime."
+ ],
+ "metadata": {
+ "id": "I7vMsFGW16bZ"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "import os\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.options.pipeline_options import PipelineOptions\n",
+ "from apache_beam.ml.inference.base import PredictionResult\n",
+ "from apache_beam.ml.inference.base import RunInference\n",
+ "from apache_beam.ml.inference.pytorch_inference import
make_tensor_model_fn\n",
+ "from apache_beam.ml.inference.pytorch_inference import
PytorchModelHandlerTensor\n",
+ "import torch\n",
+ "from transformers import AutoConfig\n",
+ "from transformers import AutoModelForSeq2SeqLM\n",
+ "from transformers import AutoTokenizer\n",
+ "from transformers.tokenization_utils import PreTrainedTokenizer\n",
+ "\n",
+ "message_topic = \"MESSAGE_TOPIC\"\n",
+ "response_topic = \"RESPONSE_TOPIC\"\n",
+ "\n",
+ "MAX_RESPONSE_TOKENS = 256\n",
+ "\n",
+ "model_name = \"google/flan-t5-small\"\n",
+ "state_dict_path = \"saved_model\""
+ ],
+ "metadata": {
+ "id": "uhbOYUzvbOSc"
+ },
+ "execution_count": 75,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Download and save the model\n",
+ "We will use the AutoClasses from Hugging Face to instantly load the
model in memory and later save it to the path defined above."
+ ],
+ "metadata": {
+ "id": "yRls3LmxswrC"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "model = AutoModelForSeq2SeqLM.from_pretrained(\n",
+ " model_name, torch_dtype=torch.bfloat16\n",
+ " )\n",
+ "\n",
+ "directory = os.path.dirname(state_dict_path)\n",
+ "torch.save(model.state_dict(), state_dict_path)"
+ ],
+ "metadata": {
+ "id": "PKhkiQFJe44n"
+ },
+ "execution_count": 76,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Utitlity functions for before/after running RunInference"
+ ],
+ "metadata": {
+ "id": "7TSqb3l1s7F7"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "def to_tensors(input_text: str, tokenizer) -> torch.Tensor:\n",
+ " \"\"\"Encodes input text into token tensors.\n",
+ " Args:\n",
+ " input_text: Input text for the LLM model.\n",
+ " tokenizer: Tokenizer for the LLM model.\n",
+ " Returns: Tokenized input tokens.\n",
+ " \"\"\"\n",
+ " return tokenizer(input_text,
return_tensors=\"pt\").input_ids[0]\n",
+ "\n",
+ "\n",
+ "def get_response(result: PredictionResult, tokenizer) -> str:\n",
+ " \"\"\"Decodes output token tensors into text.\n",
+ " Args:\n",
+ " result: Prediction results from the RunInference
transform.\n",
+ " tokenizer: Tokenizer for the LLM model.\n",
+ " Returns: The model's response as text.\n",
+ " \"\"\"\n",
+ " output_tokens = result.inference\n",
+ " return tokenizer.decode(output_tokens, skip_special_tokens=True)"
+ ],
+ "metadata": {
+ "id": "OeTMbaLidnBe"
+ },
+ "execution_count": 78,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Run Inference Pipeline\n",
+ "Run the cell below and publish messages from the `message_topic`
using google cloud console.\n",
+ "\n",
+ "(Since it is a streming pipeline, the cell will keep on running until
it is manually stopped.)"
+ ],
+ "metadata": {
+ "id": "y7IU3cYKtA3e"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "tokenizer = AutoTokenizer.from_pretrained(model_name)\n",
+ "\n",
+ "# create an instance of pytorch model handler\n",
Review Comment:
```suggestion
"# Create an instance of the PyTorch model handler.\n",
```
##########
examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb:
##########
@@ -0,0 +1,292 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": []
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "IVkpU8HZ1eyz"
+ },
+ "execution_count": 74,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Apache Beam RunInference in a Streaming Pipeline\n",
+ "\n",
+ "<table align=\"left\">\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\"
/>Run in Google Colab</a>\n",
+ " </td>\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\"
/>View source on GitHub</a>\n",
+ " </td>\n",
+ "</table>\n"
+ ],
+ "metadata": {
+ "id": "kH8SORNim8on"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "This notebook shows how to use the Apache Beam
[RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference)
transform in a streaming pipeline with [Google Cloud
Pub-Sub](https://cloud.google.com/pubsub?utm_source=google&utm_medium=cpc&utm_campaign=na-US-all-en-dr-bkws-all-all-trial-b-dr-1605212&utm_content=text-ad-none-any-DEV_c-CRE_648329165516-ADGP_Desk%20%7C%20BKWS%20-%20BRO%20%7C%20Txt%20_%20Pub%2Fsub-KWID_43700075187144857-aud-664745643345%3Akwd-874320293016&utm_term=KW_pub%20sub%20google%20cloud-ST_pub%20sub%20google%20cloud&gclid=CjwKCAjw-IWkBhBTEiwA2exyO19xMFn6h1UKjb4QUavatV8Yb5Au9pCQj2_VAo0rzaYS8v2bq5VmuBoCL9wQAvD_BwE&gclsrc=aw.ds).\n",
+ "\n",
+ "This notebook demonstrates the following steps:\n",
+ "- Load and save a model from Hugging Face Models Hub.\n",
+ "- Use Pub/Sub IO in Python SDK as a streaming source.\n",
+ "- Use PyTorch model handler for RunInference.\n",
+ "\n",
+ "For more information about using RunInference, see [Get started with
AI/ML pipelines](https://beam.apache.org/documentation/ml/overview/) in the
Apache Beam documentation."
+ ],
+ "metadata": {
+ "id": "7N2XzwoA0k4L"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Before you begin\n",
+ "Set up your environment and download dependencies."
+ ],
+ "metadata": {
+ "id": "nhf_lOeEsO1C"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "wS9a3Y0oZ_l5"
+ },
+ "outputs": [],
+ "source": [
+ "!pip install apache_beam[gcp]==2.48.0\n",
+ "!pip install torch\n",
+ "!pip install transformers\n",
+ "!pip install tensorflow"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Authenticate with Google Cloud\n",
+ "This notebook relies on Google Cloud Pub/Sub as an input to the
pipeline as well for writing out the results. To use your Google Cloud account,
authenticate this notebook."
Review Comment:
```suggestion
"This notebook uses Google Cloud Pub/Sub as an input to the pipeline
and for writing out the pipeline results. To use your Google Cloud account,
authenticate this notebook."
```
##########
examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb:
##########
@@ -0,0 +1,292 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": []
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "IVkpU8HZ1eyz"
+ },
+ "execution_count": 74,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Apache Beam RunInference in a Streaming Pipeline\n",
+ "\n",
+ "<table align=\"left\">\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\"
/>Run in Google Colab</a>\n",
+ " </td>\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\"
/>View source on GitHub</a>\n",
+ " </td>\n",
+ "</table>\n"
+ ],
+ "metadata": {
+ "id": "kH8SORNim8on"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "This notebook shows how to use the Apache Beam
[RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference)
transform in a streaming pipeline with [Google Cloud
Pub-Sub](https://cloud.google.com/pubsub?utm_source=google&utm_medium=cpc&utm_campaign=na-US-all-en-dr-bkws-all-all-trial-b-dr-1605212&utm_content=text-ad-none-any-DEV_c-CRE_648329165516-ADGP_Desk%20%7C%20BKWS%20-%20BRO%20%7C%20Txt%20_%20Pub%2Fsub-KWID_43700075187144857-aud-664745643345%3Akwd-874320293016&utm_term=KW_pub%20sub%20google%20cloud-ST_pub%20sub%20google%20cloud&gclid=CjwKCAjw-IWkBhBTEiwA2exyO19xMFn6h1UKjb4QUavatV8Yb5Au9pCQj2_VAo0rzaYS8v2bq5VmuBoCL9wQAvD_BwE&gclsrc=aw.ds).\n",
+ "\n",
+ "This notebook demonstrates the following steps:\n",
+ "- Load and save a model from Hugging Face Models Hub.\n",
Review Comment:
```suggestion
"- Load and save a model from the Hugging Face Model Hub.\n",
```
##########
examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb:
##########
@@ -0,0 +1,292 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": []
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "IVkpU8HZ1eyz"
+ },
+ "execution_count": 74,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Apache Beam RunInference in a Streaming Pipeline\n",
+ "\n",
+ "<table align=\"left\">\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\"
/>Run in Google Colab</a>\n",
+ " </td>\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\"
/>View source on GitHub</a>\n",
+ " </td>\n",
+ "</table>\n"
+ ],
+ "metadata": {
+ "id": "kH8SORNim8on"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "This notebook shows how to use the Apache Beam
[RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference)
transform in a streaming pipeline with [Google Cloud
Pub-Sub](https://cloud.google.com/pubsub?utm_source=google&utm_medium=cpc&utm_campaign=na-US-all-en-dr-bkws-all-all-trial-b-dr-1605212&utm_content=text-ad-none-any-DEV_c-CRE_648329165516-ADGP_Desk%20%7C%20BKWS%20-%20BRO%20%7C%20Txt%20_%20Pub%2Fsub-KWID_43700075187144857-aud-664745643345%3Akwd-874320293016&utm_term=KW_pub%20sub%20google%20cloud-ST_pub%20sub%20google%20cloud&gclid=CjwKCAjw-IWkBhBTEiwA2exyO19xMFn6h1UKjb4QUavatV8Yb5Au9pCQj2_VAo0rzaYS8v2bq5VmuBoCL9wQAvD_BwE&gclsrc=aw.ds).\n",
+ "\n",
+ "This notebook demonstrates the following steps:\n",
+ "- Load and save a model from Hugging Face Models Hub.\n",
+ "- Use Pub/Sub IO in Python SDK as a streaming source.\n",
+ "- Use PyTorch model handler for RunInference.\n",
+ "\n",
+ "For more information about using RunInference, see [Get started with
AI/ML pipelines](https://beam.apache.org/documentation/ml/overview/) in the
Apache Beam documentation."
+ ],
+ "metadata": {
+ "id": "7N2XzwoA0k4L"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Before you begin\n",
+ "Set up your environment and download dependencies."
+ ],
+ "metadata": {
+ "id": "nhf_lOeEsO1C"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "wS9a3Y0oZ_l5"
+ },
+ "outputs": [],
+ "source": [
Review Comment:
Add a section header before the code block. Something like:
## Install the Apache Beam SDK and dependencies
Use the following code to install the Apache Beam Python SDK, PyTorch, and
TensorFlow.
##########
examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb:
##########
@@ -0,0 +1,292 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": []
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "IVkpU8HZ1eyz"
+ },
+ "execution_count": 74,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Apache Beam RunInference in a Streaming Pipeline\n",
+ "\n",
+ "<table align=\"left\">\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\"
/>Run in Google Colab</a>\n",
+ " </td>\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\"
/>View source on GitHub</a>\n",
+ " </td>\n",
+ "</table>\n"
+ ],
+ "metadata": {
+ "id": "kH8SORNim8on"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "This notebook shows how to use the Apache Beam
[RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference)
transform in a streaming pipeline with [Google Cloud
Pub-Sub](https://cloud.google.com/pubsub?utm_source=google&utm_medium=cpc&utm_campaign=na-US-all-en-dr-bkws-all-all-trial-b-dr-1605212&utm_content=text-ad-none-any-DEV_c-CRE_648329165516-ADGP_Desk%20%7C%20BKWS%20-%20BRO%20%7C%20Txt%20_%20Pub%2Fsub-KWID_43700075187144857-aud-664745643345%3Akwd-874320293016&utm_term=KW_pub%20sub%20google%20cloud-ST_pub%20sub%20google%20cloud&gclid=CjwKCAjw-IWkBhBTEiwA2exyO19xMFn6h1UKjb4QUavatV8Yb5Au9pCQj2_VAo0rzaYS8v2bq5VmuBoCL9wQAvD_BwE&gclsrc=aw.ds).\n",
+ "\n",
+ "This notebook demonstrates the following steps:\n",
+ "- Load and save a model from Hugging Face Models Hub.\n",
+ "- Use Pub/Sub IO in Python SDK as a streaming source.\n",
+ "- Use PyTorch model handler for RunInference.\n",
+ "\n",
+ "For more information about using RunInference, see [Get started with
AI/ML pipelines](https://beam.apache.org/documentation/ml/overview/) in the
Apache Beam documentation."
+ ],
+ "metadata": {
+ "id": "7N2XzwoA0k4L"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Before you begin\n",
+ "Set up your environment and download dependencies."
+ ],
+ "metadata": {
+ "id": "nhf_lOeEsO1C"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "wS9a3Y0oZ_l5"
+ },
+ "outputs": [],
+ "source": [
+ "!pip install apache_beam[gcp]==2.48.0\n",
+ "!pip install torch\n",
+ "!pip install transformers\n",
+ "!pip install tensorflow"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Authenticate with Google Cloud\n",
+ "This notebook relies on Google Cloud Pub/Sub as an input to the
pipeline as well for writing out the results. To use your Google Cloud account,
authenticate this notebook."
+ ],
+ "metadata": {
+ "id": "A0v-oGSfsTHh"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "from google.colab import auth\n",
+ "auth.authenticate_user()"
+ ],
+ "metadata": {
+ "id": "hNslPJGil2Zc"
+ },
+ "execution_count": 63,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Import dependencies and set up your bucket\n",
+ "Use the following code to import dependencies and to set up your
Google Cloud Storage bucket.\n",
Review Comment:
I don't think this line is properly describing what the code that follows
does.
##########
examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb:
##########
@@ -0,0 +1,292 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": []
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "IVkpU8HZ1eyz"
+ },
+ "execution_count": 74,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Apache Beam RunInference in a Streaming Pipeline\n",
+ "\n",
+ "<table align=\"left\">\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\"
/>Run in Google Colab</a>\n",
+ " </td>\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\"
/>View source on GitHub</a>\n",
+ " </td>\n",
+ "</table>\n"
+ ],
+ "metadata": {
+ "id": "kH8SORNim8on"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "This notebook shows how to use the Apache Beam
[RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference)
transform in a streaming pipeline with [Google Cloud
Pub-Sub](https://cloud.google.com/pubsub?utm_source=google&utm_medium=cpc&utm_campaign=na-US-all-en-dr-bkws-all-all-trial-b-dr-1605212&utm_content=text-ad-none-any-DEV_c-CRE_648329165516-ADGP_Desk%20%7C%20BKWS%20-%20BRO%20%7C%20Txt%20_%20Pub%2Fsub-KWID_43700075187144857-aud-664745643345%3Akwd-874320293016&utm_term=KW_pub%20sub%20google%20cloud-ST_pub%20sub%20google%20cloud&gclid=CjwKCAjw-IWkBhBTEiwA2exyO19xMFn6h1UKjb4QUavatV8Yb5Au9pCQj2_VAo0rzaYS8v2bq5VmuBoCL9wQAvD_BwE&gclsrc=aw.ds).\n",
+ "\n",
+ "This notebook demonstrates the following steps:\n",
+ "- Load and save a model from Hugging Face Models Hub.\n",
+ "- Use Pub/Sub IO in Python SDK as a streaming source.\n",
+ "- Use PyTorch model handler for RunInference.\n",
Review Comment:
```suggestion
"- Use the PyTorch model handler for RunInference.\n",
```
##########
examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb:
##########
@@ -0,0 +1,292 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": []
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "IVkpU8HZ1eyz"
+ },
+ "execution_count": 74,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Apache Beam RunInference in a Streaming Pipeline\n",
+ "\n",
+ "<table align=\"left\">\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\"
/>Run in Google Colab</a>\n",
+ " </td>\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\"
/>View source on GitHub</a>\n",
+ " </td>\n",
+ "</table>\n"
+ ],
+ "metadata": {
+ "id": "kH8SORNim8on"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "This notebook shows how to use the Apache Beam
[RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference)
transform in a streaming pipeline with [Google Cloud
Pub-Sub](https://cloud.google.com/pubsub?utm_source=google&utm_medium=cpc&utm_campaign=na-US-all-en-dr-bkws-all-all-trial-b-dr-1605212&utm_content=text-ad-none-any-DEV_c-CRE_648329165516-ADGP_Desk%20%7C%20BKWS%20-%20BRO%20%7C%20Txt%20_%20Pub%2Fsub-KWID_43700075187144857-aud-664745643345%3Akwd-874320293016&utm_term=KW_pub%20sub%20google%20cloud-ST_pub%20sub%20google%20cloud&gclid=CjwKCAjw-IWkBhBTEiwA2exyO19xMFn6h1UKjb4QUavatV8Yb5Au9pCQj2_VAo0rzaYS8v2bq5VmuBoCL9wQAvD_BwE&gclsrc=aw.ds).\n",
+ "\n",
+ "This notebook demonstrates the following steps:\n",
+ "- Load and save a model from Hugging Face Models Hub.\n",
+ "- Use Pub/Sub IO in Python SDK as a streaming source.\n",
+ "- Use PyTorch model handler for RunInference.\n",
+ "\n",
+ "For more information about using RunInference, see [Get started with
AI/ML pipelines](https://beam.apache.org/documentation/ml/overview/) in the
Apache Beam documentation."
+ ],
+ "metadata": {
+ "id": "7N2XzwoA0k4L"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Before you begin\n",
+ "Set up your environment and download dependencies."
+ ],
+ "metadata": {
+ "id": "nhf_lOeEsO1C"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "wS9a3Y0oZ_l5"
+ },
+ "outputs": [],
+ "source": [
+ "!pip install apache_beam[gcp]==2.48.0\n",
+ "!pip install torch\n",
+ "!pip install transformers\n",
+ "!pip install tensorflow"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Authenticate with Google Cloud\n",
+ "This notebook relies on Google Cloud Pub/Sub as an input to the
pipeline as well for writing out the results. To use your Google Cloud account,
authenticate this notebook."
+ ],
+ "metadata": {
+ "id": "A0v-oGSfsTHh"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "from google.colab import auth\n",
+ "auth.authenticate_user()"
+ ],
+ "metadata": {
+ "id": "hNslPJGil2Zc"
+ },
+ "execution_count": 63,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Import dependencies and set up your bucket\n",
+ "Use the following code to import dependencies and to set up your
Google Cloud Storage bucket.\n",
+ "\n",
+ "Replace `MESSAGE_TOPIC` and `RESPONSE_TOPIC` with the Pub/Sub topics
in your project.\n",
+ "\n",
+ "**Important**: If an error occurs, restart your runtime."
+ ],
+ "metadata": {
+ "id": "I7vMsFGW16bZ"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "import os\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.options.pipeline_options import PipelineOptions\n",
+ "from apache_beam.ml.inference.base import PredictionResult\n",
+ "from apache_beam.ml.inference.base import RunInference\n",
+ "from apache_beam.ml.inference.pytorch_inference import
make_tensor_model_fn\n",
+ "from apache_beam.ml.inference.pytorch_inference import
PytorchModelHandlerTensor\n",
+ "import torch\n",
+ "from transformers import AutoConfig\n",
+ "from transformers import AutoModelForSeq2SeqLM\n",
+ "from transformers import AutoTokenizer\n",
+ "from transformers.tokenization_utils import PreTrainedTokenizer\n",
+ "\n",
+ "message_topic = \"MESSAGE_TOPIC\"\n",
+ "response_topic = \"RESPONSE_TOPIC\"\n",
+ "\n",
+ "MAX_RESPONSE_TOKENS = 256\n",
+ "\n",
+ "model_name = \"google/flan-t5-small\"\n",
+ "state_dict_path = \"saved_model\""
+ ],
+ "metadata": {
+ "id": "uhbOYUzvbOSc"
+ },
+ "execution_count": 75,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Download and save the model\n",
+ "We will use the AutoClasses from Hugging Face to instantly load the
model in memory and later save it to the path defined above."
+ ],
+ "metadata": {
+ "id": "yRls3LmxswrC"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "model = AutoModelForSeq2SeqLM.from_pretrained(\n",
+ " model_name, torch_dtype=torch.bfloat16\n",
+ " )\n",
+ "\n",
+ "directory = os.path.dirname(state_dict_path)\n",
+ "torch.save(model.state_dict(), state_dict_path)"
+ ],
+ "metadata": {
+ "id": "PKhkiQFJe44n"
+ },
+ "execution_count": 76,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Utitlity functions for before/after running RunInference"
+ ],
Review Comment:
Add a line explaining what the utility functions are for. Something like:
These utility functions are used before and after the RunInference transform
to....
##########
examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb:
##########
@@ -0,0 +1,292 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": []
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "IVkpU8HZ1eyz"
+ },
+ "execution_count": 74,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Apache Beam RunInference in a Streaming Pipeline\n",
+ "\n",
+ "<table align=\"left\">\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\"
/>Run in Google Colab</a>\n",
+ " </td>\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\"
/>View source on GitHub</a>\n",
+ " </td>\n",
+ "</table>\n"
+ ],
+ "metadata": {
+ "id": "kH8SORNim8on"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "This notebook shows how to use the Apache Beam
[RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference)
transform in a streaming pipeline with [Google Cloud
Pub-Sub](https://cloud.google.com/pubsub?utm_source=google&utm_medium=cpc&utm_campaign=na-US-all-en-dr-bkws-all-all-trial-b-dr-1605212&utm_content=text-ad-none-any-DEV_c-CRE_648329165516-ADGP_Desk%20%7C%20BKWS%20-%20BRO%20%7C%20Txt%20_%20Pub%2Fsub-KWID_43700075187144857-aud-664745643345%3Akwd-874320293016&utm_term=KW_pub%20sub%20google%20cloud-ST_pub%20sub%20google%20cloud&gclid=CjwKCAjw-IWkBhBTEiwA2exyO19xMFn6h1UKjb4QUavatV8Yb5Au9pCQj2_VAo0rzaYS8v2bq5VmuBoCL9wQAvD_BwE&gclsrc=aw.ds).\n",
+ "\n",
+ "This notebook demonstrates the following steps:\n",
+ "- Load and save a model from Hugging Face Models Hub.\n",
+ "- Use Pub/Sub IO in Python SDK as a streaming source.\n",
+ "- Use PyTorch model handler for RunInference.\n",
+ "\n",
+ "For more information about using RunInference, see [Get started with
AI/ML pipelines](https://beam.apache.org/documentation/ml/overview/) in the
Apache Beam documentation."
+ ],
+ "metadata": {
+ "id": "7N2XzwoA0k4L"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Before you begin\n",
+ "Set up your environment and download dependencies."
+ ],
+ "metadata": {
+ "id": "nhf_lOeEsO1C"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "wS9a3Y0oZ_l5"
+ },
+ "outputs": [],
+ "source": [
+ "!pip install apache_beam[gcp]==2.48.0\n",
+ "!pip install torch\n",
+ "!pip install transformers\n",
+ "!pip install tensorflow"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Authenticate with Google Cloud\n",
+ "This notebook relies on Google Cloud Pub/Sub as an input to the
pipeline as well for writing out the results. To use your Google Cloud account,
authenticate this notebook."
+ ],
+ "metadata": {
+ "id": "A0v-oGSfsTHh"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "from google.colab import auth\n",
+ "auth.authenticate_user()"
+ ],
+ "metadata": {
+ "id": "hNslPJGil2Zc"
+ },
+ "execution_count": 63,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Import dependencies and set up your bucket\n",
+ "Use the following code to import dependencies and to set up your
Google Cloud Storage bucket.\n",
+ "\n",
+ "Replace `MESSAGE_TOPIC` and `RESPONSE_TOPIC` with the Pub/Sub topics
in your project.\n",
+ "\n",
+ "**Important**: If an error occurs, restart your runtime."
+ ],
+ "metadata": {
+ "id": "I7vMsFGW16bZ"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "import os\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.options.pipeline_options import PipelineOptions\n",
+ "from apache_beam.ml.inference.base import PredictionResult\n",
+ "from apache_beam.ml.inference.base import RunInference\n",
+ "from apache_beam.ml.inference.pytorch_inference import
make_tensor_model_fn\n",
+ "from apache_beam.ml.inference.pytorch_inference import
PytorchModelHandlerTensor\n",
+ "import torch\n",
+ "from transformers import AutoConfig\n",
+ "from transformers import AutoModelForSeq2SeqLM\n",
+ "from transformers import AutoTokenizer\n",
+ "from transformers.tokenization_utils import PreTrainedTokenizer\n",
+ "\n",
+ "message_topic = \"MESSAGE_TOPIC\"\n",
+ "response_topic = \"RESPONSE_TOPIC\"\n",
+ "\n",
+ "MAX_RESPONSE_TOKENS = 256\n",
+ "\n",
+ "model_name = \"google/flan-t5-small\"\n",
+ "state_dict_path = \"saved_model\""
+ ],
+ "metadata": {
+ "id": "uhbOYUzvbOSc"
+ },
+ "execution_count": 75,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Download and save the model\n",
+ "We will use the AutoClasses from Hugging Face to instantly load the
model in memory and later save it to the path defined above."
+ ],
+ "metadata": {
+ "id": "yRls3LmxswrC"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "model = AutoModelForSeq2SeqLM.from_pretrained(\n",
+ " model_name, torch_dtype=torch.bfloat16\n",
+ " )\n",
+ "\n",
+ "directory = os.path.dirname(state_dict_path)\n",
+ "torch.save(model.state_dict(), state_dict_path)"
+ ],
+ "metadata": {
+ "id": "PKhkiQFJe44n"
+ },
+ "execution_count": 76,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Utitlity functions for before/after running RunInference"
+ ],
+ "metadata": {
+ "id": "7TSqb3l1s7F7"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "def to_tensors(input_text: str, tokenizer) -> torch.Tensor:\n",
+ " \"\"\"Encodes input text into token tensors.\n",
+ " Args:\n",
+ " input_text: Input text for the LLM model.\n",
+ " tokenizer: Tokenizer for the LLM model.\n",
+ " Returns: Tokenized input tokens.\n",
+ " \"\"\"\n",
+ " return tokenizer(input_text,
return_tensors=\"pt\").input_ids[0]\n",
+ "\n",
+ "\n",
+ "def get_response(result: PredictionResult, tokenizer) -> str:\n",
+ " \"\"\"Decodes output token tensors into text.\n",
+ " Args:\n",
+ " result: Prediction results from the RunInference
transform.\n",
+ " tokenizer: Tokenizer for the LLM model.\n",
+ " Returns: The model's response as text.\n",
+ " \"\"\"\n",
+ " output_tokens = result.inference\n",
+ " return tokenizer.decode(output_tokens, skip_special_tokens=True)"
+ ],
+ "metadata": {
+ "id": "OeTMbaLidnBe"
+ },
+ "execution_count": 78,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Run Inference Pipeline\n",
Review Comment:
```suggestion
"## Run the pipeline\n",
```
##########
examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb:
##########
@@ -0,0 +1,292 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": []
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "IVkpU8HZ1eyz"
+ },
+ "execution_count": 74,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Apache Beam RunInference in a Streaming Pipeline\n",
+ "\n",
+ "<table align=\"left\">\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\"
/>Run in Google Colab</a>\n",
+ " </td>\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\"
/>View source on GitHub</a>\n",
+ " </td>\n",
+ "</table>\n"
+ ],
+ "metadata": {
+ "id": "kH8SORNim8on"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "This notebook shows how to use the Apache Beam
[RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference)
transform in a streaming pipeline with [Google Cloud
Pub-Sub](https://cloud.google.com/pubsub?utm_source=google&utm_medium=cpc&utm_campaign=na-US-all-en-dr-bkws-all-all-trial-b-dr-1605212&utm_content=text-ad-none-any-DEV_c-CRE_648329165516-ADGP_Desk%20%7C%20BKWS%20-%20BRO%20%7C%20Txt%20_%20Pub%2Fsub-KWID_43700075187144857-aud-664745643345%3Akwd-874320293016&utm_term=KW_pub%20sub%20google%20cloud-ST_pub%20sub%20google%20cloud&gclid=CjwKCAjw-IWkBhBTEiwA2exyO19xMFn6h1UKjb4QUavatV8Yb5Au9pCQj2_VAo0rzaYS8v2bq5VmuBoCL9wQAvD_BwE&gclsrc=aw.ds).\n",
+ "\n",
+ "This notebook demonstrates the following steps:\n",
+ "- Load and save a model from Hugging Face Models Hub.\n",
+ "- Use Pub/Sub IO in Python SDK as a streaming source.\n",
+ "- Use PyTorch model handler for RunInference.\n",
+ "\n",
+ "For more information about using RunInference, see [Get started with
AI/ML pipelines](https://beam.apache.org/documentation/ml/overview/) in the
Apache Beam documentation."
+ ],
+ "metadata": {
+ "id": "7N2XzwoA0k4L"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Before you begin\n",
+ "Set up your environment and download dependencies."
+ ],
+ "metadata": {
+ "id": "nhf_lOeEsO1C"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "wS9a3Y0oZ_l5"
+ },
+ "outputs": [],
+ "source": [
+ "!pip install apache_beam[gcp]==2.48.0\n",
+ "!pip install torch\n",
+ "!pip install transformers\n",
+ "!pip install tensorflow"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Authenticate with Google Cloud\n",
+ "This notebook relies on Google Cloud Pub/Sub as an input to the
pipeline as well for writing out the results. To use your Google Cloud account,
authenticate this notebook."
+ ],
+ "metadata": {
+ "id": "A0v-oGSfsTHh"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "from google.colab import auth\n",
+ "auth.authenticate_user()"
+ ],
+ "metadata": {
+ "id": "hNslPJGil2Zc"
+ },
+ "execution_count": 63,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Import dependencies and set up your bucket\n",
+ "Use the following code to import dependencies and to set up your
Google Cloud Storage bucket.\n",
+ "\n",
+ "Replace `MESSAGE_TOPIC` and `RESPONSE_TOPIC` with the Pub/Sub topics
in your project.\n",
+ "\n",
+ "**Important**: If an error occurs, restart your runtime."
+ ],
+ "metadata": {
+ "id": "I7vMsFGW16bZ"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "import os\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.options.pipeline_options import PipelineOptions\n",
+ "from apache_beam.ml.inference.base import PredictionResult\n",
+ "from apache_beam.ml.inference.base import RunInference\n",
+ "from apache_beam.ml.inference.pytorch_inference import
make_tensor_model_fn\n",
+ "from apache_beam.ml.inference.pytorch_inference import
PytorchModelHandlerTensor\n",
+ "import torch\n",
+ "from transformers import AutoConfig\n",
+ "from transformers import AutoModelForSeq2SeqLM\n",
+ "from transformers import AutoTokenizer\n",
+ "from transformers.tokenization_utils import PreTrainedTokenizer\n",
+ "\n",
+ "message_topic = \"MESSAGE_TOPIC\"\n",
+ "response_topic = \"RESPONSE_TOPIC\"\n",
+ "\n",
+ "MAX_RESPONSE_TOKENS = 256\n",
+ "\n",
+ "model_name = \"google/flan-t5-small\"\n",
+ "state_dict_path = \"saved_model\""
+ ],
+ "metadata": {
+ "id": "uhbOYUzvbOSc"
+ },
+ "execution_count": 75,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Download and save the model\n",
+ "We will use the AutoClasses from Hugging Face to instantly load the
model in memory and later save it to the path defined above."
+ ],
+ "metadata": {
+ "id": "yRls3LmxswrC"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "model = AutoModelForSeq2SeqLM.from_pretrained(\n",
+ " model_name, torch_dtype=torch.bfloat16\n",
+ " )\n",
+ "\n",
+ "directory = os.path.dirname(state_dict_path)\n",
+ "torch.save(model.state_dict(), state_dict_path)"
+ ],
+ "metadata": {
+ "id": "PKhkiQFJe44n"
+ },
+ "execution_count": 76,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Utitlity functions for before/after running RunInference"
+ ],
+ "metadata": {
+ "id": "7TSqb3l1s7F7"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "def to_tensors(input_text: str, tokenizer) -> torch.Tensor:\n",
+ " \"\"\"Encodes input text into token tensors.\n",
+ " Args:\n",
+ " input_text: Input text for the LLM model.\n",
+ " tokenizer: Tokenizer for the LLM model.\n",
+ " Returns: Tokenized input tokens.\n",
+ " \"\"\"\n",
+ " return tokenizer(input_text,
return_tensors=\"pt\").input_ids[0]\n",
+ "\n",
+ "\n",
+ "def get_response(result: PredictionResult, tokenizer) -> str:\n",
+ " \"\"\"Decodes output token tensors into text.\n",
+ " Args:\n",
+ " result: Prediction results from the RunInference
transform.\n",
+ " tokenizer: Tokenizer for the LLM model.\n",
+ " Returns: The model's response as text.\n",
+ " \"\"\"\n",
+ " output_tokens = result.inference\n",
+ " return tokenizer.decode(output_tokens, skip_special_tokens=True)"
+ ],
+ "metadata": {
+ "id": "OeTMbaLidnBe"
+ },
+ "execution_count": 78,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Run Inference Pipeline\n",
+ "Run the cell below and publish messages from the `message_topic`
using google cloud console.\n",
+ "\n",
+ "(Since it is a streming pipeline, the cell will keep on running until
it is manually stopped.)"
Review Comment:
```suggestion
"Because this pipeline is streaming messages, the code continues to
run until you manually stop it."
```
##########
examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb:
##########
@@ -0,0 +1,292 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": []
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "IVkpU8HZ1eyz"
+ },
+ "execution_count": 74,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Apache Beam RunInference in a Streaming Pipeline\n",
+ "\n",
+ "<table align=\"left\">\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\"
/>Run in Google Colab</a>\n",
+ " </td>\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\"
/>View source on GitHub</a>\n",
+ " </td>\n",
+ "</table>\n"
+ ],
+ "metadata": {
+ "id": "kH8SORNim8on"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "This notebook shows how to use the Apache Beam
[RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference)
transform in a streaming pipeline with [Google Cloud
Pub-Sub](https://cloud.google.com/pubsub?utm_source=google&utm_medium=cpc&utm_campaign=na-US-all-en-dr-bkws-all-all-trial-b-dr-1605212&utm_content=text-ad-none-any-DEV_c-CRE_648329165516-ADGP_Desk%20%7C%20BKWS%20-%20BRO%20%7C%20Txt%20_%20Pub%2Fsub-KWID_43700075187144857-aud-664745643345%3Akwd-874320293016&utm_term=KW_pub%20sub%20google%20cloud-ST_pub%20sub%20google%20cloud&gclid=CjwKCAjw-IWkBhBTEiwA2exyO19xMFn6h1UKjb4QUavatV8Yb5Au9pCQj2_VAo0rzaYS8v2bq5VmuBoCL9wQAvD_BwE&gclsrc=aw.ds).\n",
+ "\n",
+ "This notebook demonstrates the following steps:\n",
+ "- Load and save a model from Hugging Face Models Hub.\n",
+ "- Use Pub/Sub IO in Python SDK as a streaming source.\n",
+ "- Use PyTorch model handler for RunInference.\n",
+ "\n",
+ "For more information about using RunInference, see [Get started with
AI/ML pipelines](https://beam.apache.org/documentation/ml/overview/) in the
Apache Beam documentation."
+ ],
+ "metadata": {
+ "id": "7N2XzwoA0k4L"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Before you begin\n",
Review Comment:
Most of this topic falls into Before you begin, so we should remove it as a
header and make the subheadings H2s.
##########
examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb:
##########
@@ -0,0 +1,292 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": []
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "IVkpU8HZ1eyz"
+ },
+ "execution_count": 74,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Apache Beam RunInference in a Streaming Pipeline\n",
+ "\n",
+ "<table align=\"left\">\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\"
/>Run in Google Colab</a>\n",
+ " </td>\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\"
/>View source on GitHub</a>\n",
+ " </td>\n",
+ "</table>\n"
+ ],
+ "metadata": {
+ "id": "kH8SORNim8on"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "This notebook shows how to use the Apache Beam
[RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference)
transform in a streaming pipeline with [Google Cloud
Pub-Sub](https://cloud.google.com/pubsub?utm_source=google&utm_medium=cpc&utm_campaign=na-US-all-en-dr-bkws-all-all-trial-b-dr-1605212&utm_content=text-ad-none-any-DEV_c-CRE_648329165516-ADGP_Desk%20%7C%20BKWS%20-%20BRO%20%7C%20Txt%20_%20Pub%2Fsub-KWID_43700075187144857-aud-664745643345%3Akwd-874320293016&utm_term=KW_pub%20sub%20google%20cloud-ST_pub%20sub%20google%20cloud&gclid=CjwKCAjw-IWkBhBTEiwA2exyO19xMFn6h1UKjb4QUavatV8Yb5Au9pCQj2_VAo0rzaYS8v2bq5VmuBoCL9wQAvD_BwE&gclsrc=aw.ds).\n",
+ "\n",
+ "This notebook demonstrates the following steps:\n",
+ "- Load and save a model from Hugging Face Models Hub.\n",
+ "- Use Pub/Sub IO in Python SDK as a streaming source.\n",
+ "- Use PyTorch model handler for RunInference.\n",
+ "\n",
+ "For more information about using RunInference, see [Get started with
AI/ML pipelines](https://beam.apache.org/documentation/ml/overview/) in the
Apache Beam documentation."
+ ],
+ "metadata": {
+ "id": "7N2XzwoA0k4L"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Before you begin\n",
+ "Set up your environment and download dependencies."
+ ],
+ "metadata": {
+ "id": "nhf_lOeEsO1C"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "wS9a3Y0oZ_l5"
+ },
+ "outputs": [],
+ "source": [
+ "!pip install apache_beam[gcp]==2.48.0\n",
+ "!pip install torch\n",
+ "!pip install transformers\n",
+ "!pip install tensorflow"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Authenticate with Google Cloud\n",
+ "This notebook relies on Google Cloud Pub/Sub as an input to the
pipeline as well for writing out the results. To use your Google Cloud account,
authenticate this notebook."
+ ],
+ "metadata": {
+ "id": "A0v-oGSfsTHh"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "from google.colab import auth\n",
+ "auth.authenticate_user()"
+ ],
+ "metadata": {
+ "id": "hNslPJGil2Zc"
+ },
+ "execution_count": 63,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Import dependencies and set up your bucket\n",
+ "Use the following code to import dependencies and to set up your
Google Cloud Storage bucket.\n",
+ "\n",
+ "Replace `MESSAGE_TOPIC` and `RESPONSE_TOPIC` with the Pub/Sub topics
in your project.\n",
Review Comment:
We should put this after the code instead of before, just because that's the
way our style guide says to do it.
##########
examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb:
##########
@@ -0,0 +1,292 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": []
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "IVkpU8HZ1eyz"
+ },
+ "execution_count": 74,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Apache Beam RunInference in a Streaming Pipeline\n",
+ "\n",
+ "<table align=\"left\">\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\"
/>Run in Google Colab</a>\n",
+ " </td>\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\"
/>View source on GitHub</a>\n",
+ " </td>\n",
+ "</table>\n"
+ ],
+ "metadata": {
+ "id": "kH8SORNim8on"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "This notebook shows how to use the Apache Beam
[RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference)
transform in a streaming pipeline with [Google Cloud
Pub-Sub](https://cloud.google.com/pubsub?utm_source=google&utm_medium=cpc&utm_campaign=na-US-all-en-dr-bkws-all-all-trial-b-dr-1605212&utm_content=text-ad-none-any-DEV_c-CRE_648329165516-ADGP_Desk%20%7C%20BKWS%20-%20BRO%20%7C%20Txt%20_%20Pub%2Fsub-KWID_43700075187144857-aud-664745643345%3Akwd-874320293016&utm_term=KW_pub%20sub%20google%20cloud-ST_pub%20sub%20google%20cloud&gclid=CjwKCAjw-IWkBhBTEiwA2exyO19xMFn6h1UKjb4QUavatV8Yb5Au9pCQj2_VAo0rzaYS8v2bq5VmuBoCL9wQAvD_BwE&gclsrc=aw.ds).\n",
+ "\n",
+ "This notebook demonstrates the following steps:\n",
+ "- Load and save a model from Hugging Face Models Hub.\n",
+ "- Use Pub/Sub IO in Python SDK as a streaming source.\n",
+ "- Use PyTorch model handler for RunInference.\n",
+ "\n",
+ "For more information about using RunInference, see [Get started with
AI/ML pipelines](https://beam.apache.org/documentation/ml/overview/) in the
Apache Beam documentation."
+ ],
+ "metadata": {
+ "id": "7N2XzwoA0k4L"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Before you begin\n",
+ "Set up your environment and download dependencies."
+ ],
+ "metadata": {
+ "id": "nhf_lOeEsO1C"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "wS9a3Y0oZ_l5"
+ },
+ "outputs": [],
+ "source": [
+ "!pip install apache_beam[gcp]==2.48.0\n",
+ "!pip install torch\n",
+ "!pip install transformers\n",
+ "!pip install tensorflow"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Authenticate with Google Cloud\n",
+ "This notebook relies on Google Cloud Pub/Sub as an input to the
pipeline as well for writing out the results. To use your Google Cloud account,
authenticate this notebook."
+ ],
+ "metadata": {
+ "id": "A0v-oGSfsTHh"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "from google.colab import auth\n",
+ "auth.authenticate_user()"
+ ],
+ "metadata": {
+ "id": "hNslPJGil2Zc"
+ },
+ "execution_count": 63,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Import dependencies and set up your bucket\n",
+ "Use the following code to import dependencies and to set up your
Google Cloud Storage bucket.\n",
+ "\n",
+ "Replace `MESSAGE_TOPIC` and `RESPONSE_TOPIC` with the Pub/Sub topics
in your project.\n",
+ "\n",
+ "**Important**: If an error occurs, restart your runtime."
+ ],
+ "metadata": {
+ "id": "I7vMsFGW16bZ"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "import os\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.options.pipeline_options import PipelineOptions\n",
+ "from apache_beam.ml.inference.base import PredictionResult\n",
+ "from apache_beam.ml.inference.base import RunInference\n",
+ "from apache_beam.ml.inference.pytorch_inference import
make_tensor_model_fn\n",
+ "from apache_beam.ml.inference.pytorch_inference import
PytorchModelHandlerTensor\n",
+ "import torch\n",
+ "from transformers import AutoConfig\n",
+ "from transformers import AutoModelForSeq2SeqLM\n",
+ "from transformers import AutoTokenizer\n",
+ "from transformers.tokenization_utils import PreTrainedTokenizer\n",
+ "\n",
+ "message_topic = \"MESSAGE_TOPIC\"\n",
+ "response_topic = \"RESPONSE_TOPIC\"\n",
+ "\n",
+ "MAX_RESPONSE_TOKENS = 256\n",
+ "\n",
+ "model_name = \"google/flan-t5-small\"\n",
+ "state_dict_path = \"saved_model\""
+ ],
+ "metadata": {
+ "id": "uhbOYUzvbOSc"
+ },
+ "execution_count": 75,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Download and save the model\n",
Review Comment:
```suggestion
"## Download and save the model\n",
```
##########
examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb:
##########
@@ -0,0 +1,292 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": []
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "IVkpU8HZ1eyz"
+ },
+ "execution_count": 74,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Apache Beam RunInference in a Streaming Pipeline\n",
+ "\n",
+ "<table align=\"left\">\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\"
/>Run in Google Colab</a>\n",
+ " </td>\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\"
/>View source on GitHub</a>\n",
+ " </td>\n",
+ "</table>\n"
+ ],
+ "metadata": {
+ "id": "kH8SORNim8on"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "This notebook shows how to use the Apache Beam
[RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference)
transform in a streaming pipeline with [Google Cloud
Pub-Sub](https://cloud.google.com/pubsub?utm_source=google&utm_medium=cpc&utm_campaign=na-US-all-en-dr-bkws-all-all-trial-b-dr-1605212&utm_content=text-ad-none-any-DEV_c-CRE_648329165516-ADGP_Desk%20%7C%20BKWS%20-%20BRO%20%7C%20Txt%20_%20Pub%2Fsub-KWID_43700075187144857-aud-664745643345%3Akwd-874320293016&utm_term=KW_pub%20sub%20google%20cloud-ST_pub%20sub%20google%20cloud&gclid=CjwKCAjw-IWkBhBTEiwA2exyO19xMFn6h1UKjb4QUavatV8Yb5Au9pCQj2_VAo0rzaYS8v2bq5VmuBoCL9wQAvD_BwE&gclsrc=aw.ds).\n",
+ "\n",
+ "This notebook demonstrates the following steps:\n",
+ "- Load and save a model from Hugging Face Models Hub.\n",
+ "- Use Pub/Sub IO in Python SDK as a streaming source.\n",
+ "- Use PyTorch model handler for RunInference.\n",
+ "\n",
+ "For more information about using RunInference, see [Get started with
AI/ML pipelines](https://beam.apache.org/documentation/ml/overview/) in the
Apache Beam documentation."
+ ],
+ "metadata": {
+ "id": "7N2XzwoA0k4L"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Before you begin\n",
+ "Set up your environment and download dependencies."
+ ],
+ "metadata": {
+ "id": "nhf_lOeEsO1C"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "wS9a3Y0oZ_l5"
+ },
+ "outputs": [],
+ "source": [
+ "!pip install apache_beam[gcp]==2.48.0\n",
+ "!pip install torch\n",
+ "!pip install transformers\n",
+ "!pip install tensorflow"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Authenticate with Google Cloud\n",
Review Comment:
```suggestion
"## Authenticate with Google Cloud\n",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]