This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 626be06a47b [YAML] Fraud Detection MLOps Workflow Example (#36012)
626be06a47b is described below

commit 626be06a47b60ae48c5fa20500221f04dc3da287
Author: Charles Nguyen <[email protected]>
AuthorDate: Thu Sep 4 14:56:47 2025 -0400

    [YAML] Fraud Detection MLOps Workflow Example (#36012)
    
    * YAML pipeline example for fraud detection workflow
    
    * Update README
---
 .../transforms/ml/fraud_detection/README.md        |   28 +
 .../fraud_detection_mlops_beam_yaml_sdk.ipynb      | 1329 ++++++++++++++++++++
 2 files changed, 1357 insertions(+)

diff --git 
a/sdks/python/apache_beam/yaml/examples/transforms/ml/fraud_detection/README.md 
b/sdks/python/apache_beam/yaml/examples/transforms/ml/fraud_detection/README.md
new file mode 100644
index 00000000000..ff8f7044ea7
--- /dev/null
+++ 
b/sdks/python/apache_beam/yaml/examples/transforms/ml/fraud_detection/README.md
@@ -0,0 +1,28 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Fraud Detection MLOps - Feature Engineering and Model Evaluation
+
+This example demonstrates a Fraud Detection MLOps solution that uses Apache 
Beam
+YAML SDK for feature engineering and model evaluation to detect transaction
+frauds.
+
+The entire workflow is self-contained in the
+[fraud_detection_mlops_beam_yaml_sdk](./fraud_detection_mlops_beam_yaml_sdk.ipynb)
+notebook.
diff --git 
a/sdks/python/apache_beam/yaml/examples/transforms/ml/fraud_detection/fraud_detection_mlops_beam_yaml_sdk.ipynb
 
b/sdks/python/apache_beam/yaml/examples/transforms/ml/fraud_detection/fraud_detection_mlops_beam_yaml_sdk.ipynb
new file mode 100644
index 00000000000..52f81b56f6b
--- /dev/null
+++ 
b/sdks/python/apache_beam/yaml/examples/transforms/ml/fraud_detection/fraud_detection_mlops_beam_yaml_sdk.ipynb
@@ -0,0 +1,1329 @@
+{
+ "cells": [
+  {
+   "cell_type": "code",
+   "source": [
+    "# @title ###### Licensed to the Apache Software Foundation (ASF), Version 
2.0 (the \"License\")\n",
+    "\n",
+    "# Licensed to the Apache Software Foundation (ASF) under one\n",
+    "# or more contributor license agreements. See the NOTICE file\n",
+    "# distributed with this work for additional information\n",
+    "# regarding copyright ownership. The ASF licenses this file\n",
+    "# to you under the Apache License, Version 2.0 (the\n",
+    "# \"License\"); you may not use this file except in compliance\n",
+    "# with the License. You may obtain a copy of the License at\n",
+    "#\n",
+    "#   http://www.apache.org/licenses/LICENSE-2.0\n";,
+    "#\n",
+    "# Unless required by applicable law or agreed to in writing,\n",
+    "# software distributed under the License is distributed on an\n",
+    "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+    "# KIND, either express or implied. See the License for the\n",
+    "# specific language governing permissions and limitations\n",
+    "# under the License"
+   ],
+   "metadata": {
+    "cellView": "form",
+    "id": "FW7MQcQeZJh5"
+   },
+   "id": "FW7MQcQeZJh5",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "# Fraud Detection MLOps - Feature Engineering and Model Evaluation\n",
+    "\n",
+    "<table><tbody><tr>\n",
+    "  <td style=\"text-align: center\">\n",
+    "    <a 
href=\"https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Fsdks%2Fpython%2Fapache_beam%2Fyaml%2Fexamples%2Ftransforms%2Fml%2Ffraud_detection%2Ffraud_detection_mlops_beam_yaml_sdk.ipynb\";>\n",
+    "      <img alt=\"Google Cloud Colab Enterprise logo\" 
src=\"https://lh3.googleusercontent.com/JmcxdQi-qOpctIvWKgPtrzZdJJK-J3sWE1RsfjZNwshCFgE_9fULcNpuXYTilIR2hjwN\";
 width=\"32px\"><br> Run in Colab Enterprise\n",
+    "    </a>\n",
+    "  </td>\n",
+    "  <td style=\"text-align: center\">\n",
+    "    <a 
href=\"https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/ml/fraud_detection/fraud_detection_mlops_beam_yaml_sdk.ipynb\";>\n",
+    "      <img alt=\"GitHub logo\" 
src=\"https://github.githubassets.com/assets/GitHub-Mark-ea2971cee799.png\"; 
width=\"32px\"><br> View on GitHub\n",
+    "    </a>\n",
+    "  </td>\n",
+    "</tr></tbody></table>\n"
+   ],
+   "metadata": {
+    "id": "MqQferBbaD4E"
+   },
+   "id": "MqQferBbaD4E"
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "## Overview\n",
+    "\n",
+    "This notebook demonstrates a Fraud Detection MLOps solution that uses 
Apache\n",
+    "Beam YAML SDK for feature engineering and model evaluation to detect 
transaction frauds.\n",
+    "\n",
+    "The dataset is generated with credit card transactions, and can be found 
on Kaggle https://www.kaggle.com/datasets/kartik2112/fraud-detection.\n";,
+    "\n",
+    "The training dataset will be stored as Iceberg tables on GCS object 
storage for feature engineering workflows in Beam. Once the feature generation 
task is done and the features are stored on Iceberg, they will then be 
downloaded for training and for model evaluation.\n",
+    "\n",
+    "The training dataset is primarily used in this example, but the workflow 
makes use of Jinja 
[templatization](https://beam.apache.org/documentation/sdks/yaml/#jinja-templatization)
 in YAML pipelines that makes it modular and extensible to be used with 
additional datasets.\n",
+    "\n",
+    "## Outline\n",
+    "1. Setup\n",
+    "2. From dataset to Iceberg tables\n",
+    "3. Feature engineering\n",
+    "4. Training\n",
+    "5. Evaluation"
+   ],
+   "metadata": {
+    "id": "xUXaoZBaaE0g"
+   },
+   "id": "xUXaoZBaaE0g"
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "## Setup"
+   ],
+   "metadata": {
+    "id": "7ujq4XdW90yT"
+   },
+   "id": "7ujq4XdW90yT"
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "Install the necessary libraries and dependencies."
+   ],
+   "metadata": {
+    "id": "I8FYju0zKZs-"
+   },
+   "id": "I8FYju0zKZs-"
+  },
+  {
+   "cell_type": "code",
+   "id": "iK2YmbXZB1UYEsDb6zP6qxMQ",
+   "metadata": {
+    "tags": [],
+    "id": "iK2YmbXZB1UYEsDb6zP6qxMQ"
+   },
+   "source": [
+    "!pip3 install --quiet --upgrade \\\n",
+    "  apache-beam[yaml,gcp] \\\n",
+    "  opendatasets \\\n",
+    "  scikit-learn \\\n",
+    "  xgboost \\\n",
+    "  datatable \\\n",
+    "  pandas \\\n",
+    "  poetry\n",
+    "\n",
+    "!apt-get update\n",
+    "!apt-get install python3.10-venv"
+   ],
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "import os\n",
+    "import random\n",
+    "import time\n",
+    "\n",
+    "import opendatasets as od\n",
+    "import pandas as pd\n",
+    "from sklearn.model_selection import train_test_split\n",
+    "from xgboost import XGBClassifier"
+   ],
+   "metadata": {
+    "id": "LW3IUMtceN_X"
+   },
+   "id": "LW3IUMtceN_X",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "Setting up directories and environment variables."
+   ],
+   "metadata": {
+    "id": "TVW4Q9cYKk1H"
+   },
+   "id": "TVW4Q9cYKk1H"
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "!mkdir yaml\n",
+    "!mkdir dataset\n",
+    "\n",
+    "PROJECT = 'apache-beam-testing' # @param {type:'string'}\n",
+    "REGION = 'us-central1' # @param {type:'string'}\n",
+    "WAREHOUSE = 'gs://apache-beam-testing-charlesng/mlops' # @param 
{type:'string'}\n",
+    "\n",
+    "os.environ['PROJECT'] = PROJECT\n",
+    "os.environ['REGION'] = REGION\n",
+    "os.environ['WAREHOUSE'] = WAREHOUSE"
+   ],
+   "metadata": {
+    "id": "H-OhqtW9pFV0"
+   },
+   "id": "H-OhqtW9pFV0",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "The feature engineering task and model evaluation necessitate 
implementing custom PTransforms. We will make use of [transform 
provider](https://beam.apache.org/documentation/sdks/yaml-providers/) to expose 
transforms in Python that can be used in a YAML pipeline.\n",
+    "\n",
+    "[Poetry](https://python-poetry.org/) and the following `pyproject.toml` 
file are used to manage dependencies, build and package the custom PTransform 
implementation reside in `my_provider.py`."
+   ],
+   "metadata": {
+    "id": "KKlPDSaRKtcq"
+   },
+   "id": "KKlPDSaRKtcq"
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "%%writefile ./yaml/pyproject.toml\n",
+    "\n",
+    "[tool.poetry]\n",
+    "name = \"my_provider\"\n",
+    "version = \"0.1.0\"\n",
+    "description = \"A provider for custom transforms\"\n",
+    "authors = [\"Your Name <[email protected]>\"]\n",
+    "license = \"Apache License 2.0\"\n",
+    "packages = [\n",
+    "    { include = \"my_provider.py\" },\n",
+    "]\n",
+    "\n",
+    "[tool.poetry.dependencies]\n",
+    "python = \"^3.10\"\n",
+    "apache-beam = {extras = [\"gcp\", \"yaml\"], version = \"^2.67.0\"}\n",
+    "scikit-learn = \"^1.7.0\"\n",
+    "numpy = \"^1.26.0\"\n",
+    "pandas = \"^2.2.0\"\n",
+    "xgboost = \"^3.0.0\"\n",
+    "datatable = \"^1.1.0\"\n",
+    "\n",
+    "[build-system]\n",
+    "requires = [\"poetry-core\"]\n",
+    "build-backend = \"poetry.core.masonry.api\""
+   ],
+   "metadata": {
+    "id": "vwaQQsR_yAJ1"
+   },
+   "id": "vwaQQsR_yAJ1",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "## From dataset to Iceberg tables"
+   ],
+   "metadata": {
+    "id": "Erzyjw8SmmVH"
+   },
+   "id": "Erzyjw8SmmVH"
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "We use the `opendatasets` library to programmatically download the 
dataset from Kaggle.\n",
+    "\n",
+    "We'll first need a Kaggle account and register for this competition. 
We'll also need the API key which is stored in `kaggle.json` file automatically 
downloaded when you create an API token. Go to *Profile* picture -> *Settings* 
-> *API* -> *Create New Token*.\n",
+    "\n",
+    "The dataset download will prompt you to enter your Kaggle username and 
key. Copy this information from `kaggle.json`."
+   ],
+   "metadata": {
+    "id": "pHES8YV1ORhC"
+   },
+   "id": "pHES8YV1ORhC"
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "dataset_url = 
'https://www.kaggle.com/datasets/kartik2112/fraud-detection'\n",
+    "od.download(dataset_url, data_dir='./dataset')"
+   ],
+   "metadata": {
+    "id": "ekTCsIj-eOJt"
+   },
+   "id": "ekTCsIj-eOJt",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "Read in the dataset from the csv file and write it to an Iceberg 
table.\n",
+    "\n",
+    "For Iceberg tables in this workflow, GCS is used as the storage layer.\n",
+    "In a data lakehouse with Iceberg and GCS object storage, a natural 
choice\n",
+    "for Iceberg catalog is [BigLake 
metastore](https://cloud.google.com/bigquery/docs/about-blms).\n",
+    "It is a managed, serverless metastore that doesn't require any setup."
+   ],
+   "metadata": {
+    "id": "0VcEtlRkOV0o"
+   },
+   "id": "0VcEtlRkOV0o"
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "%%writefile ./yaml/iceberg_migration_template.yaml\n",
+    "pipeline:\n",
+    "  transforms:\n",
+    "    - type: PyTransform\n",
+    "      name: ReadFromCsv\n",
+    "      input: {}\n",
+    "      config:\n",
+    "        constructor: apache_beam.io.ReadFromCsv\n",
+    "        kwargs:\n",
+    "            path: \"{{ DATASET_PATH }}\"\n",
+    "            index_col: False\n",
+    "            usecols: [\n",
+    "              'trans_date_trans_time',\n",
+    "              'cc_num',\n",
+    "              'merchant',\n",
+    "              'category',\n",
+    "              'amt',\n",
+    "              'first',\n",
+    "              'last',\n",
+    "              'gender',\n",
+    "              'street',\n",
+    "              'city',\n",
+    "              'state',\n",
+    "              'zip',\n",
+    "              'lat',\n",
+    "              'long',\n",
+    "              'city_pop',\n",
+    "              'job',\n",
+    "              'dob',\n",
+    "              'trans_num',\n",
+    "              'unix_time',\n",
+    "              'merch_lat',\n",
+    "              'merch_long',\n",
+    "              'is_fraud']\n",
+    "\n",
+    "    - type: WriteToIceberg\n",
+    "      name: WriteToIceberg\n",
+    "      input: ReadFromCsv\n",
+    "      config:\n",
+    "        table: \"{{ ICEBERG_TABLE }}\"\n",
+    "        catalog_name: \"my_catalog\"\n",
+    "        catalog_properties:\n",
+    "          warehouse: \"{{ WAREHOUSE }}\"\n",
+    "          catalog-impl: 
\"org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog\"\n",
+    "          io-impl: \"org.apache.iceberg.gcp.gcs.GCSFileIO\"\n",
+    "          gcp_project: \"{{ PROJECT }}\"\n",
+    "          gcp_location: \"{{ REGION }}\""
+   ],
+   "metadata": {
+    "id": "smpAH7coeYh8"
+   },
+   "id": "smpAH7coeYh8",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "%%writefile ./yaml/iceberg_migration_train_dataset.yaml\n",
+    "{% include './yaml/iceberg_migration_template.yaml' %}"
+   ],
+   "metadata": {
+    "id": "WqdUS9vCeORE"
+   },
+   "id": "WqdUS9vCeORE",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "!python -m apache_beam.yaml.main                                        
\\\n",
+    "  --yaml_pipeline_file=./yaml/iceberg_migration_train_dataset.yaml      
\\\n",
+    "  --jinja_variables='{                                                  
\\\n",
+    "    \"DATASET_PATH\": \"./dataset/fraud-detection/fraudTrain.csv\",       
  \\\n",
