damccorm commented on code in PR #34251:
URL: https://github.com/apache/beam/pull/34251#discussion_r1992143758


##########
examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb:
##########
@@ -2148,6 +2160,599 @@
         "print(\"\\nAfter embedding generation:\")\n",
         "verify_embeddings(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)"
       ]
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Streaming Embeddings Updates from PubSub\n",
+        "\n",
+        "This section demonstrates how to build a real-time embedding pipeline 
that continuously processes product updates and maintains fresh embeddings in 
AlloyDB. This approach is ideal data that changes frequently.\n",
+        "\n",
+        "This example runs on Dataflow because streaming with DirectRunner and 
writing via JDBC is not supported.\n",
+        "\n",
+        "### Authenticate with Google Cloud\n",
+        "To use the PubSub, we authenticate with Google Cloud.\n"
+      ],
+      "metadata": {
+        "id": "yv4Rd1ZvsB_M"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Replace <PROJECT_ID> with a valid Google Cloud project ID.\n",
+        "PROJECT_ID = '' # @param {type:'string'}\n",
+        "\n",
+        "from google.colab import auth\n",
+        "auth.authenticate_user(project_id=PROJECT_ID)"
+      ],
+      "metadata": {
+        "id": "VCqJmaznt1nS"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "### Setting Up PubSub Resources\n",
+        "\n",
+        "First, let's set up the necessary PubSub topics and subscriptions:"
+      ],
+      "metadata": {
+        "id": "2FsoFaugtsln"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "from google.cloud import pubsub_v1\n",
+        "from google.api_core.exceptions import AlreadyExists\n",
+        "import json\n",
+        "\n",
+        "# Define topic and subscription names\n",
+        "TOPIC = \"product-updates\" # @param {type:'string'}\n",
+        "SUBSCRIPTION = \"product-updates-sub\" # @param {type:'string'}\n",
+        "\n",
+        "# Create publisher client and topic\n",
+        "publisher = pubsub_v1.PublisherClient()\n",
+        "topic_path = publisher.topic_path(PROJECT_ID, TOPIC)\n",
+        "try:\n",
+        "    topic = publisher.create_topic(request={\"name\": topic_path})\n",
+        "    print(f\"Created topic: {topic.name}\")\n",
+        "except AlreadyExists:\n",
+        "    print(f\"Topic {topic_path} already exists.\")\n",
+        "\n",
+        "# Create subscriber client and subscription\n",
+        "subscriber = pubsub_v1.SubscriberClient()\n",
+        "subscription_path = subscriber.subscription_path(PROJECT_ID, 
SUBSCRIPTION)\n",
+        "try:\n",
+        "    subscription = subscriber.create_subscription(\n",
+        "        request={\"name\": subscription_path, \"topic\": 
topic_path}\n",
+        "    )\n",
+        "    print(f\"Created subscription: {subscription.name}\")\n",
+        "except AlreadyExists:\n",
+        "    print(f\"Subscription {subscription_path} already exists.\")"
+      ],
+      "metadata": {
+        "id": "nqMe0Brlt7Bk"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "### Create AlloyDB Table for Streaming Updates\n",
+        "\n",
+        "Next, create a table to store the embedded data."
+      ],
+      "metadata": {
+        "id": "07ZFeGbMuFj_"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "table_name = \"streaming_product_embeddings\"\n",
+        "table_schema = \"\"\"\n",
+        "  id VARCHAR PRIMARY KEY,\n",
+        "  embedding VECTOR(384) NOT NULL,\n",
+        "  content text,\n",
+        "  metadata JSONB,\n",
+        "  last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP\n",
+        "\"\"\""
+      ],
+      "metadata": {
+        "id": "3Xc70uV_uJy5"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "setup_alloydb_table(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)\n",
+        "test_alloydb_connection(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)"
+      ],
+      "metadata": {
+        "id": "8HPhUfAuorBP"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "### Create Publisher Subprocess\n",
+        "The publisher simulates real-time product updates by:\n",
+        "- Publishing sample product data to the PubSub topic every 5 
seconds\n",
+        "- Modifying prices and descriptions to represent changes\n",
+        "- Adding timestamps to track update times\n",
+        "- Running for 25 minutes in the background while our pipeline 
processes the data"
+      ],
+      "metadata": {
+        "id": "r7nJdc09vs98"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "#@title Define PubSub publisher function\n",
+        "import threading\n",
+        "import time\n",
+        "import json\n",
+        "import logging\n",
+        "from google.cloud import pubsub_v1\n",
+        "import datetime\n",
+        "import os\n",
+        "import sys\n",
+        "log_file = os.path.join(os.getcwd(), \"publisher_log.txt\")\n",
+        "\n",
+        "print(f\"Log file will be created at: {log_file}\")\n",
+        "\n",
+        "def publisher_function(project_id, topic):\n",
+        "    \"\"\"Function that publishes sample product updates to a PubSub 
topic.\n",
+        "\n",
+        "    This function runs in a separate thread and continuously 
publishes\n",
+        "    messages to simulate real-time product updates.\n",
+        "    \"\"\"\n",
+        "    thread_id = threading.current_thread().ident\n",
+        "\n",
+        "    process_log_file = os.path.join(os.getcwd(), 
f\"publisher_{thread_id}.log\")\n",
+        "\n",
+        "    file_handler = logging.FileHandler(process_log_file)\n",
+        "    file_handler.setFormatter(logging.Formatter('%(asctime)s - 
ThreadID:%(thread)d - %(levelname)s - %(message)s'))\n",
+        "\n",
+        "    logger = logging.getLogger(f\"worker.{thread_id}\")\n",
+        "    logger.setLevel(logging.INFO)\n",
+        "    logger.addHandler(file_handler)\n",
+        "\n",
+        "    logger.info(f\"Publisher thread started with ID: 
{thread_id}\")\n",
+        "    file_handler.flush()\n",
+        "\n",
+        "    publisher = pubsub_v1.PublisherClient()\n",
+        "    topic_path = publisher.topic_path(project_id, topic)\n",
+        "\n",
+        "    logger.info(\"Starting to publish messages...\")\n",
+        "    file_handler.flush()\n",
+        "    for i in range(300):\n",
+        "        message_index = i % len(PRODUCTS_DATA)\n",
+        "        message = PRODUCTS_DATA[message_index].copy()\n",
+        "\n",
+        "        if message_index % 2 == 0:\n",
+        "            dynamic_factor = 1.05 + (0.1 * ((i % 20) / 20))\n",
+        "            message[\"price\"] = round(message[\"price\"] * 
dynamic_factor, 2)\n",
+        "            message[\"description\"] = f\"PRICE UPDATE (factor: 
{dynamic_factor:.3f}): \" + message[\"description\"]\n",
+        "\n",
+        "        message[\"published_at\"] = 
datetime.datetime.now().isoformat()\n",
+        "\n",
+        "        data = json.dumps(message).encode('utf-8')\n",
+        "        publish_future = publisher.publish(topic_path, data)\n",
+        "\n",
+        "        try:\n",
+        "            logger.info(f\"Publishing message {message}\")\n",
+        "            file_handler.flush()\n",
+        "            message_id = publish_future.result()\n",
+        "            logger.info(f\"Published message {i+1}: {message['id']} 
(Message ID: {message_id})\")\n",
+        "            file_handler.flush()\n",
+        "        except Exception as e:\n",
+        "            logger.error(f\"Error publishing message: {e}\")\n",
+        "            file_handler.flush()\n",
+        "\n",
+        "        time.sleep(5)\n",
+        "\n",
+        "    logger.info(\"Finished publishing all messages.\")\n",
+        "    file_handler.flush()"
+      ],
+      "metadata": {
+        "id": "C9Bf0Nb0vY7r",
+        "cellView": "form"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "#### Start publishing to PusSub in background"

Review Comment:
   ```suggestion
           "#### Start publishing to PubSub in background"
   ```



##########
examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb:
##########
@@ -2148,6 +2160,599 @@
         "print(\"\\nAfter embedding generation:\")\n",
         "verify_embeddings(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)"
       ]
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Streaming Embeddings Updates from PubSub\n",
+        "\n",
+        "This section demonstrates how to build a real-time embedding pipeline 
that continuously processes product updates and maintains fresh embeddings in 
AlloyDB. This approach is ideal data that changes frequently.\n",
+        "\n",
+        "This example runs on Dataflow because streaming with DirectRunner and 
writing via JDBC is not supported.\n",
+        "\n",
+        "### Authenticate with Google Cloud\n",
+        "To use the PubSub, we authenticate with Google Cloud.\n"
+      ],
+      "metadata": {
+        "id": "yv4Rd1ZvsB_M"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Replace <PROJECT_ID> with a valid Google Cloud project ID.\n",
+        "PROJECT_ID = '' # @param {type:'string'}\n",
+        "\n",
+        "from google.colab import auth\n",
+        "auth.authenticate_user(project_id=PROJECT_ID)"
+      ],
+      "metadata": {
+        "id": "VCqJmaznt1nS"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "### Setting Up PubSub Resources\n",
+        "\n",
+        "First, let's set up the necessary PubSub topics and subscriptions:"
+      ],
+      "metadata": {
+        "id": "2FsoFaugtsln"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "from google.cloud import pubsub_v1\n",
+        "from google.api_core.exceptions import AlreadyExists\n",
+        "import json\n",
+        "\n",
+        "# Define topic and subscription names\n",
+        "TOPIC = \"product-updates\" # @param {type:'string'}\n",
+        "SUBSCRIPTION = \"product-updates-sub\" # @param {type:'string'}\n",
+        "\n",
+        "# Create publisher client and topic\n",
+        "publisher = pubsub_v1.PublisherClient()\n",
+        "topic_path = publisher.topic_path(PROJECT_ID, TOPIC)\n",
+        "try:\n",
+        "    topic = publisher.create_topic(request={\"name\": topic_path})\n",
+        "    print(f\"Created topic: {topic.name}\")\n",
+        "except AlreadyExists:\n",
+        "    print(f\"Topic {topic_path} already exists.\")\n",
+        "\n",
+        "# Create subscriber client and subscription\n",
+        "subscriber = pubsub_v1.SubscriberClient()\n",
+        "subscription_path = subscriber.subscription_path(PROJECT_ID, 
SUBSCRIPTION)\n",
+        "try:\n",
+        "    subscription = subscriber.create_subscription(\n",
+        "        request={\"name\": subscription_path, \"topic\": 
topic_path}\n",
+        "    )\n",
+        "    print(f\"Created subscription: {subscription.name}\")\n",
+        "except AlreadyExists:\n",
+        "    print(f\"Subscription {subscription_path} already exists.\")"
+      ],
+      "metadata": {
+        "id": "nqMe0Brlt7Bk"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "### Create AlloyDB Table for Streaming Updates\n",
+        "\n",
+        "Next, create a table to store the embedded data."
+      ],
+      "metadata": {
+        "id": "07ZFeGbMuFj_"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "table_name = \"streaming_product_embeddings\"\n",
+        "table_schema = \"\"\"\n",
+        "  id VARCHAR PRIMARY KEY,\n",
+        "  embedding VECTOR(384) NOT NULL,\n",
+        "  content text,\n",
+        "  metadata JSONB,\n",
+        "  last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP\n",
+        "\"\"\""
+      ],
+      "metadata": {
+        "id": "3Xc70uV_uJy5"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "setup_alloydb_table(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)\n",
+        "test_alloydb_connection(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)"
+      ],
+      "metadata": {
+        "id": "8HPhUfAuorBP"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "### Create Publisher Subprocess\n",
+        "The publisher simulates real-time product updates by:\n",
+        "- Publishing sample product data to the PubSub topic every 5 
seconds\n",
+        "- Modifying prices and descriptions to represent changes\n",
+        "- Adding timestamps to track update times\n",
+        "- Running for 25 minutes in the background while our pipeline 
processes the data"
+      ],
+      "metadata": {
+        "id": "r7nJdc09vs98"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "#@title Define PubSub publisher function\n",
+        "import threading\n",
+        "import time\n",
+        "import json\n",
+        "import logging\n",
+        "from google.cloud import pubsub_v1\n",
+        "import datetime\n",
+        "import os\n",
+        "import sys\n",
+        "log_file = os.path.join(os.getcwd(), \"publisher_log.txt\")\n",
+        "\n",
+        "print(f\"Log file will be created at: {log_file}\")\n",
+        "\n",
+        "def publisher_function(project_id, topic):\n",
+        "    \"\"\"Function that publishes sample product updates to a PubSub 
topic.\n",
+        "\n",
+        "    This function runs in a separate thread and continuously 
publishes\n",
+        "    messages to simulate real-time product updates.\n",
+        "    \"\"\"\n",
+        "    thread_id = threading.current_thread().ident\n",
+        "\n",
+        "    process_log_file = os.path.join(os.getcwd(), 
f\"publisher_{thread_id}.log\")\n",
+        "\n",
+        "    file_handler = logging.FileHandler(process_log_file)\n",
+        "    file_handler.setFormatter(logging.Formatter('%(asctime)s - 
ThreadID:%(thread)d - %(levelname)s - %(message)s'))\n",
+        "\n",
+        "    logger = logging.getLogger(f\"worker.{thread_id}\")\n",
+        "    logger.setLevel(logging.INFO)\n",
+        "    logger.addHandler(file_handler)\n",
+        "\n",
+        "    logger.info(f\"Publisher thread started with ID: 
{thread_id}\")\n",
+        "    file_handler.flush()\n",
+        "\n",
+        "    publisher = pubsub_v1.PublisherClient()\n",
+        "    topic_path = publisher.topic_path(project_id, topic)\n",
+        "\n",
+        "    logger.info(\"Starting to publish messages...\")\n",
+        "    file_handler.flush()\n",
+        "    for i in range(300):\n",
+        "        message_index = i % len(PRODUCTS_DATA)\n",
+        "        message = PRODUCTS_DATA[message_index].copy()\n",
+        "\n",
+        "        if message_index % 2 == 0:\n",
+        "            dynamic_factor = 1.05 + (0.1 * ((i % 20) / 20))\n",
+        "            message[\"price\"] = round(message[\"price\"] * 
dynamic_factor, 2)\n",
+        "            message[\"description\"] = f\"PRICE UPDATE (factor: 
{dynamic_factor:.3f}): \" + message[\"description\"]\n",
+        "\n",
+        "        message[\"published_at\"] = 
datetime.datetime.now().isoformat()\n",
+        "\n",
+        "        data = json.dumps(message).encode('utf-8')\n",
+        "        publish_future = publisher.publish(topic_path, data)\n",
+        "\n",
+        "        try:\n",
+        "            logger.info(f\"Publishing message {message}\")\n",
+        "            file_handler.flush()\n",
+        "            message_id = publish_future.result()\n",
+        "            logger.info(f\"Published message {i+1}: {message['id']} 
(Message ID: {message_id})\")\n",
+        "            file_handler.flush()\n",
+        "        except Exception as e:\n",
+        "            logger.error(f\"Error publishing message: {e}\")\n",
+        "            file_handler.flush()\n",
+        "\n",
+        "        time.sleep(5)\n",
+        "\n",
+        "    logger.info(\"Finished publishing all messages.\")\n",
+        "    file_handler.flush()"
+      ],
+      "metadata": {
+        "id": "C9Bf0Nb0vY7r",
+        "cellView": "form"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "#### Start publishing to PusSub in background"
+      ],
+      "metadata": {
+        "id": "jnUSynmjEmVr"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Launch publisher in a separate thread\n",
+        "print(\"Starting publisher thread...\")\n",
+        "publisher_thread = threading.Thread(\n",
+        "    target=publisher_function,\n",
+        "    args=(PROJECT_ID, TOPIC),\n",
+        "    daemon=True\n",
+        ")\n",
+        "publisher_thread.start()\n",
+        "print(f\"Publisher thread started with ID: 
{publisher_thread.ident}\")\n",
+        "print(f\"Publisher thread logging to file: 
publisher_{publisher_thread.ident}.log\")"
+      ],
+      "metadata": {
+        "id": "ZnBBTwZHw7Ex"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "### Define Streaming Pipeline\n",
+        "Our pipeline contains these key components:\n",
+        "\n",
+        "1. **Source**: Continuously reads messages from PubSub\n",
+        "2. **Windowing**: Groups messages into 10-second windows for batch 
processing\n",
+        "3. **Transformation**: Converts JSON messages to Chunk objects for 
embedding\n",
+        "4. **ML Processing**: Generates embeddings using HuggingFace 
models\n",
+        "5. **Sink**: Writes results to AlloyDB with conflict resolution"
+      ],
+      "metadata": {
+        "id": "EK3pela7CLFj"
+      }
+    },
+    {
+      "cell_type": "markdown",

Review Comment:
   Given that the pipeline is defined one cell down, I don't think we get much 
out of including it here. It also renders a little oddly in GitHub and probably 
in GCP docs - 
https://github.com/apache/beam/blob/8c588f2d5c53f637eb745478beead92e4c5e19ce/examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb



##########
examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb:
##########
@@ -2148,6 +2160,599 @@
         "print(\"\\nAfter embedding generation:\")\n",
         "verify_embeddings(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)"
       ]
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Streaming Embeddings Updates from PubSub\n",
+        "\n",
+        "This section demonstrates how to build a real-time embedding pipeline 
that continuously processes product updates and maintains fresh embeddings in 
AlloyDB. This approach is ideal data that changes frequently.\n",
+        "\n",
+        "This example runs on Dataflow because streaming with DirectRunner and 
writing via JDBC is not supported.\n",
+        "\n",
+        "### Authenticate with Google Cloud\n",
+        "To use the PubSub, we authenticate with Google Cloud.\n"
+      ],
+      "metadata": {
+        "id": "yv4Rd1ZvsB_M"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Replace <PROJECT_ID> with a valid Google Cloud project ID.\n",
+        "PROJECT_ID = '' # @param {type:'string'}\n",
+        "\n",
+        "from google.colab import auth\n",
+        "auth.authenticate_user(project_id=PROJECT_ID)"
+      ],
+      "metadata": {
+        "id": "VCqJmaznt1nS"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "### Setting Up PubSub Resources\n",
+        "\n",
+        "First, let's set up the necessary PubSub topics and subscriptions:"
+      ],
+      "metadata": {
+        "id": "2FsoFaugtsln"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "from google.cloud import pubsub_v1\n",
+        "from google.api_core.exceptions import AlreadyExists\n",
+        "import json\n",
+        "\n",
+        "# Define topic and subscription names\n",
+        "TOPIC = \"product-updates\" # @param {type:'string'}\n",
+        "SUBSCRIPTION = \"product-updates-sub\" # @param {type:'string'}\n",
+        "\n",
+        "# Create publisher client and topic\n",
+        "publisher = pubsub_v1.PublisherClient()\n",
+        "topic_path = publisher.topic_path(PROJECT_ID, TOPIC)\n",
+        "try:\n",
+        "    topic = publisher.create_topic(request={\"name\": topic_path})\n",
+        "    print(f\"Created topic: {topic.name}\")\n",
+        "except AlreadyExists:\n",
+        "    print(f\"Topic {topic_path} already exists.\")\n",
+        "\n",
+        "# Create subscriber client and subscription\n",
+        "subscriber = pubsub_v1.SubscriberClient()\n",
+        "subscription_path = subscriber.subscription_path(PROJECT_ID, 
SUBSCRIPTION)\n",
+        "try:\n",
+        "    subscription = subscriber.create_subscription(\n",
+        "        request={\"name\": subscription_path, \"topic\": 
topic_path}\n",
+        "    )\n",
+        "    print(f\"Created subscription: {subscription.name}\")\n",
+        "except AlreadyExists:\n",
+        "    print(f\"Subscription {subscription_path} already exists.\")"
+      ],
+      "metadata": {
+        "id": "nqMe0Brlt7Bk"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "### Create AlloyDB Table for Streaming Updates\n",
+        "\n",
+        "Next, create a table to store the embedded data."
+      ],
+      "metadata": {
+        "id": "07ZFeGbMuFj_"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "table_name = \"streaming_product_embeddings\"\n",
+        "table_schema = \"\"\"\n",
+        "  id VARCHAR PRIMARY KEY,\n",
+        "  embedding VECTOR(384) NOT NULL,\n",
+        "  content text,\n",
+        "  metadata JSONB,\n",
+        "  last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP\n",
+        "\"\"\""
+      ],
+      "metadata": {
+        "id": "3Xc70uV_uJy5"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "setup_alloydb_table(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)\n",
+        "test_alloydb_connection(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, 
DB_PASSWORD)"
+      ],
+      "metadata": {
+        "id": "8HPhUfAuorBP"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "### Create Publisher Subprocess\n",
+        "The publisher simulates real-time product updates by:\n",
+        "- Publishing sample product data to the PubSub topic every 5 
seconds\n",
+        "- Modifying prices and descriptions to represent changes\n",
+        "- Adding timestamps to track update times\n",
+        "- Running for 25 minutes in the background while our pipeline 
processes the data"
+      ],
+      "metadata": {
+        "id": "r7nJdc09vs98"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "#@title Define PubSub publisher function\n",
+        "import threading\n",
+        "import time\n",
+        "import json\n",
+        "import logging\n",
+        "from google.cloud import pubsub_v1\n",
+        "import datetime\n",
+        "import os\n",
+        "import sys\n",
+        "log_file = os.path.join(os.getcwd(), \"publisher_log.txt\")\n",
+        "\n",
+        "print(f\"Log file will be created at: {log_file}\")\n",
+        "\n",
+        "def publisher_function(project_id, topic):\n",
+        "    \"\"\"Function that publishes sample product updates to a PubSub 
topic.\n",
+        "\n",
+        "    This function runs in a separate thread and continuously 
publishes\n",
+        "    messages to simulate real-time product updates.\n",
+        "    \"\"\"\n",
+        "    thread_id = threading.current_thread().ident\n",
+        "\n",
+        "    process_log_file = os.path.join(os.getcwd(), 
f\"publisher_{thread_id}.log\")\n",
+        "\n",
+        "    file_handler = logging.FileHandler(process_log_file)\n",
+        "    file_handler.setFormatter(logging.Formatter('%(asctime)s - 
ThreadID:%(thread)d - %(levelname)s - %(message)s'))\n",
+        "\n",
+        "    logger = logging.getLogger(f\"worker.{thread_id}\")\n",
+        "    logger.setLevel(logging.INFO)\n",
+        "    logger.addHandler(file_handler)\n",
+        "\n",
+        "    logger.info(f\"Publisher thread started with ID: 
{thread_id}\")\n",
+        "    file_handler.flush()\n",
+        "\n",
+        "    publisher = pubsub_v1.PublisherClient()\n",
+        "    topic_path = publisher.topic_path(project_id, topic)\n",
+        "\n",
+        "    logger.info(\"Starting to publish messages...\")\n",
+        "    file_handler.flush()\n",
+        "    for i in range(300):\n",
+        "        message_index = i % len(PRODUCTS_DATA)\n",
+        "        message = PRODUCTS_DATA[message_index].copy()\n",
+        "\n",
+        "        if message_index % 2 == 0:\n",
+        "            dynamic_factor = 1.05 + (0.1 * ((i % 20) / 20))\n",
+        "            message[\"price\"] = round(message[\"price\"] * 
dynamic_factor, 2)\n",
+        "            message[\"description\"] = f\"PRICE UPDATE (factor: 
{dynamic_factor:.3f}): \" + message[\"description\"]\n",
+        "\n",
+        "        message[\"published_at\"] = 
datetime.datetime.now().isoformat()\n",
+        "\n",
+        "        data = json.dumps(message).encode('utf-8')\n",
+        "        publish_future = publisher.publish(topic_path, data)\n",
+        "\n",
+        "        try:\n",
+        "            logger.info(f\"Publishing message {message}\")\n",
+        "            file_handler.flush()\n",
+        "            message_id = publish_future.result()\n",
+        "            logger.info(f\"Published message {i+1}: {message['id']} 
(Message ID: {message_id})\")\n",
+        "            file_handler.flush()\n",
+        "        except Exception as e:\n",
+        "            logger.error(f\"Error publishing message: {e}\")\n",
+        "            file_handler.flush()\n",
+        "\n",
+        "        time.sleep(5)\n",
+        "\n",
+        "    logger.info(\"Finished publishing all messages.\")\n",
+        "    file_handler.flush()"
+      ],
+      "metadata": {
+        "id": "C9Bf0Nb0vY7r",
+        "cellView": "form"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "#### Start publishing to PusSub in background"
+      ],
+      "metadata": {
+        "id": "jnUSynmjEmVr"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Launch publisher in a separate thread\n",
+        "print(\"Starting publisher thread...\")\n",
+        "publisher_thread = threading.Thread(\n",
+        "    target=publisher_function,\n",
+        "    args=(PROJECT_ID, TOPIC),\n",
+        "    daemon=True\n",
+        ")\n",
+        "publisher_thread.start()\n",
+        "print(f\"Publisher thread started with ID: 
{publisher_thread.ident}\")\n",
+        "print(f\"Publisher thread logging to file: 
publisher_{publisher_thread.ident}.log\")"
+      ],
+      "metadata": {
+        "id": "ZnBBTwZHw7Ex"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "### Define Streaming Pipeline\n",
+        "Our pipeline contains these key components:\n",
+        "\n",
+        "1. **Source**: Continuously reads messages from PubSub\n",
+        "2. **Windowing**: Groups messages into 10-second windows for batch 
processing\n",
+        "3. **Transformation**: Converts JSON messages to Chunk objects for 
embedding\n",
+        "4. **ML Processing**: Generates embeddings using HuggingFace 
models\n",
+        "5. **Sink**: Writes results to AlloyDB with conflict resolution"
+      ],
+      "metadata": {
+        "id": "EK3pela7CLFj"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "```python\n",
+        "import apache_beam as beam\n",
+        "from apache_beam.options.pipeline_options import PipelineOptions, 
StandardOptions\n",
+        "from apache_beam.transforms.window import FixedWindows\n",
+        "from apache_beam.ml.transforms.base import MLTransform\n",
+        "from apache_beam.ml.rag.types import Chunk, Content\n",
+        "from apache_beam.ml.rag.ingestion.base import 
VectorDatabaseWriteTransform\n",
+        "from apache_beam.ml.rag.ingestion.alloydb import 
AlloyDBVectorWriterConfig, AlloyDBConnectionConfig, ConflictResolution\n",
+        "from apache_beam.ml.rag.embeddings.huggingface import 
HuggingfaceTextEmbeddings\n",
+        "import json\n",
+        "import tempfile\n",
+        "\n",
+        "# Parse incoming PubSub messages\n",
+        "def parse_message(message):\n",
+        "    product_json = json.loads(message.decode('utf-8'))\n",
+        "    return Chunk(\n",
+        "        content=Content(text=f\"{product_json['name']}: 
{product_json['description']}\"),\n",
+        "        id=product_json['id'],\n",
+        "        metadata=product_json\n",
+        "    )\n",
+        "\n",
+        "# Set up pipeline options\n",
+        "options = PipelineOptions()\n",
+        "options.view_as(StandardOptions).streaming = True  # This makes it a 
streaming pipeline\n",
+        "\n",
+        "# Create and run the pipeline\n",
+        "with beam.Pipeline(options=options) as p:\n",
+        "    _ = (\n",
+        "        p\n",
+        "        # SOURCE: Read continuously from PubSub\n",
+        "        | \"Read from PubSub\" >> beam.io.ReadFromPubSub(\n",
+        "            
subscription=\"projects/my-project/subscriptions/my-subscription\"\n",
+        "        )\n",
+        "        \n",
+        "        # WINDOWING: Group messages into 10-second windows\n",
+        "        | \"Window\" >> beam.WindowInto(FixedWindows(10))\n",
+        "        \n",
+        "        # TRANSFORM: Parse JSON messages into Chunks\n",
+        "        | \"Parse Messages\" >> beam.Map(parse_message)\n",
+        "        \n",
+        "        # ML PROCESSING: Generate embeddings with Huggingface\n",
+        "        | \"Generate Embeddings\" >> 
MLTransform(write_artifact_location=tempfile.mkdtemp())\n",
+        "            .with_transform(HuggingfaceTextEmbeddings(\n",
+        "                
model_name=\"sentence-transformers/all-MiniLM-L6-v2\"\n",
+        "            ))\n",
+        "        \n",
+        "        # SINK: Write to AlloyDB vector database\n",
+        "        | \"Write to AlloyDB\" >> VectorDatabaseWriteTransform(\n",
+        "            AlloyDBVectorWriterConfig(\n",
+        "                connection_config=AlloyDBConnectionConfig(\n",
+        "                    
jdbc_url=\"jdbc:postgresql://host:port/database\",\n",
+        "                    username=\"user\",\n",
+        "                    password=\"password\"\n",
+        "                ),\n",
+        "                table_name=\"product_embeddings\",\n",
+        "                conflict_resolution=ConflictResolution(\n",
+        "                    on_conflict_fields=\"id\",\n",
+        "                    action=\"UPDATE\",\n",
+        "                    update_fields=[\"embedding\", \"content\", 
\"metadata\"]\n",
+        "                )\n",
+        "            )\n",
+        "        )\n",
+        "    )\n",
+        "```"
+      ],
+      "metadata": {
+        "id": "hWEpljFpkvp3"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "### Save our Pipeline to a python file\n",
+        "\n",
+        "To launch our pipeline job on DataFlow we save our pipeline code to a 
local file `streaming_ingestion_pipeline.py`.\n",
+        "\n",
+        "Expand the hidden cell to see the entire pipeline content."
+      ],
+      "metadata": {
+        "id": "0PBGyr6mnSLs"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "#@title Save pipeline to streaming_ingestion_pipeline.py\n",
+        "file_content = \"\"\"\n",

Review Comment:
   Rather than saving this to a file and running it that way, can we just run 
it directly through colab like 
https://github.com/apache/beam/blob/8c588f2d5c53f637eb745478beead92e4c5e19ce/examples/notebooks/beam-ml/automatic_model_refresh.ipynb
 ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to