damccorm commented on code in PR #23173: URL: https://github.com/apache/beam/pull/23173#discussion_r980239415
########## examples/notebooks/beam-ml/README.md: ########## @@ -0,0 +1,44 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +# ML Implementations on Beam + +As of Beam 2.40 users now have access to a simple +[RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference) +transform. + +This allows inferences or predictions of data on +popular ML frameworks like TensorFlow, PyTorch and +scikit-learn. + +## Using The Notebooks + +These notebooks illustrate usages of Beam's RunInference, as well as different +usages of implementations of [ModelHandler](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.ModelHandler). Beam comes with various implementations of ModelHandler. + +### Loading the Notebooks + +1. A quick way to get started is with [Colab](colab.sandbox.google.com). +2. Load the notebook from github, for example: +``` +https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_custom_inference.ipynb. Review Comment: ```suggestion https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_tensorflow.ipynb. ``` Nit: Lets recommend tf (or another framework) for our example since a custom handler shouldn't be the first thing people try. ########## examples/notebooks/beam-ml/run_inference_pytorch.ipynb: ########## @@ -63,25 +60,21 @@ " </a>\n", "</button>\n", "\n", - "In this notebook, we walk through the use of the RunInference transform.\n", - "The transform and its accompanying [ModelHandler](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.ModelHandler) classes handle the following tasks:\n", - "\n", - "\n", - "* Optimizing loading models from popular frameworks.\n", - "* Batching examples in a scalable fashion.\n", + "In this notebook, we walk through the use of the RunInference transform for PyTorch.\n", + "The transform and its accompanying [ModelHandler](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.ModelHandler).\n", Review Comment: This sentence is incomplete ########## examples/notebooks/beam-ml/run_inference_tensorflow.ipynb: ########## @@ -0,0 +1,489 @@ +{ + "nbformat": 4, + "nbformat_minor": 0, + "metadata": { + "colab": { + "provenance": [], + "collapsed_sections": [], + "toc_visible": true + }, + "kernelspec": { + "display_name": "Python 3", + "name": "python3" + }, + "accelerator": "GPU" + }, + "cells": [ + { + "cell_type": "code", + "source": [ + "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n", + "\n", + "# Licensed to the Apache Software Foundation (ASF) under one\n", + "# or more contributor license agreements. See the NOTICE file\n", + "# distributed with this work for additional information\n", + "# regarding copyright ownership. The ASF licenses this file\n", + "# to you under the Apache License, Version 2.0 (the\n", + "# \"License\"); you may not use this file except in compliance\n", + "# with the License. You may obtain a copy of the License at\n", + "#\n", + "# http://www.apache.org/licenses/LICENSE-2.0\n", + "#\n", + "# Unless required by applicable law or agreed to in writing,\n", + "# software distributed under the License is distributed on an\n", + "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n", + "# KIND, either express or implied. See the License for the\n", + "# specific language governing permissions and limitations\n", + "# under the License." + ], + "metadata": { + "cellView": "form", + "id": "fFjof1NgAJwu" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "# Apache Beam RunInference with TensorFlow\n", + "\n", + "<button>\n", + " <a href=\"https://beam.apache.org/documentation/sdks/python-machine-learning/\">\n", + " <img src=\"https://beam.apache.org/images/favicon.ico\" alt=\"Open the docs\" height=\"16\"/>\n", + " Beam RunInference\n", + " </a>\n", + "</button>\n", + "\n", + "The Apache Beam RunInference transform is used for making predictions for\n", + "a variety of machine learning models. From version 1.10.0 of tfx-bsl you can\n", + "create a TensorFlow ModelHandler for use with Apache Beam.\n", + "\n", + "In this notebook, we walk through the use of the RunInference transform for [TensorFlow](https://www.tensorflow.org/).\n", + "Beam [RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference) accepts ModelHandler generated from [tfx-bsl](https://github.com/tensorflow/tfx-bsl) via CreateModelHandler.\n", + "\n", + "\n", + "\n", + "In this notebook we walk through:\n", + "- Importing [tfx-bsl](https://github.com/tensorflow/tfx-bsl)\n", + "- Building a simple TF model\n", + "- Setting up examples\n", + "- Running those examples and gettign a prediction" + ], + "metadata": { + "id": "HrCtxslBGK8Z" + } + }, + { + "cell_type": "code", + "metadata": { + "id": "jBakpNZnAhqk" + }, + "source": [ + "!pip install tfx_bsl==1.10.0 --quiet" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "In order to use RunInference you will need beam 2.40 or higher. Creation of a ModelHandler is supported in tfx-bsl 1.10 or higher." + ], + "metadata": { + "id": "gVCtGOKTHMm4" + } + }, + { + "cell_type": "code", + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "8hHP9wu98Ld4", + "outputId": "a1a8e816-e5b9-4bee-b74c-6d2966bd45d6" + }, + "source": [ + "!pip freeze | grep beam\n", + "!pip freeze | grep tfx-bsl\n" + ], + "execution_count": null, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "apache-beam==2.41.0\n", + "tfx-bsl==1.10.0\n" + ] + } + ] + }, + { + "cell_type": "markdown", + "source": [ + "## Authenticate With Cloud\n", + "This colab relies on saving your model into Google Cloud. First authenticate this notebook to use your Google Cloud account." + ], + "metadata": { + "id": "X80jy3FqHjK4" + } + }, + { + "cell_type": "code", + "metadata": { + "id": "Kz9sccyGBqz3" + }, + "source": [ + "from google.colab import auth\n", + "auth.authenticate_user()" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "## Import Your Dependencies and Setup Your Bucket\n", + "Replace project and bucket with your variables with your project.\n", + "\n", + "**Important**: If you get an error, restart your runtime." + ], + "metadata": { + "id": "40qtP6zJuMXm" + } + }, + { + "cell_type": "code", + "metadata": { + "id": "eEle839_Akqx" + }, + "source": [ + "import argparse\n", + "\n", + "import tensorflow as tf\n", + "from tensorflow import keras\n", + "from tensorflow_serving.apis import prediction_log_pb2\n", + "\n", + "import apache_beam as beam\n", + "from apache_beam.ml.inference.base import RunInference\n", + "import tfx_bsl\n", + "from tfx_bsl.public.beam.run_inference import CreateModelHandler\n", + "from tfx_bsl.public import tfxio\n", + "from tfx_bsl.public.proto import model_spec_pb2\n", + "from tensorflow_metadata.proto.v0 import schema_pb2\n", + "\n", + "import numpy\n", + "\n", + "from typing import Dict, Text, Any, Tuple, List\n", + "\n", + "from apache_beam.options.pipeline_options import PipelineOptions\n", + "\n", + "project = \"<Replace With Your Project>\"\n", + "bucket = \"<Replace With Your Bucket>\"\n", + "\n", + "save_model_dir_multiply = f'gs://{bucket}/tfx-inference/model/multiply_five/v1/'\n" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "## Create and Test a Simple Model\n", + "\n", + "This creates a model that predicts the 5 times table." + ], + "metadata": { + "id": "YzvZWEv-1oiK" + } + }, + { + "cell_type": "code", + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "SH7iq3zeBBJ-", + "outputId": "8ba3dbd2-fd13-4da2-f1e6-04f36fbcf706" + }, + "source": [ + "# Create training data which represents the 5 times multiplication table for 0 to 99. x is the data and y the labels. \n", + "x = numpy.arange(0, 100) # Examples\n", + "y = x * 5 # Labels\n", + "\n", + "# Build a simple linear regression model.\n", + "# Note the model has a shape of (1) for its input layer, it will expect a single int64 value.\n", + "input_layer = keras.layers.Input(shape=(1), dtype=tf.float32, name='x')\n", + "output_layer= keras.layers.Dense(1)(input_layer)\n", + "\n", + "model = keras.Model(input_layer, output_layer)\n", + "model.compile(optimizer=tf.optimizers.Adam(), loss='mean_absolute_error')\n", + "model.summary()" + ], + "execution_count": null, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "Model: \"model_3\"\n", + "_________________________________________________________________\n", + " Layer (type) Output Shape Param # \n", + "=================================================================\n", + " x (InputLayer) [(None, 1)] 0 \n", + " \n", + " dense_3 (Dense) (None, 1) 2 \n", + " \n", + "=================================================================\n", + "Total params: 2\n", + "Trainable params: 2\n", + "Non-trainable params: 0\n", + "_________________________________________________________________\n" + ] + } + ] + }, + { + "cell_type": "markdown", + "source": [ + "### Test the Model\n" + ], + "metadata": { + "id": "O_a0-4Gb19cy" + } + }, + { + "cell_type": "code", + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "5XkIYXhJBFmS", + "outputId": "82515add-9e26-4a5d-df07-389823098433" + }, + "source": [ + "model.fit(x, y, epochs=4000, verbose=0)\n", + "test_examples =[20, 40, 60, 90]\n", + "value_to_predict = numpy.array(test_examples, dtype=numpy.float32)\n", + "predictions = model.predict(value_to_predict)\n", + "\n", + "print('Test Examples ' + str(test_examples))\n", + "print('Predictions ' + str(predictions))" + ], + "execution_count": null, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "1/1 [==============================] - 0s 39ms/step\n", + "Test Examples [20, 40, 60, 90]\n", + "Predictions [[100.001434]\n", + " [200.00256 ]\n", + " [300.0037 ]\n", + " [450.00537 ]]\n" + ] + } + ] + }, + { + "cell_type": "markdown", + "source": [ + "### Populate the Data into a TensorFlow Proto\n", + "\n", + "Tensorflow data uses protos. If you are loading from a file there are helpers for this. Since we are using generated data, this code populates a proto." + ], + "metadata": { + "id": "dEmleqiH3t71" + } + }, + { + "cell_type": "code", + "metadata": { + "id": "XvKc9kQilPjx" + }, + "source": [ + "# This is an example of a proto that converts the samples and labels into\n", + "# tensors usable by tensorflow.\n", + "class ExampleProcessor:\n", + " def create_example_with_label(self, feature: numpy.float32,\n", + " label: numpy.float32)-> tf.train.Example:\n", + " return tf.train.Example(\n", + " features=tf.train.Features(\n", + " feature={'x': self.create_feature(feature),\n", + " 'y' : self.create_feature(label)\n", + " }))\n", + "\n", + " def create_example(self, feature: numpy.float32):\n", + " return tf.train.Example(\n", + " features=tf.train.Features(\n", + " feature={'x' : self.create_feature(feature)})\n", + " )\n", + "\n", + " def create_feature(self, element: numpy.float32):\n", + " return tf.train.Feature(float_list=tf.train.FloatList(value=[element]))\n", + "\n", + "# Create our labeled example file for 5 times table\n", + "\n", + "example_five_times_table = 'example_five_times_table.tfrecord'\n", + "\n", + "with tf.io.TFRecordWriter(example_five_times_table) as writer:\n", + " for i in zip(x, y):\n", + " example = ExampleProcessor().create_example_with_label(\n", + " feature=i[0], label=i[1])\n", + " writer.write(example.SerializeToString())\n", + "\n", + "# Create a file containing the values to predict\n", + "\n", + "predict_values_five_times_table = 'predict_values_five_times_table.tfrecord'\n", + "\n", + "with tf.io.TFRecordWriter(predict_values_five_times_table) as writer:\n", + " for i in value_to_predict:\n", + " example = ExampleProcessor().create_example(feature=i)\n", + " writer.write(example.SerializeToString())" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "### Fit The Model\n", + "\n", + "This example builds a model. Since RunInference requires pretrained models, this segment builds a usable model." + ], + "metadata": { + "id": "G-sAu3cf31f3" + } + }, + { + "cell_type": "code", + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "AnbrxXPKeAOQ", + "outputId": "0574f487-5624-4968-e6f5-110e79c40210" + }, + "source": [ + "RAW_DATA_TRAIN_SPEC = {\n", + "'x': tf.io.FixedLenFeature([], tf.float32),\n", + "'y': tf.io.FixedLenFeature([], tf.float32)\n", + "}\n", + "\n", + "dataset = tf.data.TFRecordDataset(example_five_times_table)\n", + "dataset = dataset.map(lambda e : tf.io.parse_example(e, RAW_DATA_TRAIN_SPEC))\n", + "dataset = dataset.map(lambda t : (t['x'], t['y']))\n", + "dataset = dataset.batch(100)\n", + "dataset = dataset.repeat()\n", + "\n", + "model.fit(dataset, epochs=500, steps_per_epoch=1, verbose=0)" + ], + "execution_count": null, + "outputs": [ + { + "output_type": "execute_result", + "data": { + "text/plain": [ + "<keras.callbacks.History at 0x7fc0a24b8090>" + ] + }, + "metadata": {}, + "execution_count": 40 + } + ] + }, + { + "cell_type": "markdown", + "source": [ + "### Save the Model" + ], + "metadata": { + "id": "r4dpR6dQ4JwX" + } + }, + { + "cell_type": "code", + "metadata": { + "id": "fYvrIYO3qiJx" + }, + "source": [ + "RAW_DATA_PREDICT_SPEC = {\n", + "'x': tf.io.FixedLenFeature([], tf.float32),\n", + "}\n", + "\n", + "@tf.function(input_signature=[tf.TensorSpec(shape=[None], dtype=tf.string , name='examples')])\n", + "def serve_tf_examples_fn(serialized_tf_examples):\n", + " \"\"\"Returns the output to be used in the serving signature.\"\"\"\n", + " features = tf.io.parse_example(serialized_tf_examples, RAW_DATA_PREDICT_SPEC)\n", + " return model(features, training=False)\n", + "\n", + "signature = {'serving_default': serve_tf_examples_fn}\n", + "\n", + "tf.keras.models.save_model(model, save_model_dir_multiply, signatures=signature)" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "## Run the Pipeline\n", Review Comment: Could we add a keyed example after this? Could just be a single section similar to what we do in sklearn ########## examples/notebooks/beam-ml/run_inference_tensorflow.ipynb: ########## @@ -0,0 +1,489 @@ +{ + "nbformat": 4, + "nbformat_minor": 0, + "metadata": { + "colab": { + "provenance": [], + "collapsed_sections": [], + "toc_visible": true + }, + "kernelspec": { + "display_name": "Python 3", + "name": "python3" + }, + "accelerator": "GPU" + }, + "cells": [ + { + "cell_type": "code", + "source": [ + "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n", + "\n", + "# Licensed to the Apache Software Foundation (ASF) under one\n", + "# or more contributor license agreements. See the NOTICE file\n", + "# distributed with this work for additional information\n", + "# regarding copyright ownership. The ASF licenses this file\n", + "# to you under the Apache License, Version 2.0 (the\n", + "# \"License\"); you may not use this file except in compliance\n", + "# with the License. You may obtain a copy of the License at\n", + "#\n", + "# http://www.apache.org/licenses/LICENSE-2.0\n", + "#\n", + "# Unless required by applicable law or agreed to in writing,\n", + "# software distributed under the License is distributed on an\n", + "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n", + "# KIND, either express or implied. See the License for the\n", + "# specific language governing permissions and limitations\n", + "# under the License." + ], + "metadata": { + "cellView": "form", + "id": "fFjof1NgAJwu" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "# Apache Beam RunInference with TensorFlow\n", + "\n", + "<button>\n", + " <a href=\"https://beam.apache.org/documentation/sdks/python-machine-learning/\">\n", + " <img src=\"https://beam.apache.org/images/favicon.ico\" alt=\"Open the docs\" height=\"16\"/>\n", + " Beam RunInference\n", + " </a>\n", + "</button>\n", + "\n", + "The Apache Beam RunInference transform is used for making predictions for\n", + "a variety of machine learning models. From version 1.10.0 of tfx-bsl you can\n", + "create a TensorFlow ModelHandler for use with Apache Beam.\n", + "\n", + "In this notebook, we walk through the use of the RunInference transform for [TensorFlow](https://www.tensorflow.org/).\n", + "Beam [RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference) accepts ModelHandler generated from [tfx-bsl](https://github.com/tensorflow/tfx-bsl) via CreateModelHandler.\n", + "\n", + "\n", + "\n", + "In this notebook we walk through:\n", + "- Importing [tfx-bsl](https://github.com/tensorflow/tfx-bsl)\n", + "- Building a simple TF model\n", + "- Setting up examples\n", + "- Running those examples and gettign a prediction" Review Comment: ```suggestion "- Running those examples and getting a prediction" ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