+    "    \"ICEBERG_TABLE\": \"fraud_detection.train_dataset_table\",           
  \\\n",
+    "    \"WAREHOUSE\": \"'$WAREHOUSE'\",                                      
  \\\n",
+    "    \"PROJECT\": \"'$PROJECT'\",                                          
  \\\n",
+    "    \"REGION\": \"'$REGION'\" }'"
+   ],
+   "metadata": {
+    "id": "8gQX3MN3eOW6"
+   },
+   "id": "8gQX3MN3eOW6",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "## Feature engineering\n",
+    "\n",
+    "In this dataset, there's information on users' transactions over time.\n",
+    "\n",
+    "We experiment by computing the following historical aggregate 
features:\n",
+    "- A user's average transaction amount over 1, 3, and 7 days.\n",
+    "- A user's total transaction count over 1, 3, and 7 days.\n",
+    "\n",
+    "As mentioned before, we implement our custom PTransform 
`ComputeHistoricalFeatures` to generate these features in `my_provider.py` in 
order to later be exposed to the YAML pipeline."
+   ],
+   "metadata": {
+    "id": "I_59petUokNn"
+   },
+   "id": "I_59petUokNn"
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "%%writefile ./yaml/my_provider.py\n",
+    "\n",
+    "import apache_beam as beam\n",
+    "from collections import deque\n",
+    "from datetime import datetime, timedelta, timezone\n",
+    "\n",
+    "class ComputeHistoricalFeatures(beam.PTransform):\n",
+    "\n",
+    "  class _ComputeFeaturesDoFn(beam.DoFn):\n",
+    "    \"\"\"Processes all transactions for one user to compute 
features.\"\"\"\n",
+    "    def process(self, element):\n",
+    "      cc_num, transactions = element\n",
+    "\n",
+    "      # 1. Sort all transactions for the user chronologically.\n",
+    "      sorted_transactions = sorted(\n",
+    "          list(transactions),\n",
+    "          key=lambda t: 
datetime.fromisoformat(t.trans_date_trans_time).timestamp())\n",
+    "\n",
+    "      # store transactions in a sliding 7-day window.\n",
+    "      history = deque()\n",
+    "\n",
+    "      for tx in sorted_transactions:\n",
+    "        current_time = 
datetime.fromisoformat(tx.trans_date_trans_time).astimezone(timezone.utc)\n",
+    "\n",
+    "        # 2. Remove transactions older than 7 days\n",
+    "        # relative to the current transaction's timestamp.\n",
+    "        while (history and\n",
+    "          datetime\n",
+    "            .fromisoformat(history[0].trans_date_trans_time)\n",
+    "            .astimezone(timezone.utc) < current_time - 
timedelta(days=7)):\n",
+    "            history.popleft()\n",
+    "\n",
+    "        # 3. Calculate features by filtering the current history.\n",
+    "        def _avg(amounts):\n",
+    "            return sum(amounts) / len(amounts) if amounts else 0\n",
+    "\n",
+    "        avg_past_1d = _avg([h.amt for h in history\n",
+    "                            if (datetime\n",
+    "                                
.fromisoformat(h.trans_date_trans_time)\n",
+    "                                .astimezone(timezone.utc) >= current_time 
- timedelta(days=1))])\n",
+    "        avg_past_3d = _avg([h.amt for h in history\n",
+    "                            if (datetime\n",
+    "                                
.fromisoformat(h.trans_date_trans_time)\n",
+    "                                .astimezone(timezone.utc) >= current_time 
- timedelta(days=3))])\n",
+    "        avg_past_7d = _avg([h.amt for h in history])\n",
+    "\n",
+    "        count_past_1d = len([h for h in history\n",
+    "                             if (datetime\n",
+    "                                  
.fromisoformat(h.trans_date_trans_time)\n",
+    "                                  .astimezone(timezone.utc) >= 
current_time - timedelta(days=1))])\n",
+    "        count_past_3d = len([h for h in history\n",
+    "                             if (datetime\n",
+    "                                  
.fromisoformat(h.trans_date_trans_time)\n",
+    "                                  .astimezone(timezone.utc) >= 
current_time - timedelta(days=3))])\n",
+    "        count_past_7d = len(history)\n",
+    "\n",
+    "        # 4. Yield a new row with the original data and the new 
features.\n",
+    "        yield beam.Row(\n",
+    "            **tx._asdict(),\n",
+    "            avg_amount_past_1d=avg_past_1d,\n",
+    "            avg_amount_past_3d=avg_past_3d,\n",
+    "            avg_amount_past_7d=avg_past_7d,\n",
+    "            count_past_1d=count_past_1d,\n",
+    "            count_past_3d=count_past_3d,\n",
+    "            count_past_7d=count_past_7d\n",
+    "        )\n",
+    "\n",
+    "        # 5. Add the current transaction to history for the next 
iteration.\n",
+    "        history.append(tx)\n",
+    "\n",
+    "  def expand(self, pcoll):\n",
+    "    return (\n",
+    "        pcoll\n",
+    "        | beam.WithKeys(lambda row: row.cc_num)\n",
+    "        | beam.GroupByKey()\n",
+    "        | beam.ParDo(self._ComputeFeaturesDoFn())\n",
+    "        | beam.Map(lambda row: beam.Row(\n",
+    "            **row.as_dict()\n",
+    "          ))\n",
+    "    )\n"
+   ],
+   "metadata": {
+    "id": "2ODiBDG4yPpF"
+   },
+   "id": "2ODiBDG4yPpF",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "Build and package the custom transform."
+   ],
+   "metadata": {
+    "id": "Fedk8LIGQ7qx"
+   },
+   "id": "Fedk8LIGQ7qx"
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "!poetry build -C ./yaml"
+   ],
+   "metadata": {
+    "id": "fS9PQQF6ubXk"
+   },
+   "id": "fS9PQQF6ubXk",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "Read the input training data stored in Iceberg table, use our custom 
transform to compute the features and write them to another Iceberg table."
+   ],
+   "metadata": {
+    "id": "HoJHokZkREN6"
+   },
+   "id": "HoJHokZkREN6"
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "%%writefile ./yaml/historical_aggregates_featurize_template.yaml\n",
+    "pipeline:\n",
+    "  type: chain\n",
+    "  transforms:\n",
+    "    - type: ReadFromIceberg\n",
+    "      name: ReadFromIceberg\n",
+    "      config:\n",
+    "        table: \"{{ ICEBERG_TABLE_INPUT }}\"\n",
+    "        catalog_name: \"my_catalog\"\n",
+    "        catalog_properties:\n",
+    "          warehouse: \"{{ WAREHOUSE }}\"\n",
+    "          catalog-impl: 
\"org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog\"\n",
+    "          io-impl: \"org.apache.iceberg.gcp.gcs.GCSFileIO\"\n",
+    "          gcp_project: \"{{ PROJECT }}\"\n",
+    "          gcp_location: \"{{ REGION }}\"\n",
+    "\n",
+    "    - type: HistoricalAggregatesTransform\n",
+    "      name: HistoricalAggregatesTransform\n",
+    "\n",
+    "    - type: MapToFields\n",
+    "      name: MapToFields\n",
+    "      config:\n",
+    "        language: python\n",
+    "        fields:\n",
+    "          cc_num:\n",
+    "            callable: \"lambda row: row.cc_num\"\n",
+    "            output_type: integer\n",
+    "          amt:\n",
+    "            callable: \"lambda row: row.amt\"\n",
+    "            output_type: number\n",
+    "          trans_date_trans_time:\n",
+    "            callable: \"lambda row: row.trans_date_trans_time\"\n",
+    "            output_type: string\n",
+    "          trans_num:\n",
+    "            callable: \"lambda row: row.trans_num\"\n",
+    "            output_type: string\n",
+    "          merchant:\n",
+    "            callable: \"lambda row: row.merchant\"\n",
+    "            output_type: string\n",
+    "          category:\n",
+    "            callable: \"lambda row: row.category\"\n",
+    "            output_type: string\n",
+    "          merch_lat:\n",
+    "            callable: \"lambda row: row.merch_lat\"\n",
+    "            output_type: number\n",
+    "          merch_long:\n",
+    "            callable: \"lambda row: row.merch_long\"\n",
+    "            output_type: number\n",
+    "          first:\n",
+    "            callable: \"lambda row: row.first\"\n",
+    "            output_type: string\n",
+    "          last:\n",
+    "            callable: \"lambda row: row.last\"\n",
+    "            output_type: string\n",
+    "          gender:\n",
+    "            callable: \"lambda row: row.gender\"\n",
+    "            output_type: string\n",
+    "          street:\n",
+    "            callable: \"lambda row: row.street\"\n",
+    "            output_type: string\n",
+    "          city:\n",
+    "            callable: \"lambda row: row.city\"\n",
+    "            output_type: string\n",
+    "          state:\n",
+    "            callable: \"lambda row: row.state\"\n",
+    "            output_type: string\n",
+    "          zip:\n",
+    "            callable: \"lambda row: row.zip\"\n",
+    "            output_type: integer\n",
+    "          lat:\n",
+    "            callable: \"lambda row: row.lat\"\n",
+    "            output_type: number\n",
+    "          long:\n",
+    "            callable: \"lambda row: row.long\"\n",
+    "            output_type: number\n",
+    "          city_pop:\n",
+    "            callable: \"lambda row: row.city_pop\"\n",
+    "            output_type: integer\n",
+    "          job:\n",
+    "            callable: \"lambda row: row.job\"\n",
+    "            output_type: string\n",
+    "          dob:\n",
+    "            callable: \"lambda row: row.dob\"\n",
+    "            output_type: string\n",
+    "          avg_amount_past_1d:\n",
+    "            callable: \"lambda row: row.avg_amount_past_1d\"\n",
+    "            output_type: number\n",
+    "          avg_amount_past_3d:\n",
+    "            callable: \"lambda row: row.avg_amount_past_3d\"\n",
+    "            output_type: number\n",
+    "          avg_amount_past_7d:\n",
+    "            callable: \"lambda row: row.avg_amount_past_7d\"\n",
+    "            output_type: number\n",
+    "          count_past_1d:\n",
+    "            callable: \"lambda row: row.count_past_1d\"\n",
+    "            output_type: integer\n",
+    "          count_past_3d:\n",
+    "            callable: \"lambda row: row.count_past_3d\"\n",
+    "            output_type: integer\n",
+    "          count_past_7d:\n",
+    "            callable: \"lambda row: row.count_past_7d\"\n",
+    "            output_type: integer\n",
+    "          is_fraud:\n",
+    "            callable: \"lambda row: row.is_fraud\"\n",
+    "            output_type: integer\n",
+    "\n",
+    "    - type: WriteToIceberg\n",
+    "      name: WriteToIceberg\n",
+    "      config:\n",
+    "        table: \"{{ ICEBERG_TABLE_OUTPUT }}\"\n",
+    "        catalog_name: \"my_catalog\"\n",
+    "        catalog_properties:\n",
+    "          warehouse: \"{{ WAREHOUSE }}\"\n",
+    "          catalog-impl: 
\"org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog\"\n",
+    "          io-impl: \"org.apache.iceberg.gcp.gcs.GCSFileIO\"\n",
+    "          gcp_project: \"{{ PROJECT }}\"\n",
+    "          gcp_location: \"{{ REGION }}\"\n",
+    "\n",
+    "providers:\n",
+    "  - type: pythonPackage\n",
+    "    config:\n",
+    "      packages:\n",
+    "        - ./dist/my_provider-0.1.0.tar.gz\n",
+    "    transforms:\n",
+    "      HistoricalAggregatesTransform: 
'my_provider.ComputeHistoricalFeatures'\n"
+   ],
+   "metadata": {
+    "id": "3Yu4pNbFzE9s"
+   },
+   "id": "3Yu4pNbFzE9s",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "%%writefile ./yaml/historical_aggregates_featurize_train_dataset.yaml\n",
+    "{% include './yaml/historical_aggregates_featurize_template.yaml' %}"
+   ],
+   "metadata": {
+    "id": "z5FGi1wbwtVM"
+   },
+   "id": "z5FGi1wbwtVM",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "!python -m apache_beam.yaml.main                                          
                          \\\n",
