damccorm commented on code in PR #34184:
URL: https://github.com/apache/beam/pull/34184#discussion_r1981931106


##########
examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb:
##########
@@ -0,0 +1,2563 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "provenance": [],
+      "collapsed_sections": [
+        "mcZATJbaOec0",
+        "-2hTEi-jzYN6"
+      ]
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    },
+    "language_info": {
+      "name": "python"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "code",
+      "source": [
+        "# @title ###### Licensed to the Apache Software Foundation (ASF), 
Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n";,
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License"
+      ],
+      "metadata": {
+        "cellView": "form",
+        "id": "8ZekaWhZH2SX"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# Vector Embedding Ingestion with Apache Beam and AlloyDB\n",
+        "\n",
+        "<table align=\"left\">\n",
+        "  <td>\n",
+        "    <a target=\"_blank\" 
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb\";><img
 
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\";
 />Run in Google Colab</a>\n",
+        "  </td>\n",
+        "  <td>\n",
+        "    <a target=\"_blank\" 
href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb\";><img
 
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\";
 />View source on GitHub</a>\n",
+        "  </td>\n",
+        "</table>\n"
+      ],
+      "metadata": {
+        "id": "K6-p-DVrIFTY"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# Introduction\n",
+        "\n",
+        "This Colab demonstrates how to generate embeddings from data and 
ingest them into [AlloyDB](https://cloud.google.com/alloydb), Google Cloud's 
fully managed, PostgreSQL-compatible database service. We'll use Apache Beam 
and Dataflow for scalable data processing.\n",
+        "\n",
+        "The goal of this notebook is to make it easy for users to get started 
with generating embeddings at scale using Apache Beam and storing them in 
AlloyDB. We focus on building efficient ingestion pipelines that can handle 
various data sources and embedding models.\n",
+        "\n",
+        "## Example: Furniture Product Catalog\n",
+        "\n",
+        "We'll work with a sample e-commerce dataset representing a furniture 
product catalog. Each product has:\n",
+        "\n",
+        "*   **Structured fields:** `id`, `name`, `category`, `price`\n",
+        "*   **Detailed text descriptions:** Longer text describing the 
product's features.\n",
+        "*   **Additional metadata:** `material`, `dimensions`\n",
+        "\n",
+        "## Pipeline Overview\n",
+        "We will build a pipeline to:\n",
+        "1. Read product data\n",
+        "2. Convert unstructured product data, to `Chunk`<sup>[1]</sup> 
type\n",
+        "2. Generate Embeddings: Use a pre-trained Hugging Face model (via 
MLTransform) to create vector embeddings\n",
+        "3. Write to AlloyDB: Store the embeddings in an AlloyDB vector 
database\n",
+        "\n",
+        "Here's a visualization of the data flow:\n",
+        "\n",
+        "| Stage                     | Data Representation                     
                 | Notes                                                        
                                                           |\n",
+        "| :------------------------ | 
:------------------------------------------------------- | 
:----------------------------------------------------------------------------------------------------------------------
 |\n",
+        "| **1. Ingest Data**      | `{`<br> `  \"id\": \"desk-001\",`<br> `  
\"name\": \"Modern Desk\",`<br> `  \"description\": \"Sleek...\",`<br> `  
\"category\": \"Desks\",`<br> `  ...`<br> `}` | Supports:<br>- Reading from 
batch (e.g., files, databases)<br>- Streaming sources (e.g., Pub/Sub).          
       |\n",
+        "| **2. Convert to Chunks** | `Chunk(` <br>  
&nbsp;&nbsp;`id=\"desk-001\",` <br>  &nbsp;&nbsp;`content=Content(` <br>   
&nbsp;&nbsp;&nbsp;&nbsp;`text=\"Modern Desk\"` <br> &nbsp;&nbsp; `),` <br>  
&nbsp;&nbsp;`metadata={...}` <br> `)`       | - `Chunk` is the structured input 
for generating and ingesting embeddings.<br>- `chunk.content.text` is the field 
that is embedded.<br> - Converting to `Chunk` does not mean breaking data into 
smaller pieces,<br>&nbsp;&nbsp; it's simply organizing your data in a standard 
format for the embedding pipeline.<br> - `Chunk` allows data to flow seamlessly 
throughout embedding pipelines. |\n",
+        "| **3. Generate Embeddings**| `Chunk(` <br>  
&nbsp;&nbsp;`id=\"desk-001\",`<br>  &nbsp;&nbsp;`embedding=[-0.1, 0.6, 
...],`<br>  `...)`  | Supports:<br>- Local Hugging Face models<br>- Remote 
Vertex AI models<br>- Custom embedding implementations.          |\n",
+        "| **4. Write to AlloyDB** | **AlloyDB Table (Example Row):**<br>`id: 
desk-001`<br>`embedding: [-0.1, 0.6, ...]`<br> `name = \"Modern 
Desk\"`,<br>`Other fields ...` | Supports:<br>- Custom schemas<br>- Conflict 
resolution strategies for handling updates                               |\n",
+        "\n",
+        "\n",
+        "[1]: Chunk represents an embeddable unit of input. It specifies which 
fields should be embedded and which fields should be treated as metadata. 
Converting to Chunk does not necessarily mean breaking your text into smaller 
pieces - it's primarily about structuring your data for the embedding pipeline. 
For very long texts that exceed the embedding model's maximum input size, you 
can optionally [use Langchain 
TextSplitters](https://beam.apache.org/releases/pydoc/2.63.0/apache_beam.ml.rag.chunking.langchain.html)
 to break the text into smaller `Chunk`'s.\n",
+        "\n",
+        "## Execution Environments\n",
+        "\n",
+        "This notebook demonstrates two execution environments:\n",
+        "\n",
+        "1. **DirectRunner (Local Execution)**: All examples in this notebook 
run on DirectRunner by default, which executes the pipeline locally. This is 
ideal for development, testing, and processing small datasets.\n",
+        "\n",
+        "2. **DataflowRunner (Distributed Execution)**: The [Run on 
Dataflow](#scrollTo=Quick_Start_Run_on_Dataflow) section demonstrates how to 
execute the same pipeline on Google Cloud Dataflow for scalable, distributed 
processing. This is recommended for production workloads and large datasets.\n",
+        "\n",
+        "All examples in this notebook can be adapted to run on Dataflow by 
following the pattern shown in the \"Run on Dataflow\" section."
+      ],
+      "metadata": {
+        "id": "WWwFCLRHZPm4"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# Setup and Prerequisites\n",
+        "\n",
+        "This example requires:\n",
+        "1. An AlloyDB instance with pgvector extension enabled\n",
+        "2. Apache Beam 2.63.0 or later"
+      ],
+      "metadata": {
+        "id": "z2eAyRECIP3z"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Install Packages and Dependencies\n",
+        "\n",
+        "First, let's install the Python packages required for the embedding 
and ingestion pipeline:"
+      ],
+      "metadata": {
+        "id": "WhOOPUBa6PyW"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Apache Beam with GCP support\n",
+        "!pip install apache_beam[gcp]==v2.63.0 --quiet\n",
+        "# Huggingface sentence-transformers for embedding models\n",
+        "!pip install sentence-transformers --quiet"
+      ],
+      "metadata": {
+        "id": "gCWRw2YE11wN"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "Next, let's install psycopg2-binary to help set up our test database."
+      ],
+      "metadata": {
+        "id": "4aqYZ_pG1oYb"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "!pip install psycopg2-binary --quiet"
+      ],
+      "metadata": {
+        "id": "eOYjnVDR87IE",
+        "colab": {
+          "base_uri": "https://localhost:8080/";
+        },
+        "outputId": "b358dd5d-f9bd-49c6-bedd-8b72c8ca7391"
+      },
+      "execution_count": null,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "\u001b[?25l   
\u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m0.0/3.0 
MB\u001b[0m \u001b[31m?\u001b[0m eta \u001b[36m-:--:--\u001b[0m\r\u001b[2K   
\u001b[91m━━━━━━━━━\u001b[0m\u001b[91m╸\u001b[0m\u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m
 \u001b[32m0.7/3.0 MB\u001b[0m \u001b[31m23.3 MB/s\u001b[0m eta 
\u001b[36m0:00:01\u001b[0m\r\u001b[2K   
\u001b[91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m\u001b[91m╸\u001b[0m 
\u001b[32m3.0/3.0 MB\u001b[0m \u001b[31m41.3 MB/s\u001b[0m eta 
\u001b[36m0:00:01\u001b[0m\r\u001b[2K   
\u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m3.0/3.0 
MB\u001b[0m \u001b[31m30.0 MB/s\u0
 01b[0m eta \u001b[36m0:00:00\u001b[0m\n",
+            "\u001b[?25h"
+          ]
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Database Setup\n",
+        "\n",
+        "To connect to AlloyDB, you'll need:\n",
+        "1. The JDBC connection URL\n",
+        "2. Database credentials\n",
+        "3. The pgvector extension enabled in your database\n",
+        "\n",
+        "Replace these placeholder values with your actual AlloyDB connection 
details:"
+      ],
+      "metadata": {
+        "id": "VhgbpTKzI-zI"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "ALLOYDB_HOST = \"\" # @param {type:'string'}\n",
+        "DB_NAME = \"postgres\" #  @param {type:'string'}\n",
+        "JDBC_URL = f\"jdbc:postgresql://{ALLOYDB_HOST}:5432/{DB_NAME}\"\n",
+        "DB_USER = \"postgres\" # @param {type:'string'}\n",
+        "DB_PASSWORD = \"\" # @param {type:'string'}"
+      ],
+      "metadata": {
+        "id": "oqKQT0c_JB5f"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "### To connect from Colab to AlloyDB, you'll need one of the 
following:\n",
+        "1. Public IP: Enable public IP on your AlloyDB instance and add your 
Colab IP to the authorized networks\n",
+        "2. Auth Proxy: Use the Cloud SQL Auth Proxy to establish a secure 
connection\n",
+        "3. VPC: Run this notebook on a Compute Engine VM in the same VPC as 
your AlloyDB instance\n",
+        "\n",
+        "Your current IP address (for configuring AlloyDB authorized networks 
if using public IP):\n"
+      ],
+      "metadata": {
+        "id": "VoURfADof7mO"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "!curl ifconfig.me"
+      ],
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/";
+        },
+        "id": "qIdPWsOOsn9b",
+        "outputId": "281121d9-53f5-446f-a018-d27e3193546b"
+      },
+      "execution_count": null,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "34.168.233.188"
+          ]
+        }
+      ]
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "#@title Postgres helpers for creating tables and verifying data\n",
+        "import psycopg2\n",
+        "from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT\n",
+        "\n",
+        "def setup_alloydb_table(host: str,\n",
+        "                       database: str,\n",
+        "                      table_name: str,\n",
+        "                       user: str,\n",
+        "                       password: str,\n",
+        "                       port: int = 5432):\n",
+        "    \"\"\"Set up AlloyDB table with vector extension and proper 
schema.\n",
+        "\n",
+        "    Args:\n",
+        "        host: AlloyDB instance host\n",
+        "        database: Database name\n",
+        "        user: Database user\n",
+        "        password: Database password\n",
+        "        port: Database port (default: 5432)\n",
+        "    \"\"\"\n",
+        "    # Connection string\n",
+        "    conn_string = f\"host={host} dbname={database} user={user} 
password={password} port={port}\"\n",
+        "\n",
+        "    try:\n",
+        "        # Connect to the database\n",
+        "        conn = psycopg2.connect(conn_string)\n",
+        "        conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)\n",
+        "        cur = conn.cursor()\n",
+        "\n",
+        "        print(\"Connected to AlloyDB successfully!\")\n",
+        "\n",
+        "        # Create pgvector extension if it doesn't exist\n",
+        "        print(\"Creating pgvector extension...\")\n",
+        "        cur.execute(\"CREATE EXTENSION IF NOT EXISTS vector;\")\n",
+        "\n",
+        "        # Create the product_embeddings table\n",
+        "        print(\"Creating table...\")\n",
+        "        cur.execute(f\"\"\"\n",
+        "          DROP TABLE IF EXISTS {table_name};\n",
+        "        \"\"\")\n",
+        "        cur.execute(f\"\"\"\n",
+        "          CREATE TABLE IF NOT EXISTS {table_name} (\n",
+        "              {table_schema}\n",
+        "          );\n",
+        "        \"\"\")\n",
+        "\n",
+        "        print(\"Setup completed successfully!\")\n",
+        "\n",
+        "    except Exception as e:\n",
+        "        print(f\"An error occurred: {e}\")\n",
+        "    finally:\n",
+        "        if 'cur' in locals():\n",
+        "            cur.close()\n",
+        "        if 'conn' in locals():\n",
+        "            conn.close()\n",
+        "\n",
+        "# Example usage (user will need to provide their actual connection 
details)\n",
+        "\"\"\"\n",
+        "To set up your AlloyDB table, run the following with your connection 
details:\n",
+        "\n",
+        "setup_alloydb_table(\n",
+        "    host=\"your-alloydb-host\",\n",
+        "    database=\"your-database\",\n",
+        "    user=\"your-username\",\n",
+        "    password=\"your-password\"\n",
+        ")\n",
+        "\"\"\"\n",
+        "\n",
+        "\"\"\"### Test the Connection and Table Setup\n",
+        "\n",
+        "You can verify the setup with a simple query:\n",
+        "\"\"\"\n",
+        "\n",
+        "def test_alloydb_connection(host: str,\n",
+        "                          database: str,\n",
+        "                          table_name: str,\n",
+        "                          user: str,\n",
+        "                          password: str,\n",
+        "                          port: int = 5432):\n",
+        "    \"\"\"Test the AlloyDB connection and verify table creation.\n",
+        "\n",
+        "    Args:\n",
+        "        host: AlloyDB instance host\n",
+        "        database: Database name\n",
+        "        user: Database user\n",
+        "        password: Database password\n",
+        "        port: Database port (default: 5432)\n",
+        "    \"\"\"\n",
+        "    conn_string = f\"host={host} dbname={database} user={user} 
password={password} port={port}\"\n",
+        "\n",
+        "    try:\n",
+        "        conn = psycopg2.connect(conn_string)\n",
+        "        cur = conn.cursor()\n",
+        "\n",
+        "        # Check if table exists\n",
+        "        cur.execute(f\"\"\"\n",
+        "            SELECT EXISTS (\n",
+        "                SELECT FROM information_schema.tables\n",
+        "                WHERE table_name = '{table_name}'\n",
+        "            );\n",
+        "        \"\"\")\n",
+        "        table_exists = cur.fetchone()[0]\n",
+        "\n",
+        "        if table_exists:\n",
+        "            print(f\"✓ {table_name} table exists\")\n",
+        "\n",
+        "            # Check if vector extension is installed\n",
+        "            cur.execute(\"SELECT * FROM pg_extension WHERE extname = 
'vector';\")\n",
+        "            if cur.fetchone():\n",
+        "                print(\"✓ pgvector extension is installed\")\n",
+        "            else:\n",
+        "                print(\"✗ pgvector extension is not installed\")\n",
+        "        else:\n",
+        "            print(f\"✗ {table_name} table does not exist\")\n",
+        "\n",
+        "    except Exception as e:\n",
+        "        print(f\"Connection test failed: {e}\")\n",
+        "    finally:\n",
+        "        if 'cur' in locals():\n",
+        "            cur.close()\n",
+        "        if 'conn' in locals():\n",
+        "            conn.close()\n",
+        "\n",
+        "def verify_embeddings(host: str,\n",
+        "                     database: str,\n",
+        "                     table_name: str,\n",
+        "                     user: str,\n",
+        "                     password: str,\n",
+        "                     port: int = 5432):\n",
+        "    \"\"\"Connect to AlloyDB and print all written products.\"\"\"\n",
+        "    conn = psycopg2.connect(\n",
+        "        host=host,\n",
+        "        database=database,\n",
+        "        user=user,\n",
+        "        password=password,\n",
+        "        port=port\n",
+        "    )\n",
+        "\n",
+        "    try:\n",
+        "        with conn.cursor() as cur:\n",
+        "            # Simple SELECT * query\n",
+        "            cur.execute(f\"SELECT * FROM {table_name};\")\n",
+        "            rows = cur.fetchall()\n",
+        "\n",
+        "            # Get column names from cursor description\n",
+        "            columns = [desc[0] for desc in cur.description]\n",
+        "\n",
+        "            print(f\"\\nFound {len(rows)} products:\")\n",
+        "            print(\"-\" * 80)\n",
+        "\n",
+        "            # Print each row with column names\n",
+        "            for row in rows:\n",
+        "                for col, val in zip(columns, row):\n",
+        "                    print(f\"{col}: {val}\")\n",
+        "                print(\"-\" * 80)\n",
+        "\n",
+        "    finally:\n",
+        "        conn.close()"
+      ],
+      "metadata": {
+        "id": "l_BBCKl7KKcb",
+        "cellView": "form"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Create Sample Product Catalog Data\n",
+        "\n",
+        "We'll create a typical e-commerce catalog where you might want to:\n",
+        "- Generate embeddings for product text\n",
+        "- Store vectors alongside product data\n",
+        "- Enable vector similarity features\n",
+        "\n",
+        "Example product:\n",
+        "```python\n",
+        "{\n",
+        "    \"id\": \"desk-001\",\n",
+        "    \"name\": \"Modern Minimalist Desk\",\n",
+        "    \"description\": \"Sleek minimalist desk with clean lines and a 
spacious work surface. \"\n",
+        "                  \"Features cable management system and sturdy steel 
frame. \"\n",
+        "                  \"Perfect for contemporary home offices and 
workspaces.\",\n",
+        "    \"category\": \"Desks\",\n",
+        "    \"price\": 399.99,\n",
+        "    \"material\": \"Engineered Wood, Steel\",\n",
+        "    \"dimensions\": \"60W x 30D x 29H inches\"\n",
+        "}\n",
+        "```"
+      ],
+      "metadata": {
+        "id": "70z2O4nbOuaM"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "#@title Create sample data\n",
+        "PRODUCTS_DATA = [\n",
+        "    {\n",
+        "        \"id\": \"desk-001\",\n",
+        "        \"name\": \"Modern Minimalist Desk\",\n",
+        "        \"description\": \"Sleek minimalist desk with clean lines and 
a spacious work surface. \"\n",
+        "                      \"Features cable management system and sturdy 
steel frame. \"\n",
+        "                      \"Perfect for contemporary home offices and 
workspaces.\",\n",
+        "        \"category\": \"Desks\",\n",
+        "        \"price\": 399.99,\n",
+        "        \"material\": \"Engineered Wood, Steel\",\n",
+        "        \"dimensions\": \"60W x 30D x 29H inches\"\n",
+        "    },\n",
+        "    {\n",
+        "        \"id\": \"chair-001\",\n",
+        "        \"name\": \"Ergonomic Mesh Office Chair\",\n",
+        "        \"description\": \"Premium ergonomic office chair with 
breathable mesh back, \"\n",
+        "                      \"adjustable lumbar support, and 4D armrests. 
Features synchronized \"\n",
+        "                      \"tilt mechanism and memory foam seat cushion. 
Ideal for long work hours.\",\n",
+        "        \"category\": \"Office Chairs\",\n",
+        "        \"price\": 299.99,\n",
+        "        \"material\": \"Mesh, Metal, Premium Foam\",\n",
+        "        \"dimensions\": \"26W x 26D x 48H inches\"\n",
+        "    },\n",
+        "    {\n",
+        "        \"id\": \"sofa-001\",\n",
+        "        \"name\": \"Contemporary Sectional Sofa\",\n",
+        "        \"description\": \"Modern L-shaped sectional with chaise 
lounge. Upholstered in premium \"\n",
+        "                      \"performance fabric. Features deep seats, 
plush cushions, and solid \"\n",
+        "                      \"wood legs. Perfect for modern living 
rooms.\",\n",
+        "        \"category\": \"Sofas\",\n",
+        "        \"price\": 1299.99,\n",
+        "        \"material\": \"Performance Fabric, Solid Wood\",\n",
+        "        \"dimensions\": \"112W x 65D x 34H inches\"\n",
+        "    },\n",
+        "    {\n",
+        "        \"id\": \"table-001\",\n",
+        "        \"name\": \"Rustic Dining Table\",\n",
+        "        \"description\": \"Farmhouse-style dining table with solid 
wood construction. \"\n",
+        "                      \"Features distressed finish and trestle base. 
Seats 6-8 people \"\n",
+        "                      \"comfortably. Perfect for family 
gatherings.\",\n",
+        "        \"category\": \"Dining Tables\",\n",
+        "        \"price\": 899.99,\n",
+        "        \"material\": \"Solid Pine Wood\",\n",
+        "        \"dimensions\": \"72W x 42D x 30H inches\"\n",
+        "    },\n",
+        "    {\n",
+        "        \"id\": \"bed-001\",\n",
+        "        \"name\": \"Platform Storage Bed\",\n",
+        "        \"description\": \"Modern queen platform bed with integrated 
storage drawers. \"\n",
+        "                      \"Features upholstered headboard and durable 
wood slat support. \"\n",
+        "                      \"No box spring needed. Perfect for maximizing 
bedroom space.\",\n",
+        "        \"category\": \"Beds\",\n",
+        "        \"price\": 799.99,\n",
+        "        \"material\": \"Engineered Wood, Linen Fabric\",\n",
+        "        \"dimensions\": \"65W x 86D x 48H inches\"\n",
+        "    }\n",
+        "]\n",
+        "print(f\"\"\"✓ Created PRODUCTS_DATA with {len(PRODUCTS_DATA)} 
records\"\"\")"
+      ],
+      "metadata": {
+        "id": "7_J__S8JOwJ_",
+        "cellView": "form",
+        "colab": {
+          "base_uri": "https://localhost:8080/";
+        },
+        "outputId": "2b877676-25ad-45d2-ca7f-892f9a685075"
+      },
+      "execution_count": null,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "✓ Created PRODUCTS_DATA with 5 records\n"
+          ]
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Importing Pipeline Components\n",
+        "\n",
+        "We import the following for configuring our embedding ingestion 
pipeline:\n",
+        "- `Chunk`, the structured input for generating and ingesting 
embeddings\n",
+        "- `AlloyDBConnectionConfig` for configuring database connection 
information\n",
+        "- `AlloyDBVectorWriterConfig` for configuring write behavior like 
schema mapping and conflict resolution"
+      ],
+      "metadata": {
+        "id": "KUHPsWzQFKpL"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Embedding-specific imports\n",
+        "from apache_beam.ml.rag.ingestion.alloydb import (\n",
+        "    AlloyDBVectorWriterConfig,\n",
+        "    AlloyDBConnectionConfig\n",
+        ")\n",
+        "from apache_beam.ml.rag.ingestion.base import 
VectorDatabaseWriteTransform\n",
+        "from apache_beam.ml.rag.types import Chunk, Content\n",
+        "from apache_beam.ml.rag.embeddings.huggingface import 
HuggingfaceTextEmbeddings\n",
+        "\n",
+        "# Apache Beam core\n",
+        "import apache_beam as beam\n",
+        "from apache_beam.options.pipeline_options import PipelineOptions\n",
+        "from apache_beam.ml.transforms.base import MLTransform"
+      ],
+      "metadata": {
+        "id": "fFMjPZaelTi2"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# What's next?\n",
+        "\n",
+        "This colab covers several use cases that you can explore based on 
your needs after completing the Setup and Prerequisites:\n",
+        "\n",
+        "🔰 **New to vector embeddings?**\n",
+        "- [Start with Quick 
Start](#scrollTo=Quick_Start_Basic_Vector_Ingestion)\n",
+        "- Uses simple out-of-box schema\n",
+        "- Perfect for initial testing\n",
+        "\n",
+        "🚀 **Need to scale to large datasets?**\n",
+        "- [Go to Run on Dataflow](#scrollTo=Quick_Start_Run_on_Dataflow)\n",
+        "- Learn how to execute the same pipeline at scale\n",
+        "- Fully managed\n",
+        "- Process large datasets efficiently\n",
+        "\n",
+        "🎯 **Have a specific schema?**\n",
+        "- [Go to Custom 
Schema](#scrollTo=Custom_Schema_with_Column_Mapping)\n",
+        "- Learn to use different column names\n",
+        "- Map metadata to individual columns\n",
+        "\n",
+        "🔄 **Need to update embeddings?**\n",
+        "- [Check out Updating 
Embeddings](#scrollTo=Update_Embeddings_and_Metadata_with_Conflict_Resolution)\n",
+        "- Handle conflicts\n",
+        "- Selective field updates\n",
+        "\n",
+        "🔗 **Need to generate and Store Embeddings for Existing AlloyDB 
Data??**\n",
+        "- [See Database 
Integration](#scrollTo=Adding_Embeddings_to_Existing_Database_Records)\n",
+        "- Read data from your AlloyDB table.\n",
+        "- Generate embeddings for the relevant fields.\n",
+        "- Update your table (or a related table) with the generated 
embeddings.\n",
+        "\n",
+        "🤖 **Want to use Google's AI models?**\n",
+        "- [Try Vertex AI 
Embeddings](#scrollTo=Generate_Embeddings_with_VertexAI_Text_Embeddings)\n",
+        "- Use Google's powerful embedding models\n",
+        "- Seamlessly integrate with other Google Cloud services"
+      ],
+      "metadata": {
+        "id": "FjUzsUtXzFof"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "<a name=\"quickstart\"></a>\n",
+        "# Quick Start: Basic Vector Ingestion\n",
+        "\n",
+        "This section shows the simplest way to generate embeddings and store 
them in AlloyDB."
+      ],
+      "metadata": {
+        "id": "pLEi3Z4wKMOX"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Create table with default schema\n",
+        "\n",
+        "Before running the pipeline, we need a table to store our embeddings:"
+      ],
+      "metadata": {
+        "id": "LWqEgqjQOcbA"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "table_name = \"default_product_embeddings\"\n",
+        "table_schema = f\"\"\"\n",
+        "  id VARCHAR PRIMARY KEY,\n",
+        "  embedding VECTOR(384) NOT NULL,\n",
+        "  content text,\n",
+        "  metadata JSONB\n",
+        "\"\"\"\n",
+        "setup_alloydb_table(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)\n",
+        "test_alloydb_connection(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)"
+      ],
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/";
+        },
+        "id": "93YnjdJkFWOi",
+        "outputId": "f843e7db-d62a-468c-a211-4bace73d10c6"
+      },
+      "execution_count": null,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "Connected to AlloyDB successfully!\n",
+            "Creating pgvector extension...\n",
+            "Creating table...\n",
+            "Setup completed successfully!\n",
+            "✓ default_product_embeddings table exists\n",
+            "✓ pgvector extension is installed\n"
+          ]
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Configure Pipeline Components\n",
+        "\n",
+        "Now define the components that control the pipeline behavior:"
+      ],
+      "metadata": {
+        "id": "DikTnoGbOioG"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "### Map products to Chunks\n",
+        "- Our data is ingested as product dictionaries\n",
+        "- Embedding generation and ingestion processes `Chunks`\n",
+        "- We convert each product dictionary to a `Chunk` to configure what 
text to embed and what to treat as metadata"
+      ],
+      "metadata": {
+        "id": "M8rVyZ6o-Nep"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "from typing import Dict, Any\n",
+        "\n",
+        "# The create_chunk function converts our product dictionaries to 
Chunks.\n",
+        "# This doesn't split the text - it simply structures it in the 
format\n",
+        "# expected by the embedding pipeline components.\n",
+        "def create_chunk(product: Dict[str, Any]) -> Chunk:\n",
+        "    \"\"\"Convert a product dictionary into a Chunk object.\n",
+        "\n",
+        "       The pipeline components (MLTransform, 
VectorDatabaseWriteTransform)\n",
+        "       work with Chunk objects. This function:\n",
+        "       1. Extracts text we want to embed\n",
+        "       2. Preserves product data as metadata\n",
+        "       3. Creates a Chunk in the expected format\n",
+        "\n",
+        "    Args:\n",
+        "        product: Dictionary containing product information\n",
+        "\n",
+        "    Returns:\n",
+        "        Chunk: A Chunk object ready for embedding\n",
+        "    \"\"\"\n",
+        "    return Chunk(\n",
+        "        content=Content(\n",
+        "            text=f\"{product['name']}: {product['description']}\"\n",
+        "        ), # The text that will be embedded\n",
+        "        id=product['id'],  # Use product ID as chunk ID\n",
+        "        metadata=product,  # Store all product info in metadata\n",
+        "    )"
+      ],
+      "metadata": {
+        "id": "Rm_IX5U6mP_r"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "### Generate embeddings with HuggingFace"
+      ],
+      "metadata": {
+        "id": "xJaI9m3D7Vw-"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "We use a local pre-trained Hugging Face model to create vector 
embeddings from the product descriptions."
+      ],
+      "metadata": {
+        "id": "0dlm1fjQh2dX"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "huggingface_embedder = HuggingfaceTextEmbeddings(\n",
+        "    model_name=\"sentence-transformers/all-MiniLM-L6-v2\"\n",
+        ")"
+      ],
+      "metadata": {
+        "id": "E5LkHmjV7l2S"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "### Write to AlloyDB\n",
+        "\n",
+        "The default AlloyDBVectorWriterConfig maps Chunk fields to database 
columns as:\n",
+        "\n",
+        "| Database Column | Chunk Field | Description |\n",
+        "|----------------|-------------|-------------|\n",
+        "| id             | chunk.id    | Unique identifier |\n",
+        "| embedding      | chunk.embedding.dense_embedding | Vector 
representation |\n",
+        "| content        | chunk.content.text | Text that was embedded |\n",
+        "| metadata       | chunk.metadata | Additional data as JSONB |"
+      ],
+      "metadata": {
+        "id": "vVv8hD5wQo3w"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "alloydb_writer_config = AlloyDBVectorWriterConfig(\n",
+        "    connection_config = AlloyDBConnectionConfig(JDBC_URL, DB_USER, 
DB_PASSWORD),\n",
+        "    table_name=table_name\n",
+        ")"
+      ],
+      "metadata": {
+        "id": "moKsz_6xQt-E"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Assemble and Run Pipeline\n",
+        "\n",
+        "Now we can create our pipeline that:\n",
+        "1. Takes our product data\n",
+        "2. Converts each product to a Chunk\n",
+        "3. Generates embeddings for each Chunk\n",
+        "4. Stores everything in AlloyDB"
+      ],
+      "metadata": {
+        "id": "Ww2BPxTNKmL2"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "import tempfile\n",
+        "\n",
+        "# Executing on DirectRunner (local execution)\n",
+        "with beam.Pipeline() as p:\n",
+        "    _ = (\n",
+        "            p\n",
+        "            | 'Create Products' >> beam.Create(PRODUCTS_DATA)\n",
+        "            | 'Convert to Chunks' >> beam.Map(create_chunk)\n",
+        "            | 'Generate Embeddings' >> 
MLTransform(write_artifact_location=tempfile.mkdtemp())\n",
+        "              .with_transform(huggingface_embedder)\n",
+        "            | 'Write to AlloyDB' >> VectorDatabaseWriteTransform(\n",
+        "                alloydb_writer_config\n",
+        "            )\n",
+        "        )"
+      ],
+      "metadata": {
+        "id": "lyS3IpNBDgYw"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Verify Embeddings\n",
+        "Let's check what was written to our AlloyDB table:"
+      ],
+      "metadata": {
+        "id": "Qm97EAww6RvW"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "verify_embeddings(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)"
+      ],
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/";
+        },
+        "id": "-H3t2cIN6lO_",
+        "outputId": "895540d4-d312-4245-b7e0-ae15b7677f6b"
+      },
+      "execution_count": null,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "\n",
+            "Found 5 products:\n",
+            
"--------------------------------------------------------------------------------\n",
+            "id: desk-001\n",
+            "embedding: 
[-0.001057815,0.060585838,0.06550077,-0.020135332,0.0063834586,-0.016612189,-0.0031698928,0.10957789,-0.0820077,0.087466136,0.05336146,0.018569656,0.057456993,0.029180786,-0.0739839,-0.010892165,0.07001466,-0.03374081,0.051436793,0.07567188,-0.13465184,-0.032239668,-0.04638205,-0.029914198,-0.009278446,0.06718533,-0.051145084,0.042603016,0.007903699,-0.041032363,-0.04231419,-0.01325918,0.071415,0.040334247,-0.01094771,0.003317118,0.08076268,-0.017956244,-0.010060241,-0.050612617,-0.06333365,-0.015355688,-0.016382031,0.04253677,-0.0060182833,0.033938967,-0.022733817,-0.13714638,-0.046747543,-0.053437427,0.03156534,0.0036609878,0.008148901,0.0004498263,-0.0139052775,-0.0072347587,0.003329244,0.02151876,-0.002358143,-0.04545856,0.06712152,-0.06171683,0.0034830095,0.03768045,0.029724924,0.046725973,-0.041152626,0.022954624,0.06727486,-0.047817953,-0.022311574,-0.006367141,-0.034347665,0.0454951,7.043634e-05,-0.113772176,0.0037808658,-0.016453175,0.007873811,0.082
 
94731,-0.048830684,0.043934047,-0.072832346,0.075656876,-0.06601299,0.00082295056,-0.055017143,-0.046629313,0.003992826,-0.11849778,0.06739192,0.04190433,-0.042112317,0.012006758,-0.06356277,-0.052011997,0.04062942,0.0008779314,-0.10618619,0.058837418,-0.035058603,0.024972772,0.14561659,0.014725338,0.029324891,-0.0053626252,0.02814062,-0.066259705,-0.07870076,-0.03763159,-0.05551403,-0.06366927,-0.17272565,-0.07403342,0.028536445,-0.019524984,-0.006076428,-0.018281966,0.08323952,0.043096125,0.044669934,0.003794127,0.003360419,0.0038145634,0.017628977,0.053377304,0.04699343,1.8948987e-33,0.027950196,0.066461906,-0.028161732,-0.006389328,0.06030824,0.020444188,0.018832795,0.10140829,0.026494414,0.036146514,-0.026098972,-0.05159774,0.0011205175,0.023721311,0.05779271,-0.04863308,0.025785096,-0.010925589,-0.007652651,0.041972544,-0.05286998,0.09319047,0.041135963,-0.05854327,0.030171309,-0.04105334,0.07038842,-0.020940648,-0.02166687,-0.020070298,-0.06315385,-0.009049079,0.00078501547,-
 
0.015395543,-0.050217472,0.027864248,-0.01698884,-0.01361225,0.083218426,-0.00680146,-0.090368144,0.065700725,0.047122493,-0.018271266,0.038392376,0.067258775,0.15831783,0.0318156,0.05744108,-0.030053735,-0.017398562,0.05620426,0.03845883,0.07659209,-0.005228263,-0.07242891,-0.017286185,-0.06076563,0.010254259,0.04249031,0.0054570134,0.024156824,-0.03405574,0.03224425,0.043352388,0.0066218483,0.009701542,-0.0061702006,0.03439675,0.014364707,-0.07169268,-0.030080838,0.057064775,0.021052632,0.10315803,0.08609661,0.04735578,-0.09807956,-0.019662429,-0.06984223,-0.052169047,0.009997571,0.016189488,0.057329655,0.031931113,0.096302606,0.10293316,-0.016209126,-0.024686016,0.058873244,-0.03085521,-0.0737745,-0.042329386,0.060528547,0.004806,-2.3104881e-33,0.017774018,-0.033051778,-0.09566817,0.024296569,0.028017169,0.023638517,0.0076059336,-0.04351966,-0.06524873,0.07048606,0.07029284,-0.0062136357,0.018824892,0.0043494813,0.029355347,0.116440654,-0.049167324,-0.06571007,0.041200016,0.00702
 
7206,0.11581624,0.05254668,-0.11179672,-0.07191128,0.010691769,-0.02362584,-0.07400684,-0.035415992,0.040427044,-0.017533168,-0.10204969,-0.047531646,0.011986546,0.011579142,0.08209371,0.061332937,-0.12630121,0.06260612,0.021209786,0.0024153567,0.004879835,0.03823057,0.021687144,0.017282587,-0.0058235675,-0.041638855,-0.120774835,-0.05274331,-0.0901771,-0.062420562,-0.04692266,-0.040547855,-0.017517554,-0.033914257,-0.024906259,-0.044837877,-0.004414193,-0.022200402,-0.020926967,0.01944887,0.0038933426,-0.07519052,-0.075344004,-0.00925266,0.038882483,0.029127559,0.014658826,0.02615534,-0.06710353,-0.005388302,0.035875384,0.006635634,0.01845464,-0.029214669,0.030962888,0.03324723,0.0805287,-0.0140210055,0.034357224,0.0061449767,0.06392108,0.022795836,0.028148135,-0.08538441,0.030423703,0.076761656,-0.09278483,-0.06254251,-0.007314473,0.010542858,-0.014869807,-0.014268346,0.0038216454,0.043618064,-0.055879563,-2.8660342e-08,-0.026919082,-0.05864714,-0.00941353,-0.0044262623,-0.0176770
 
33,-0.06411421,0.029808018,-0.044010296,-0.057445858,-0.002930679,0.03394864,-0.07481234,-0.043237135,0.06960515,0.062109653,0.08589196,-0.016729712,0.040591072,-0.044696674,-0.013019682,0.042662516,-0.01285427,-0.004786111,0.015622201,-0.02249433,-0.011467345,0.08628773,0.028841617,0.011362021,0.022700336,0.036347754,0.023580225,0.006108897,0.022842467,0.03529631,0.0621403,-0.013211548,0.02976385,-0.112288035,-0.023612317,-0.030943196,-0.11450085,-0.07902988,0.00446465,0.028691083,-0.0420317,-0.040882185,-0.047819637,0.011485365,-0.010957185,0.020327702,0.011172781,0.037923362,0.009317626,-0.038125765,0.025017489,0.11998403,0.022702718,-0.07397983,-0.029796196,0.04002992,0.09435436,0.0037625604,0.05090331]\n",

Review Comment:
   There's a a couple very long output blocks like this - could you remove them 
from the checked in colab? That will make the rendering on GitHub much cleaner 
and plays well with import tooling. Basically, anywhere you see `"outputs":` 
you should be able to get rid of the output and just make it `"outputs": []` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to