rezarokni commented on code in PR #27075:
URL: https://github.com/apache/beam/pull/27075#discussion_r1223566632


##########
examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb:
##########
@@ -0,0 +1,292 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "provenance": []
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    },
+    "language_info": {
+      "name": "python"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "code",
+      "source": [
+        "# @title ###### Licensed to the Apache Software Foundation (ASF), 
Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n";,
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License"
+      ],
+      "metadata": {
+        "cellView": "form",
+        "id": "IVkpU8HZ1eyz"
+      },
+      "execution_count": 74,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# Apache Beam RunInference in a Streaming Pipeline\n",
+        "\n",
+        "<table align=\"left\">\n",
+        "  <td>\n",
+        "    <a target=\"_blank\" 
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\";><img
 
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\";
 />Run in Google Colab</a>\n",

Review Comment:
   Can we please change the name of the file to something like LLM or 
Generative AI inference



##########
examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb:
##########
@@ -0,0 +1,292 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "provenance": []
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    },
+    "language_info": {
+      "name": "python"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "code",
+      "source": [
+        "# @title ###### Licensed to the Apache Software Foundation (ASF), 
Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n";,
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License"
+      ],
+      "metadata": {
+        "cellView": "form",
+        "id": "IVkpU8HZ1eyz"
+      },
+      "execution_count": 74,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# Apache Beam RunInference in a Streaming Pipeline\n",
+        "\n",
+        "<table align=\"left\">\n",
+        "  <td>\n",
+        "    <a target=\"_blank\" 
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\";><img
 
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\";
 />Run in Google Colab</a>\n",
+        "  </td>\n",
+        "  <td>\n",
+        "    <a target=\"_blank\" 
href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_streaming_pipeline.ipynb\";><img
 
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\";
 />View source on GitHub</a>\n",
+        "  </td>\n",
+        "</table>\n"
+      ],
+      "metadata": {
+        "id": "kH8SORNim8on"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "This notebook shows how to use the Apache Beam 
[RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference)
 transform in a streaming pipeline with [Google Cloud 
Pub-Sub](https://cloud.google.com/pubsub?utm_source=google&utm_medium=cpc&utm_campaign=na-US-all-en-dr-bkws-all-all-trial-b-dr-1605212&utm_content=text-ad-none-any-DEV_c-CRE_648329165516-ADGP_Desk%20%7C%20BKWS%20-%20BRO%20%7C%20Txt%20_%20Pub%2Fsub-KWID_43700075187144857-aud-664745643345%3Akwd-874320293016&utm_term=KW_pub%20sub%20google%20cloud-ST_pub%20sub%20google%20cloud&gclid=CjwKCAjw-IWkBhBTEiwA2exyO19xMFn6h1UKjb4QUavatV8Yb5Au9pCQj2_VAo0rzaYS8v2bq5VmuBoCL9wQAvD_BwE&gclsrc=aw.ds).\n",
+        "\n",
+        "This notebook demonstrates the following steps:\n",
+        "- Load and save a model from Hugging Face Models Hub.\n",
+        "- Use Pub/Sub IO in Python SDK as a streaming source.\n",
+        "- Use PyTorch model handler for RunInference.\n",
+        "\n",
+        "For more information about using RunInference, see [Get started with 
AI/ML pipelines](https://beam.apache.org/documentation/ml/overview/) in the 
Apache Beam documentation."
+      ],
+      "metadata": {
+        "id": "7N2XzwoA0k4L"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Before you begin\n",
+        "Set up your environment and download dependencies."
+      ],
+      "metadata": {
+        "id": "nhf_lOeEsO1C"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "wS9a3Y0oZ_l5"
+      },
+      "outputs": [],
+      "source": [
+        "!pip install apache_beam[gcp]==2.48.0\n",
+        "!pip install torch\n",
+        "!pip install transformers\n",
+        "!pip install tensorflow"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "### Authenticate with Google Cloud\n",
+        "This notebook relies on Google Cloud Pub/Sub as an input to the 
pipeline as well for writing out the results. To use your Google Cloud account, 
authenticate this notebook."
+      ],
+      "metadata": {
+        "id": "A0v-oGSfsTHh"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "from google.colab import auth\n",
+        "auth.authenticate_user()"
+      ],
+      "metadata": {
+        "id": "hNslPJGil2Zc"
+      },
+      "execution_count": 63,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "### Import dependencies and set up your bucket\n",
+        "Use the following code to import dependencies and to set up your 
Google Cloud Storage bucket.\n",
+        "\n",
+        "Replace `MESSAGE_TOPIC` and `RESPONSE_TOPIC` with the Pub/Sub topics 
in your project.\n",
+        "\n",
+        "**Important**: If an error occurs, restart your runtime."
+      ],
+      "metadata": {
+        "id": "I7vMsFGW16bZ"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "import os\n",
+        "import apache_beam as beam\n",
+        "from apache_beam.options.pipeline_options import PipelineOptions\n",
+        "from apache_beam.ml.inference.base import PredictionResult\n",
+        "from apache_beam.ml.inference.base import RunInference\n",
+        "from apache_beam.ml.inference.pytorch_inference import 
make_tensor_model_fn\n",
+        "from apache_beam.ml.inference.pytorch_inference import 
PytorchModelHandlerTensor\n",
+        "import torch\n",
+        "from transformers import AutoConfig\n",
+        "from transformers import AutoModelForSeq2SeqLM\n",
+        "from transformers import AutoTokenizer\n",
+        "from transformers.tokenization_utils import PreTrainedTokenizer\n",
+        "\n",
+        "message_topic = \"MESSAGE_TOPIC\"\n",
+        "response_topic = \"RESPONSE_TOPIC\"\n",
+        "\n",
+        "MAX_RESPONSE_TOKENS = 256\n",
+        "\n",
+        "model_name = \"google/flan-t5-small\"\n",
+        "state_dict_path = \"saved_model\""
+      ],
+      "metadata": {
+        "id": "uhbOYUzvbOSc"
+      },
+      "execution_count": 75,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "### Download and save the model\n",
+        "We will use the AutoClasses from Hugging Face to instantly load the 
model in memory and later save it to the path defined above."
+      ],
+      "metadata": {
+        "id": "yRls3LmxswrC"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "model = AutoModelForSeq2SeqLM.from_pretrained(\n",
+        "        model_name, torch_dtype=torch.bfloat16\n",
+        "    )\n",
+        "\n",
+        "directory = os.path.dirname(state_dict_path)\n",
+        "torch.save(model.state_dict(), state_dict_path)"
+      ],
+      "metadata": {
+        "id": "PKhkiQFJe44n"
+      },
+      "execution_count": 76,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "### Utitlity functions for before/after running RunInference"
+      ],
+      "metadata": {
+        "id": "7TSqb3l1s7F7"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "def to_tensors(input_text: str, tokenizer) -> torch.Tensor:\n",
+        "    \"\"\"Encodes input text into token tensors.\n",
+        "    Args:\n",
+        "        input_text: Input text for the LLM model.\n",
+        "        tokenizer: Tokenizer for the LLM model.\n",
+        "    Returns: Tokenized input tokens.\n",
+        "    \"\"\"\n",
+        "    return tokenizer(input_text, 
return_tensors=\"pt\").input_ids[0]\n",
+        "\n",
+        "\n",
+        "def get_response(result: PredictionResult, tokenizer) -> str:\n",
+        "    \"\"\"Decodes output token tensors into text.\n",
+        "    Args:\n",
+        "        result: Prediction results from the RunInference 
transform.\n",
+        "        tokenizer: Tokenizer for the LLM model.\n",
+        "    Returns: The model's response as text.\n",
+        "    \"\"\"\n",
+        "    output_tokens = result.inference\n",
+        "    return tokenizer.decode(output_tokens, skip_special_tokens=True)"
+      ],
+      "metadata": {
+        "id": "OeTMbaLidnBe"
+      },
+      "execution_count": 78,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "### Run Inference Pipeline\n",
+        "Run the cell below and publish messages from the `message_topic` 
using google cloud console.\n",
+        "\n",
+        "(Since it is a streming pipeline, the cell will keep on running until 
it is manually stopped.)"
+      ],
+      "metadata": {
+        "id": "y7IU3cYKtA3e"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "tokenizer = AutoTokenizer.from_pretrained(model_name)\n",
+        "\n",
+        "# create an instance of pytorch model handler\n",
+        "model_handler = PytorchModelHandlerTensor(\n",
+        "            state_dict_path=state_dict_path,\n",
+        "            model_class=AutoModelForSeq2SeqLM.from_config,\n",
+        "            model_params={\"config\": 
AutoConfig.from_pretrained(model_name)},\n",
+        "            inference_fn=make_tensor_model_fn(\"generate\"),\n",
+        "        )\n",
+        "\n",
+        "# set up pipeline options to enable streaming\n",
+        "pipeline = 
beam.Pipeline(options=PipelineOptions(save_main_session=True,pickle_library=\"cloudpickle\",streaming=True))\n",
+        "\n",
+        "with pipeline as p:\n",
+        "  _ = (\n",
+        "          p\n",
+        "          | \"Read from Pub/Sub\" >> 
beam.io.ReadFromPubSub(message_topic)\n",

Review Comment:
   Can we please change this to a beam.Create source and have the output be a 
beam.Map(print) So that the example is locally runnable without needing access 
to the Cloud.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to