Juta commented on code in PR #25381:
URL: https://github.com/apache/beam/pull/25381#discussion_r1104747738
##########
examples/notebooks/beam-ml/tfma_beam.ipynb:
##########
@@ -0,0 +1,669 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# TensorFlow Model Analysis in Beam\n",
+ "[TensorFlow Model Analysis
(TFMA)](https://www.tensorflow.org/tfx/guide/tfma) is a library for performing
model evaluation across different slices of data. TFMA performs its
computations in a distributed manner over large amounts of data using Apache
Beam.\n",
+ "\n",
+ "This example colab notebook illustrates how TFMA can be used to
investigate and visualize the performance of a model as part of your Beam
pipeline. This allows for scalable and flexible execution of your evaluation
pipeline. For this we will use
[**ExtractEvaluateAndWriteResults**](https://www.tensorflow.org/tfx/model_analysis/api_docs/python/tfma/ExtractEvaluateAndWriteResults),
which is a PTransform for performing extraction, evaluation, and writing
results all in one step.\n",
+ "\n",
+ "For additional information on TFMA, you can refer to the [TFMA basic
notebook](https://www.tensorflow.org/tfx/tutorials/model_analysis/tfma_basic)
which provides a more in-depth look at its capabilities"
+ ],
+ "metadata": {
+ "id": "1m9dEIsQAP_-"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Install Jupyter extensions\n",
+ "Note: If running in a local Jupyter notebook, then these Jupyter
extensions must be installed in the environment before running Jupyter.\n",
+ "\n",
+ "```bash\n",
+ "jupyter nbextension enable --py widgetsnbextension --sys-prefix \n",
+ "jupyter nbextension install --py --symlink tensorflow_model_analysis
--sys-prefix \n",
+ "jupyter nbextension enable --py tensorflow_model_analysis
--sys-prefix \n",
+ "```"
+ ],
+ "metadata": {
+ "id": "GKcUZKcTRhW_"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Install TensorFlow Model Analysis (TFMA)\n",
+ "\n",
+ "This will pull in all the dependencies, and will take a minute."
+ ],
+ "metadata": {
+ "id": "-01Hts8eR9OV"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "8aLJh4pFasK0"
+ },
+ "outputs": [],
+ "source": [
+ "# Upgrade pip to the latest, and install TFMA.\n",
+ "!pip install -U pip\n",
+ "!pip install tensorflow-model-analysis\n",
+ "\n",
+ "# To use the newly installed version, restart the runtime.\n",
+ "exit() "
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "SUTczmH2dWk2"
+ },
+ "outputs": [],
+ "source": [
+ "# This setup was tested with TF 2.11, TFMA 0.43 and Beam 2.44 (using
colab),\n",
+ "# but it should also work with the latest release.\n",
+ "import sys\n",
+ "\n",
+ "# Confirm that we're using Python 3\n",
+ "assert sys.version_info.major==3, 'This notebook must be run using
Python 3.'\n",
+ "\n",
+ "import tensorflow as tf\n",
+ "print('TF version: {}'.format(tf.__version__))\n",
+ "import apache_beam as beam\n",
+ "print('Beam version: {}'.format(beam.__version__))\n",
+ "import tensorflow_model_analysis as tfma\n",
+ "print('TFMA version: {}'.format(tfma.__version__))\n",
+ "import tensorflow_datasets as tfds\n",
+ "print('TFDS version: {}'.format(tfds.__version__))"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "**NOTE: The output above should be clear of errors before proceeding.
Re-run the install and restart your kernel if you are still seeing errors.**"
+ ],
+ "metadata": {
+ "id": "KP7V1pIU_9H2"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "SHL89whuLqmq"
+ },
+ "source": [
+ "# Data preprocessing"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "6yapf7rN_lB7"
+ },
+ "source": [
+ "## Diamonds price prediction\n",
+ "\n",
+ "We will be using the [TFDS diamonds
dataset](https://www.tensorflow.org/datasets/catalog/diamonds) to train a
linear regression model that will predict the price of a diamond. This dataset
contains various physical attributes such as the weight of the diamond (carat),
the cut quality, color, clarity and the price of 53940 diamonds. The model's
performance will be evaluated using metrics such as mean squared error and mean
absolute error.\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "In order to simulate a scenario where a model's performance improves
over time as new data is added to the dataset, we will first train a model
called v1 using half of the diamonds dataset. Later on, we will train a second
model called v2 using additional data. This will enable us to demonstrate the
use of TFMA when comparing the performance of the two models for the same task."
+ ],
+ "metadata": {
+ "id": "y9a9rv3_YYhE"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "hQCHORIG8Ixv"
+ },
+ "outputs": [],
+ "source": [
+ "# Load the data from TFDS and create a train, test and validation
dataset by splitting the dataset into parts\n",
+ "(ds_train_v1, ds_test, ds_val), info = tfds.load('diamonds',
split=['train[:40%]', 'train[80%:90%]', 'train[90%:]'], as_supervised=True,
with_info=True)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "import numpy as np\n",
+ "\n",
+ "# Load in the numerical training data to use for normalization\n",
+ "def extract_numerical_features(item):\n",
+ " carat = item['carat']\n",
+ " depth = item['depth']\n",
+ " table = item['table']\n",
+ " x = item['x']\n",
+ " y = item['y']\n",
+ " z = item['z']\n",
+ " \n",
+ " return [carat, depth, table, x, y, z]\n",
+ "\n",
+ "def get_train_data(ds_train):\n",
+ " train_data = []\n",
+ " for item, label in ds_train:\n",
+ " features = extract_numerical_features(item)\n",
+ " train_data.append(features)\n",
+ "\n",
+ " train_data = np.array(train_data)\n",
+ "\n",
+ " return train_data"
+ ],
+ "metadata": {
+ "id": "_sNqOzwNGo6V"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "train_data_v1 = get_train_data(ds_train_v1)"
+ ],
+ "metadata": {
+ "id": "PVVUZ1LOwQAf"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Define the features length\n",
+ "NUMERICAL_FEATURES = 6\n",
+ "NUM_FEATURES = (NUMERICAL_FEATURES +\n",
+ " info.features['features']['color'].num_classes +\n",
+ " info.features['features']['cut'].num_classes +\n",
+ " info.features['features']['clarity'].num_classes)"
+ ],
+ "metadata": {
+ "id": "__6xLw92aI9a"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "ZJt8Wu2LmX9j"
+ },
+ "outputs": [],
+ "source": [
+ "# Transform the input data into a feature vector and label by
selecting the input and output for the model\n",
+ "def transform_data(item, label):\n",
+ " numerical_features = extract_numerical_features(item)\n",
+ "\n",
+ " # Categorical features will be encoded using one-hot encoding\n",
+ " color = tf.one_hot(item['color'],
info.features['features']['color'].num_classes)\n",
+ " cut = tf.one_hot(item['cut'],
info.features['features']['cut'].num_classes)\n",
+ " clarity = tf.one_hot(item['clarity'],
info.features['features']['clarity'].num_classes)\n",
+ " \n",
+ " # Create output tensor\n",
+ " output = tf.concat([tf.stack(numerical_features, axis=0), color,
cut, clarity], 0)\n",
+ " return output, [label]"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "f3ZxhNZDsbkx"
+ },
+ "outputs": [],
+ "source": [
+ "ds_train_v1 = ds_train_v1.map(transform_data)\n",
+ "ds_test = ds_test.map(transform_data)\n",
+ "ds_val = ds_val.map(transform_data)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Prepare the data for training by structuring it in batches\n",
+ "BATCH_SIZE = 32\n",
+ "ds_train_v1 = ds_train_v1.batch(BATCH_SIZE)\n",
+ "ds_test = ds_test.batch(BATCH_SIZE)"
+ ],
+ "metadata": {
+ "id": "sw3udkicwVZE"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## TFRecords creation\n",
+ "\n",
+ "TFMA and Beam need to read the dataset used during evaluation from a
file. We will create a TFRecords file that contains our validation dataset."
+ ],
+ "metadata": {
+ "id": "KFd6NwPAacSM"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "ipHOHiRqMJOi"
+ },
+ "outputs": [],
+ "source": [
+ "!mkdir data"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "TkeG4uw1K9Dt"
+ },
+ "outputs": [],
+ "source": [
+ "# Write the validation record to a file (used by TFMA)\n",
+ "tfrecord_file = 'data/val_data.tfrecord'\n",
+ "\n",
+ "with tf.io.TFRecordWriter(tfrecord_file) as file_writer:\n",
+ " for x, y in ds_val:\n",
+ " record_bytes =
tf.train.Example(features=tf.train.Features(feature={\n",
+ " \"inputs\":
tf.train.Feature(float_list=tf.train.FloatList(value=x)),\n",
+ " \"output\":
tf.train.Feature(float_list=tf.train.FloatList(value=[y])),\n",
+ " })).SerializeToString()\n",
+ " file_writer.write(record_bytes)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "zhunMzSOLmdG"
+ },
+ "source": [
+ "# Model definition and training"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "Now let's train a linear regression model that will predict the price
of a diamond. The model we will train is a neural network with one hidden
layer. We also will use one normalisation layer to scale all the numerical
features between 0 and 1."
+ ],
+ "metadata": {
+ "id": "Svsic-PSbGu7"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "so7l2WDzd2kg"
+ },
+ "outputs": [],
+ "source": [
+ "def construct_model(model_name, train_data):\n",
+ " inputs = tf.keras.Input(shape=(NUM_FEATURES,), name='inputs')\n",
+ "\n",
+ " # Normalize numerical features\n",
+ " normalization_layer = tf.keras.layers.Normalization()\n",
+ " # Fit normalization layer on training data\n",
+ " normalization_layer.adapt(train_data)\n",
+ " # Split input between numerical and categorical input\n",
+ " input_numerical = tf.gather(inputs,
indices=[*range(NUMERICAL_FEATURES)], axis=1)\n",
+ " input_normalized = normalization_layer(input_numerical)\n",
+ " input_one_hot = tf.gather(inputs,
indices=[*range(NUMERICAL_FEATURES, NUM_FEATURES)], axis=1)\n",
+ " # Define one hidden layer with 8 neurons\n",
+ " x = tf.keras.layers.Dense(8,
activation='relu')(tf.concat([input_normalized, input_one_hot], 1))\n",
+ " outputs = tf.keras.layers.Dense(1, name='output')(x)\n",
+ " model = tf.keras.Model(inputs=inputs, outputs=outputs,
name=model_name)\n",
+ "\n",
+ " model.compile(\n",
+ " optimizer=tf.keras.optimizers.Adam(learning_rate=0.1),\n",
+ " loss='mean_absolute_error')\n",
+ " \n",
+ " return model"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "buc0NIc4oRv4"
+ },
+ "outputs": [],
+ "source": [
+ "model_v1 = construct_model('model_v1', train_data_v1)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "8bvna8ZJAKj5"
+ },
+ "outputs": [],
+ "source": [
+ "# Train the model\n",
+ "history = model_v1.fit(\n",
+ " ds_train_v1,\n",
+ " validation_data=ds_test,\n",
+ " epochs=5,\n",
+ " verbose=1)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "pyCfKRY9DUhO"
+ },
+ "outputs": [],
+ "source": [
+ "# Save the model to disk\n",
+ "model_path_v1 = 'saved_model_v1'\n",
+ "model_v1.save(model_path_v1)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "kwUycnjpLvTX"
+ },
+ "source": [
+ "# Evaluation"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "Now that we have trained a model, we can use TFMA to analyze the
performance. The first thing we need to do is define our evaluation config. For
our use case, we will use the most common metrics used for a linear regression
model: MAE & MSE. See [TFMA metrics and
plots](https://www.tensorflow.org/tfx/model_analysis/metrics) for more
information about the supported evaluation parameters. "
+ ],
+ "metadata": {
+ "id": "7gmbEx6wG2Za"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "b2NN9zVr-AE1"
+ },
+ "outputs": [],
+ "source": [
+ "from google.protobuf import text_format\n",
+ "\n",
+ "# Define TFMA evaluation config\n",
+ "eval_config = text_format.Parse(\"\"\"\n",
+ " ## Model information\n",
+ " model_specs {\n",
+ " # For keras (and serving models) we need to add a `label_key`.\n",
+ " label_key: \"output\"\n",
+ " }\n",
+ "\n",
+ " ## Post training metric information. These will be merged with any
built-in\n",
+ " ## metrics from training.\n",
+ " metrics_specs {\n",
+ " metrics { class_name: \"ExampleCount\" }\n",
+ " metrics { class_name: \"MeanAbsoluteError\" }\n",
+ " metrics { class_name: \"MeanSquaredError\" }\n",
+ " metrics { class_name: \"MeanPrediction\" }\n",
+ " }\n",
+ "\n",
+ " slicing_specs {}\n",
+ "\"\"\", tfma.EvalConfig())"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "We will now use
[ExtractEvaluateAndWriteResults](https://www.tensorflow.org/tfx/model_analysis/api_docs/python/tfma/ExtractEvaluateAndWriteResults),
which is a PTransform for performing extraction, evaluation, and writing
results. This PTransform can directly be used in our Beam pipeline if we
combine it with reading in our TFRecords via
[TFXIO](https://www.tensorflow.org/tfx/tfx_bsl/api_docs/python/tfx_bsl/public/tfxio)"
+ ],
+ "metadata": {
+ "id": "8JVSVK4iH-d2"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "4NDEQAMDhJWL"
+ },
+ "outputs": [],
+ "source": [
+ "from tfx_bsl.public import tfxio\n",
+ "\n",
+ "output_path = 'evaluation_results'\n",
+ "\n",
+ "eval_shared_model = tfma.default_eval_shared_model(\n",
+ " eval_saved_model_path=model_path_v1, eval_config=eval_config)\n",
+ "\n",
+ "tfx_io = tfxio.TFExampleRecord(\n",
+ " file_pattern=tfrecord_file,\n",
+ " raw_record_column_name=tfma.ARROW_INPUT_COLUMN)\n",
+ "\n",
+ "# Run Evaluation\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " _ = (\n",
+ " pipeline\n",
+ " | 'ReadData' >> tfx_io.BeamSource()\n",
+ " | 'EvalModel' >> tfma.ExtractEvaluateAndWriteResults(\n",
+ " eval_shared_model=eval_shared_model,\n",
+ " eval_config=eval_config,\n",
+ " output_path=output_path))"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "He97LPB_NbtU"
+ },
+ "outputs": [],
+ "source": [
+ "# Visualize results\n",
+ "result = tfma.load_eval_result(output_path=output_path)\n",
+ "tfma.view.render_slicing_metrics(result)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Comparing multiple models"
+ ],
+ "metadata": {
+ "id": "I7ZuqLcdwpCs"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "An interesting and common use case it to compare the performance of
multiple models to select the best candidate to put into production. We can
also use Beam to evaluate and compare multiple models in one step."
+ ],
+ "metadata": {
+ "id": "ebkJsE3zJ2W2"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Model v2"
+ ],
+ "metadata": {
+ "id": "ah1xX9jl44O8"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "In order to showcase this use case, we will now train a second model
on the full dataset."
+ ],
+ "metadata": {
+ "id": "6GcGg08YKRhg"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Preprocess data\n",
+ "ds_train_v2 = tfds.load('diamonds', split=['train[:80%]'],
as_supervised=True)[0]\n",
+ "train_data_v2 = get_train_data(ds_train_v2)\n",
+ "ds_train_v2 = ds_train_v2.map(transform_data)\n",
+ "ds_train_v2 = ds_train_v2.batch(BATCH_SIZE)"
+ ],
+ "metadata": {
+ "id": "cQYA0cdzwoXr"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Define and train model\n",
+ "model_v2 = construct_model('model_v2', train_data_v2)\n",
+ "history = model_v2.fit(\n",
+ " ds_train_v2,\n",
+ " validation_data=ds_test,\n",
+ " epochs=5,\n",
+ " verbose=1)"
+ ],
+ "metadata": {
+ "id": "WII_PC5rxzTc"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Save model to file\n",
+ "model_path_v2 = 'saved_model_v2'\n",
+ "model_v2.save(model_path_v2)"
+ ],
+ "metadata": {
+ "id": "ppVFHy7myhXu"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Evaluation"
+ ],
+ "metadata": {
+ "id": "QKO-vh1X48tv"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Define TFMA evaluation config, including two model specs for the
two models we want to compare\n",
+ "eval_config_compare = text_format.Parse(\"\"\"\n",
+ " ## Model information\n",
+ " model_specs {\n",
+ " name: \"model_v1\"\n",
+ " # For keras (and serving models) we need to add a `label_key`.\n",
+ " label_key: \"output\"\n",
+ " is_baseline: true\n",
+ " }\n",
+ " model_specs {\n",
+ " name: \"model_v2\"\n",
+ " # For keras (and serving models) we need to add a `label_key`.\n",
+ " label_key: \"output\"\n",
+ " }\n",
+ "\n",
+ " ## Post training metric information. These will be merged with any
built-in\n",
+ " ## metrics from training.\n",
+ " metrics_specs {\n",
+ " metrics { class_name: \"ExampleCount\" }\n",
+ " metrics { class_name: \"MeanAbsoluteError\" }\n",
+ " metrics { class_name: \"MeanSquaredError\" }\n",
+ " metrics { class_name: \"MeanPrediction\" }\n",
+ " }\n",
+ "\n",
+ " slicing_specs {}\n",
+ "\"\"\", tfma.EvalConfig())"
+ ],
+ "metadata": {
+ "id": "pB4aXUo45RAg"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "from tfx_bsl.public import tfxio\n",
+ "\n",
+ "output_path_compare = 'evaluation_results_compare'\n",
+ "\n",
+ "eval_shared_models = [\n",
+ " tfma.default_eval_shared_model(\n",
+ " model_name='model_v1',\n",
+ " eval_saved_model_path=model_path_v1,\n",
+ " eval_config=eval_config_compare),\n",
+ " tfma.default_eval_shared_model(\n",
+ " model_name='model_v2',\n",
+ " eval_saved_model_path=model_path_v2,\n",
+ " eval_config=eval_config_compare),\n",
+ "]\n",
+ "\n",
+ "tfx_io = tfxio.TFExampleRecord(\n",
+ " file_pattern=tfrecord_file,\n",
+ " raw_record_column_name=tfma.ARROW_INPUT_COLUMN)\n",
+ "\n",
+ "# Run Evaluation\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " _ = (\n",
+ " pipeline\n",
+ " | 'ReadData' >> tfx_io.BeamSource()\n",
+ " | 'EvalModel' >> tfma.ExtractEvaluateAndWriteResults(\n",
+ " eval_shared_model=eval_shared_models,\n",
+ " eval_config=eval_config_compare,\n",
+ " output_path=output_path_compare))"
+ ],
+ "metadata": {
+ "id": "4eVdpW0aWTah"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Visualize results\n",
+ "results = tfma.load_eval_results(output_paths=output_path_compare)\n",
+ "tfma.view.render_time_series(results)"
Review Comment:
I added screenshots for now. Suggestions on how to do it otherwise are still
welcome of course
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]