damccorm commented on code in PR #34184:
URL: https://github.com/apache/beam/pull/34184#discussion_r1981931106
##########
examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb:
##########
@@ -0,0 +1,2563 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": [],
+ "collapsed_sections": [
+ "mcZATJbaOec0",
+ "-2hTEi-jzYN6"
+ ]
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "8ZekaWhZH2SX"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Vector Embedding Ingestion with Apache Beam and AlloyDB\n",
+ "\n",
+ "<table align=\"left\">\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\"
/>Run in Google Colab</a>\n",
+ " </td>\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\"
/>View source on GitHub</a>\n",
+ " </td>\n",
+ "</table>\n"
+ ],
+ "metadata": {
+ "id": "K6-p-DVrIFTY"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Introduction\n",
+ "\n",
+ "This Colab demonstrates how to generate embeddings from data and
ingest them into [AlloyDB](https://cloud.google.com/alloydb), Google Cloud's
fully managed, PostgreSQL-compatible database service. We'll use Apache Beam
and Dataflow for scalable data processing.\n",
+ "\n",
+ "The goal of this notebook is to make it easy for users to get started
with generating embeddings at scale using Apache Beam and storing them in
AlloyDB. We focus on building efficient ingestion pipelines that can handle
various data sources and embedding models.\n",
+ "\n",
+ "## Example: Furniture Product Catalog\n",
+ "\n",
+ "We'll work with a sample e-commerce dataset representing a furniture
product catalog. Each product has:\n",
+ "\n",
+ "* **Structured fields:** `id`, `name`, `category`, `price`\n",
+ "* **Detailed text descriptions:** Longer text describing the
product's features.\n",
+ "* **Additional metadata:** `material`, `dimensions`\n",
+ "\n",
+ "## Pipeline Overview\n",
+ "We will build a pipeline to:\n",
+ "1. Read product data\n",
+ "2. Convert unstructured product data, to `Chunk`<sup>[1]</sup>
type\n",
+ "2. Generate Embeddings: Use a pre-trained Hugging Face model (via
MLTransform) to create vector embeddings\n",
+ "3. Write to AlloyDB: Store the embeddings in an AlloyDB vector
database\n",
+ "\n",
+ "Here's a visualization of the data flow:\n",
+ "\n",
+ "| Stage | Data Representation
| Notes
|\n",
+ "| :------------------------ |
:------------------------------------------------------- |
:----------------------------------------------------------------------------------------------------------------------
|\n",
+ "| **1. Ingest Data** | `{`<br> ` \"id\": \"desk-001\",`<br> `
\"name\": \"Modern Desk\",`<br> ` \"description\": \"Sleek...\",`<br> `
\"category\": \"Desks\",`<br> ` ...`<br> `}` | Supports:<br>- Reading from
batch (e.g., files, databases)<br>- Streaming sources (e.g., Pub/Sub).
|\n",
+ "| **2. Convert to Chunks** | `Chunk(` <br>
`id=\"desk-001\",` <br> `content=Content(` <br>
`text=\"Modern Desk\"` <br> `),` <br>
`metadata={...}` <br> `)` | - `Chunk` is the structured input
for generating and ingesting embeddings.<br>- `chunk.content.text` is the field
that is embedded.<br> - Converting to `Chunk` does not mean breaking data into
smaller pieces,<br> it's simply organizing your data in a standard
format for the embedding pipeline.<br> - `Chunk` allows data to flow seamlessly
throughout embedding pipelines. |\n",
+ "| **3. Generate Embeddings**| `Chunk(` <br>
`id=\"desk-001\",`<br> `embedding=[-0.1, 0.6,
...],`<br> `...)` | Supports:<br>- Local Hugging Face models<br>- Remote
Vertex AI models<br>- Custom embedding implementations. |\n",
+ "| **4. Write to AlloyDB** | **AlloyDB Table (Example Row):**<br>`id:
desk-001`<br>`embedding: [-0.1, 0.6, ...]`<br> `name = \"Modern
Desk\"`,<br>`Other fields ...` | Supports:<br>- Custom schemas<br>- Conflict
resolution strategies for handling updates |\n",
+ "\n",
+ "\n",
+ "[1]: Chunk represents an embeddable unit of input. It specifies which
fields should be embedded and which fields should be treated as metadata.
Converting to Chunk does not necessarily mean breaking your text into smaller
pieces - it's primarily about structuring your data for the embedding pipeline.
For very long texts that exceed the embedding model's maximum input size, you
can optionally [use Langchain
TextSplitters](https://beam.apache.org/releases/pydoc/2.63.0/apache_beam.ml.rag.chunking.langchain.html)
to break the text into smaller `Chunk`'s.\n",
+ "\n",
+ "## Execution Environments\n",
+ "\n",
+ "This notebook demonstrates two execution environments:\n",
+ "\n",
+ "1. **DirectRunner (Local Execution)**: All examples in this notebook
run on DirectRunner by default, which executes the pipeline locally. This is
ideal for development, testing, and processing small datasets.\n",
+ "\n",
+ "2. **DataflowRunner (Distributed Execution)**: The [Run on
Dataflow](#scrollTo=Quick_Start_Run_on_Dataflow) section demonstrates how to
execute the same pipeline on Google Cloud Dataflow for scalable, distributed
processing. This is recommended for production workloads and large datasets.\n",
+ "\n",
+ "All examples in this notebook can be adapted to run on Dataflow by
following the pattern shown in the \"Run on Dataflow\" section."
+ ],
+ "metadata": {
+ "id": "WWwFCLRHZPm4"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Setup and Prerequisites\n",
+ "\n",
+ "This example requires:\n",
+ "1. An AlloyDB instance with pgvector extension enabled\n",
+ "2. Apache Beam 2.63.0 or later"
+ ],
+ "metadata": {
+ "id": "z2eAyRECIP3z"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Install Packages and Dependencies\n",
+ "\n",
+ "First, let's install the Python packages required for the embedding
and ingestion pipeline:"
+ ],
+ "metadata": {
+ "id": "WhOOPUBa6PyW"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Apache Beam with GCP support\n",
+ "!pip install apache_beam[gcp]==v2.63.0 --quiet\n",
+ "# Huggingface sentence-transformers for embedding models\n",
+ "!pip install sentence-transformers --quiet"
+ ],
+ "metadata": {
+ "id": "gCWRw2YE11wN"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "Next, let's install psycopg2-binary to help set up our test database."
+ ],
+ "metadata": {
+ "id": "4aqYZ_pG1oYb"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "!pip install psycopg2-binary --quiet"
+ ],
+ "metadata": {
+ "id": "eOYjnVDR87IE",
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "outputId": "b358dd5d-f9bd-49c6-bedd-8b72c8ca7391"
+ },
+ "execution_count": null,
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ "\u001b[?25l
\u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m0.0/3.0
MB\u001b[0m \u001b[31m?\u001b[0m eta \u001b[36m-:--:--\u001b[0m\r\u001b[2K
\u001b[91m━━━━━━━━━\u001b[0m\u001b[91m╸\u001b[0m\u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m
\u001b[32m0.7/3.0 MB\u001b[0m \u001b[31m23.3 MB/s\u001b[0m eta
\u001b[36m0:00:01\u001b[0m\r\u001b[2K
\u001b[91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m\u001b[91m╸\u001b[0m
\u001b[32m3.0/3.0 MB\u001b[0m \u001b[31m41.3 MB/s\u001b[0m eta
\u001b[36m0:00:01\u001b[0m\r\u001b[2K
\u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m3.0/3.0
MB\u001b[0m \u001b[31m30.0 MB/s\u0
01b[0m eta \u001b[36m0:00:00\u001b[0m\n",
+ "\u001b[?25h"
+ ]
+ }
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Database Setup\n",
+ "\n",
+ "To connect to AlloyDB, you'll need:\n",
+ "1. The JDBC connection URL\n",
+ "2. Database credentials\n",
+ "3. The pgvector extension enabled in your database\n",
+ "\n",
+ "Replace these placeholder values with your actual AlloyDB connection
details:"
+ ],
+ "metadata": {
+ "id": "VhgbpTKzI-zI"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "ALLOYDB_HOST = \"\" # @param {type:'string'}\n",
+ "DB_NAME = \"postgres\" # @param {type:'string'}\n",
+ "JDBC_URL = f\"jdbc:postgresql://{ALLOYDB_HOST}:5432/{DB_NAME}\"\n",
+ "DB_USER = \"postgres\" # @param {type:'string'}\n",
+ "DB_PASSWORD = \"\" # @param {type:'string'}"
+ ],
+ "metadata": {
+ "id": "oqKQT0c_JB5f"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### To connect from Colab to AlloyDB, you'll need one of the
following:\n",
+ "1. Public IP: Enable public IP on your AlloyDB instance and add your
Colab IP to the authorized networks\n",
+ "2. Auth Proxy: Use the Cloud SQL Auth Proxy to establish a secure
connection\n",
+ "3. VPC: Run this notebook on a Compute Engine VM in the same VPC as
your AlloyDB instance\n",
+ "\n",
+ "Your current IP address (for configuring AlloyDB authorized networks
if using public IP):\n"
+ ],
+ "metadata": {
+ "id": "VoURfADof7mO"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "!curl ifconfig.me"
+ ],
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "id": "qIdPWsOOsn9b",
+ "outputId": "281121d9-53f5-446f-a018-d27e3193546b"
+ },
+ "execution_count": null,
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ "34.168.233.188"
+ ]
+ }
+ ]
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "#@title Postgres helpers for creating tables and verifying data\n",
+ "import psycopg2\n",
+ "from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT\n",
+ "\n",
+ "def setup_alloydb_table(host: str,\n",
+ " database: str,\n",
+ " table_name: str,\n",
+ " user: str,\n",
+ " password: str,\n",
+ " port: int = 5432):\n",
+ " \"\"\"Set up AlloyDB table with vector extension and proper
schema.\n",
+ "\n",
+ " Args:\n",
+ " host: AlloyDB instance host\n",
+ " database: Database name\n",
+ " user: Database user\n",
+ " password: Database password\n",
+ " port: Database port (default: 5432)\n",
+ " \"\"\"\n",
+ " # Connection string\n",
+ " conn_string = f\"host={host} dbname={database} user={user}
password={password} port={port}\"\n",
+ "\n",
+ " try:\n",
+ " # Connect to the database\n",
+ " conn = psycopg2.connect(conn_string)\n",
+ " conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)\n",
+ " cur = conn.cursor()\n",
+ "\n",
+ " print(\"Connected to AlloyDB successfully!\")\n",
+ "\n",
+ " # Create pgvector extension if it doesn't exist\n",
+ " print(\"Creating pgvector extension...\")\n",
+ " cur.execute(\"CREATE EXTENSION IF NOT EXISTS vector;\")\n",
+ "\n",
+ " # Create the product_embeddings table\n",
+ " print(\"Creating table...\")\n",
+ " cur.execute(f\"\"\"\n",
+ " DROP TABLE IF EXISTS {table_name};\n",
+ " \"\"\")\n",
+ " cur.execute(f\"\"\"\n",
+ " CREATE TABLE IF NOT EXISTS {table_name} (\n",
+ " {table_schema}\n",
+ " );\n",
+ " \"\"\")\n",
+ "\n",
+ " print(\"Setup completed successfully!\")\n",
+ "\n",
+ " except Exception as e:\n",
+ " print(f\"An error occurred: {e}\")\n",
+ " finally:\n",
+ " if 'cur' in locals():\n",
+ " cur.close()\n",
+ " if 'conn' in locals():\n",
+ " conn.close()\n",
+ "\n",
+ "# Example usage (user will need to provide their actual connection
details)\n",
+ "\"\"\"\n",
+ "To set up your AlloyDB table, run the following with your connection
details:\n",
+ "\n",
+ "setup_alloydb_table(\n",
+ " host=\"your-alloydb-host\",\n",
+ " database=\"your-database\",\n",
+ " user=\"your-username\",\n",
+ " password=\"your-password\"\n",
+ ")\n",
+ "\"\"\"\n",
+ "\n",
+ "\"\"\"### Test the Connection and Table Setup\n",
+ "\n",
+ "You can verify the setup with a simple query:\n",
+ "\"\"\"\n",
+ "\n",
+ "def test_alloydb_connection(host: str,\n",
+ " database: str,\n",
+ " table_name: str,\n",
+ " user: str,\n",
+ " password: str,\n",
+ " port: int = 5432):\n",
+ " \"\"\"Test the AlloyDB connection and verify table creation.\n",
+ "\n",
+ " Args:\n",
+ " host: AlloyDB instance host\n",
+ " database: Database name\n",
+ " user: Database user\n",
+ " password: Database password\n",
+ " port: Database port (default: 5432)\n",
+ " \"\"\"\n",
+ " conn_string = f\"host={host} dbname={database} user={user}
password={password} port={port}\"\n",
+ "\n",
+ " try:\n",
+ " conn = psycopg2.connect(conn_string)\n",
+ " cur = conn.cursor()\n",
+ "\n",
+ " # Check if table exists\n",
+ " cur.execute(f\"\"\"\n",
+ " SELECT EXISTS (\n",
+ " SELECT FROM information_schema.tables\n",
+ " WHERE table_name = '{table_name}'\n",
+ " );\n",
+ " \"\"\")\n",
+ " table_exists = cur.fetchone()[0]\n",
+ "\n",
+ " if table_exists:\n",
+ " print(f\"✓ {table_name} table exists\")\n",
+ "\n",
+ " # Check if vector extension is installed\n",
+ " cur.execute(\"SELECT * FROM pg_extension WHERE extname =
'vector';\")\n",
+ " if cur.fetchone():\n",
+ " print(\"✓ pgvector extension is installed\")\n",
+ " else:\n",
+ " print(\"✗ pgvector extension is not installed\")\n",
+ " else:\n",
+ " print(f\"✗ {table_name} table does not exist\")\n",
+ "\n",
+ " except Exception as e:\n",
+ " print(f\"Connection test failed: {e}\")\n",
+ " finally:\n",
+ " if 'cur' in locals():\n",
+ " cur.close()\n",
+ " if 'conn' in locals():\n",
+ " conn.close()\n",
+ "\n",
+ "def verify_embeddings(host: str,\n",
+ " database: str,\n",
+ " table_name: str,\n",
+ " user: str,\n",
+ " password: str,\n",
+ " port: int = 5432):\n",
+ " \"\"\"Connect to AlloyDB and print all written products.\"\"\"\n",
+ " conn = psycopg2.connect(\n",
+ " host=host,\n",
+ " database=database,\n",
+ " user=user,\n",
+ " password=password,\n",
+ " port=port\n",
+ " )\n",
+ "\n",
+ " try:\n",
+ " with conn.cursor() as cur:\n",
+ " # Simple SELECT * query\n",
+ " cur.execute(f\"SELECT * FROM {table_name};\")\n",
+ " rows = cur.fetchall()\n",
+ "\n",
+ " # Get column names from cursor description\n",
+ " columns = [desc[0] for desc in cur.description]\n",
+ "\n",
+ " print(f\"\\nFound {len(rows)} products:\")\n",
+ " print(\"-\" * 80)\n",
+ "\n",
+ " # Print each row with column names\n",
+ " for row in rows:\n",
+ " for col, val in zip(columns, row):\n",
+ " print(f\"{col}: {val}\")\n",
+ " print(\"-\" * 80)\n",
+ "\n",
+ " finally:\n",
+ " conn.close()"
+ ],
+ "metadata": {
+ "id": "l_BBCKl7KKcb",
+ "cellView": "form"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Create Sample Product Catalog Data\n",
+ "\n",
+ "We'll create a typical e-commerce catalog where you might want to:\n",
+ "- Generate embeddings for product text\n",
+ "- Store vectors alongside product data\n",
+ "- Enable vector similarity features\n",
+ "\n",
+ "Example product:\n",
+ "```python\n",
+ "{\n",
+ " \"id\": \"desk-001\",\n",
+ " \"name\": \"Modern Minimalist Desk\",\n",
+ " \"description\": \"Sleek minimalist desk with clean lines and a
spacious work surface. \"\n",
+ " \"Features cable management system and sturdy steel
frame. \"\n",
+ " \"Perfect for contemporary home offices and
workspaces.\",\n",
+ " \"category\": \"Desks\",\n",
+ " \"price\": 399.99,\n",
+ " \"material\": \"Engineered Wood, Steel\",\n",
+ " \"dimensions\": \"60W x 30D x 29H inches\"\n",
+ "}\n",
+ "```"
+ ],
+ "metadata": {
+ "id": "70z2O4nbOuaM"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "#@title Create sample data\n",
+ "PRODUCTS_DATA = [\n",
+ " {\n",
+ " \"id\": \"desk-001\",\n",
+ " \"name\": \"Modern Minimalist Desk\",\n",
+ " \"description\": \"Sleek minimalist desk with clean lines and
a spacious work surface. \"\n",
+ " \"Features cable management system and sturdy
steel frame. \"\n",
+ " \"Perfect for contemporary home offices and
workspaces.\",\n",
+ " \"category\": \"Desks\",\n",
+ " \"price\": 399.99,\n",
+ " \"material\": \"Engineered Wood, Steel\",\n",
+ " \"dimensions\": \"60W x 30D x 29H inches\"\n",
+ " },\n",
+ " {\n",
+ " \"id\": \"chair-001\",\n",
+ " \"name\": \"Ergonomic Mesh Office Chair\",\n",
+ " \"description\": \"Premium ergonomic office chair with
breathable mesh back, \"\n",
+ " \"adjustable lumbar support, and 4D armrests.
Features synchronized \"\n",
+ " \"tilt mechanism and memory foam seat cushion.
Ideal for long work hours.\",\n",
+ " \"category\": \"Office Chairs\",\n",
+ " \"price\": 299.99,\n",
+ " \"material\": \"Mesh, Metal, Premium Foam\",\n",
+ " \"dimensions\": \"26W x 26D x 48H inches\"\n",
+ " },\n",
+ " {\n",
+ " \"id\": \"sofa-001\",\n",
+ " \"name\": \"Contemporary Sectional Sofa\",\n",
+ " \"description\": \"Modern L-shaped sectional with chaise
lounge. Upholstered in premium \"\n",
+ " \"performance fabric. Features deep seats,
plush cushions, and solid \"\n",
+ " \"wood legs. Perfect for modern living
rooms.\",\n",
+ " \"category\": \"Sofas\",\n",
+ " \"price\": 1299.99,\n",
+ " \"material\": \"Performance Fabric, Solid Wood\",\n",
+ " \"dimensions\": \"112W x 65D x 34H inches\"\n",
+ " },\n",
+ " {\n",
+ " \"id\": \"table-001\",\n",
+ " \"name\": \"Rustic Dining Table\",\n",
+ " \"description\": \"Farmhouse-style dining table with solid
wood construction. \"\n",
+ " \"Features distressed finish and trestle base.
Seats 6-8 people \"\n",
+ " \"comfortably. Perfect for family
gatherings.\",\n",
+ " \"category\": \"Dining Tables\",\n",
+ " \"price\": 899.99,\n",
+ " \"material\": \"Solid Pine Wood\",\n",
+ " \"dimensions\": \"72W x 42D x 30H inches\"\n",
+ " },\n",
+ " {\n",
+ " \"id\": \"bed-001\",\n",
+ " \"name\": \"Platform Storage Bed\",\n",
+ " \"description\": \"Modern queen platform bed with integrated
storage drawers. \"\n",
+ " \"Features upholstered headboard and durable
wood slat support. \"\n",
+ " \"No box spring needed. Perfect for maximizing
bedroom space.\",\n",
+ " \"category\": \"Beds\",\n",
+ " \"price\": 799.99,\n",
+ " \"material\": \"Engineered Wood, Linen Fabric\",\n",
+ " \"dimensions\": \"65W x 86D x 48H inches\"\n",
+ " }\n",
+ "]\n",
+ "print(f\"\"\"✓ Created PRODUCTS_DATA with {len(PRODUCTS_DATA)}
records\"\"\")"
+ ],
+ "metadata": {
+ "id": "7_J__S8JOwJ_",
+ "cellView": "form",
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "outputId": "2b877676-25ad-45d2-ca7f-892f9a685075"
+ },
+ "execution_count": null,
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ "✓ Created PRODUCTS_DATA with 5 records\n"
+ ]
+ }
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Importing Pipeline Components\n",
+ "\n",
+ "We import the following for configuring our embedding ingestion
pipeline:\n",
+ "- `Chunk`, the structured input for generating and ingesting
embeddings\n",
+ "- `AlloyDBConnectionConfig` for configuring database connection
information\n",
+ "- `AlloyDBVectorWriterConfig` for configuring write behavior like
schema mapping and conflict resolution"
+ ],
+ "metadata": {
+ "id": "KUHPsWzQFKpL"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Embedding-specific imports\n",
+ "from apache_beam.ml.rag.ingestion.alloydb import (\n",
+ " AlloyDBVectorWriterConfig,\n",
+ " AlloyDBConnectionConfig\n",
+ ")\n",
+ "from apache_beam.ml.rag.ingestion.base import
VectorDatabaseWriteTransform\n",
+ "from apache_beam.ml.rag.types import Chunk, Content\n",
+ "from apache_beam.ml.rag.embeddings.huggingface import
HuggingfaceTextEmbeddings\n",
+ "\n",
+ "# Apache Beam core\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.options.pipeline_options import PipelineOptions\n",
+ "from apache_beam.ml.transforms.base import MLTransform"
+ ],
+ "metadata": {
+ "id": "fFMjPZaelTi2"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# What's next?\n",
+ "\n",
+ "This colab covers several use cases that you can explore based on
your needs after completing the Setup and Prerequisites:\n",
+ "\n",
+ "🔰 **New to vector embeddings?**\n",
+ "- [Start with Quick
Start](#scrollTo=Quick_Start_Basic_Vector_Ingestion)\n",
+ "- Uses simple out-of-box schema\n",
+ "- Perfect for initial testing\n",
+ "\n",
+ "🚀 **Need to scale to large datasets?**\n",
+ "- [Go to Run on Dataflow](#scrollTo=Quick_Start_Run_on_Dataflow)\n",
+ "- Learn how to execute the same pipeline at scale\n",
+ "- Fully managed\n",
+ "- Process large datasets efficiently\n",
+ "\n",
+ "🎯 **Have a specific schema?**\n",
+ "- [Go to Custom
Schema](#scrollTo=Custom_Schema_with_Column_Mapping)\n",
+ "- Learn to use different column names\n",
+ "- Map metadata to individual columns\n",
+ "\n",
+ "🔄 **Need to update embeddings?**\n",
+ "- [Check out Updating
Embeddings](#scrollTo=Update_Embeddings_and_Metadata_with_Conflict_Resolution)\n",
+ "- Handle conflicts\n",
+ "- Selective field updates\n",
+ "\n",
+ "🔗 **Need to generate and Store Embeddings for Existing AlloyDB
Data??**\n",
+ "- [See Database
Integration](#scrollTo=Adding_Embeddings_to_Existing_Database_Records)\n",
+ "- Read data from your AlloyDB table.\n",
+ "- Generate embeddings for the relevant fields.\n",
+ "- Update your table (or a related table) with the generated
embeddings.\n",
+ "\n",
+ "🤖 **Want to use Google's AI models?**\n",
+ "- [Try Vertex AI
Embeddings](#scrollTo=Generate_Embeddings_with_VertexAI_Text_Embeddings)\n",
+ "- Use Google's powerful embedding models\n",
+ "- Seamlessly integrate with other Google Cloud services"
+ ],
+ "metadata": {
+ "id": "FjUzsUtXzFof"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "<a name=\"quickstart\"></a>\n",
+ "# Quick Start: Basic Vector Ingestion\n",
+ "\n",
+ "This section shows the simplest way to generate embeddings and store
them in AlloyDB."
+ ],
+ "metadata": {
+ "id": "pLEi3Z4wKMOX"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Create table with default schema\n",
+ "\n",
+ "Before running the pipeline, we need a table to store our embeddings:"
+ ],
+ "metadata": {
+ "id": "LWqEgqjQOcbA"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "table_name = \"default_product_embeddings\"\n",
+ "table_schema = f\"\"\"\n",
+ " id VARCHAR PRIMARY KEY,\n",
+ " embedding VECTOR(384) NOT NULL,\n",
+ " content text,\n",
+ " metadata JSONB\n",
+ "\"\"\"\n",
+ "setup_alloydb_table(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)\n",
+ "test_alloydb_connection(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)"
+ ],
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "id": "93YnjdJkFWOi",
+ "outputId": "f843e7db-d62a-468c-a211-4bace73d10c6"
+ },
+ "execution_count": null,
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ "Connected to AlloyDB successfully!\n",
+ "Creating pgvector extension...\n",
+ "Creating table...\n",
+ "Setup completed successfully!\n",
+ "✓ default_product_embeddings table exists\n",
+ "✓ pgvector extension is installed\n"
+ ]
+ }
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Configure Pipeline Components\n",
+ "\n",
+ "Now define the components that control the pipeline behavior:"
+ ],
+ "metadata": {
+ "id": "DikTnoGbOioG"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Map products to Chunks\n",
+ "- Our data is ingested as product dictionaries\n",
+ "- Embedding generation and ingestion processes `Chunks`\n",
+ "- We convert each product dictionary to a `Chunk` to configure what
text to embed and what to treat as metadata"
+ ],
+ "metadata": {
+ "id": "M8rVyZ6o-Nep"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "from typing import Dict, Any\n",
+ "\n",
+ "# The create_chunk function converts our product dictionaries to
Chunks.\n",
+ "# This doesn't split the text - it simply structures it in the
format\n",
+ "# expected by the embedding pipeline components.\n",
+ "def create_chunk(product: Dict[str, Any]) -> Chunk:\n",
+ " \"\"\"Convert a product dictionary into a Chunk object.\n",
+ "\n",
+ " The pipeline components (MLTransform,
VectorDatabaseWriteTransform)\n",
+ " work with Chunk objects. This function:\n",
+ " 1. Extracts text we want to embed\n",
+ " 2. Preserves product data as metadata\n",
+ " 3. Creates a Chunk in the expected format\n",
+ "\n",
+ " Args:\n",
+ " product: Dictionary containing product information\n",
+ "\n",
+ " Returns:\n",
+ " Chunk: A Chunk object ready for embedding\n",
+ " \"\"\"\n",
+ " return Chunk(\n",
+ " content=Content(\n",
+ " text=f\"{product['name']}: {product['description']}\"\n",
+ " ), # The text that will be embedded\n",
+ " id=product['id'], # Use product ID as chunk ID\n",
+ " metadata=product, # Store all product info in metadata\n",
+ " )"
+ ],
+ "metadata": {
+ "id": "Rm_IX5U6mP_r"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Generate embeddings with HuggingFace"
+ ],
+ "metadata": {
+ "id": "xJaI9m3D7Vw-"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "We use a local pre-trained Hugging Face model to create vector
embeddings from the product descriptions."
+ ],
+ "metadata": {
+ "id": "0dlm1fjQh2dX"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "huggingface_embedder = HuggingfaceTextEmbeddings(\n",
+ " model_name=\"sentence-transformers/all-MiniLM-L6-v2\"\n",
+ ")"
+ ],
+ "metadata": {
+ "id": "E5LkHmjV7l2S"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Write to AlloyDB\n",
+ "\n",
+ "The default AlloyDBVectorWriterConfig maps Chunk fields to database
columns as:\n",
+ "\n",
+ "| Database Column | Chunk Field | Description |\n",
+ "|----------------|-------------|-------------|\n",
+ "| id | chunk.id | Unique identifier |\n",
+ "| embedding | chunk.embedding.dense_embedding | Vector
representation |\n",
+ "| content | chunk.content.text | Text that was embedded |\n",
+ "| metadata | chunk.metadata | Additional data as JSONB |"
+ ],
+ "metadata": {
+ "id": "vVv8hD5wQo3w"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "alloydb_writer_config = AlloyDBVectorWriterConfig(\n",
+ " connection_config = AlloyDBConnectionConfig(JDBC_URL, DB_USER,
DB_PASSWORD),\n",
+ " table_name=table_name\n",
+ ")"
+ ],
+ "metadata": {
+ "id": "moKsz_6xQt-E"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Assemble and Run Pipeline\n",
+ "\n",
+ "Now we can create our pipeline that:\n",
+ "1. Takes our product data\n",
+ "2. Converts each product to a Chunk\n",
+ "3. Generates embeddings for each Chunk\n",
+ "4. Stores everything in AlloyDB"
+ ],
+ "metadata": {
+ "id": "Ww2BPxTNKmL2"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "import tempfile\n",
+ "\n",
+ "# Executing on DirectRunner (local execution)\n",
+ "with beam.Pipeline() as p:\n",
+ " _ = (\n",
+ " p\n",
+ " | 'Create Products' >> beam.Create(PRODUCTS_DATA)\n",
+ " | 'Convert to Chunks' >> beam.Map(create_chunk)\n",
+ " | 'Generate Embeddings' >>
MLTransform(write_artifact_location=tempfile.mkdtemp())\n",
+ " .with_transform(huggingface_embedder)\n",
+ " | 'Write to AlloyDB' >> VectorDatabaseWriteTransform(\n",
+ " alloydb_writer_config\n",
+ " )\n",
+ " )"
+ ],
+ "metadata": {
+ "id": "lyS3IpNBDgYw"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Verify Embeddings\n",
+ "Let's check what was written to our AlloyDB table:"
+ ],
+ "metadata": {
+ "id": "Qm97EAww6RvW"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "verify_embeddings(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)"
+ ],
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "id": "-H3t2cIN6lO_",
+ "outputId": "895540d4-d312-4245-b7e0-ae15b7677f6b"
+ },
+ "execution_count": null,
+ "outputs": [
+ {
+ "output_type": "stream",
+ "name": "stdout",
+ "text": [
+ "\n",
+ "Found 5 products:\n",
+
"--------------------------------------------------------------------------------\n",
+ "id: desk-001\n",
+ "embedding:
[-0.001057815,0.060585838,0.06550077,-0.020135332,0.0063834586,-0.016612189,-0.0031698928,0.10957789,-0.0820077,0.087466136,0.05336146,0.018569656,0.057456993,0.029180786,-0.0739839,-0.010892165,0.07001466,-0.03374081,0.051436793,0.07567188,-0.13465184,-0.032239668,-0.04638205,-0.029914198,-0.009278446,0.06718533,-0.051145084,0.042603016,0.007903699,-0.041032363,-0.04231419,-0.01325918,0.071415,0.040334247,-0.01094771,0.003317118,0.08076268,-0.017956244,-0.010060241,-0.050612617,-0.06333365,-0.015355688,-0.016382031,0.04253677,-0.0060182833,0.033938967,-0.022733817,-0.13714638,-0.046747543,-0.053437427,0.03156534,0.0036609878,0.008148901,0.0004498263,-0.0139052775,-0.0072347587,0.003329244,0.02151876,-0.002358143,-0.04545856,0.06712152,-0.06171683,0.0034830095,0.03768045,0.029724924,0.046725973,-0.041152626,0.022954624,0.06727486,-0.047817953,-0.022311574,-0.006367141,-0.034347665,0.0454951,7.043634e-05,-0.113772176,0.0037808658,-0.016453175,0.007873811,0.082
94731,-0.048830684,0.043934047,-0.072832346,0.075656876,-0.06601299,0.00082295056,-0.055017143,-0.046629313,0.003992826,-0.11849778,0.06739192,0.04190433,-0.042112317,0.012006758,-0.06356277,-0.052011997,0.04062942,0.0008779314,-0.10618619,0.058837418,-0.035058603,0.024972772,0.14561659,0.014725338,0.029324891,-0.0053626252,0.02814062,-0.066259705,-0.07870076,-0.03763159,-0.05551403,-0.06366927,-0.17272565,-0.07403342,0.028536445,-0.019524984,-0.006076428,-0.018281966,0.08323952,0.043096125,0.044669934,0.003794127,0.003360419,0.0038145634,0.017628977,0.053377304,0.04699343,1.8948987e-33,0.027950196,0.066461906,-0.028161732,-0.006389328,0.06030824,0.020444188,0.018832795,0.10140829,0.026494414,0.036146514,-0.026098972,-0.05159774,0.0011205175,0.023721311,0.05779271,-0.04863308,0.025785096,-0.010925589,-0.007652651,0.041972544,-0.05286998,0.09319047,0.041135963,-0.05854327,0.030171309,-0.04105334,0.07038842,-0.020940648,-0.02166687,-0.020070298,-0.06315385,-0.009049079,0.00078501547,-
0.015395543,-0.050217472,0.027864248,-0.01698884,-0.01361225,0.083218426,-0.00680146,-0.090368144,0.065700725,0.047122493,-0.018271266,0.038392376,0.067258775,0.15831783,0.0318156,0.05744108,-0.030053735,-0.017398562,0.05620426,0.03845883,0.07659209,-0.005228263,-0.07242891,-0.017286185,-0.06076563,0.010254259,0.04249031,0.0054570134,0.024156824,-0.03405574,0.03224425,0.043352388,0.0066218483,0.009701542,-0.0061702006,0.03439675,0.014364707,-0.07169268,-0.030080838,0.057064775,0.021052632,0.10315803,0.08609661,0.04735578,-0.09807956,-0.019662429,-0.06984223,-0.052169047,0.009997571,0.016189488,0.057329655,0.031931113,0.096302606,0.10293316,-0.016209126,-0.024686016,0.058873244,-0.03085521,-0.0737745,-0.042329386,0.060528547,0.004806,-2.3104881e-33,0.017774018,-0.033051778,-0.09566817,0.024296569,0.028017169,0.023638517,0.0076059336,-0.04351966,-0.06524873,0.07048606,0.07029284,-0.0062136357,0.018824892,0.0043494813,0.029355347,0.116440654,-0.049167324,-0.06571007,0.041200016,0.00702
7206,0.11581624,0.05254668,-0.11179672,-0.07191128,0.010691769,-0.02362584,-0.07400684,-0.035415992,0.040427044,-0.017533168,-0.10204969,-0.047531646,0.011986546,0.011579142,0.08209371,0.061332937,-0.12630121,0.06260612,0.021209786,0.0024153567,0.004879835,0.03823057,0.021687144,0.017282587,-0.0058235675,-0.041638855,-0.120774835,-0.05274331,-0.0901771,-0.062420562,-0.04692266,-0.040547855,-0.017517554,-0.033914257,-0.024906259,-0.044837877,-0.004414193,-0.022200402,-0.020926967,0.01944887,0.0038933426,-0.07519052,-0.075344004,-0.00925266,0.038882483,0.029127559,0.014658826,0.02615534,-0.06710353,-0.005388302,0.035875384,0.006635634,0.01845464,-0.029214669,0.030962888,0.03324723,0.0805287,-0.0140210055,0.034357224,0.0061449767,0.06392108,0.022795836,0.028148135,-0.08538441,0.030423703,0.076761656,-0.09278483,-0.06254251,-0.007314473,0.010542858,-0.014869807,-0.014268346,0.0038216454,0.043618064,-0.055879563,-2.8660342e-08,-0.026919082,-0.05864714,-0.00941353,-0.0044262623,-0.0176770
33,-0.06411421,0.029808018,-0.044010296,-0.057445858,-0.002930679,0.03394864,-0.07481234,-0.043237135,0.06960515,0.062109653,0.08589196,-0.016729712,0.040591072,-0.044696674,-0.013019682,0.042662516,-0.01285427,-0.004786111,0.015622201,-0.02249433,-0.011467345,0.08628773,0.028841617,0.011362021,0.022700336,0.036347754,0.023580225,0.006108897,0.022842467,0.03529631,0.0621403,-0.013211548,0.02976385,-0.112288035,-0.023612317,-0.030943196,-0.11450085,-0.07902988,0.00446465,0.028691083,-0.0420317,-0.040882185,-0.047819637,0.011485365,-0.010957185,0.020327702,0.011172781,0.037923362,0.009317626,-0.038125765,0.025017489,0.11998403,0.022702718,-0.07397983,-0.029796196,0.04002992,0.09435436,0.0037625604,0.05090331]\n",
Review Comment:
There's a a couple very long output blocks like this - could you remove them
from the checked in colab? That will make the rendering on GitHub much cleaner
and plays well with import tooling. Basically, anywhere you see `"outputs":`
you should be able to get rid of the output and just make it `"outputs": []`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]