+    "  
--yaml_pipeline_file=./yaml/historical_aggregates_featurize_train_dataset.yaml  
                  \\\n",
+    "  --jinja_variables='{                                                    
                          \\\n",
+    "    \"ICEBERG_TABLE_INPUT\": \"fraud_detection.train_dataset_table\",     
                              \\\n",
+    "    \"ICEBERG_TABLE_OUTPUT\": 
\"fraud_detection.historical_aggregates_featurized_train_dataset_table\", \\\n",
+    "    \"WAREHOUSE\": \"'$WAREHOUSE'\",                                      
                              \\\n",
+    "    \"PROJECT\": \"'$PROJECT'\",                                          
                              \\\n",
+    "    \"REGION\": \"'$REGION'\" }'"
+   ],
+   "metadata": {
+    "id": "uZNOIw5Hwxpi"
+   },
+   "id": "uZNOIw5Hwxpi",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "## Training"
+   ],
+   "metadata": {
+    "id": "hhsB-uhoookH"
+   },
+   "id": "hhsB-uhoookH"
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "Download the dataset with all the computed features from the Iceberg 
table and load it to pandas Dataframe."
+   ],
+   "metadata": {
+    "id": "cPW9xNdRR-lc"
+   },
+   "id": "cPW9xNdRR-lc"
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "%%writefile ./yaml/download_featurized_dataset_template.yaml\n",
+    "pipeline:\n",
+    "  type: chain\n",
+    "  transforms:\n",
+    "    - type: ReadFromIceberg\n",
+    "      config:\n",
+    "        table: \"{{ ICEBERG_TABLE }}\"\n",
+    "        catalog_name: \"my_catalog\"\n",
+    "        catalog_properties:\n",
+    "          warehouse: \"{{ WAREHOUSE }}\"\n",
+    "          catalog-impl: 
\"org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog\"\n",
+    "          io-impl: \"org.apache.iceberg.gcp.gcs.GCSFileIO\"\n",
+    "          gcp_project: \"{{ PROJECT }}\"\n",
+    "          gcp_location: \"{{ REGION }}\"\n",
+    "\n",
+    "    - type: WriteToCsv\n",
+    "      config:\n",
+    "        path: \"{{ OUTPUT_PATH }}\""
+   ],
+   "metadata": {
+    "id": "iPq6Qqs14yAH"
+   },
+   "id": "iPq6Qqs14yAH",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "%%writefile ./yaml/download_featurized_train_dataset.yaml\n",
+    "{% include './yaml/download_featurized_dataset_template.yaml' %}"
+   ],
+   "metadata": {
+    "id": "1rEOu8996Zqr"
+   },
+   "id": "1rEOu8996Zqr",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "!python -m apache_beam.yaml.main                                          
                        \\\n",
