This is an automated email from the ASF dual-hosted git repository. riteshghorse pushed a commit to branch revert-26105-tf-notebook in repository https://gitbox.apache.org/repos/asf/beam.git
commit 2234c8ebef629e90ec80ec37cacb20990e3f2a56 Author: Ritesh Ghorse <[email protected]> AuthorDate: Wed Apr 5 20:22:10 2023 -0400 Revert "[Python] Separate out notebooks for tensorflow with tfx and built-in model handler (#26105)" This reverts commit 126db67d8be2b4a91d8964e1a7c291041694e607. --- .../beam-ml/run_inference_tensorflow.ipynb | 435 +++++++++++--- .../run_inference_tensorflow_with_tfx.ipynb | 657 --------------------- 2 files changed, 360 insertions(+), 732 deletions(-) diff --git a/examples/notebooks/beam-ml/run_inference_tensorflow.ipynb b/examples/notebooks/beam-ml/run_inference_tensorflow.ipynb index a541489a93c..034ab449a10 100644 --- a/examples/notebooks/beam-ml/run_inference_tensorflow.ipynb +++ b/examples/notebooks/beam-ml/run_inference_tensorflow.ipynb @@ -67,18 +67,21 @@ "source": [ "This notebook demonstrates the use of the [RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference) transform for [TensorFlow](https://www.tensorflow.org/).\n", "\n", - "Beam has built in support for 2 TensorFlow Model Handlers: [TFModelHandlerNumpy](https://github.com/apache/beam/blob/ca0787642a6b3804a742326147281c99ae8d08d2/sdks/python/apache_beam/ml/inference/tensorflow_inference.py#L91) and [TFModelHandlerTensor](https://github.com/apache/beam/blob/ca0787642a6b3804a742326147281c99ae8d08d2/sdks/python/apache_beam/ml/inference/tensorflow_inference.py#L184).\n", - "TFModelHandlerNumpy can be used to run inference on models expecting a `numpy` array as an input while TFModelHandlerTensor can be used to run inference on models expecting a `tf.Tensor` as an input.\n", + "Beam has built in support for 2 Tensorflow Model Handlers: [TFModelHandlerNumpy](https://github.com/apache/beam/blob/ca0787642a6b3804a742326147281c99ae8d08d2/sdks/python/apache_beam/ml/inference/tensorflow_inference.py#L91) and [TFModelHandlerTensor](https://github.com/apache/beam/blob/ca0787642a6b3804a742326147281c99ae8d08d2/sdks/python/apache_beam/ml/inference/tensorflow_inference.py#L184).\n", + "TFModelHandlerNumpy can be used to run inference on models expecting a Numpy array as an input while TFModelHandlerTensor can be used to run inference on models expecting a Tensor as an input.\n", "\n", - "If your model needs input of type `tf.Example` see the [Apache Beam RunInference with `tfx-bsl` notebook](https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_tensorflow_with_tfx.ipynb).\n", + "Beam's Runinference transform also accepts a ModelHandler generated from [`tfx-bsl`](https://github.com/tensorflow/tfx-bsl) using `CreateModelHandler`.\n", "\n", "The Apache Beam RunInference transform is used to make predictions for\n", - "a variety of machine learning models. For more information about the RunInference API, see [Get started with AI/ML pipelines](https://beam.apache.org/documentation/ml/overview/) in the Apache Beam documentation.\n", + "a variety of machine learning models. For more information about the RunInference API, see [Machine Learning](https://beam.apache.org/documentation/sdks/python-machine-learning) in the Apache Beam documentation.\n", "\n", "This notebook demonstrates the following steps:\n", + "- Import [`tfx-bsl`](https://github.com/tensorflow/tfx-bsl).\n", "- Build a simple TensorFlow model.\n", - "- Set up example data.\n", - "- Run those examples with the built-in model handlers and get a prediction inside an Apache Beam pipeline." + "- Set up example data\n", + "- Run those examples with built-in model handlers and get a prediction inside an Apache Beam pipeline.\n", + "- Set up example data in TensorFlow protos.\n", + "- Run those examples with `tfx-bsl` model handler and get a prediction inside an Apache Beam pipeline." ], "metadata": { "id": "HrCtxslBGK8Z" @@ -87,7 +90,19 @@ { "cell_type": "markdown", "source": [ - "To use RunInference with built-in Tensorflow model handler, install Apache Beam version 2.46 or later." + "## Before you begin\n", + "Complete the following setup steps.\n", + "\n", + "First, import `tfx-bsl`." + ], + "metadata": { + "id": "HrCtxslBGK8A" + } + }, + { + "cell_type": "markdown", + "source": [ + "To use RunInference with built-in Tensorflow Model Handler, install Apache Beam version 2.46 or later. Creation of a ModelHandler is supported in `tfx-bsl` versions 1.10 and later." ], "metadata": { "id": "gVCtGOKTHMm4" @@ -99,6 +114,7 @@ "id": "jBakpNZnAhqk" }, "source": [ + "!pip install tfx_bsl==1.10.0 --quiet\n", "!pip install protobuf --quiet\n", "!pip install apache_beam==2.46.0 --quiet" ], @@ -124,7 +140,7 @@ "from google.colab import auth\n", "auth.authenticate_user()" ], - "execution_count": 2, + "execution_count": null, "outputs": [] }, { @@ -146,18 +162,23 @@ }, "source": [ "import argparse\n", - "from typing import Dict, Text, Any, Tuple, List\n", - "import numpy\n", - "\n", - "from google.protobuf import text_format\n", "\n", "import tensorflow as tf\n", "from tensorflow import keras\n", + "from tensorflow_serving.apis import prediction_log_pb2\n", + "\n", "import apache_beam as beam\n", "from apache_beam.ml.inference.base import RunInference\n", - "from apache_beam.ml.inference.base import KeyedModelHandler\n", - "from apache_beam.ml.inference.tensorflow_inference import TFModelHandlerNumpy\n", - "from apache_beam.ml.inference.tensorflow_inference import TFModelHandlerTensor\n", + "import tfx_bsl\n", + "from tfx_bsl.public.beam.run_inference import CreateModelHandler\n", + "from tfx_bsl.public import tfxio\n", + "from tfx_bsl.public.proto import model_spec_pb2\n", + "from tensorflow_metadata.proto.v0 import schema_pb2\n", + "\n", + "import numpy\n", + "\n", + "from typing import Dict, Text, Any, Tuple, List\n", + "\n", "from apache_beam.options.pipeline_options import PipelineOptions\n", "\n", "project = \"PROJECT_ID\"\n", @@ -165,7 +186,7 @@ "\n", "save_model_dir_multiply = f'gs://{bucket}/tfx-inference/model/multiply_five/v1/'\n" ], - "execution_count": 10, + "execution_count": null, "outputs": [] }, { @@ -196,7 +217,7 @@ "base_uri": "https://localhost:8080/" }, "id": "SH7iq3zeBBJ-", - "outputId": "e15cab6b-1271-4b0b-bac3-ba76f8991077" + "outputId": "2fb860fe-4420-4266-a51b-80e0c296b0fa" }, "source": [ "# Create training data that represents the 5 times multiplication table for the numbers 0 to 99.\n", @@ -213,7 +234,7 @@ "model.compile(optimizer=tf.optimizers.Adam(), loss='mean_absolute_error')\n", "model.summary()" ], - "execution_count": 6, + "execution_count": null, "outputs": [ { "output_type": "stream", @@ -254,7 +275,7 @@ "base_uri": "https://localhost:8080/" }, "id": "5XkIYXhJBFmS", - "outputId": "724cad1b-58f6-4e97-f7ec-9526297a108e" + "outputId": "599f8a10-5923-44ae-f1b2-f6d86a06c2ad" }, "source": [ "model.fit(x, y, epochs=500, verbose=0)\n", @@ -265,18 +286,18 @@ "print('Test Examples ' + str(test_examples))\n", "print('Predictions ' + str(predictions))" ], - "execution_count": 7, + "execution_count": null, "outputs": [ { "output_type": "stream", "name": "stdout", "text": [ - "1/1 [==============================] - 0s 64ms/step\n", + "1/1 [==============================] - 0s 71ms/step\n", "Test Examples [20, 40, 60, 90]\n", - "Predictions [[ 51.815357]\n", - " [101.63492 ]\n", - " [151.45448 ]\n", - " [226.18384 ]]\n" + "Predictions [[ 34.466846]\n", + " [ 66.937996]\n", + " [ 99.409134]\n", + " [148.11584 ]]\n" ] } ] @@ -300,7 +321,7 @@ "metadata": { "id": "2JbE7WkGcAkK" }, - "execution_count": 8, + "execution_count": null, "outputs": [] }, { @@ -316,6 +337,9 @@ { "cell_type": "code", "source": [ + "from apache_beam.ml.inference.tensorflow_inference import TFModelHandlerNumpy\n", + "import apache_beam as beam\n", + "\n", "class FormatOutput(beam.DoFn):\n", " def process(self, element, *args, **kwargs):\n", " yield \"example is {example} prediction is {prediction}\".format(example=element.example, prediction=element.inference)\n", @@ -332,60 +356,294 @@ ], "metadata": { "colab": { - "base_uri": "https://localhost:8080/", - "height": 124 + "base_uri": "https://localhost:8080/" }, "id": "St07XoibcQSb", - "outputId": "028fb751-1f45-4c7b-da3f-5a3e31312798" + "outputId": "d373b1f9-00dc-4704-bf93-a27a74bf3673" }, - "execution_count": 9, + "execution_count": null, "outputs": [ { "output_type": "stream", - "name": "stderr", + "name": "stdout", "text": [ - "WARNING:apache_beam.runners.interactive.interactive_environment:Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features.\n" + "example is 20.0 prediction is [102.867615]\n", + "example is 40.0 prediction is [201.72066]\n", + "example is 60.0 prediction is [300.5737]\n", + "example is 90.0 prediction is [448.85324]\n" ] + } + ] + }, + { + "cell_type": "markdown", + "source": [ + "## KeyedModelHandler with TensorFlow using TFModelHandlerNumpy\n", + "\n", + "By default, the `ModelHandler` does not expect a key.\n", + "\n", + "* If you know that keys are associated with your examples, wrap the model handler with `beam.KeyedModelHandler`.\n", + "* If you don't know whether keys are associated with your examples, use `beam.MaybeKeyedModelHandler`." + ], + "metadata": { + "id": "tRLArcjOcYuO" + } + }, + { + "cell_type": "code", + "source": [ + "from apache_beam.ml.inference.base import KeyedModelHandler\n", + "from google.protobuf import text_format\n", + "import tensorflow as tf\n", + "from typing import Tuple\n", + "\n", + "class FormatOutputKeyed(FormatOutput):\n", + " # To simplify, inherit from FormatOutput.\n", + " def process(self, tuple_in: Tuple):\n", + " key, element = tuple_in\n", + " output = super().process(element)\n", + " yield \"{} : {}\".format(key, [op for op in output])\n", + "\n", + "examples = numpy.array([(1,20), (2,40), (3,60), (4,90)], dtype=numpy.float32)\n", + "keyed_model_handler = KeyedModelHandler(TFModelHandlerNumpy(save_model_dir_multiply))\n", + "with beam.Pipeline() as p:\n", + " _ = (p | 'CreateExamples' >> beam.Create(examples)\n", + " | RunInference(keyed_model_handler)\n", + " | beam.ParDo(FormatOutputKeyed())\n", + " | beam.Map(print)\n", + " )" + ], + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "P6l9RwL2cAW3", + "outputId": "13cc318e-51b1-4b1e-ba8b-7e7cbe1956e8" + }, + "execution_count": null, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "1.0 : ['example is 20.0 prediction is [102.867615]']\n", + "2.0 : ['example is 40.0 prediction is [201.72066]']\n", + "3.0 : ['example is 60.0 prediction is [300.5737]']\n", + "4.0 : ['example is 90.0 prediction is [448.85324]']\n" + ] + } + ] + }, + { + "cell_type": "markdown", + "source": [ + "## RunInference with Tensorflow using tfx-bsl\n", + "In versions 1.10.0 and later of `tfx-bsl`, you can\n", + "create a TensorFlow `ModelHandler` for use with Apache Beam. For more information about the RunInference API, see [Machine Learning](https://beam.apache.org/documentation/sdks/python-machine-learning) in the Apache Beam documentation.\n", + "\n", + "### Populate the data in a TensorFlow proto\n", + "\n", + "Tensorflow data uses protos. If you are loading from a file, helpers exist for this step. Because this example uses generated data, this code populates a proto." + ], + "metadata": { + "id": "dEmleqiH3t71" + } + }, + { + "cell_type": "code", + "metadata": { + "id": "XvKc9kQilPjx" + }, + "source": [ + "# This example shows a proto that converts the samples and labels into\n", + "# tensors usable by TensorFlow.\n", + "\n", + "class ExampleProcessor:\n", + " def create_example_with_label(self, feature: numpy.float32,\n", + " label: numpy.float32)-> tf.train.Example:\n", + " return tf.train.Example(\n", + " features=tf.train.Features(\n", + " feature={'x': self.create_feature(feature),\n", + " 'y' : self.create_feature(label)\n", + " }))\n", + "\n", + " def create_example(self, feature: numpy.float32):\n", + " return tf.train.Example(\n", + " features=tf.train.Features(\n", + " feature={'x' : self.create_feature(feature)})\n", + " )\n", + "\n", + " def create_feature(self, element: numpy.float32):\n", + " return tf.train.Feature(float_list=tf.train.FloatList(value=[element]))\n", + "\n", + "# Create a labeled example file for the 5 times table.\n", + "\n", + "example_five_times_table = 'example_five_times_table.tfrecord'\n", + "\n", + "with tf.io.TFRecordWriter(example_five_times_table) as writer:\n", + " for i in zip(x, y):\n", + " example = ExampleProcessor().create_example_with_label(\n", + " feature=i[0], label=i[1])\n", + " writer.write(example.SerializeToString())\n", + "\n", + "# Create a file containing the values to predict.\n", + "\n", + "predict_values_five_times_table = 'predict_values_five_times_table.tfrecord'\n", + "\n", + "with tf.io.TFRecordWriter(predict_values_five_times_table) as writer:\n", + " for i in value_to_predict:\n", + " example = ExampleProcessor().create_example(feature=i)\n", + " writer.write(example.SerializeToString())" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "### Fit The Model\n", + "\n", + "This step builds a model. Because RunInference requires pretrained models, this segment builds a usable model." + ], + "metadata": { + "id": "G-sAu3cf31f3" + } + }, + { + "cell_type": "code", + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" }, + "id": "AnbrxXPKeAOQ", + "outputId": "011b2fd3-722e-43b3-c914-c131eaa48bbc" + }, + "source": [ + "RAW_DATA_TRAIN_SPEC = {\n", + "'x': tf.io.FixedLenFeature([], tf.float32),\n", + "'y': tf.io.FixedLenFeature([], tf.float32)\n", + "}\n", + "\n", + "dataset = tf.data.TFRecordDataset(example_five_times_table)\n", + "dataset = dataset.map(lambda e : tf.io.parse_example(e, RAW_DATA_TRAIN_SPEC))\n", + "dataset = dataset.map(lambda t : (t['x'], t['y']))\n", + "dataset = dataset.batch(100)\n", + "dataset = dataset.repeat()\n", + "\n", + "model.fit(dataset, epochs=5000, steps_per_epoch=1, verbose=0)" + ], + "execution_count": null, + "outputs": [ { - "output_type": "display_data", + "output_type": "execute_result", "data": { - "application/javascript": [ - "\n", - " if (typeof window.interactive_beam_jquery == 'undefined') {\n", - " var jqueryScript = document.createElement('script');\n", - " jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n", - " jqueryScript.type = 'text/javascript';\n", - " jqueryScript.onload = function() {\n", - " var datatableScript = document.createElement('script');\n", - " datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n", - " datatableScript.type = 'text/javascript';\n", - " datatableScript.onload = function() {\n", - " window.interactive_beam_jquery = jQuery.noConflict(true);\n", - " window.interactive_beam_jquery(document).ready(function($){\n", - " \n", - " });\n", - " }\n", - " document.head.appendChild(datatableScript);\n", - " };\n", - " document.head.appendChild(jqueryScript);\n", - " } else {\n", - " window.interactive_beam_jquery(document).ready(function($){\n", - " \n", - " });\n", - " }" + "text/plain": [ + "<keras.callbacks.History at 0x7fa02b413130>" ] }, - "metadata": {} + "metadata": {}, + "execution_count": 26 + } + ] + }, + { + "cell_type": "markdown", + "source": [ + "### Save the model\n", + "\n", + "This step shows how to save your model." + ], + "metadata": { + "id": "r4dpR6dQ4JwX" + } + }, + { + "cell_type": "code", + "metadata": { + "id": "fYvrIYO3qiJx" + }, + "source": [ + "RAW_DATA_PREDICT_SPEC = {\n", + "'x': tf.io.FixedLenFeature([], tf.float32),\n", + "}\n", + "\n", + "# tf.function compiles the function into a callable TF graph.\n", + "# RunInference relies on calling a TF graph as a model.\n", + "# Note: The input signature should be type tf.string as supported by\n", + "# tfx-bsl ModelHandlers.\n", + "@tf.function(input_signature=[tf.TensorSpec(shape=[None], dtype=tf.string , name='examples')])\n", + "def serve_tf_examples_fn(serialized_tf_examples):\n", + " \"\"\"Returns the output to be used in the serving signature.\"\"\"\n", + " features = tf.io.parse_example(serialized_tf_examples, RAW_DATA_PREDICT_SPEC)\n", + " return model(features, training=False)\n", + "\n", + "signature = {'serving_default': serve_tf_examples_fn}\n", + "\n", + "# Signatures define the input and output types for a computation. The optional\n", + "# save signatures argument controls which methods in obj will be available to\n", + "# programs which consume SavedModels, for example, serving APIs.\n", + "# See https://www.tensorflow.org/api_docs/python/tf/saved_model/save\n", + "tf.keras.models.save_model(model, save_model_dir_multiply, signatures=signature)" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "## Run the Pipeline\n", + "Use the following code to run the pipeline.\n", + "\n", + "`FormatOutput` demonstrates how to extract values from the output protos.\n", + "\n", + "`CreateModelHandler` demonstrates the model handler that needs to be passed into the Apache Beam RunInference API." + ], + "metadata": { + "id": "P2UMmbNW4YQV" + } + }, + { + "cell_type": "code", + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" }, + "id": "PzjmXM_KvqHY", + "outputId": "67e61086-a83e-410c-c4e6-e6af77ad82bb" + }, + "source": [ + "from tfx_bsl.public.beam.run_inference import CreateModelHandler\n", + "\n", + "class FormatOutput(beam.DoFn):\n", + " def process(self, element: prediction_log_pb2.PredictionLog):\n", + " predict_log = element.predict_log\n", + " input_value = tf.train.Example.FromString(predict_log.request.inputs['examples'].string_val[0])\n", + " input_float_value = input_value.features.feature['x'].float_list.value[0]\n", + " output_value = predict_log.response.outputs\n", + " output_float_value = output_value['output_0'].float_val[0]\n", + " yield (f\"example is {input_float_value:.2f} prediction is {output_float_value:.2f}\")\n", + "\n", + "tfexample_beam_record = tfx_bsl.public.tfxio.TFExampleRecord(file_pattern=predict_values_five_times_table)\n", + "saved_model_spec = model_spec_pb2.SavedModelSpec(model_path=save_model_dir_multiply)\n", + "inference_spec_type = model_spec_pb2.InferenceSpecType(saved_model_spec=saved_model_spec)\n", + "model_handler = CreateModelHandler(inference_spec_type)\n", + "with beam.Pipeline() as p:\n", + " _ = (p | tfexample_beam_record.RawRecordBeamSource()\n", + " | RunInference(model_handler)\n", + " | beam.ParDo(FormatOutput())\n", + " | beam.Map(print)\n", + " )" + ], + "execution_count": null, + "outputs": [ { "output_type": "stream", "name": "stdout", "text": [ - "example is 20.0 prediction is [51.815357]\n", - "example is 40.0 prediction is [101.63492]\n", - "example is 60.0 prediction is [151.45448]\n", - "example is 90.0 prediction is [226.18384]\n" + "example is 20.00 prediction is 100.00\n", + "example is 40.00 prediction is 200.00\n", + "example is 60.00 prediction is 300.01\n", + "example is 90.00 prediction is 450.01\n" ] } ] @@ -393,29 +651,56 @@ { "cell_type": "markdown", "source": [ - "## KeyedModelHandler with TensorFlow using TFModelHandlerNumpy\n", + "## KeyedModelHandler with TensorFlow using tfx-bsl\n", "\n", "By default, the `ModelHandler` does not expect a key.\n", "\n", "* If you know that keys are associated with your examples, wrap the model handler with `beam.KeyedModelHandler`.\n", - "* If you don't know whether keys are associated with your examples, use `beam.MaybeKeyedModelHandler`." + "* If you don't know whether keys are associated with your examples, use `beam.MaybeKeyedModelHandler`.\n", + "\n", + "In addition to demonstrating how to use a keyed model handler, this step demonstrates how to use `tfx-bsl` examples." ], "metadata": { - "id": "tRLArcjOcYuO" + "id": "IXikjkGdHm9n" } }, { "cell_type": "code", "source": [ + "from apache_beam.ml.inference.base import KeyedModelHandler\n", + "from google.protobuf import text_format\n", + "import tensorflow as tf\n", + "\n", "class FormatOutputKeyed(FormatOutput):\n", " # To simplify, inherit from FormatOutput.\n", " def process(self, tuple_in: Tuple):\n", " key, element = tuple_in\n", " output = super().process(element)\n", - " yield \"{} : {}\".format(key, [op for op in output])\n", + " yield ' : '.join([key, next(output)])\n", "\n", - "examples = numpy.array([(1,20), (2,40), (3,60), (4,90)], dtype=numpy.float32)\n", - "keyed_model_handler = KeyedModelHandler(TFModelHandlerNumpy(save_model_dir_multiply))\n", + "def make_example(num):\n", + " # Return keyed values in the form of (key num, example).\n", + " key = f'key {num}'\n", + " tf_proto = text_format.Parse(\n", + " \"\"\"\n", + " features {\n", + " feature {key: \"x\" value { float_list { value: %f }}}\n", + " }\n", + " \"\"\"% num, tf.train.Example())\n", + " return (key, tf_proto)\n", + "\n", + "# Make a list of examples of type tf.train.Example.\n", + "examples = [\n", + " make_example(5.0),\n", + " make_example(50.0),\n", + " make_example(40.0),\n", + " make_example(100.0)\n", + "]\n", + "\n", + "tfexample_beam_record = tfx_bsl.public.tfxio.TFExampleRecord(file_pattern=predict_values_five_times_table)\n", + "saved_model_spec = model_spec_pb2.SavedModelSpec(model_path=save_model_dir_multiply)\n", + "inference_spec_type = model_spec_pb2.InferenceSpecType(saved_model_spec=saved_model_spec)\n", + "keyed_model_handler = KeyedModelHandler(CreateModelHandler(inference_spec_type))\n", "with beam.Pipeline() as p:\n", " _ = (p | 'CreateExamples' >> beam.Create(examples)\n", " | RunInference(keyed_model_handler)\n", @@ -427,19 +712,19 @@ "colab": { "base_uri": "https://localhost:8080/" }, - "id": "P6l9RwL2cAW3", - "outputId": "03459fea-7d0a-4501-93cb-18bbad915d13" + "id": "KPtE3fmdJQry", + "outputId": "8729f479-e347-4243-b8de-757efd28dba7" }, - "execution_count": 11, + "execution_count": null, "outputs": [ { "output_type": "stream", "name": "stdout", "text": [ - "1.0 : ['example is 20.0 prediction is [51.815357]']\n", - "2.0 : ['example is 40.0 prediction is [101.63492]']\n", - "3.0 : ['example is 60.0 prediction is [151.45448]']\n", - "4.0 : ['example is 90.0 prediction is [226.18384]']\n" + "key 5.0 : example is 5.00 prediction is 25.00\n", + "key 50.0 : example is 50.00 prediction is 250.01\n", + "key 40.0 : example is 40.00 prediction is 200.00\n", + "key 100.0 : example is 100.00 prediction is 500.01\n" ] } ] diff --git a/examples/notebooks/beam-ml/run_inference_tensorflow_with_tfx.ipynb b/examples/notebooks/beam-ml/run_inference_tensorflow_with_tfx.ipynb deleted file mode 100644 index fe9fc2b288a..00000000000 --- a/examples/notebooks/beam-ml/run_inference_tensorflow_with_tfx.ipynb +++ /dev/null @@ -1,657 +0,0 @@ -{ - "nbformat": 4, - "nbformat_minor": 0, - "metadata": { - "colab": { - "provenance": [], - "collapsed_sections": [ - "X80jy3FqHjK4", - "40qtP6zJuMXm", - "YzvZWEv-1oiK", - "rIwD_qEpX7Gu", - "O_a0-4Gb19cy", - "G-sAu3cf31f3", - "r4dpR6dQ4JwX", - "P2UMmbNW4YQV" - ] - }, - "kernelspec": { - "name": "python3", - "display_name": "Python 3" - }, - "language_info": { - "name": "python" - }, - "accelerator": "GPU" - }, - "cells": [ - { - "cell_type": "code", - "source": [ - "# @title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n", - "\n", - "# Licensed to the Apache Software Foundation (ASF) under one\n", - "# or more contributor license agreements. See the NOTICE file\n", - "# distributed with this work for additional information\n", - "# regarding copyright ownership. The ASF licenses this file\n", - "# to you under the Apache License, Version 2.0 (the\n", - "# \"License\"); you may not use this file except in compliance\n", - "# with the License. You may obtain a copy of the License at\n", - "#\n", - "# http://www.apache.org/licenses/LICENSE-2.0\n", - "#\n", - "# Unless required by applicable law or agreed to in writing,\n", - "# software distributed under the License is distributed on an\n", - "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n", - "# KIND, either express or implied. See the License for the\n", - "# specific language governing permissions and limitations\n", - "# under the License" - ], - "metadata": { - "cellView": "form", - "id": "fFjof1NgAJwu" - }, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "A8xNRyZMW1yK" - }, - "source": [ - "# Apache Beam RunInference with `tfx-bsl`\n", - "\n", - "<table align=\"left\">\n", - " <td>\n", - " <a target=\"_blank\" href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_tensorflow_with_tfx.ipynb\"><img src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\" />Run in Google Colab</a>\n", - " </td>\n", - " <td>\n", - " <a target=\"_blank\" href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_tensorflow_with_tfx.ipynb\"><img src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\" />View source on GitHub</a>\n", - " </td>\n", - "</table>\n" - ] - }, - { - "cell_type": "markdown", - "source": [ - "This notebook demonstrates the use of the [RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference) transform for [TensorFlow](https://www.tensorflow.org/) using [`tfx-bsl`](https://github.com/tensorflow/tfx-bsl).\n", - "\n", - "Use this approach when the trained model requires a `tf.Example` input. If you have `numpy` or `tf.Tensor` inputs, see the [Apache Beam RunInference with TensorFlow notebook](https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_tensorflow.ipynb) instead which demonstrates the use built-in model handlers for TensorFlow in Beam SDK starting 2.46.0.\n", - "\n", - "The Apache Beam RunInference transform accepts a model handler generated from [`tfx-bsl`](https://github.com/tensorflow/tfx-bsl) by using `CreateModelHandler`.\n", - "\n", - "The Apache Beam RunInference transform is used to make predictions for\n", - "a variety of machine learning models. For more information about the RunInference API, see [Get started with AI/ML pipelines](https://beam.apache.org/documentation/ml/overview/) in the Apache Beam documentation.\n", - "\n", - "This notebook demonstrates the following steps:\n", - "- Import [`tfx-bsl`](https://github.com/tensorflow/tfx-bsl).\n", - "- Build a simple TensorFlow model.\n", - "- Set up example data.\n", - "- Run those examples with the `tfx-bsl` model handler and get a prediction inside an Apache Beam pipeline." - ], - "metadata": { - "id": "HrCtxslBGK8Z" - } - }, - { - "cell_type": "markdown", - "source": [ - "## Before you begin\n", - "Complete the following setup steps.\n", - "\n", - "First, import `tfx-bsl`." - ], - "metadata": { - "id": "HrCtxslBGK8A" - } - }, - { - "cell_type": "markdown", - "source": [ - "Creation of a ModelHandler is supported in `tfx-bsl` versions 1.10 and later." - ], - "metadata": { - "id": "gVCtGOKTHMm4" - } - }, - { - "cell_type": "code", - "metadata": { - "id": "jBakpNZnAhqk" - }, - "source": [ - "!pip install tfx_bsl==1.10.0 --quiet\n", - "!pip install protobuf --quiet\n", - "!pip install apache_beam --quiet" - ], - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "markdown", - "source": [ - "### Authenticate with Google Cloud\n", - "This notebook relies on saving your model to Google Cloud. To use your Google Cloud account, authenticate this notebook." - ], - "metadata": { - "id": "X80jy3FqHjK4" - } - }, - { - "cell_type": "code", - "metadata": { - "id": "Kz9sccyGBqz3" - }, - "source": [ - "from google.colab import auth\n", - "auth.authenticate_user()" - ], - "execution_count": 2, - "outputs": [] - }, - { - "cell_type": "markdown", - "source": [ - "### Import dependencies and set up your bucket\n", - "Replace `PROJECT_ID` and `BUCKET_NAME` with the ID of your project and the name of your bucket.\n", - "\n", - "**Important**: If an error occurs, restart your runtime." - ], - "metadata": { - "id": "40qtP6zJuMXm" - } - }, - { - "cell_type": "code", - "metadata": { - "id": "eEle839_Akqx" - }, - "source": [ - "import argparse\n", - "\n", - "import tensorflow as tf\n", - "from tensorflow import keras\n", - "from tensorflow_serving.apis import prediction_log_pb2\n", - "\n", - "import apache_beam as beam\n", - "from apache_beam.ml.inference.base import RunInference\n", - "import tfx_bsl\n", - "from tfx_bsl.public.beam.run_inference import CreateModelHandler\n", - "from tfx_bsl.public import tfxio\n", - "from tfx_bsl.public.proto import model_spec_pb2\n", - "from tensorflow_metadata.proto.v0 import schema_pb2\n", - "\n", - "import numpy\n", - "\n", - "from typing import Dict, Text, Any, Tuple, List\n", - "\n", - "from apache_beam.options.pipeline_options import PipelineOptions\n", - "\n", - "project = \"PROJECT_ID\"\n", - "bucket = \"BUCKET_NAME\"\n", - "\n", - "save_model_dir_multiply = f'gs://{bucket}/tfx-inference/model/multiply_five/v1/'\n" - ], - "execution_count": 12, - "outputs": [] - }, - { - "cell_type": "markdown", - "source": [ - "## Create and test a simple model\n", - "\n", - "This step creates and tests a model that predicts the 5 times table." - ], - "metadata": { - "id": "YzvZWEv-1oiK" - } - }, - { - "cell_type": "markdown", - "metadata": { - "id": "rIwD_qEpX7Gu" - }, - "source": [ - "### Create the model\n", - "Create training data and build a linear regression model." - ] - }, - { - "cell_type": "code", - "metadata": { - "colab": { - "base_uri": "https://localhost:8080/" - }, - "id": "SH7iq3zeBBJ-", - "outputId": "c5adb7ec-285b-401e-f9be-1e9b83c6d0ba" - }, - "source": [ - "# Create training data that represents the 5 times multiplication table for the numbers 0 to 99.\n", - "# x is the data and y is the labels.\n", - "x = numpy.arange(0, 100) # Examples\n", - "y = x * 5 # Labels\n", - "\n", - "# Build a simple linear regression model.\n", - "# Note that the model has a shape of (1) for its input layer and expects a single int64 value.\n", - "input_layer = keras.layers.Input(shape=(1), dtype=tf.float32, name='x')\n", - "output_layer= keras.layers.Dense(1)(input_layer)\n", - "\n", - "model = keras.Model(input_layer, output_layer)\n", - "model.compile(optimizer=tf.optimizers.Adam(), loss='mean_absolute_error')\n", - "model.summary()" - ], - "execution_count": 4, - "outputs": [ - { - "output_type": "stream", - "name": "stdout", - "text": [ - "Model: \"model\"\n", - "_________________________________________________________________\n", - " Layer (type) Output Shape Param # \n", - "=================================================================\n", - " x (InputLayer) [(None, 1)] 0 \n", - " \n", - " dense (Dense) (None, 1) 2 \n", - " \n", - "=================================================================\n", - "Total params: 2\n", - "Trainable params: 2\n", - "Non-trainable params: 0\n", - "_________________________________________________________________\n" - ] - } - ] - }, - { - "cell_type": "markdown", - "source": [ - "### Test the model\n", - "\n", - "This step tests the model that you created." - ], - "metadata": { - "id": "O_a0-4Gb19cy" - } - }, - { - "cell_type": "code", - "metadata": { - "colab": { - "base_uri": "https://localhost:8080/" - }, - "id": "5XkIYXhJBFmS", - "outputId": "e3bb5079-5cb8-4fe4-eb8d-d3d13d5f9f0c" - }, - "source": [ - "model.fit(x, y, epochs=500, verbose=0)\n", - "test_examples =[20, 40, 60, 90]\n", - "value_to_predict = numpy.array(test_examples, dtype=numpy.float32)\n", - "predictions = model.predict(value_to_predict)\n", - "\n", - "print('Test Examples ' + str(test_examples))\n", - "print('Predictions ' + str(predictions))" - ], - "execution_count": 6, - "outputs": [ - { - "output_type": "stream", - "name": "stdout", - "text": [ - "1/1 [==============================] - 0s 94ms/step\n", - "Test Examples [20, 40, 60, 90]\n", - "Predictions [[ 9.201942]\n", - " [16.40566 ]\n", - " [23.609379]\n", - " [34.41496 ]]\n" - ] - } - ] - }, - { - "cell_type": "markdown", - "source": [ - "## RunInference with Tensorflow using `tfx-bsl`\n", - "In versions 1.10.0 and later of `tfx-bsl`, you can\n", - "create a TensorFlow `ModelHandler` for use with Apache Beam. For more information about the RunInference API, see [Get started with AI/ML pipelines](https://beam.apache.org/documentation/ml/overview/) in the Apache Beam documentation.\n", - "\n", - "### Populate the data in a TensorFlow proto\n", - "\n", - "Tensorflow data uses protos. If you are loading from a file, helpers exist for this step. Because this example uses generated data, this code populates a proto." - ], - "metadata": { - "id": "dEmleqiH3t71" - } - }, - { - "cell_type": "code", - "metadata": { - "id": "XvKc9kQilPjx" - }, - "source": [ - "# This example shows a proto that converts the samples and labels into\n", - "# tensors usable by TensorFlow.\n", - "\n", - "class ExampleProcessor:\n", - " def create_example_with_label(self, feature: numpy.float32,\n", - " label: numpy.float32)-> tf.train.Example:\n", - " return tf.train.Example(\n", - " features=tf.train.Features(\n", - " feature={'x': self.create_feature(feature),\n", - " 'y' : self.create_feature(label)\n", - " }))\n", - "\n", - " def create_example(self, feature: numpy.float32):\n", - " return tf.train.Example(\n", - " features=tf.train.Features(\n", - " feature={'x' : self.create_feature(feature)})\n", - " )\n", - "\n", - " def create_feature(self, element: numpy.float32):\n", - " return tf.train.Feature(float_list=tf.train.FloatList(value=[element]))\n", - "\n", - "# Create a labeled example file for the 5 times table.\n", - "\n", - "example_five_times_table = 'example_five_times_table.tfrecord'\n", - "\n", - "with tf.io.TFRecordWriter(example_five_times_table) as writer:\n", - " for i in zip(x, y):\n", - " example = ExampleProcessor().create_example_with_label(\n", - " feature=i[0], label=i[1])\n", - " writer.write(example.SerializeToString())\n", - "\n", - "# Create a file containing the values to predict.\n", - "\n", - "predict_values_five_times_table = 'predict_values_five_times_table.tfrecord'\n", - "\n", - "with tf.io.TFRecordWriter(predict_values_five_times_table) as writer:\n", - " for i in value_to_predict:\n", - " example = ExampleProcessor().create_example(feature=i)\n", - " writer.write(example.SerializeToString())" - ], - "execution_count": 7, - "outputs": [] - }, - { - "cell_type": "markdown", - "source": [ - "### Fit The Model\n", - "\n", - "This step builds a model. Because RunInference requires pretrained models, this segment builds a usable model." - ], - "metadata": { - "id": "G-sAu3cf31f3" - } - }, - { - "cell_type": "code", - "metadata": { - "colab": { - "base_uri": "https://localhost:8080/" - }, - "id": "AnbrxXPKeAOQ", - "outputId": "42439aac-3a10-4e86-829f-44332aad6173" - }, - "source": [ - "RAW_DATA_TRAIN_SPEC = {\n", - "'x': tf.io.FixedLenFeature([], tf.float32),\n", - "'y': tf.io.FixedLenFeature([], tf.float32)\n", - "}\n", - "\n", - "dataset = tf.data.TFRecordDataset(example_five_times_table)\n", - "dataset = dataset.map(lambda e : tf.io.parse_example(e, RAW_DATA_TRAIN_SPEC))\n", - "dataset = dataset.map(lambda t : (t['x'], t['y']))\n", - "dataset = dataset.batch(100)\n", - "dataset = dataset.repeat()\n", - "\n", - "model.fit(dataset, epochs=5000, steps_per_epoch=1, verbose=0)" - ], - "execution_count": 8, - "outputs": [ - { - "output_type": "execute_result", - "data": { - "text/plain": [ - "<keras.callbacks.History at 0x7f6960074c70>" - ] - }, - "metadata": {}, - "execution_count": 8 - } - ] - }, - { - "cell_type": "markdown", - "source": [ - "### Save the model\n", - "\n", - "This step shows how to save your model." - ], - "metadata": { - "id": "r4dpR6dQ4JwX" - } - }, - { - "cell_type": "code", - "metadata": { - "id": "fYvrIYO3qiJx" - }, - "source": [ - "RAW_DATA_PREDICT_SPEC = {\n", - "'x': tf.io.FixedLenFeature([], tf.float32),\n", - "}\n", - "\n", - "# tf.function compiles the function into a callable TF graph.\n", - "# RunInference relies on calling a TF graph as a model.\n", - "# Note: The input signature should be type tf.string as supported by\n", - "# tfx-bsl ModelHandlers.\n", - "@tf.function(input_signature=[tf.TensorSpec(shape=[None], dtype=tf.string , name='examples')])\n", - "def serve_tf_examples_fn(serialized_tf_examples):\n", - " \"\"\"Returns the output to be used in the serving signature.\"\"\"\n", - " features = tf.io.parse_example(serialized_tf_examples, RAW_DATA_PREDICT_SPEC)\n", - " return model(features, training=False)\n", - "\n", - "signature = {'serving_default': serve_tf_examples_fn}\n", - "\n", - "# Signatures define the input and output types for a computation. The optional\n", - "# save signatures argument controls which methods in obj will be available to\n", - "# programs which consume SavedModels, for example, serving APIs.\n", - "# See https://www.tensorflow.org/api_docs/python/tf/saved_model/save\n", - "tf.keras.models.save_model(model, save_model_dir_multiply, signatures=signature)" - ], - "execution_count": 9, - "outputs": [] - }, - { - "cell_type": "markdown", - "source": [ - "## Run the Pipeline\n", - "Use the following code to run the pipeline.\n", - "\n", - "`FormatOutput` demonstrates how to extract values from the output protos.\n", - "\n", - "`CreateModelHandler` demonstrates the model handler that needs to be passed into the Apache Beam RunInference API." - ], - "metadata": { - "id": "P2UMmbNW4YQV" - } - }, - { - "cell_type": "code", - "metadata": { - "colab": { - "base_uri": "https://localhost:8080/", - "height": 193 - }, - "id": "PzjmXM_KvqHY", - "outputId": "0aa60bef-52a0-4ce2-d228-3fac977d59e0" - }, - "source": [ - "from tfx_bsl.public.beam.run_inference import CreateModelHandler\n", - "\n", - "class FormatOutput(beam.DoFn):\n", - " def process(self, element: prediction_log_pb2.PredictionLog):\n", - " predict_log = element.predict_log\n", - " input_value = tf.train.Example.FromString(predict_log.request.inputs['examples'].string_val[0])\n", - " input_float_value = input_value.features.feature['x'].float_list.value[0]\n", - " output_value = predict_log.response.outputs\n", - " output_float_value = output_value['output_0'].float_val[0]\n", - " yield (f\"example is {input_float_value:.2f} prediction is {output_float_value:.2f}\")\n", - "\n", - "tfexample_beam_record = tfx_bsl.public.tfxio.TFExampleRecord(file_pattern=predict_values_five_times_table)\n", - "saved_model_spec = model_spec_pb2.SavedModelSpec(model_path=save_model_dir_multiply)\n", - "inference_spec_type = model_spec_pb2.InferenceSpecType(saved_model_spec=saved_model_spec)\n", - "model_handler = CreateModelHandler(inference_spec_type)\n", - "with beam.Pipeline() as p:\n", - " _ = (p | tfexample_beam_record.RawRecordBeamSource()\n", - " | RunInference(model_handler)\n", - " | beam.ParDo(FormatOutput())\n", - " | beam.Map(print)\n", - " )" - ], - "execution_count": 10, - "outputs": [ - { - "output_type": "stream", - "name": "stderr", - "text": [ - "WARNING:apache_beam.runners.interactive.interactive_environment:Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features.\n" - ] - }, - { - "output_type": "display_data", - "data": { - "application/javascript": [ - "\n", - " if (typeof window.interactive_beam_jquery == 'undefined') {\n", - " var jqueryScript = document.createElement('script');\n", - " jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n", - " jqueryScript.type = 'text/javascript';\n", - " jqueryScript.onload = function() {\n", - " var datatableScript = document.createElement('script');\n", - " datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n", - " datatableScript.type = 'text/javascript';\n", - " datatableScript.onload = function() {\n", - " window.interactive_beam_jquery = jQuery.noConflict(true);\n", - " window.interactive_beam_jquery(document).ready(function($){\n", - " \n", - " });\n", - " }\n", - " document.head.appendChild(datatableScript);\n", - " };\n", - " document.head.appendChild(jqueryScript);\n", - " } else {\n", - " window.interactive_beam_jquery(document).ready(function($){\n", - " \n", - " });\n", - " }" - ] - }, - "metadata": {} - }, - { - "output_type": "stream", - "name": "stderr", - "text": [ - "WARNING:tensorflow:From /usr/local/lib/python3.9/dist-packages/tfx_bsl/beam/run_inference.py:615: load (from tensorflow.python.saved_model.loader_impl) is deprecated and will be removed in a future version.\n", - "Instructions for updating:\n", - "Use `tf.saved_model.load` instead.\n", - "WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.\n" - ] - }, - { - "output_type": "stream", - "name": "stdout", - "text": [ - "example is 20.00 prediction is 104.36\n", - "example is 40.00 prediction is 202.62\n", - "example is 60.00 prediction is 300.87\n", - "example is 90.00 prediction is 448.26\n" - ] - } - ] - }, - { - "cell_type": "markdown", - "source": [ - "## KeyedModelHandler with TensorFlow using `tfx-bsl`\n", - "\n", - "By default, the `ModelHandler` does not expect a key.\n", - "\n", - "* If you know that keys are associated with your examples, wrap the model handler with `beam.KeyedModelHandler`.\n", - "* If you don't know whether keys are associated with your examples, use `beam.MaybeKeyedModelHandler`.\n", - "\n", - "In addition to demonstrating how to use a keyed model handler, this step demonstrates how to use `tfx-bsl` examples." - ], - "metadata": { - "id": "IXikjkGdHm9n" - } - }, - { - "cell_type": "code", - "source": [ - "from apache_beam.ml.inference.base import KeyedModelHandler\n", - "from google.protobuf import text_format\n", - "import tensorflow as tf\n", - "\n", - "class FormatOutputKeyed(FormatOutput):\n", - " # To simplify, inherit from FormatOutput.\n", - " def process(self, tuple_in: Tuple):\n", - " key, element = tuple_in\n", - " output = super().process(element)\n", - " yield ' : '.join([key, next(output)])\n", - "\n", - "def make_example(num):\n", - " # Return keyed values in the form of (key num, example).\n", - " key = f'key {num}'\n", - " tf_proto = text_format.Parse(\n", - " \"\"\"\n", - " features {\n", - " feature {key: \"x\" value { float_list { value: %f }}}\n", - " }\n", - " \"\"\"% num, tf.train.Example())\n", - " return (key, tf_proto)\n", - "\n", - "# Make a list of examples of type tf.train.Example.\n", - "examples = [\n", - " make_example(5.0),\n", - " make_example(50.0),\n", - " make_example(40.0),\n", - " make_example(100.0)\n", - "]\n", - "\n", - "tfexample_beam_record = tfx_bsl.public.tfxio.TFExampleRecord(file_pattern=predict_values_five_times_table)\n", - "saved_model_spec = model_spec_pb2.SavedModelSpec(model_path=save_model_dir_multiply)\n", - "inference_spec_type = model_spec_pb2.InferenceSpecType(saved_model_spec=saved_model_spec)\n", - "keyed_model_handler = KeyedModelHandler(CreateModelHandler(inference_spec_type))\n", - "with beam.Pipeline() as p:\n", - " _ = (p | 'CreateExamples' >> beam.Create(examples)\n", - " | RunInference(keyed_model_handler)\n", - " | beam.ParDo(FormatOutputKeyed())\n", - " | beam.Map(print)\n", - " )" - ], - "metadata": { - "colab": { - "base_uri": "https://localhost:8080/" - }, - "id": "KPtE3fmdJQry", - "outputId": "c33558fc-fb12-4c20-b828-b5520721f279" - }, - "execution_count": 11, - "outputs": [ - { - "output_type": "stream", - "name": "stdout", - "text": [ - "key 5.0 : example is 5.00 prediction is 30.67\n", - "key 50.0 : example is 50.00 prediction is 251.75\n", - "key 40.0 : example is 40.00 prediction is 202.62\n", - "key 100.0 : example is 100.00 prediction is 497.38\n" - ] - } - ] - } - ] -} \ No newline at end of file
