ryanthompson591 commented on code in PR #23173:
URL: https://github.com/apache/beam/pull/23173#discussion_r980511916
##########
examples/notebooks/beam-ml/run_inference_tensorflow.ipynb:
##########
@@ -0,0 +1,489 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": [],
+ "collapsed_sections": [],
+ "toc_visible": true
+ },
+ "kernelspec": {
+ "display_name": "Python 3",
+ "name": "python3"
+ },
+ "accelerator": "GPU"
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "#@title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License."
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "fFjof1NgAJwu"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Apache Beam RunInference with TensorFlow\n",
+ "\n",
+ "<button>\n",
+ " <a
href=\"https://beam.apache.org/documentation/sdks/python-machine-learning/\">\n",
+ " <img src=\"https://beam.apache.org/images/favicon.ico\"
alt=\"Open the docs\" height=\"16\"/>\n",
+ " Beam RunInference\n",
+ " </a>\n",
+ "</button>\n",
+ "\n",
+ "The Apache Beam RunInference transform is used for making predictions
for\n",
+ "a variety of machine learning models. From version 1.10.0 of tfx-bsl
you can\n",
+ "create a TensorFlow ModelHandler for use with Apache Beam.\n",
+ "\n",
+ "In this notebook, we walk through the use of the RunInference
transform for [TensorFlow](https://www.tensorflow.org/).\n",
+ "Beam
[RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference)
accepts ModelHandler generated from
[tfx-bsl](https://github.com/tensorflow/tfx-bsl) via CreateModelHandler.\n",
+ "\n",
+ "\n",
+ "\n",
+ "In this notebook we walk through:\n",
+ "- Importing [tfx-bsl](https://github.com/tensorflow/tfx-bsl)\n",
+ "- Building a simple TF model\n",
+ "- Setting up examples\n",
+ "- Running those examples and gettign a prediction"
+ ],
+ "metadata": {
+ "id": "HrCtxslBGK8Z"
+ }
+ },
+ {
+ "cell_type": "code",
+ "metadata": {
+ "id": "jBakpNZnAhqk"
+ },
+ "source": [
+ "!pip install tfx_bsl==1.10.0 --quiet"
+ ],
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "In order to use RunInference you will need beam 2.40 or higher.
Creation of a ModelHandler is supported in tfx-bsl 1.10 or higher."
+ ],
+ "metadata": {
+ "id": "gVCtGOKTHMm4"
+ }
+ },
+ {
+ "cell_type": "code",
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "id": "8hHP9wu98Ld4",
+ "outputId": "a1a8e816-e5b9-4bee-b74c-6d2966bd45d6"
+ },
+ "source": [
+ "!pip freeze | grep beam\n",
+ "!pip freeze | grep tfx-bsl\n"
+ ],
+ "execution_count": null,
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ "apache-beam==2.41.0\n",
+ "tfx-bsl==1.10.0\n"
+ ]
+ }
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Authenticate With Cloud\n",
+ "This colab relies on saving your model into Google Cloud. First
authenticate this notebook to use your Google Cloud account."
+ ],
+ "metadata": {
+ "id": "X80jy3FqHjK4"
+ }
+ },
+ {
+ "cell_type": "code",
+ "metadata": {
+ "id": "Kz9sccyGBqz3"
+ },
+ "source": [
+ "from google.colab import auth\n",
+ "auth.authenticate_user()"
+ ],
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Import Your Dependencies and Setup Your Bucket\n",
+ "Replace project and bucket with your variables with your project.\n",
+ "\n",
+ "**Important**: If you get an error, restart your runtime."
+ ],
+ "metadata": {
+ "id": "40qtP6zJuMXm"
+ }
+ },
+ {
+ "cell_type": "code",
+ "metadata": {
+ "id": "eEle839_Akqx"
+ },
+ "source": [
+ "import argparse\n",
+ "\n",
+ "import tensorflow as tf\n",
+ "from tensorflow import keras\n",
+ "from tensorflow_serving.apis import prediction_log_pb2\n",
+ "\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.ml.inference.base import RunInference\n",
+ "import tfx_bsl\n",
+ "from tfx_bsl.public.beam.run_inference import CreateModelHandler\n",
+ "from tfx_bsl.public import tfxio\n",
+ "from tfx_bsl.public.proto import model_spec_pb2\n",
+ "from tensorflow_metadata.proto.v0 import schema_pb2\n",
+ "\n",
+ "import numpy\n",
+ "\n",
+ "from typing import Dict, Text, Any, Tuple, List\n",
+ "\n",
+ "from apache_beam.options.pipeline_options import PipelineOptions\n",
+ "\n",
+ "project = \"<Replace With Your Project>\"\n",
+ "bucket = \"<Replace With Your Bucket>\"\n",
+ "\n",
+ "save_model_dir_multiply =
f'gs://{bucket}/tfx-inference/model/multiply_five/v1/'\n"
+ ],
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Create and Test a Simple Model\n",
+ "\n",
+ "This creates a model that predicts the 5 times table."
+ ],
+ "metadata": {
+ "id": "YzvZWEv-1oiK"
+ }
+ },
+ {
+ "cell_type": "code",
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "id": "SH7iq3zeBBJ-",
+ "outputId": "8ba3dbd2-fd13-4da2-f1e6-04f36fbcf706"
+ },
+ "source": [
+ "# Create training data which represents the 5 times multiplication
table for 0 to 99. x is the data and y the labels. \n",
+ "x = numpy.arange(0, 100) # Examples\n",
+ "y = x * 5 # Labels\n",
+ "\n",
+ "# Build a simple linear regression model.\n",
+ "# Note the model has a shape of (1) for its input layer, it will
expect a single int64 value.\n",
+ "input_layer = keras.layers.Input(shape=(1), dtype=tf.float32,
name='x')\n",
+ "output_layer= keras.layers.Dense(1)(input_layer)\n",
+ "\n",
+ "model = keras.Model(input_layer, output_layer)\n",
+ "model.compile(optimizer=tf.optimizers.Adam(),
loss='mean_absolute_error')\n",
+ "model.summary()"
+ ],
+ "execution_count": null,
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ "Model: \"model_3\"\n",
+
"_________________________________________________________________\n",
+ " Layer (type) Output Shape Param #
\n",
+
"=================================================================\n",
+ " x (InputLayer) [(None, 1)] 0
\n",
+ "
\n",
+ " dense_3 (Dense) (None, 1) 2
\n",
+ "
\n",
+
"=================================================================\n",
+ "Total params: 2\n",
+ "Trainable params: 2\n",
+ "Non-trainable params: 0\n",
+
"_________________________________________________________________\n"
+ ]
+ }
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Test the Model\n"
+ ],
+ "metadata": {
+ "id": "O_a0-4Gb19cy"
+ }
+ },
+ {
+ "cell_type": "code",
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "id": "5XkIYXhJBFmS",
+ "outputId": "82515add-9e26-4a5d-df07-389823098433"
+ },
+ "source": [
+ "model.fit(x, y, epochs=4000, verbose=0)\n",
+ "test_examples =[20, 40, 60, 90]\n",
+ "value_to_predict = numpy.array(test_examples, dtype=numpy.float32)\n",
+ "predictions = model.predict(value_to_predict)\n",
+ "\n",
+ "print('Test Examples ' + str(test_examples))\n",
+ "print('Predictions ' + str(predictions))"
+ ],
+ "execution_count": null,
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ "1/1 [==============================] - 0s 39ms/step\n",
+ "Test Examples [20, 40, 60, 90]\n",
+ "Predictions [[100.001434]\n",
+ " [200.00256 ]\n",
+ " [300.0037 ]\n",
+ " [450.00537 ]]\n"
+ ]
+ }
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Populate the Data into a TensorFlow Proto\n",
+ "\n",
+ "Tensorflow data uses protos. If you are loading from a file there are
helpers for this. Since we are using generated data, this code populates a
proto."
+ ],
+ "metadata": {
+ "id": "dEmleqiH3t71"
+ }
+ },
+ {
+ "cell_type": "code",
+ "metadata": {
+ "id": "XvKc9kQilPjx"
+ },
+ "source": [
+ "# This is an example of a proto that converts the samples and labels
into\n",
+ "# tensors usable by tensorflow.\n",
+ "class ExampleProcessor:\n",
+ " def create_example_with_label(self, feature: numpy.float32,\n",
+ " label: numpy.float32)->
tf.train.Example:\n",
+ " return tf.train.Example(\n",
+ " features=tf.train.Features(\n",
+ " feature={'x': self.create_feature(feature),\n",
+ " 'y' : self.create_feature(label)\n",
+ " }))\n",
+ "\n",
+ " def create_example(self, feature: numpy.float32):\n",
+ " return tf.train.Example(\n",
+ " features=tf.train.Features(\n",
+ " feature={'x' : self.create_feature(feature)})\n",
+ " )\n",
+ "\n",
+ " def create_feature(self, element: numpy.float32):\n",
+ " return
tf.train.Feature(float_list=tf.train.FloatList(value=[element]))\n",
+ "\n",
+ "# Create our labeled example file for 5 times table\n",
+ "\n",
+ "example_five_times_table = 'example_five_times_table.tfrecord'\n",
+ "\n",
+ "with tf.io.TFRecordWriter(example_five_times_table) as writer:\n",
+ " for i in zip(x, y):\n",
+ " example = ExampleProcessor().create_example_with_label(\n",
+ " feature=i[0], label=i[1])\n",
+ " writer.write(example.SerializeToString())\n",
+ "\n",
+ "# Create a file containing the values to predict\n",
+ "\n",
+ "predict_values_five_times_table =
'predict_values_five_times_table.tfrecord'\n",
+ "\n",
+ "with tf.io.TFRecordWriter(predict_values_five_times_table) as
writer:\n",
+ " for i in value_to_predict:\n",
+ " example = ExampleProcessor().create_example(feature=i)\n",
+ " writer.write(example.SerializeToString())"
+ ],
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Fit The Model\n",
+ "\n",
+ "This example builds a model. Since RunInference requires pretrained
models, this segment builds a usable model."
+ ],
+ "metadata": {
+ "id": "G-sAu3cf31f3"
+ }
+ },
+ {
+ "cell_type": "code",
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "id": "AnbrxXPKeAOQ",
+ "outputId": "0574f487-5624-4968-e6f5-110e79c40210"
+ },
+ "source": [
+ "RAW_DATA_TRAIN_SPEC = {\n",
+ "'x': tf.io.FixedLenFeature([], tf.float32),\n",
+ "'y': tf.io.FixedLenFeature([], tf.float32)\n",
+ "}\n",
+ "\n",
+ "dataset = tf.data.TFRecordDataset(example_five_times_table)\n",
+ "dataset = dataset.map(lambda e : tf.io.parse_example(e,
RAW_DATA_TRAIN_SPEC))\n",
+ "dataset = dataset.map(lambda t : (t['x'], t['y']))\n",
+ "dataset = dataset.batch(100)\n",
+ "dataset = dataset.repeat()\n",
+ "\n",
+ "model.fit(dataset, epochs=500, steps_per_epoch=1, verbose=0)"
+ ],
+ "execution_count": null,
+ "outputs": [
+ {
+ "output_type": "execute_result",
+ "data": {
+ "text/plain": [
+ "<keras.callbacks.History at 0x7fc0a24b8090>"
+ ]
+ },
+ "metadata": {},
+ "execution_count": 40
+ }
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Save the Model"
+ ],
+ "metadata": {
+ "id": "r4dpR6dQ4JwX"
+ }
+ },
+ {
+ "cell_type": "code",
+ "metadata": {
+ "id": "fYvrIYO3qiJx"
+ },
+ "source": [
+ "RAW_DATA_PREDICT_SPEC = {\n",
+ "'x': tf.io.FixedLenFeature([], tf.float32),\n",
+ "}\n",
+ "\n",
+ "@tf.function(input_signature=[tf.TensorSpec(shape=[None],
dtype=tf.string , name='examples')])\n",
+ "def serve_tf_examples_fn(serialized_tf_examples):\n",
+ " \"\"\"Returns the output to be used in the serving
signature.\"\"\"\n",
+ " features = tf.io.parse_example(serialized_tf_examples,
RAW_DATA_PREDICT_SPEC)\n",
+ " return model(features, training=False)\n",
+ "\n",
+ "signature = {'serving_default': serve_tf_examples_fn}\n",
+ "\n",
+ "tf.keras.models.save_model(model, save_model_dir_multiply,
signatures=signature)"
+ ],
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Run the Pipeline\n",
Review Comment:
I added a keyed example. I think it's nice as it also illustrates another
maybe more simple way to construct TF protos.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]