+    "  --yaml_pipeline_file=./yaml/download_featurized_train_dataset.yaml      
                        \\\n",
+    "  --jinja_variables='{                                                    
                        \\\n",
+    "  \"ICEBERG_TABLE\": 
\"fraud_detection.historical_aggregates_featurized_train_dataset_table\",       
 \\\n",
+    "  \"WAREHOUSE\": \"'$WAREHOUSE'\",                                        
                            \\\n",
+    "  \"PROJECT\": \"'$PROJECT'\",                                            
                            \\\n",
+    "  \"REGION\": \"'$REGION'\",                                              
                            \\\n",
+    "  \"OUTPUT_PATH\": 
\"./dataset/historical_aggregates_featurized_train_dataset.csv\" }'"
+   ],
+   "metadata": {
+    "id": "usgeN1Zi6JN1"
+   },
+   "id": "usgeN1Zi6JN1",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "df = pd.read_csv(\n",
+    "    
'./dataset/historical_aggregates_featurized_train_dataset.csv-00000-of-00001',\n",
+    "    header=0,\n",
+    "    parse_dates=['trans_date_trans_time', 'dob']\n",
+    ")\n",
+    "\n",
+    "df.head(5)"
+   ],
+   "metadata": {
+    "id": "OBSNMd_J-kwf"
+   },
+   "id": "OBSNMd_J-kwf",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "ML models usually accept only numerical or categorical data. For data 
that are of type string (transaction date and time, date of birth, first and 
last name, city, state, etc...), some preprocessing is required."
+   ],
+   "metadata": {
+    "id": "vV_5gBYsSWK6"
+   },
+   "id": "vV_5gBYsSWK6"
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "For datetime columns, we break them further into multiple numerical 
feature columns."
+   ],
+   "metadata": {
+    "id": "rkX_sbEvTMVh"
+   },
+   "id": "rkX_sbEvTMVh"
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "def add_dateparts(df, col):\n",
+    "    \"\"\"\n",
+    "    This function splits the datetime column into separate column such 
as\n",
+    "    year, month, day, weekday, and hour\n",
+    "    :param df: DataFrame table to add the columns\n",
+    "    :param col: the column with datetime values\n",
+    "    :return: None\n",
+    "    \"\"\"\n",
+    "    df[col + '_year'] = df[col].dt.year\n",
+    "    df[col + '_month'] = df[col].dt.month\n",
+    "    df[col + '_day'] = df[col].dt.day\n",
+    "    df[col + '_weekday'] = df[col].dt.weekday\n",
+    "    df[col + '_hour'] = df[col].dt.hour\n",
+    "\n",
+    "add_dateparts(df, 'trans_date_trans_time')\n",
+    "add_dateparts(df, 'dob')"
+   ],
+   "metadata": {
+    "id": "3CtdoWTZLp9I"
+   },
+   "id": "3CtdoWTZLp9I",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "For other feature columns with string data, we specify them explicitly as 
`category` type data."
+   ],
+   "metadata": {
+    "id": "PywA2vaETaQV"
+   },
+   "id": "PywA2vaETaQV"
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "categorical_feature_columns = [\n",
+    "    'merchant',\n",
+    "    'category',\n",
+    "    'first',\n",
+    "    'last',\n",
+    "    'gender',\n",
+    "    'city',\n",
+    "    'state',\n",
+    "    'job'\n",
+    "]\n",
+    "for col in categorical_feature_columns:\n",
+    "    df[col] = df[col].astype('category')"
+   ],
+   "metadata": {
+    "id": "i1AT9BZ6OkSX"
+   },
+   "id": "i1AT9BZ6OkSX",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "df.info()"
+   ],
+   "metadata": {
+    "id": "-4Gb4-LWPxrg"
+   },
+   "id": "-4Gb4-LWPxrg",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "We specify the `baseline_feature_columns` containing the original columns 
of the dataset that are used as input columns for our first model. Likewise, we 
specify `full_feature_columns` containing the original and computed feature 
columns of the dataset that are used as input columns for our second model.\n",
+    "\n",
+    "The target/label column for training is the `is_fraud` column."
+   ],
+   "metadata": {
+    "id": "UfWaePqnTxA0"
+   },
+   "id": "UfWaePqnTxA0"
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "baseline_feature_columns = [\n",
+    "    'cc_num',\n",
+    "    'amt',\n",
+    "    'trans_date_trans_time_year',\n",
+    "    'trans_date_trans_time_month',\n",
+    "    'trans_date_trans_time_day',\n",
+    "    'trans_date_trans_time_weekday',\n",
+    "    'trans_date_trans_time_hour',\n",
+    "    'merchant',\n",
+    "    'category',\n",
+    "    'merch_lat',\n",
+    "    'merch_long',\n",
+    "    'first',\n",
+    "    'last',\n",
+    "    'gender',\n",
+    "    'city',\n",
+    "    'state',\n",
+    "    'zip',\n",
+    "    'lat',\n",
+    "    'long',\n",
+    "    'city_pop',\n",
+    "    'job',\n",
+    "    'dob_year',\n",
+    "    'dob_month',\n",
+    "    'dob_day',\n",
+    "    'dob_weekday',\n",
+    "    'dob_hour'\n",
+    "]\n",
+    "full_feature_columns = baseline_feature_columns + [\n",
+    "    'avg_amount_past_1d',\n",
+    "    'avg_amount_past_3d',\n",
+    "    'avg_amount_past_7d',\n",
+    "    'count_past_1d',\n",
+    "    'count_past_3d',\n",
+    "    'count_past_7d',\n",
+    "]\n",
+    "target_column = 'is_fraud'\n",
+    "\n",
+    "X = df[full_feature_columns]\n",
+    "y = df[target_column]\n",
+    "\n",
+    "# Split data for validation\n",
+    "X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, 
random_state=42)"
+   ],
+   "metadata": {
+    "id": "HfLU6ESyrWly"
+   },
+   "id": "HfLU6ESyrWly",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "Predicting taxi fare is a supervised learning, regression problem and our 
dataset is tabular. It is well-known that [gradient-boosted decision tree 
(GBDT) model](https://en.wikipedia.org/wiki/Gradient_boosting) performs very 
well for this kind of problem and dataset type.\n",
+    "\n",
+    "We use the XGBoost library which implements the GBDT machine learning 
algorithm in a scalable, distributed manner.\n",
+    "\n",
+    "We train two models, one `baseline_model` with 
`baseline_feature_columns`, and one `experimental_model` with 
`full_feature_columns`."
+   ],
+   "metadata": {
+    "id": "gKT9ejG4VFbG"
+   },
+   "id": "gKT9ejG4VFbG"
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "# Train a simple XGBoost Classifier\n",
+    "baseline_model = XGBClassifier(\n",
+    "    objective='binary:logistic',\n",
+    "    eval_metric='logloss',\n",
+    "    use_label_encoder=False,\n",
+    "    enable_categorical=True,\n",
+    "    tree_method='hist'\n",
+    ")\n",
+    "baseline_model.fit(X_train[baseline_feature_columns], y_train)\n",
+    "baseline_model.save_model(\"baseline_model.bst\")"
+   ],
+   "metadata": {
+    "id": "3LGhF0KdDqjm"
+   },
+   "id": "3LGhF0KdDqjm",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "!gcloud storage cp baseline_model.bst {WAREHOUSE}"
+   ],
+   "metadata": {
+    "id": "4mwDGya8FHPV"
+   },
+   "id": "4mwDGya8FHPV",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "# Train a simple XGBoost Classifier\n",
+    "experimental_model = XGBClassifier(\n",
+    "    objective='binary:logistic',\n",
+    "    eval_metric='logloss',\n",
+    "    use_label_encoder=False,\n",
+    "    enable_categorical=True,\n",
+    "    tree_method='hist'\n",
+    ")\n",
+    "experimental_model.fit(X_train, y_train)\n",
+    "experimental_model.save_model(\"experimental_model.bst\")"
+   ],
+   "metadata": {
+    "id": "ZvihaXYSBrci"
+   },
+   "id": "ZvihaXYSBrci",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "!gcloud storage cp experimental_model.bst {WAREHOUSE}"
+   ],
+   "metadata": {
+    "id": "XpRI8RJorCCu"
+   },
+   "id": "XpRI8RJorCCu",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "## Evaluation"
+   ],
+   "metadata": {
+    "id": "sASVErGKostr"
+   },
+   "id": "sASVErGKostr"
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "Save the the testing datasets and label to be used later in model 
evaluation YAML pipeline."
+   ],
+   "metadata": {
+    "id": "8FCLyzFiV2cC"
+   },
+   "id": "8FCLyzFiV2cC"
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    
"X_test[baseline_feature_columns].to_pickle(\"X_test_baseline_feature_columns.pkl\")\n",
+    
"X_test[full_feature_columns].to_pickle(\"X_test_full_feature_columns.pkl\")\n",
+    "\n",
+    "y_test.to_pickle(\"y_test.pkl\")"
+   ],
+   "metadata": {
+    "id": "fzy5LGzIR-sH"
+   },
+   "id": "fzy5LGzIR-sH",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "We again implement custom PTransforms `RunInferenceTransform` to perform 
inference on input data, and `ComputeMetricsTransform` to calculate various 
model evaluation metrics."
+   ],
+   "metadata": {
+    "id": "vG3REPjFWPlm"
+   },
+   "id": "vG3REPjFWPlm"
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "%%writefile -a ./yaml/my_provider.py\n",
+    "\n",
+    "from apache_beam.ml.inference.base import RunInference\n",
+    "from apache_beam.ml.inference.xgboost_inference import 
XGBoostModelHandlerPandas\n",
+    "import xgboost\n",
+    "import pandas as pd\n",
+    "import sklearn.metrics\n",
+    "\n",
+    "class RunInferenceTransform(beam.PTransform):\n",
+    "  def __init__(self, model_path, df_dataset_path):\n",
+    "    self._model_path = model_path\n",
+    "    self._df_dataset_path = df_dataset_path\n",
+    "    self._model_handler = XGBoostModelHandlerPandas(\n",
+    "        model_class=xgboost.XGBClassifier,\n",
+    "        model_state=self._model_path,\n",
+    "    )\n",
+    "\n",
+    "  def expand(self, pcoll):\n",
+    "    return (\n",
+    "        pcoll\n",
+    "        | beam.Create([pd.read_pickle(self._df_dataset_path)])\n",
+    "        | RunInference(self._model_handler)\n",
+    "        | beam.Map(lambda row: beam.Row(inferences=row.inference))\n",
+    "    )\n",
+    "\n",
+    "class ComputeMetricsTransform(beam.PTransform):\n",
+    "  def __init__(self, df_targets_path):\n",
+    "    self.targets = pd.read_pickle(df_targets_path).to_list()\n",
+    "\n",
+    "  def expand(self, pcoll):\n",
+    "\n",
+    "    def compute_metrics(row):\n",
+    "      true_labels = self.targets\n",
+    "      predicted_labels = row.inferences\n",
+    "\n",
+    "      accuracy = sklearn.metrics.accuracy_score(true_labels, 
predicted_labels)\n",
+    "      precision = sklearn.metrics.precision_score(true_labels, 
predicted_labels, average='macro', zero_division=0)\n",
+    "      recall = sklearn.metrics.recall_score(true_labels, 
predicted_labels, average='macro', zero_division=0)\n",
+    "      f1 = sklearn.metrics.f1_score(true_labels, predicted_labels, 
average='macro', zero_division=0)\n",
+    "\n",
+    "      yield beam.Row(\n",
+    "          accuracy=float(accuracy),\n",
+    "          precision=float(precision),\n",
+    "          recall=float(recall),\n",
+    "          f1=float(f1),\n",
+    "      )\n",
+    "\n",
+    "    return pcoll | 'CalculateMetrics' >> beam.FlatMap(compute_metrics)\n"
+   ],
+   "metadata": {
+    "id": "Ih07-BIQY_Vz"
+   },
+   "id": "Ih07-BIQY_Vz",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "Build and package the custom transforms."
+   ],
+   "metadata": {
+    "id": "k6dVfcSnW5qj"
+   },
+   "id": "k6dVfcSnW5qj"
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "!poetry build -C ./yaml"
+   ],
+   "metadata": {
+    "id": "RA5bOhDgxgC9"
+   },
+   "id": "RA5bOhDgxgC9",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "Use our custom transforms to read in the dataset, perform inference and 
evaluate against target labels."
+   ],
+   "metadata": {
+    "id": "Os7Gc8paXZs6"
+   },
+   "id": "Os7Gc8paXZs6"
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "%%writefile ./yaml/model_evaluation_template.yaml\n",
+    "pipeline:\n",
+    "  transforms:\n",
+    "    - type: RunInferenceTransform\n",
+    "      name: RunInferenceTransform\n",
+    "      input: {}\n",
+    "      config:\n",
+    "        model_path: \"{{ MODEL_PATH }}\"\n",
+    "        df_dataset_path: \"{{ DF_DATASET_PATH }}\"\n",
+    "\n",
+    "    - type: ComputeMetricsTransform\n",
+    "      name: ComputeMetricsTransform\n",
+    "      input: RunInferenceTransform\n",
+    "      config:\n",
+    "        df_targets_path: \"{{ DF_TARGETS_PATH }}\"\n",
+    "\n",
+    "    - type: LogForTesting\n",
+    "      name: LogForTesting\n",
+    "      input: ComputeMetricsTransform\n",
+    "\n",
+    "providers:\n",
+    "  - type: pythonPackage\n",
+    "    config:\n",
+    "      packages:\n",
+    "        - ./dist/my_provider-0.1.0.tar.gz\n",
+    "    transforms:\n",
+    "      RunInferenceTransform: 'my_provider.RunInferenceTransform'\n",
+    "      ComputeMetricsTransform: 'my_provider.ComputeMetricsTransform'\n"
+   ],
+   "metadata": {
+    "id": "OHkF4U3oS1xw"
+   },
+   "id": "OHkF4U3oS1xw",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "Evaluate the baseline model on the baseline feature columns."
+   ],
+   "metadata": {
+    "id": "g4Qqx7G9W92g"
+   },
+   "id": "g4Qqx7G9W92g"
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "%%writefile ./yaml/baseline_model_evaluation.yaml\n",
+    "{% include './yaml/model_evaluation_template.yaml' %}"
+   ],
+   "metadata": {
+    "id": "xZqhRXw_9iKY"
+   },
+   "id": "xZqhRXw_9iKY",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "!python -m apache_beam.yaml.main                                          
    \\\n",
