damccorm commented on code in PR #34251:
URL: https://github.com/apache/beam/pull/34251#discussion_r1992143758
##########
examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb:
##########
@@ -2148,6 +2160,599 @@
"print(\"\\nAfter embedding generation:\")\n",
"verify_embeddings(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)"
]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Streaming Embeddings Updates from PubSub\n",
+ "\n",
+ "This section demonstrates how to build a real-time embedding pipeline
that continuously processes product updates and maintains fresh embeddings in
AlloyDB. This approach is ideal data that changes frequently.\n",
+ "\n",
+ "This example runs on Dataflow because streaming with DirectRunner and
writing via JDBC is not supported.\n",
+ "\n",
+ "### Authenticate with Google Cloud\n",
+ "To use the PubSub, we authenticate with Google Cloud.\n"
+ ],
+ "metadata": {
+ "id": "yv4Rd1ZvsB_M"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Replace <PROJECT_ID> with a valid Google Cloud project ID.\n",
+ "PROJECT_ID = '' # @param {type:'string'}\n",
+ "\n",
+ "from google.colab import auth\n",
+ "auth.authenticate_user(project_id=PROJECT_ID)"
+ ],
+ "metadata": {
+ "id": "VCqJmaznt1nS"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Setting Up PubSub Resources\n",
+ "\n",
+ "First, let's set up the necessary PubSub topics and subscriptions:"
+ ],
+ "metadata": {
+ "id": "2FsoFaugtsln"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "from google.cloud import pubsub_v1\n",
+ "from google.api_core.exceptions import AlreadyExists\n",
+ "import json\n",
+ "\n",
+ "# Define topic and subscription names\n",
+ "TOPIC = \"product-updates\" # @param {type:'string'}\n",
+ "SUBSCRIPTION = \"product-updates-sub\" # @param {type:'string'}\n",
+ "\n",
+ "# Create publisher client and topic\n",
+ "publisher = pubsub_v1.PublisherClient()\n",
+ "topic_path = publisher.topic_path(PROJECT_ID, TOPIC)\n",
+ "try:\n",
+ " topic = publisher.create_topic(request={\"name\": topic_path})\n",
+ " print(f\"Created topic: {topic.name}\")\n",
+ "except AlreadyExists:\n",
+ " print(f\"Topic {topic_path} already exists.\")\n",
+ "\n",
+ "# Create subscriber client and subscription\n",
+ "subscriber = pubsub_v1.SubscriberClient()\n",
+ "subscription_path = subscriber.subscription_path(PROJECT_ID,
SUBSCRIPTION)\n",
+ "try:\n",
+ " subscription = subscriber.create_subscription(\n",
+ " request={\"name\": subscription_path, \"topic\":
topic_path}\n",
+ " )\n",
+ " print(f\"Created subscription: {subscription.name}\")\n",
+ "except AlreadyExists:\n",
+ " print(f\"Subscription {subscription_path} already exists.\")"
+ ],
+ "metadata": {
+ "id": "nqMe0Brlt7Bk"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Create AlloyDB Table for Streaming Updates\n",
+ "\n",
+ "Next, create a table to store the embedded data."
+ ],
+ "metadata": {
+ "id": "07ZFeGbMuFj_"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "table_name = \"streaming_product_embeddings\"\n",
+ "table_schema = \"\"\"\n",
+ " id VARCHAR PRIMARY KEY,\n",
+ " embedding VECTOR(384) NOT NULL,\n",
+ " content text,\n",
+ " metadata JSONB,\n",
+ " last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP\n",
+ "\"\"\""
+ ],
+ "metadata": {
+ "id": "3Xc70uV_uJy5"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "setup_alloydb_table(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)\n",
+ "test_alloydb_connection(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)"
+ ],
+ "metadata": {
+ "id": "8HPhUfAuorBP"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Create Publisher Subprocess\n",
+ "The publisher simulates real-time product updates by:\n",
+ "- Publishing sample product data to the PubSub topic every 5
seconds\n",
+ "- Modifying prices and descriptions to represent changes\n",
+ "- Adding timestamps to track update times\n",
+ "- Running for 25 minutes in the background while our pipeline
processes the data"
+ ],
+ "metadata": {
+ "id": "r7nJdc09vs98"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "#@title Define PubSub publisher function\n",
+ "import threading\n",
+ "import time\n",
+ "import json\n",
+ "import logging\n",
+ "from google.cloud import pubsub_v1\n",
+ "import datetime\n",
+ "import os\n",
+ "import sys\n",
+ "log_file = os.path.join(os.getcwd(), \"publisher_log.txt\")\n",
+ "\n",
+ "print(f\"Log file will be created at: {log_file}\")\n",
+ "\n",
+ "def publisher_function(project_id, topic):\n",
+ " \"\"\"Function that publishes sample product updates to a PubSub
topic.\n",
+ "\n",
+ " This function runs in a separate thread and continuously
publishes\n",
+ " messages to simulate real-time product updates.\n",
+ " \"\"\"\n",
+ " thread_id = threading.current_thread().ident\n",
+ "\n",
+ " process_log_file = os.path.join(os.getcwd(),
f\"publisher_{thread_id}.log\")\n",
+ "\n",
+ " file_handler = logging.FileHandler(process_log_file)\n",
+ " file_handler.setFormatter(logging.Formatter('%(asctime)s -
ThreadID:%(thread)d - %(levelname)s - %(message)s'))\n",
+ "\n",
+ " logger = logging.getLogger(f\"worker.{thread_id}\")\n",
+ " logger.setLevel(logging.INFO)\n",
+ " logger.addHandler(file_handler)\n",
+ "\n",
+ " logger.info(f\"Publisher thread started with ID:
{thread_id}\")\n",
+ " file_handler.flush()\n",
+ "\n",
+ " publisher = pubsub_v1.PublisherClient()\n",
+ " topic_path = publisher.topic_path(project_id, topic)\n",
+ "\n",
+ " logger.info(\"Starting to publish messages...\")\n",
+ " file_handler.flush()\n",
+ " for i in range(300):\n",
+ " message_index = i % len(PRODUCTS_DATA)\n",
+ " message = PRODUCTS_DATA[message_index].copy()\n",
+ "\n",
+ " if message_index % 2 == 0:\n",
+ " dynamic_factor = 1.05 + (0.1 * ((i % 20) / 20))\n",
+ " message[\"price\"] = round(message[\"price\"] *
dynamic_factor, 2)\n",
+ " message[\"description\"] = f\"PRICE UPDATE (factor:
{dynamic_factor:.3f}): \" + message[\"description\"]\n",
+ "\n",
+ " message[\"published_at\"] =
datetime.datetime.now().isoformat()\n",
+ "\n",
+ " data = json.dumps(message).encode('utf-8')\n",
+ " publish_future = publisher.publish(topic_path, data)\n",
+ "\n",
+ " try:\n",
+ " logger.info(f\"Publishing message {message}\")\n",
+ " file_handler.flush()\n",
+ " message_id = publish_future.result()\n",
+ " logger.info(f\"Published message {i+1}: {message['id']}
(Message ID: {message_id})\")\n",
+ " file_handler.flush()\n",
+ " except Exception as e:\n",
+ " logger.error(f\"Error publishing message: {e}\")\n",
+ " file_handler.flush()\n",
+ "\n",
+ " time.sleep(5)\n",
+ "\n",
+ " logger.info(\"Finished publishing all messages.\")\n",
+ " file_handler.flush()"
+ ],
+ "metadata": {
+ "id": "C9Bf0Nb0vY7r",
+ "cellView": "form"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "#### Start publishing to PusSub in background"
Review Comment:
```suggestion
"#### Start publishing to PubSub in background"
```
##########
examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb:
##########
@@ -2148,6 +2160,599 @@
"print(\"\\nAfter embedding generation:\")\n",
"verify_embeddings(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)"
]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Streaming Embeddings Updates from PubSub\n",
+ "\n",
+ "This section demonstrates how to build a real-time embedding pipeline
that continuously processes product updates and maintains fresh embeddings in
AlloyDB. This approach is ideal data that changes frequently.\n",
+ "\n",
+ "This example runs on Dataflow because streaming with DirectRunner and
writing via JDBC is not supported.\n",
+ "\n",
+ "### Authenticate with Google Cloud\n",
+ "To use the PubSub, we authenticate with Google Cloud.\n"
+ ],
+ "metadata": {
+ "id": "yv4Rd1ZvsB_M"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Replace <PROJECT_ID> with a valid Google Cloud project ID.\n",
+ "PROJECT_ID = '' # @param {type:'string'}\n",
+ "\n",
+ "from google.colab import auth\n",
+ "auth.authenticate_user(project_id=PROJECT_ID)"
+ ],
+ "metadata": {
+ "id": "VCqJmaznt1nS"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Setting Up PubSub Resources\n",
+ "\n",
+ "First, let's set up the necessary PubSub topics and subscriptions:"
+ ],
+ "metadata": {
+ "id": "2FsoFaugtsln"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "from google.cloud import pubsub_v1\n",
+ "from google.api_core.exceptions import AlreadyExists\n",
+ "import json\n",
+ "\n",
+ "# Define topic and subscription names\n",
+ "TOPIC = \"product-updates\" # @param {type:'string'}\n",
+ "SUBSCRIPTION = \"product-updates-sub\" # @param {type:'string'}\n",
+ "\n",
+ "# Create publisher client and topic\n",
+ "publisher = pubsub_v1.PublisherClient()\n",
+ "topic_path = publisher.topic_path(PROJECT_ID, TOPIC)\n",
+ "try:\n",
+ " topic = publisher.create_topic(request={\"name\": topic_path})\n",
+ " print(f\"Created topic: {topic.name}\")\n",
+ "except AlreadyExists:\n",
+ " print(f\"Topic {topic_path} already exists.\")\n",
+ "\n",
+ "# Create subscriber client and subscription\n",
+ "subscriber = pubsub_v1.SubscriberClient()\n",
+ "subscription_path = subscriber.subscription_path(PROJECT_ID,
SUBSCRIPTION)\n",
+ "try:\n",
+ " subscription = subscriber.create_subscription(\n",
+ " request={\"name\": subscription_path, \"topic\":
topic_path}\n",
+ " )\n",
+ " print(f\"Created subscription: {subscription.name}\")\n",
+ "except AlreadyExists:\n",
+ " print(f\"Subscription {subscription_path} already exists.\")"
+ ],
+ "metadata": {
+ "id": "nqMe0Brlt7Bk"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Create AlloyDB Table for Streaming Updates\n",
+ "\n",
+ "Next, create a table to store the embedded data."
+ ],
+ "metadata": {
+ "id": "07ZFeGbMuFj_"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "table_name = \"streaming_product_embeddings\"\n",
+ "table_schema = \"\"\"\n",
+ " id VARCHAR PRIMARY KEY,\n",
+ " embedding VECTOR(384) NOT NULL,\n",
+ " content text,\n",
+ " metadata JSONB,\n",
+ " last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP\n",
+ "\"\"\""
+ ],
+ "metadata": {
+ "id": "3Xc70uV_uJy5"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "setup_alloydb_table(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)\n",
+ "test_alloydb_connection(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)"
+ ],
+ "metadata": {
+ "id": "8HPhUfAuorBP"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Create Publisher Subprocess\n",
+ "The publisher simulates real-time product updates by:\n",
+ "- Publishing sample product data to the PubSub topic every 5
seconds\n",
+ "- Modifying prices and descriptions to represent changes\n",
+ "- Adding timestamps to track update times\n",
+ "- Running for 25 minutes in the background while our pipeline
processes the data"
+ ],
+ "metadata": {
+ "id": "r7nJdc09vs98"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "#@title Define PubSub publisher function\n",
+ "import threading\n",
+ "import time\n",
+ "import json\n",
+ "import logging\n",
+ "from google.cloud import pubsub_v1\n",
+ "import datetime\n",
+ "import os\n",
+ "import sys\n",
+ "log_file = os.path.join(os.getcwd(), \"publisher_log.txt\")\n",
+ "\n",
+ "print(f\"Log file will be created at: {log_file}\")\n",
+ "\n",
+ "def publisher_function(project_id, topic):\n",
+ " \"\"\"Function that publishes sample product updates to a PubSub
topic.\n",
+ "\n",
+ " This function runs in a separate thread and continuously
publishes\n",
+ " messages to simulate real-time product updates.\n",
+ " \"\"\"\n",
+ " thread_id = threading.current_thread().ident\n",
+ "\n",
+ " process_log_file = os.path.join(os.getcwd(),
f\"publisher_{thread_id}.log\")\n",
+ "\n",
+ " file_handler = logging.FileHandler(process_log_file)\n",
+ " file_handler.setFormatter(logging.Formatter('%(asctime)s -
ThreadID:%(thread)d - %(levelname)s - %(message)s'))\n",
+ "\n",
+ " logger = logging.getLogger(f\"worker.{thread_id}\")\n",
+ " logger.setLevel(logging.INFO)\n",
+ " logger.addHandler(file_handler)\n",
+ "\n",
+ " logger.info(f\"Publisher thread started with ID:
{thread_id}\")\n",
+ " file_handler.flush()\n",
+ "\n",
+ " publisher = pubsub_v1.PublisherClient()\n",
+ " topic_path = publisher.topic_path(project_id, topic)\n",
+ "\n",
+ " logger.info(\"Starting to publish messages...\")\n",
+ " file_handler.flush()\n",
+ " for i in range(300):\n",
+ " message_index = i % len(PRODUCTS_DATA)\n",
+ " message = PRODUCTS_DATA[message_index].copy()\n",
+ "\n",
+ " if message_index % 2 == 0:\n",
+ " dynamic_factor = 1.05 + (0.1 * ((i % 20) / 20))\n",
+ " message[\"price\"] = round(message[\"price\"] *
dynamic_factor, 2)\n",
+ " message[\"description\"] = f\"PRICE UPDATE (factor:
{dynamic_factor:.3f}): \" + message[\"description\"]\n",
+ "\n",
+ " message[\"published_at\"] =
datetime.datetime.now().isoformat()\n",
+ "\n",
+ " data = json.dumps(message).encode('utf-8')\n",
+ " publish_future = publisher.publish(topic_path, data)\n",
+ "\n",
+ " try:\n",
+ " logger.info(f\"Publishing message {message}\")\n",
+ " file_handler.flush()\n",
+ " message_id = publish_future.result()\n",
+ " logger.info(f\"Published message {i+1}: {message['id']}
(Message ID: {message_id})\")\n",
+ " file_handler.flush()\n",
+ " except Exception as e:\n",
+ " logger.error(f\"Error publishing message: {e}\")\n",
+ " file_handler.flush()\n",
+ "\n",
+ " time.sleep(5)\n",
+ "\n",
+ " logger.info(\"Finished publishing all messages.\")\n",
+ " file_handler.flush()"
+ ],
+ "metadata": {
+ "id": "C9Bf0Nb0vY7r",
+ "cellView": "form"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "#### Start publishing to PusSub in background"
+ ],
+ "metadata": {
+ "id": "jnUSynmjEmVr"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Launch publisher in a separate thread\n",
+ "print(\"Starting publisher thread...\")\n",
+ "publisher_thread = threading.Thread(\n",
+ " target=publisher_function,\n",
+ " args=(PROJECT_ID, TOPIC),\n",
+ " daemon=True\n",
+ ")\n",
+ "publisher_thread.start()\n",
+ "print(f\"Publisher thread started with ID:
{publisher_thread.ident}\")\n",
+ "print(f\"Publisher thread logging to file:
publisher_{publisher_thread.ident}.log\")"
+ ],
+ "metadata": {
+ "id": "ZnBBTwZHw7Ex"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Define Streaming Pipeline\n",
+ "Our pipeline contains these key components:\n",
+ "\n",
+ "1. **Source**: Continuously reads messages from PubSub\n",
+ "2. **Windowing**: Groups messages into 10-second windows for batch
processing\n",
+ "3. **Transformation**: Converts JSON messages to Chunk objects for
embedding\n",
+ "4. **ML Processing**: Generates embeddings using HuggingFace
models\n",
+ "5. **Sink**: Writes results to AlloyDB with conflict resolution"
+ ],
+ "metadata": {
+ "id": "EK3pela7CLFj"
+ }
+ },
+ {
+ "cell_type": "markdown",
Review Comment:
Given that the pipeline is defined one cell down, I don't think we get much
out of including it here. It also renders a little oddly in GitHub and probably
in GCP docs -
https://github.com/apache/beam/blob/8c588f2d5c53f637eb745478beead92e4c5e19ce/examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb
##########
examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb:
##########
@@ -2148,6 +2160,599 @@
"print(\"\\nAfter embedding generation:\")\n",
"verify_embeddings(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)"
]
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Streaming Embeddings Updates from PubSub\n",
+ "\n",
+ "This section demonstrates how to build a real-time embedding pipeline
that continuously processes product updates and maintains fresh embeddings in
AlloyDB. This approach is ideal data that changes frequently.\n",
+ "\n",
+ "This example runs on Dataflow because streaming with DirectRunner and
writing via JDBC is not supported.\n",
+ "\n",
+ "### Authenticate with Google Cloud\n",
+ "To use the PubSub, we authenticate with Google Cloud.\n"
+ ],
+ "metadata": {
+ "id": "yv4Rd1ZvsB_M"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Replace <PROJECT_ID> with a valid Google Cloud project ID.\n",
+ "PROJECT_ID = '' # @param {type:'string'}\n",
+ "\n",
+ "from google.colab import auth\n",
+ "auth.authenticate_user(project_id=PROJECT_ID)"
+ ],
+ "metadata": {
+ "id": "VCqJmaznt1nS"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Setting Up PubSub Resources\n",
+ "\n",
+ "First, let's set up the necessary PubSub topics and subscriptions:"
+ ],
+ "metadata": {
+ "id": "2FsoFaugtsln"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "from google.cloud import pubsub_v1\n",
+ "from google.api_core.exceptions import AlreadyExists\n",
+ "import json\n",
+ "\n",
+ "# Define topic and subscription names\n",
+ "TOPIC = \"product-updates\" # @param {type:'string'}\n",
+ "SUBSCRIPTION = \"product-updates-sub\" # @param {type:'string'}\n",
+ "\n",
+ "# Create publisher client and topic\n",
+ "publisher = pubsub_v1.PublisherClient()\n",
+ "topic_path = publisher.topic_path(PROJECT_ID, TOPIC)\n",
+ "try:\n",
+ " topic = publisher.create_topic(request={\"name\": topic_path})\n",
+ " print(f\"Created topic: {topic.name}\")\n",
+ "except AlreadyExists:\n",
+ " print(f\"Topic {topic_path} already exists.\")\n",
+ "\n",
+ "# Create subscriber client and subscription\n",
+ "subscriber = pubsub_v1.SubscriberClient()\n",
+ "subscription_path = subscriber.subscription_path(PROJECT_ID,
SUBSCRIPTION)\n",
+ "try:\n",
+ " subscription = subscriber.create_subscription(\n",
+ " request={\"name\": subscription_path, \"topic\":
topic_path}\n",
+ " )\n",
+ " print(f\"Created subscription: {subscription.name}\")\n",
+ "except AlreadyExists:\n",
+ " print(f\"Subscription {subscription_path} already exists.\")"
+ ],
+ "metadata": {
+ "id": "nqMe0Brlt7Bk"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Create AlloyDB Table for Streaming Updates\n",
+ "\n",
+ "Next, create a table to store the embedded data."
+ ],
+ "metadata": {
+ "id": "07ZFeGbMuFj_"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "table_name = \"streaming_product_embeddings\"\n",
+ "table_schema = \"\"\"\n",
+ " id VARCHAR PRIMARY KEY,\n",
+ " embedding VECTOR(384) NOT NULL,\n",
+ " content text,\n",
+ " metadata JSONB,\n",
+ " last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP\n",
+ "\"\"\""
+ ],
+ "metadata": {
+ "id": "3Xc70uV_uJy5"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "setup_alloydb_table(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)\n",
+ "test_alloydb_connection(ALLOYDB_HOST, DB_NAME, table_name, DB_USER,
DB_PASSWORD)"
+ ],
+ "metadata": {
+ "id": "8HPhUfAuorBP"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Create Publisher Subprocess\n",
+ "The publisher simulates real-time product updates by:\n",
+ "- Publishing sample product data to the PubSub topic every 5
seconds\n",
+ "- Modifying prices and descriptions to represent changes\n",
+ "- Adding timestamps to track update times\n",
+ "- Running for 25 minutes in the background while our pipeline
processes the data"
+ ],
+ "metadata": {
+ "id": "r7nJdc09vs98"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "#@title Define PubSub publisher function\n",
+ "import threading\n",
+ "import time\n",
+ "import json\n",
+ "import logging\n",
+ "from google.cloud import pubsub_v1\n",
+ "import datetime\n",
+ "import os\n",
+ "import sys\n",
+ "log_file = os.path.join(os.getcwd(), \"publisher_log.txt\")\n",
+ "\n",
+ "print(f\"Log file will be created at: {log_file}\")\n",
+ "\n",
+ "def publisher_function(project_id, topic):\n",
+ " \"\"\"Function that publishes sample product updates to a PubSub
topic.\n",
+ "\n",
+ " This function runs in a separate thread and continuously
publishes\n",
+ " messages to simulate real-time product updates.\n",
+ " \"\"\"\n",
+ " thread_id = threading.current_thread().ident\n",
+ "\n",
+ " process_log_file = os.path.join(os.getcwd(),
f\"publisher_{thread_id}.log\")\n",
+ "\n",
+ " file_handler = logging.FileHandler(process_log_file)\n",
+ " file_handler.setFormatter(logging.Formatter('%(asctime)s -
ThreadID:%(thread)d - %(levelname)s - %(message)s'))\n",
+ "\n",
+ " logger = logging.getLogger(f\"worker.{thread_id}\")\n",
+ " logger.setLevel(logging.INFO)\n",
+ " logger.addHandler(file_handler)\n",
+ "\n",
+ " logger.info(f\"Publisher thread started with ID:
{thread_id}\")\n",
+ " file_handler.flush()\n",
+ "\n",
+ " publisher = pubsub_v1.PublisherClient()\n",
+ " topic_path = publisher.topic_path(project_id, topic)\n",
+ "\n",
+ " logger.info(\"Starting to publish messages...\")\n",
+ " file_handler.flush()\n",
+ " for i in range(300):\n",
+ " message_index = i % len(PRODUCTS_DATA)\n",
+ " message = PRODUCTS_DATA[message_index].copy()\n",
+ "\n",
+ " if message_index % 2 == 0:\n",
+ " dynamic_factor = 1.05 + (0.1 * ((i % 20) / 20))\n",
+ " message[\"price\"] = round(message[\"price\"] *
dynamic_factor, 2)\n",
+ " message[\"description\"] = f\"PRICE UPDATE (factor:
{dynamic_factor:.3f}): \" + message[\"description\"]\n",
+ "\n",
+ " message[\"published_at\"] =
datetime.datetime.now().isoformat()\n",
+ "\n",
+ " data = json.dumps(message).encode('utf-8')\n",
+ " publish_future = publisher.publish(topic_path, data)\n",
+ "\n",
+ " try:\n",
+ " logger.info(f\"Publishing message {message}\")\n",
+ " file_handler.flush()\n",
+ " message_id = publish_future.result()\n",
+ " logger.info(f\"Published message {i+1}: {message['id']}
(Message ID: {message_id})\")\n",
+ " file_handler.flush()\n",
+ " except Exception as e:\n",
+ " logger.error(f\"Error publishing message: {e}\")\n",
+ " file_handler.flush()\n",
+ "\n",
+ " time.sleep(5)\n",
+ "\n",
+ " logger.info(\"Finished publishing all messages.\")\n",
+ " file_handler.flush()"
+ ],
+ "metadata": {
+ "id": "C9Bf0Nb0vY7r",
+ "cellView": "form"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "#### Start publishing to PusSub in background"
+ ],
+ "metadata": {
+ "id": "jnUSynmjEmVr"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Launch publisher in a separate thread\n",
+ "print(\"Starting publisher thread...\")\n",
+ "publisher_thread = threading.Thread(\n",
+ " target=publisher_function,\n",
+ " args=(PROJECT_ID, TOPIC),\n",
+ " daemon=True\n",
+ ")\n",
+ "publisher_thread.start()\n",
+ "print(f\"Publisher thread started with ID:
{publisher_thread.ident}\")\n",
+ "print(f\"Publisher thread logging to file:
publisher_{publisher_thread.ident}.log\")"
+ ],
+ "metadata": {
+ "id": "ZnBBTwZHw7Ex"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Define Streaming Pipeline\n",
+ "Our pipeline contains these key components:\n",
+ "\n",
+ "1. **Source**: Continuously reads messages from PubSub\n",
+ "2. **Windowing**: Groups messages into 10-second windows for batch
processing\n",
+ "3. **Transformation**: Converts JSON messages to Chunk objects for
embedding\n",
+ "4. **ML Processing**: Generates embeddings using HuggingFace
models\n",
+ "5. **Sink**: Writes results to AlloyDB with conflict resolution"
+ ],
+ "metadata": {
+ "id": "EK3pela7CLFj"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "```python\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.options.pipeline_options import PipelineOptions,
StandardOptions\n",
+ "from apache_beam.transforms.window import FixedWindows\n",
+ "from apache_beam.ml.transforms.base import MLTransform\n",
+ "from apache_beam.ml.rag.types import Chunk, Content\n",
+ "from apache_beam.ml.rag.ingestion.base import
VectorDatabaseWriteTransform\n",
+ "from apache_beam.ml.rag.ingestion.alloydb import
AlloyDBVectorWriterConfig, AlloyDBConnectionConfig, ConflictResolution\n",
+ "from apache_beam.ml.rag.embeddings.huggingface import
HuggingfaceTextEmbeddings\n",
+ "import json\n",
+ "import tempfile\n",
+ "\n",
+ "# Parse incoming PubSub messages\n",
+ "def parse_message(message):\n",
+ " product_json = json.loads(message.decode('utf-8'))\n",
+ " return Chunk(\n",
+ " content=Content(text=f\"{product_json['name']}:
{product_json['description']}\"),\n",
+ " id=product_json['id'],\n",
+ " metadata=product_json\n",
+ " )\n",
+ "\n",
+ "# Set up pipeline options\n",
+ "options = PipelineOptions()\n",
+ "options.view_as(StandardOptions).streaming = True # This makes it a
streaming pipeline\n",
+ "\n",
+ "# Create and run the pipeline\n",
+ "with beam.Pipeline(options=options) as p:\n",
+ " _ = (\n",
+ " p\n",
+ " # SOURCE: Read continuously from PubSub\n",
+ " | \"Read from PubSub\" >> beam.io.ReadFromPubSub(\n",
+ "
subscription=\"projects/my-project/subscriptions/my-subscription\"\n",
+ " )\n",
+ " \n",
+ " # WINDOWING: Group messages into 10-second windows\n",
+ " | \"Window\" >> beam.WindowInto(FixedWindows(10))\n",
+ " \n",
+ " # TRANSFORM: Parse JSON messages into Chunks\n",
+ " | \"Parse Messages\" >> beam.Map(parse_message)\n",
+ " \n",
+ " # ML PROCESSING: Generate embeddings with Huggingface\n",
+ " | \"Generate Embeddings\" >>
MLTransform(write_artifact_location=tempfile.mkdtemp())\n",
+ " .with_transform(HuggingfaceTextEmbeddings(\n",
+ "
model_name=\"sentence-transformers/all-MiniLM-L6-v2\"\n",
+ " ))\n",
+ " \n",
+ " # SINK: Write to AlloyDB vector database\n",
+ " | \"Write to AlloyDB\" >> VectorDatabaseWriteTransform(\n",
+ " AlloyDBVectorWriterConfig(\n",
+ " connection_config=AlloyDBConnectionConfig(\n",
+ "
jdbc_url=\"jdbc:postgresql://host:port/database\",\n",
+ " username=\"user\",\n",
+ " password=\"password\"\n",
+ " ),\n",
+ " table_name=\"product_embeddings\",\n",
+ " conflict_resolution=ConflictResolution(\n",
+ " on_conflict_fields=\"id\",\n",
+ " action=\"UPDATE\",\n",
+ " update_fields=[\"embedding\", \"content\",
\"metadata\"]\n",
+ " )\n",
+ " )\n",
+ " )\n",
+ " )\n",
+ "```"
+ ],
+ "metadata": {
+ "id": "hWEpljFpkvp3"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Save our Pipeline to a python file\n",
+ "\n",
+ "To launch our pipeline job on DataFlow we save our pipeline code to a
local file `streaming_ingestion_pipeline.py`.\n",
+ "\n",
+ "Expand the hidden cell to see the entire pipeline content."
+ ],
+ "metadata": {
+ "id": "0PBGyr6mnSLs"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "#@title Save pipeline to streaming_ingestion_pipeline.py\n",
+ "file_content = \"\"\"\n",
Review Comment:
Rather than saving this to a file and running it that way, can we just run
it directly through colab like
https://github.com/apache/beam/blob/8c588f2d5c53f637eb745478beead92e4c5e19ce/examples/notebooks/beam-ml/automatic_model_refresh.ipynb
?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]