damccorm commented on code in PR #34251: URL: https://github.com/apache/beam/pull/34251#discussion_r1992143758
########## examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb: ########## @@ -2148,6 +2160,599 @@ "print(\"\\nAfter embedding generation:\")\n", "verify_embeddings(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, DB_PASSWORD)" ] + }, + { + "cell_type": "markdown", + "source": [ + "## Streaming Embeddings Updates from PubSub\n", + "\n", + "This section demonstrates how to build a real-time embedding pipeline that continuously processes product updates and maintains fresh embeddings in AlloyDB. This approach is ideal data that changes frequently.\n", + "\n", + "This example runs on Dataflow because streaming with DirectRunner and writing via JDBC is not supported.\n", + "\n", + "### Authenticate with Google Cloud\n", + "To use the PubSub, we authenticate with Google Cloud.\n" + ], + "metadata": { + "id": "yv4Rd1ZvsB_M" + } + }, + { + "cell_type": "code", + "source": [ + "# Replace <PROJECT_ID> with a valid Google Cloud project ID.\n", + "PROJECT_ID = '' # @param {type:'string'}\n", + "\n", + "from google.colab import auth\n", + "auth.authenticate_user(project_id=PROJECT_ID)" + ], + "metadata": { + "id": "VCqJmaznt1nS" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "### Setting Up PubSub Resources\n", + "\n", + "First, let's set up the necessary PubSub topics and subscriptions:" + ], + "metadata": { + "id": "2FsoFaugtsln" + } + }, + { + "cell_type": "code", + "source": [ + "from google.cloud import pubsub_v1\n", + "from google.api_core.exceptions import AlreadyExists\n", + "import json\n", + "\n", + "# Define topic and subscription names\n", + "TOPIC = \"product-updates\" # @param {type:'string'}\n", + "SUBSCRIPTION = \"product-updates-sub\" # @param {type:'string'}\n", + "\n", + "# Create publisher client and topic\n", + "publisher = pubsub_v1.PublisherClient()\n", + "topic_path = publisher.topic_path(PROJECT_ID, TOPIC)\n", + "try:\n", + " topic = publisher.create_topic(request={\"name\": topic_path})\n", + " print(f\"Created topic: {topic.name}\")\n", + "except AlreadyExists:\n", + " print(f\"Topic {topic_path} already exists.\")\n", + "\n", + "# Create subscriber client and subscription\n", + "subscriber = pubsub_v1.SubscriberClient()\n", + "subscription_path = subscriber.subscription_path(PROJECT_ID, SUBSCRIPTION)\n", + "try:\n", + " subscription = subscriber.create_subscription(\n", + " request={\"name\": subscription_path, \"topic\": topic_path}\n", + " )\n", + " print(f\"Created subscription: {subscription.name}\")\n", + "except AlreadyExists:\n", + " print(f\"Subscription {subscription_path} already exists.\")" + ], + "metadata": { + "id": "nqMe0Brlt7Bk" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "### Create AlloyDB Table for Streaming Updates\n", + "\n", + "Next, create a table to store the embedded data." + ], + "metadata": { + "id": "07ZFeGbMuFj_" + } + }, + { + "cell_type": "code", + "source": [ + "table_name = \"streaming_product_embeddings\"\n", + "table_schema = \"\"\"\n", + " id VARCHAR PRIMARY KEY,\n", + " embedding VECTOR(384) NOT NULL,\n", + " content text,\n", + " metadata JSONB,\n", + " last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP\n", + "\"\"\"" + ], + "metadata": { + "id": "3Xc70uV_uJy5" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "setup_alloydb_table(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, DB_PASSWORD)\n", + "test_alloydb_connection(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, DB_PASSWORD)" + ], + "metadata": { + "id": "8HPhUfAuorBP" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "### Create Publisher Subprocess\n", + "The publisher simulates real-time product updates by:\n", + "- Publishing sample product data to the PubSub topic every 5 seconds\n", + "- Modifying prices and descriptions to represent changes\n", + "- Adding timestamps to track update times\n", + "- Running for 25 minutes in the background while our pipeline processes the data" + ], + "metadata": { + "id": "r7nJdc09vs98" + } + }, + { + "cell_type": "code", + "source": [ + "#@title Define PubSub publisher function\n", + "import threading\n", + "import time\n", + "import json\n", + "import logging\n", + "from google.cloud import pubsub_v1\n", + "import datetime\n", + "import os\n", + "import sys\n", + "log_file = os.path.join(os.getcwd(), \"publisher_log.txt\")\n", + "\n", + "print(f\"Log file will be created at: {log_file}\")\n", + "\n", + "def publisher_function(project_id, topic):\n", + " \"\"\"Function that publishes sample product updates to a PubSub topic.\n", + "\n", + " This function runs in a separate thread and continuously publishes\n", + " messages to simulate real-time product updates.\n", + " \"\"\"\n", + " thread_id = threading.current_thread().ident\n", + "\n", + " process_log_file = os.path.join(os.getcwd(), f\"publisher_{thread_id}.log\")\n", + "\n", + " file_handler = logging.FileHandler(process_log_file)\n", + " file_handler.setFormatter(logging.Formatter('%(asctime)s - ThreadID:%(thread)d - %(levelname)s - %(message)s'))\n", + "\n", + " logger = logging.getLogger(f\"worker.{thread_id}\")\n", + " logger.setLevel(logging.INFO)\n", + " logger.addHandler(file_handler)\n", + "\n", + " logger.info(f\"Publisher thread started with ID: {thread_id}\")\n", + " file_handler.flush()\n", + "\n", + " publisher = pubsub_v1.PublisherClient()\n", + " topic_path = publisher.topic_path(project_id, topic)\n", + "\n", + " logger.info(\"Starting to publish messages...\")\n", + " file_handler.flush()\n", + " for i in range(300):\n", + " message_index = i % len(PRODUCTS_DATA)\n", + " message = PRODUCTS_DATA[message_index].copy()\n", + "\n", + " if message_index % 2 == 0:\n", + " dynamic_factor = 1.05 + (0.1 * ((i % 20) / 20))\n", + " message[\"price\"] = round(message[\"price\"] * dynamic_factor, 2)\n", + " message[\"description\"] = f\"PRICE UPDATE (factor: {dynamic_factor:.3f}): \" + message[\"description\"]\n", + "\n", + " message[\"published_at\"] = datetime.datetime.now().isoformat()\n", + "\n", + " data = json.dumps(message).encode('utf-8')\n", + " publish_future = publisher.publish(topic_path, data)\n", + "\n", + " try:\n", + " logger.info(f\"Publishing message {message}\")\n", + " file_handler.flush()\n", + " message_id = publish_future.result()\n", + " logger.info(f\"Published message {i+1}: {message['id']} (Message ID: {message_id})\")\n", + " file_handler.flush()\n", + " except Exception as e:\n", + " logger.error(f\"Error publishing message: {e}\")\n", + " file_handler.flush()\n", + "\n", + " time.sleep(5)\n", + "\n", + " logger.info(\"Finished publishing all messages.\")\n", + " file_handler.flush()" + ], + "metadata": { + "id": "C9Bf0Nb0vY7r", + "cellView": "form" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "#### Start publishing to PusSub in background" Review Comment: ```suggestion "#### Start publishing to PubSub in background" ``` ########## examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb: ########## @@ -2148,6 +2160,599 @@ "print(\"\\nAfter embedding generation:\")\n", "verify_embeddings(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, DB_PASSWORD)" ] + }, + { + "cell_type": "markdown", + "source": [ + "## Streaming Embeddings Updates from PubSub\n", + "\n", + "This section demonstrates how to build a real-time embedding pipeline that continuously processes product updates and maintains fresh embeddings in AlloyDB. This approach is ideal data that changes frequently.\n", + "\n", + "This example runs on Dataflow because streaming with DirectRunner and writing via JDBC is not supported.\n", + "\n", + "### Authenticate with Google Cloud\n", + "To use the PubSub, we authenticate with Google Cloud.\n" + ], + "metadata": { + "id": "yv4Rd1ZvsB_M" + } + }, + { + "cell_type": "code", + "source": [ + "# Replace <PROJECT_ID> with a valid Google Cloud project ID.\n", + "PROJECT_ID = '' # @param {type:'string'}\n", + "\n", + "from google.colab import auth\n", + "auth.authenticate_user(project_id=PROJECT_ID)" + ], + "metadata": { + "id": "VCqJmaznt1nS" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "### Setting Up PubSub Resources\n", + "\n", + "First, let's set up the necessary PubSub topics and subscriptions:" + ], + "metadata": { + "id": "2FsoFaugtsln" + } + }, + { + "cell_type": "code", + "source": [ + "from google.cloud import pubsub_v1\n", + "from google.api_core.exceptions import AlreadyExists\n", + "import json\n", + "\n", + "# Define topic and subscription names\n", + "TOPIC = \"product-updates\" # @param {type:'string'}\n", + "SUBSCRIPTION = \"product-updates-sub\" # @param {type:'string'}\n", + "\n", + "# Create publisher client and topic\n", + "publisher = pubsub_v1.PublisherClient()\n", + "topic_path = publisher.topic_path(PROJECT_ID, TOPIC)\n", + "try:\n", + " topic = publisher.create_topic(request={\"name\": topic_path})\n", + " print(f\"Created topic: {topic.name}\")\n", + "except AlreadyExists:\n", + " print(f\"Topic {topic_path} already exists.\")\n", + "\n", + "# Create subscriber client and subscription\n", + "subscriber = pubsub_v1.SubscriberClient()\n", + "subscription_path = subscriber.subscription_path(PROJECT_ID, SUBSCRIPTION)\n", + "try:\n", + " subscription = subscriber.create_subscription(\n", + " request={\"name\": subscription_path, \"topic\": topic_path}\n", + " )\n", + " print(f\"Created subscription: {subscription.name}\")\n", + "except AlreadyExists:\n", + " print(f\"Subscription {subscription_path} already exists.\")" + ], + "metadata": { + "id": "nqMe0Brlt7Bk" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "### Create AlloyDB Table for Streaming Updates\n", + "\n", + "Next, create a table to store the embedded data." + ], + "metadata": { + "id": "07ZFeGbMuFj_" + } + }, + { + "cell_type": "code", + "source": [ + "table_name = \"streaming_product_embeddings\"\n", + "table_schema = \"\"\"\n", + " id VARCHAR PRIMARY KEY,\n", + " embedding VECTOR(384) NOT NULL,\n", + " content text,\n", + " metadata JSONB,\n", + " last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP\n", + "\"\"\"" + ], + "metadata": { + "id": "3Xc70uV_uJy5" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "setup_alloydb_table(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, DB_PASSWORD)\n", + "test_alloydb_connection(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, DB_PASSWORD)" + ], + "metadata": { + "id": "8HPhUfAuorBP" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "### Create Publisher Subprocess\n", + "The publisher simulates real-time product updates by:\n", + "- Publishing sample product data to the PubSub topic every 5 seconds\n", + "- Modifying prices and descriptions to represent changes\n", + "- Adding timestamps to track update times\n", + "- Running for 25 minutes in the background while our pipeline processes the data" + ], + "metadata": { + "id": "r7nJdc09vs98" + } + }, + { + "cell_type": "code", + "source": [ + "#@title Define PubSub publisher function\n", + "import threading\n", + "import time\n", + "import json\n", + "import logging\n", + "from google.cloud import pubsub_v1\n", + "import datetime\n", + "import os\n", + "import sys\n", + "log_file = os.path.join(os.getcwd(), \"publisher_log.txt\")\n", + "\n", + "print(f\"Log file will be created at: {log_file}\")\n", + "\n", + "def publisher_function(project_id, topic):\n", + " \"\"\"Function that publishes sample product updates to a PubSub topic.\n", + "\n", + " This function runs in a separate thread and continuously publishes\n", + " messages to simulate real-time product updates.\n", + " \"\"\"\n", + " thread_id = threading.current_thread().ident\n", + "\n", + " process_log_file = os.path.join(os.getcwd(), f\"publisher_{thread_id}.log\")\n", + "\n", + " file_handler = logging.FileHandler(process_log_file)\n", + " file_handler.setFormatter(logging.Formatter('%(asctime)s - ThreadID:%(thread)d - %(levelname)s - %(message)s'))\n", + "\n", + " logger = logging.getLogger(f\"worker.{thread_id}\")\n", + " logger.setLevel(logging.INFO)\n", + " logger.addHandler(file_handler)\n", + "\n", + " logger.info(f\"Publisher thread started with ID: {thread_id}\")\n", + " file_handler.flush()\n", + "\n", + " publisher = pubsub_v1.PublisherClient()\n", + " topic_path = publisher.topic_path(project_id, topic)\n", + "\n", + " logger.info(\"Starting to publish messages...\")\n", + " file_handler.flush()\n", + " for i in range(300):\n", + " message_index = i % len(PRODUCTS_DATA)\n", + " message = PRODUCTS_DATA[message_index].copy()\n", + "\n", + " if message_index % 2 == 0:\n", + " dynamic_factor = 1.05 + (0.1 * ((i % 20) / 20))\n", + " message[\"price\"] = round(message[\"price\"] * dynamic_factor, 2)\n", + " message[\"description\"] = f\"PRICE UPDATE (factor: {dynamic_factor:.3f}): \" + message[\"description\"]\n", + "\n", + " message[\"published_at\"] = datetime.datetime.now().isoformat()\n", + "\n", + " data = json.dumps(message).encode('utf-8')\n", + " publish_future = publisher.publish(topic_path, data)\n", + "\n", + " try:\n", + " logger.info(f\"Publishing message {message}\")\n", + " file_handler.flush()\n", + " message_id = publish_future.result()\n", + " logger.info(f\"Published message {i+1}: {message['id']} (Message ID: {message_id})\")\n", + " file_handler.flush()\n", + " except Exception as e:\n", + " logger.error(f\"Error publishing message: {e}\")\n", + " file_handler.flush()\n", + "\n", + " time.sleep(5)\n", + "\n", + " logger.info(\"Finished publishing all messages.\")\n", + " file_handler.flush()" + ], + "metadata": { + "id": "C9Bf0Nb0vY7r", + "cellView": "form" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "#### Start publishing to PusSub in background" + ], + "metadata": { + "id": "jnUSynmjEmVr" + } + }, + { + "cell_type": "code", + "source": [ + "# Launch publisher in a separate thread\n", + "print(\"Starting publisher thread...\")\n", + "publisher_thread = threading.Thread(\n", + " target=publisher_function,\n", + " args=(PROJECT_ID, TOPIC),\n", + " daemon=True\n", + ")\n", + "publisher_thread.start()\n", + "print(f\"Publisher thread started with ID: {publisher_thread.ident}\")\n", + "print(f\"Publisher thread logging to file: publisher_{publisher_thread.ident}.log\")" + ], + "metadata": { + "id": "ZnBBTwZHw7Ex" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "### Define Streaming Pipeline\n", + "Our pipeline contains these key components:\n", + "\n", + "1. **Source**: Continuously reads messages from PubSub\n", + "2. **Windowing**: Groups messages into 10-second windows for batch processing\n", + "3. **Transformation**: Converts JSON messages to Chunk objects for embedding\n", + "4. **ML Processing**: Generates embeddings using HuggingFace models\n", + "5. **Sink**: Writes results to AlloyDB with conflict resolution" + ], + "metadata": { + "id": "EK3pela7CLFj" + } + }, + { + "cell_type": "markdown", Review Comment: Given that the pipeline is defined one cell down, I don't think we get much out of including it here. It also renders a little oddly in GitHub and probably in GCP docs - https://github.com/apache/beam/blob/8c588f2d5c53f637eb745478beead92e4c5e19ce/examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb ########## examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb: ########## @@ -2148,6 +2160,599 @@ "print(\"\\nAfter embedding generation:\")\n", "verify_embeddings(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, DB_PASSWORD)" ] + }, + { + "cell_type": "markdown", + "source": [ + "## Streaming Embeddings Updates from PubSub\n", + "\n", + "This section demonstrates how to build a real-time embedding pipeline that continuously processes product updates and maintains fresh embeddings in AlloyDB. This approach is ideal data that changes frequently.\n", + "\n", + "This example runs on Dataflow because streaming with DirectRunner and writing via JDBC is not supported.\n", + "\n", + "### Authenticate with Google Cloud\n", + "To use the PubSub, we authenticate with Google Cloud.\n" + ], + "metadata": { + "id": "yv4Rd1ZvsB_M" + } + }, + { + "cell_type": "code", + "source": [ + "# Replace <PROJECT_ID> with a valid Google Cloud project ID.\n", + "PROJECT_ID = '' # @param {type:'string'}\n", + "\n", + "from google.colab import auth\n", + "auth.authenticate_user(project_id=PROJECT_ID)" + ], + "metadata": { + "id": "VCqJmaznt1nS" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "### Setting Up PubSub Resources\n", + "\n", + "First, let's set up the necessary PubSub topics and subscriptions:" + ], + "metadata": { + "id": "2FsoFaugtsln" + } + }, + { + "cell_type": "code", + "source": [ + "from google.cloud import pubsub_v1\n", + "from google.api_core.exceptions import AlreadyExists\n", + "import json\n", + "\n", + "# Define topic and subscription names\n", + "TOPIC = \"product-updates\" # @param {type:'string'}\n", + "SUBSCRIPTION = \"product-updates-sub\" # @param {type:'string'}\n", + "\n", + "# Create publisher client and topic\n", + "publisher = pubsub_v1.PublisherClient()\n", + "topic_path = publisher.topic_path(PROJECT_ID, TOPIC)\n", + "try:\n", + " topic = publisher.create_topic(request={\"name\": topic_path})\n", + " print(f\"Created topic: {topic.name}\")\n", + "except AlreadyExists:\n", + " print(f\"Topic {topic_path} already exists.\")\n", + "\n", + "# Create subscriber client and subscription\n", + "subscriber = pubsub_v1.SubscriberClient()\n", + "subscription_path = subscriber.subscription_path(PROJECT_ID, SUBSCRIPTION)\n", + "try:\n", + " subscription = subscriber.create_subscription(\n", + " request={\"name\": subscription_path, \"topic\": topic_path}\n", + " )\n", + " print(f\"Created subscription: {subscription.name}\")\n", + "except AlreadyExists:\n", + " print(f\"Subscription {subscription_path} already exists.\")" + ], + "metadata": { + "id": "nqMe0Brlt7Bk" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "### Create AlloyDB Table for Streaming Updates\n", + "\n", + "Next, create a table to store the embedded data." + ], + "metadata": { + "id": "07ZFeGbMuFj_" + } + }, + { + "cell_type": "code", + "source": [ + "table_name = \"streaming_product_embeddings\"\n", + "table_schema = \"\"\"\n", + " id VARCHAR PRIMARY KEY,\n", + " embedding VECTOR(384) NOT NULL,\n", + " content text,\n", + " metadata JSONB,\n", + " last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP\n", + "\"\"\"" + ], + "metadata": { + "id": "3Xc70uV_uJy5" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "setup_alloydb_table(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, DB_PASSWORD)\n", + "test_alloydb_connection(ALLOYDB_HOST, DB_NAME, table_name, DB_USER, DB_PASSWORD)" + ], + "metadata": { + "id": "8HPhUfAuorBP" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "### Create Publisher Subprocess\n", + "The publisher simulates real-time product updates by:\n", + "- Publishing sample product data to the PubSub topic every 5 seconds\n", + "- Modifying prices and descriptions to represent changes\n", + "- Adding timestamps to track update times\n", + "- Running for 25 minutes in the background while our pipeline processes the data" + ], + "metadata": { + "id": "r7nJdc09vs98" + } + }, + { + "cell_type": "code", + "source": [ + "#@title Define PubSub publisher function\n", + "import threading\n", + "import time\n", + "import json\n", + "import logging\n", + "from google.cloud import pubsub_v1\n", + "import datetime\n", + "import os\n", + "import sys\n", + "log_file = os.path.join(os.getcwd(), \"publisher_log.txt\")\n", + "\n", + "print(f\"Log file will be created at: {log_file}\")\n", + "\n", + "def publisher_function(project_id, topic):\n", + " \"\"\"Function that publishes sample product updates to a PubSub topic.\n", + "\n", + " This function runs in a separate thread and continuously publishes\n", + " messages to simulate real-time product updates.\n", + " \"\"\"\n", + " thread_id = threading.current_thread().ident\n", + "\n", + " process_log_file = os.path.join(os.getcwd(), f\"publisher_{thread_id}.log\")\n", + "\n", + " file_handler = logging.FileHandler(process_log_file)\n", + " file_handler.setFormatter(logging.Formatter('%(asctime)s - ThreadID:%(thread)d - %(levelname)s - %(message)s'))\n", + "\n", + " logger = logging.getLogger(f\"worker.{thread_id}\")\n", + " logger.setLevel(logging.INFO)\n", + " logger.addHandler(file_handler)\n", + "\n", + " logger.info(f\"Publisher thread started with ID: {thread_id}\")\n", + " file_handler.flush()\n", + "\n", + " publisher = pubsub_v1.PublisherClient()\n", + " topic_path = publisher.topic_path(project_id, topic)\n", + "\n", + " logger.info(\"Starting to publish messages...\")\n", + " file_handler.flush()\n", + " for i in range(300):\n", + " message_index = i % len(PRODUCTS_DATA)\n", + " message = PRODUCTS_DATA[message_index].copy()\n", + "\n", + " if message_index % 2 == 0:\n", + " dynamic_factor = 1.05 + (0.1 * ((i % 20) / 20))\n", + " message[\"price\"] = round(message[\"price\"] * dynamic_factor, 2)\n", + " message[\"description\"] = f\"PRICE UPDATE (factor: {dynamic_factor:.3f}): \" + message[\"description\"]\n", + "\n", + " message[\"published_at\"] = datetime.datetime.now().isoformat()\n", + "\n", + " data = json.dumps(message).encode('utf-8')\n", + " publish_future = publisher.publish(topic_path, data)\n", + "\n", + " try:\n", + " logger.info(f\"Publishing message {message}\")\n", + " file_handler.flush()\n", + " message_id = publish_future.result()\n", + " logger.info(f\"Published message {i+1}: {message['id']} (Message ID: {message_id})\")\n", + " file_handler.flush()\n", + " except Exception as e:\n", + " logger.error(f\"Error publishing message: {e}\")\n", + " file_handler.flush()\n", + "\n", + " time.sleep(5)\n", + "\n", + " logger.info(\"Finished publishing all messages.\")\n", + " file_handler.flush()" + ], + "metadata": { + "id": "C9Bf0Nb0vY7r", + "cellView": "form" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "#### Start publishing to PusSub in background" + ], + "metadata": { + "id": "jnUSynmjEmVr" + } + }, + { + "cell_type": "code", + "source": [ + "# Launch publisher in a separate thread\n", + "print(\"Starting publisher thread...\")\n", + "publisher_thread = threading.Thread(\n", + " target=publisher_function,\n", + " args=(PROJECT_ID, TOPIC),\n", + " daemon=True\n", + ")\n", + "publisher_thread.start()\n", + "print(f\"Publisher thread started with ID: {publisher_thread.ident}\")\n", + "print(f\"Publisher thread logging to file: publisher_{publisher_thread.ident}.log\")" + ], + "metadata": { + "id": "ZnBBTwZHw7Ex" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "### Define Streaming Pipeline\n", + "Our pipeline contains these key components:\n", + "\n", + "1. **Source**: Continuously reads messages from PubSub\n", + "2. **Windowing**: Groups messages into 10-second windows for batch processing\n", + "3. **Transformation**: Converts JSON messages to Chunk objects for embedding\n", + "4. **ML Processing**: Generates embeddings using HuggingFace models\n", + "5. **Sink**: Writes results to AlloyDB with conflict resolution" + ], + "metadata": { + "id": "EK3pela7CLFj" + } + }, + { + "cell_type": "markdown", + "source": [ + "```python\n", + "import apache_beam as beam\n", + "from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions\n", + "from apache_beam.transforms.window import FixedWindows\n", + "from apache_beam.ml.transforms.base import MLTransform\n", + "from apache_beam.ml.rag.types import Chunk, Content\n", + "from apache_beam.ml.rag.ingestion.base import VectorDatabaseWriteTransform\n", + "from apache_beam.ml.rag.ingestion.alloydb import AlloyDBVectorWriterConfig, AlloyDBConnectionConfig, ConflictResolution\n", + "from apache_beam.ml.rag.embeddings.huggingface import HuggingfaceTextEmbeddings\n", + "import json\n", + "import tempfile\n", + "\n", + "# Parse incoming PubSub messages\n", + "def parse_message(message):\n", + " product_json = json.loads(message.decode('utf-8'))\n", + " return Chunk(\n", + " content=Content(text=f\"{product_json['name']}: {product_json['description']}\"),\n", + " id=product_json['id'],\n", + " metadata=product_json\n", + " )\n", + "\n", + "# Set up pipeline options\n", + "options = PipelineOptions()\n", + "options.view_as(StandardOptions).streaming = True # This makes it a streaming pipeline\n", + "\n", + "# Create and run the pipeline\n", + "with beam.Pipeline(options=options) as p:\n", + " _ = (\n", + " p\n", + " # SOURCE: Read continuously from PubSub\n", + " | \"Read from PubSub\" >> beam.io.ReadFromPubSub(\n", + " subscription=\"projects/my-project/subscriptions/my-subscription\"\n", + " )\n", + " \n", + " # WINDOWING: Group messages into 10-second windows\n", + " | \"Window\" >> beam.WindowInto(FixedWindows(10))\n", + " \n", + " # TRANSFORM: Parse JSON messages into Chunks\n", + " | \"Parse Messages\" >> beam.Map(parse_message)\n", + " \n", + " # ML PROCESSING: Generate embeddings with Huggingface\n", + " | \"Generate Embeddings\" >> MLTransform(write_artifact_location=tempfile.mkdtemp())\n", + " .with_transform(HuggingfaceTextEmbeddings(\n", + " model_name=\"sentence-transformers/all-MiniLM-L6-v2\"\n", + " ))\n", + " \n", + " # SINK: Write to AlloyDB vector database\n", + " | \"Write to AlloyDB\" >> VectorDatabaseWriteTransform(\n", + " AlloyDBVectorWriterConfig(\n", + " connection_config=AlloyDBConnectionConfig(\n", + " jdbc_url=\"jdbc:postgresql://host:port/database\",\n", + " username=\"user\",\n", + " password=\"password\"\n", + " ),\n", + " table_name=\"product_embeddings\",\n", + " conflict_resolution=ConflictResolution(\n", + " on_conflict_fields=\"id\",\n", + " action=\"UPDATE\",\n", + " update_fields=[\"embedding\", \"content\", \"metadata\"]\n", + " )\n", + " )\n", + " )\n", + " )\n", + "```" + ], + "metadata": { + "id": "hWEpljFpkvp3" + } + }, + { + "cell_type": "markdown", + "source": [ + "### Save our Pipeline to a python file\n", + "\n", + "To launch our pipeline job on DataFlow we save our pipeline code to a local file `streaming_ingestion_pipeline.py`.\n", + "\n", + "Expand the hidden cell to see the entire pipeline content." + ], + "metadata": { + "id": "0PBGyr6mnSLs" + } + }, + { + "cell_type": "code", + "source": [ + "#@title Save pipeline to streaming_ingestion_pipeline.py\n", + "file_content = \"\"\"\n", Review Comment: Rather than saving this to a file and running it that way, can we just run it directly through colab like https://github.com/apache/beam/blob/8c588f2d5c53f637eb745478beead92e4c5e19ce/examples/notebooks/beam-ml/automatic_model_refresh.ipynb ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org