+    "  --yaml_pipeline_file=./yaml/baseline_model_evaluation.yaml              
    \\\n",
+    "  --jinja_variables='{                                                    
    \\\n",
+    "    \"MODEL_PATH\": \"'$WAREHOUSE'/baseline_model.bst\",                  
        \\\n",
+    "    \"DF_DATASET_PATH\": \"./X_test_baseline_feature_columns.pkl\",       
        \\\n",
+    "    \"DF_TARGETS_PATH\": \"./y_test.pkl\" }'"
+   ],
+   "metadata": {
+    "id": "z9KnAxsP9iAL"
+   },
+   "id": "z9KnAxsP9iAL",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "The model trained only on the original features achieves:\n",
+    "\n",
+    "```\n",
+    "{\n",
+    "  \"accuracy\": 0.9984623532828757,\n",
+    "  \"precision\": 0.963115986169815,\n",
+    "  \"recall\": 0.8987887554423477,\n",
+    "  \"f1\": 0.9285234509618558\n",
+    "}\n",
+    "```"
+   ],
+   "metadata": {
+    "id": "TLfFUUV_9hjw"
+   },
+   "id": "TLfFUUV_9hjw"
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "Evaluate the experimental model on the full feature columns."
+   ],
+   "metadata": {
+    "id": "BnmKasKxXLvS"
+   },
+   "id": "BnmKasKxXLvS"
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "%%writefile ./yaml/experimental_model_evaluation.yaml\n",
+    "{% include './yaml/model_evaluation_template.yaml' %}"
+   ],
+   "metadata": {
+    "id": "g4_u7Lak8bkL"
+   },
+   "id": "g4_u7Lak8bkL",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "code",
+   "source": [
+    "!python -m apache_beam.yaml.main                                          
    \\\n",
+    "  --yaml_pipeline_file=./yaml/experimental_model_evaluation.yaml          
    \\\n",
+    "  --jinja_variables='{                                                    
    \\\n",
+    "    \"MODEL_PATH\": \"'$WAREHOUSE'/experimental_model.bst\",              
        \\\n",
+    "    \"DF_DATASET_PATH\": \"./X_test_full_feature_columns.pkl\",           
        \\\n",
+    "    \"DF_TARGETS_PATH\": \"./y_test.pkl\" }'"
+   ],
+   "metadata": {
+    "id": "L2rBVbnFnFZy"
+   },
+   "id": "L2rBVbnFnFZy",
+   "execution_count": null,
+   "outputs": []
+  },
+  {
+   "cell_type": "markdown",
+   "source": [
+    "The model trained on full feature columns achieve a better result:\n",
+    "\n",
+    "```\n",
+    "{\n",
+    "  \"accuracy\": 0.9989620884659411,\n",
+    "  \"precision\": 0.9643454567937234,\n",
+    "  \"recall\": 0.944328457850605,\n",
+    "  \"f1\": 0.9541138032867618\n",
+    "}\n",
+    "```"
+   ],
+   "metadata": {
+    "id": "jbJfTX26YVE6"
+   },
+   "id": "jbJfTX26YVE6"
+  }
+ ],
+ "metadata": {
+  "kernelspec": {
+   "display_name": "Python 3",
+   "language": "python",
+   "name": "python3"
+  },
+  "language_info": {
+   "codemirror_mode": {
+    "name": "ipython",
+    "version": 3
+   },
+   "file_extension": ".py",
+   "mimetype": "text/x-python",
+   "name": "python",
+   "nbconvert_exporter": "python",
+   "pygments_lexer": "ipython3",
+   "version": "3.10.10"
+  },
+  "colab": {
+   "provenance": [],
+   "name": "fraud_detection_mlops_beam_yaml_sdk"
+  }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 5
+}


Reply via email to