mohamedawnallah commented on code in PR #35467:
URL: https://github.com/apache/beam/pull/35467#discussion_r2443411962


##########
examples/notebooks/beam-ml/milvus_enrichment_transform.ipynb:
##########
@@ -0,0 +1,2373 @@
+{
+ "cells": [
+  {
+   "cell_type": "code",
+   "execution_count": 70,
+   "id": "47053bac",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "# @title ###### Licensed to the Apache Software Foundation (ASF), Version 
2.0 (the \"License\")\n",
+    "\n",
+    "# Licensed to the Apache Software Foundation (ASF) under one\n",
+    "# or more contributor license agreements. See the NOTICE file\n",
+    "# distributed with this work for additional information\n",
+    "# regarding copyright ownership. The ASF licenses this file\n",
+    "# to you under the Apache License, Version 2.0 (the\n",
+    "# \"License\"); you may not use this file except in compliance\n",
+    "# with the License. You may obtain a copy of the License at\n",
+    "#\n",
+    "#   http://www.apache.org/licenses/LICENSE-2.0\n";,
+    "#\n",
+    "# Unless required by applicable law or agreed to in writing,\n",
+    "# software distributed under the License is distributed on an\n",
+    "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+    "# KIND, either express or implied. See the License for the\n",
+    "# specific language governing permissions and limitations\n",
+    "# under the License"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "aa881240-2f38-4335-9d4d-444776d77c92",
+   "metadata": {},
+   "source": [
+    "# Use Apache Beam and Milvus to enrich data\n",
+    "\n",
+    "<table align=\"left\">\n",
+    "  <td>\n",
+    "    <a target=\"_blank\" 
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/milvus_enrichment_transform.ipynb\";><img
 
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\";
 />Run in Google Colab</a>\n",
+    "  </td>\n",
+    "  <td>\n",
+    "    <a target=\"_blank\" 
href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/milvus_enrichment_transform.ipynb\";><img
 
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\";
 />View source on GitHub</a>\n",
+    "  </td>\n",
+    "</table>"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "0611da21-d031-4b16-8301-9b76bda731e7",
+   "metadata": {},
+   "source": [
+    "This notebook shows how to enrich data by using the Apache Beam 
[enrichment 
transform](https://beam.apache.org/documentation/transforms/python/elementwise/enrichment/)
 with [Milvus](https://milvus.io/). The enrichment transform is an Apache Beam 
turnkey transform that lets you enrich data by using a key-value lookup. This 
transform has the following features:\n",
+    "\n",
+    "- The transform has a built-in Apache Beam handler that interacts with 
Milvus data during enrichment.\n",
+    "- The enrichment transform uses client-side throttling to rate limit the 
requests. The default retry strategy uses exponential backoff. You can 
configure rate limiting to suit your use case.\n",
+    "\n",
+    "This notebook demonstrates the following search engine optimization use 
case:\n",
+    "\n",
+    "A specialized technical search engine company wants to improve its query 
result relevance by dynamically enriching search results with semantically 
related content. The example uses a vector database of technical articles and 
documentation stored in Milvus to enrich incoming user queries. The enriched 
data is then used to provide users with more comprehensive and contextually 
relevant search results, especially for complex technical topics.\n",
+    "\n",
+    "## Before you begin\n",
+    "Set up your environment and download dependencies.\n",
+    "\n",
+    "### Install Apache Beam\n",
+    "To use the enrichment transform with the built-in Milvus handler, install 
the Apache Beam SDK version 2.67.0 or later."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 71,
+   "id": "e550cd55-e91e-4d43-b1bd-b0e89bb8cbd9",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "# Disable tokenizers parallelism to prevent deadlocks when forking 
processes\n",
+    "# This avoids the \"huggingface/tokenizers: The current process just got 
forked\" warning.\n",
+    "import os\n",
+    "os.environ[\"TOKENIZERS_PARALLELISM\"] = \"false\""
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 72,
+   "id": "31747c45-107a-49be-8885-5a6cc9dc1236",
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "\n",
+      
"\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m
 A new release of pip is available: 
\u001b[0m\u001b[31;49m23.0.1\u001b[0m\u001b[39;49m -> 
\u001b[0m\u001b[32;49m25.2\u001b[0m\n",
+      
"\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m
 To update, run: \u001b[0m\u001b[32;49mpip install --upgrade pip\u001b[0m\n",
+      "\n",
+      
"\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m
 A new release of pip is available: 
\u001b[0m\u001b[31;49m23.0.1\u001b[0m\u001b[39;49m -> 
\u001b[0m\u001b[32;49m25.2\u001b[0m\n",
+      
"\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m
 To update, run: \u001b[0m\u001b[32;49mpip install --upgrade pip\u001b[0m\n"
+     ]
+    }
+   ],
+   "source": [
+    "# The Apache Beam test dependencies are included here for the 
TestContainers\n",
+    "# Milvus standalone DB container that will be used later in the demo.\n",
+    "!pip install rich sentence_transformers llama_index --quiet\n",
+    "!pip install apache_beam[milvus,gcp,test,interactive]>=2.67.0 --quiet"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 73,
+   "id": "666e0c2b-0341-4b0e-8d73-561abc39bb10",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "# Standard library imports.\n",
+    "from collections import defaultdict\n",
+    "from math import ceil\n",
+    "from typing import List\n",
+    "\n",
+    "# Third-party imports.\n",
+    "import numpy as np\n",
+    "import pandas as pd\n",
+    "from pymilvus import DataType, CollectionSchema, FieldSchema, Function, 
FunctionType, MilvusClient, RRFRanker\n",
+    "from pymilvus.milvus_client import IndexParams\n",
+    "from rich import print_json\n",
+    "from sentence_transformers import SentenceTransformer\n",
+    "from torch import cuda\n",
+    "from llama_index.core.text_splitter import SentenceSplitter\n",
+    "\n",
+    "# Local application imports.\n",
+    "import apache_beam as beam\n",
+    "from apache_beam.ml.rag.types import Chunk, Content, Embedding\n",
+    "from apache_beam.transforms.enrichment import Enrichment\n",
+    "from apache_beam.ml.rag.enrichment.milvus_search_it_test import 
MilvusEnrichmentTestHelper\n",
+    "from apache_beam.ml.rag.enrichment.milvus_search import (\n",
+    "    HybridSearchParameters, \n",
+    "    KeywordSearchMetrics, \n",
+    "    KeywordSearchParameters,\n",
+    "    MilvusCollectionLoadParameters, \n",
+    "    MilvusConnectionParameters, \n",
+    "    MilvusSearchEnrichmentHandler,\n",
+    "    MilvusSearchParameters, \n",
+    "    SearchStrategy, \n",
+    "    VectorSearchMetrics, \n",
+    "    VectorSearchParameters)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "338808ff-3f80-48e5-9c76-b8d19f8769b7",
+   "metadata": {},
+   "source": [
+    "## Collect Data"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "d83ad549-5ee1-4a4c-ae5a-e638c3d0279f",
+   "metadata": {},
+   "source": [
+    "This content has been paraphrased from publicly available information on 
the internet using a large language model (OpenAI’s GPT-4) and is provided for 
informational purposes only."
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "d39a070a-206d-41f6-9033-fff0d5ea2128",
+   "metadata": {},
+   "source": [
+    "The third data point, related to Google Beam, was intentionally included 
to illustrate the importance of metadata filtering (filtered search) in 
Milvus—such as when a user searches for the term “Beam.” without it the vector 
database retrieval engine may confuse between Apache Beam and Google Beam."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 74,
+   "id": "38781cf5-e18f-40f5-827e-2d441ae7d2fa",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "corpus = [\n",
+    "  {\n",
+    "    \"id\": \"1\",\n",
+    "    \"title\": \"Apache Beam: Unified Model for Batch and Streaming 
Data\",\n",
+    "    \"keywords\": [\"Apache Beam\", \"stream processing\", \"batch 
processing\", \"data pipelines\", \"SDK\"],\n",
+    "    \"tags\": [\"Data Engineering\", \"Open Source\", \"Streaming\", 
\"Batch\", \"Big Data\"],\n",
+    "    \"content\": (\n",
+    "      \"Apache Beam is an open-source framework that provides a 
consistent programming model for both batch and streaming data processing. 
\"\n",
+    "      \"Developed originally by Google, it allows developers to write 
pipelines that can run on multiple engines, such as Apache Flink, Spark, and 
Google Cloud Dataflow. \"\n",
+    "      \"Beam uses abstractions like PCollections (data containers) and 
PTransforms (operations) to define the flow of data. \"\n",
+    "      \"The framework promotes portability through its runner 
architecture, letting the same pipeline execute on different backends. \"\n",
+    "      \"Support for multiple SDKs, including Java and Python, makes it 
accessible for a broad audience. \"\n",
+    "      \"Key features include support for event time, windowing, triggers, 
and stateful processing, which are essential for handling real-time data 
effectively. \"\n",
+    "      \"Beam is ideal for building ETL jobs, real-time analytics, and 
machine learning data pipelines. \"\n",
+    "      \"It helps teams focus on logic rather than infrastructure, 
offering flexibility and scalability in handling unbounded and bounded data 
sources. \"\n",
+    "      \"Apache Beam also supports a wide range of connectors for both 
input and output, including Kafka, BigQuery, and JDBC-based systems. \"\n",
+    "      \"This makes it easy to integrate Beam into existing data 
ecosystems. Developers can build reusable transforms and modularize pipeline 
logic, improving maintainability and testing. \"\n",
+    "      \"The concept of runners enables developers to write once and run 
anywhere, which is particularly appealing for organizations that want to avoid 
vendor lock-in. \"\n",
+    "      \"The Beam model is based on a unified programming model that 
decouples pipeline logic from execution. \"\n",
+    "      \"This makes it easier to reason about time and state in both batch 
and streaming pipelines. \"\n",
+    "      \"Advanced features like late data handling, watermarks, and 
session windowing allow for more accurate and meaningful processing of 
real-world data. \"\n",
+    "      \"Beam also integrates with orchestration tools and monitoring 
systems, allowing for production-grade deployments. \"\n",
+    "      \"Community support and contributions have grown significantly, 
making Beam a stable and evolving ecosystem. \"\n",
+    "      \"Many cloud providers offer native support for Beam pipelines, and 
it's increasingly a core component in modern data platform architectures.\"\n",
+    "    )\n",
+    "  },\n",
+    "  {\n",
+    "    \"id\": \"2\",\n",
+    "    \"title\": \"Google Cloud Dataflow: Run Apache Beam in the 
Cloud\",\n",
+    "    \"keywords\": [\"Google Cloud\", \"Dataflow\", \"Apache Beam\", 
\"serverless\", \"stream and batch\"],\n",
+    "    \"tags\": [\"Cloud Computing\", \"Data Pipelines\", \"Google Cloud\", 
\"Serverless\", \"Enterprise\"],\n",
+    "    \"content\": (\n",
+    "      \"Google Cloud Dataflow is a fully managed service that runs Apache 
Beam pipelines in the cloud. \"\n",
+    "      \"It abstracts away infrastructure management and handles dynamic 
scaling, load balancing, and fault tolerance. \"\n",
+    "      \"Developers can focus on writing data logic using the Beam SDK and 
deploy it easily to Google Cloud. \"\n",
+    "      \"Dataflow supports both batch and stream processing and integrates 
seamlessly with other Google services like BigQuery, Pub/Sub, and Cloud 
Storage. \"\n",
+    "      \"Its autoscaling capabilities allow it to adapt to changing data 
volumes, optimizing for cost and performance. \"\n",
+    "      \"Features like monitoring dashboards, job templates, and built-in 
logging make it suitable for both development and production use. \"\n",
+    "      \"With support for event time processing, stateful functions, and 
windowing, Dataflow is well-suited for real-time analytics and data 
transformation tasks. \"\n",
+    "      \"It’s a key component for architects building scalable, 
cloud-native data platforms. \"\n",
+    "      \"Dataflow also offers templates for common ETL tasks, helping 
teams get started quickly with minimal setup. \"\n",
+    "      \"Its integration with Cloud Functions and Cloud Composer enables 
event-driven and orchestrated workflows. \"\n",
+    "      \"Security and compliance are built-in with IAM roles, encryption 
at rest and in transit, and audit logging, making it suitable for enterprise 
environments. \"\n",
+    "      \"For developers, Dataflow provides local testing capabilities and 
a unified logging system through Cloud Logging. \"\n",
+    "      \"It also supports SQL-based pipeline definitions using BigQuery, 
which lowers the barrier to entry for analysts and data engineers. \"\n",
+    "      \"Dataflow’s streaming engine significantly improves performance 
and reduces costs by decoupling compute and state management. \"\n",
+    "      \"In summary, Google Cloud Dataflow not only simplifies the 
deployment of Apache Beam pipelines but also enhances them with cloud-native 
features. \"\n",
+    "      \"Its managed runtime, high availability, and integration with the 
broader Google Cloud ecosystem make it a powerful tool for modern data 
processing.\"\n",
+    "    )\n",
+    "  },\n",
+    "  {\n",
+    "    \"id\": \"3\",\n",
+    "    \"title\": \"Google Beam: 3D Communication Powered by AI\",\n",
+    "    \"keywords\": [\"Google Beam\", \"Project Starline\", \"3D video\", 
\"AI communication\", \"real-time meetings\"],\n",
+    "    \"tags\": [\"AI\", \"Communication\", \"3D Technology\", \"Remote 
Work\", \"Enterprise Tech\"],\n",
+    "    \"content\": (\n",
+    "      \"Google Beam is an innovative video communication platform that 
builds on the research of Project Starline. It uses AI, 3D imaging, and light 
field rendering to create immersive, lifelike video calls. \"\n",
+    "      \"Designed to replicate in-person interaction, Beam allows users to 
see life-sized, three-dimensional representations of each other without the 
need for headsets. \"\n",
+    "      \"This breakthrough makes remote conversations feel 
natural—capturing facial expressions, eye contact, and subtle gestures that 
traditional video conferencing often misses. \"\n",
+    "      \"Beam reduces meeting fatigue and enhances engagement, making it 
ideal for enterprise collaboration, interviews, and virtual presence scenarios. 
\"\n",
+    "      \"Powered by Google AI, Beam represents a significant leap in 
communication technology. \"\n",
+    "      \"Major companies like Salesforce, Deloitte, and NEC are already 
exploring its impact on digital collaboration. \"\n",
+    "      \"Google is partnering with HP to build and distribute Beam 
hardware, designed to work with existing productivity and video tools. \"\n",
+    "      \"Currently in limited early access for enterprise partners, Google 
Beam aims to redefine virtual meetings by bridging the gap between digital and 
physical presence. \"\n",
+    "      \"It’s a promising step toward more human and effective remote 
interactions.\"\n",
+    "    )\n",
+    "  }\n",
+    "]"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "758c2af7-12c7-477b-9257-3c88712960e7",
+   "metadata": {},
+   "source": [
+    "## Exploratory Data Analysis (EDA)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "5e751905-7217-4571-bc07-991ef850a6b2",
+   "metadata": {},
+   "source": [
+    "### Average Words/Tokens per Doc"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 75,
+   "id": "489e93b6-de41-4ec3-be33-a15c3cba12e8",
+   "metadata": {},
+   "outputs": [
+    {
+     "data": {
+      "text/html": [
+       "<div>\n",
+       "<style scoped>\n",
+       "    .dataframe tbody tr th:only-of-type {\n",
+       "        vertical-align: middle;\n",
+       "    }\n",
+       "\n",
+       "    .dataframe tbody tr th {\n",
+       "        vertical-align: top;\n",
+       "    }\n",
+       "\n",
+       "    .dataframe thead th {\n",
+       "        text-align: right;\n",
+       "    }\n",
+       "</style>\n",
+       "<table border=\"1\" class=\"dataframe\">\n",
+       "  <thead>\n",
+       "    <tr style=\"text-align: right;\">\n",
+       "      <th></th>\n",
+       "      <th># Words</th>\n",
+       "    </tr>\n",
+       "  </thead>\n",
+       "  <tbody>\n",
+       "    <tr>\n",
+       "      <th>count</th>\n",
+       "      <td>3.000000</td>\n",
+       "    </tr>\n",
+       "    <tr>\n",
+       "      <th>mean</th>\n",
+       "      <td>253.666667</td>\n",
+       "    </tr>\n",
+       "    <tr>\n",
+       "      <th>std</th>\n",
+       "      <td>72.858310</td>\n",
+       "    </tr>\n",
+       "    <tr>\n",
+       "      <th>min</th>\n",
+       "      <td>172.000000</td>\n",
+       "    </tr>\n",
+       "    <tr>\n",
+       "      <th>25%</th>\n",
+       "      <td>224.500000</td>\n",
+       "    </tr>\n",
+       "    <tr>\n",
+       "      <th>50%</th>\n",
+       "      <td>277.000000</td>\n",
+       "    </tr>\n",
+       "    <tr>\n",
+       "      <th>75%</th>\n",
+       "      <td>294.500000</td>\n",
+       "    </tr>\n",
+       "    <tr>\n",
+       "      <th>max</th>\n",
+       "      <td>312.000000</td>\n",
+       "    </tr>\n",
+       "  </tbody>\n",
+       "</table>\n",
+       "</div>"
+      ],
+      "text/plain": [
+       "          # Words\n",
+       "count    3.000000\n",
+       "mean   253.666667\n",
+       "std     72.858310\n",
+       "min    172.000000\n",
+       "25%    224.500000\n",
+       "50%    277.000000\n",
+       "75%    294.500000\n",
+       "max    312.000000"
+      ]
+     },
+     "execution_count": 75,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "# The second video may skew the average tokens results since it is a 
youtube short video.\n",
+    "contents = [c['content'] for c in corpus]\n",
+    "content_lengths = [len(content.split(\" \")) for content in contents]\n",
+    "df = pd.DataFrame(content_lengths, columns=['# Words'])\n",
+    "df.describe()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 76,
+   "id": "eb32aad0-febd-45af-b4bd-e2176b07e2dc",
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "The mean word count for each video is about 254 words, which 
corresponds to a rough token count of 331 tokens.\n"
+     ]
+    }
+   ],
+   "source": [
+    "mean_word_count = ceil(np.mean(content_lengths))\n",
+    "token_to_word_ratio = 1.3\n",
+    "approx_token_count = ceil(mean_word_count * token_to_word_ratio)\n",
+    "print(f'The mean word count for each video is about {mean_word_count} 
words, which corresponds to a rough token count of {approx_token_count} 
tokens.')"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "42c1c159-875d-411b-a009-4361301b39f6",
+   "metadata": {},
+   "source": [
+    "## Preprocess Data"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "d545355e-41da-4c53-ba9a-4d33b1fe376c",
+   "metadata": {},
+   "source": [
+    "### Chunking"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "a034c5d0-0906-4193-80ac-736a32d7b47e",
+   "metadata": {},
+   "source": [
+    "We'll use sentence splitting as the chunking strategy for 
simplicity.<br>\n",
+    "Ideally, we would pass a tokenizer here — preferably the same one used by 
the retriever — to ensure consistency.<br>\n",
+    "However, in this example, we are not using a tokenizer."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 77,
+   "id": "e7e45d70-0c23-409d-b435-b9479245c1ff",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "# The `chunk_size` parameter is constrained by the embedding model we’re 
using.\n",
+    "# Since we’re using `sentence-transformers/all-MiniLM-L6-v2`, which has a 
maximum token limit of ~384 tokens,\n",
+    "# we need to ensure chunk sizes stay well within that limit.\n",
+    "# Given that each document in our dataset contains approximately 331 
tokens,\n",
+    "# using a chunk size of 256 allows us to preserve nearly the most 
semantic meaning of each entry\n",
+    "# while staying safely under the model’s token limit.\n",
+    "chunk_size = 256\n",
+    "llama_txt_splitter = SentenceSplitter(chunk_size=chunk_size, 
chunk_overlap=20)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 78,
+   "id": "5a013b08-d7e7-4367-ad49-43ad1320158f",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "def split_contents(corpus: list[dict], text_splitter: SentenceSplitter, 
content_field: str='content') -> list[list[str]]:\n",
+    "    result = []\n",
+    "    for video in corpus:\n",
+    "        split = llama_txt_splitter.split_text(video[content_field])\n",
+    "        result.append(split)\n",
+    "    return result"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 79,
+   "id": "2d5ea747-40b3-474e-ac36-ccb81256a36c",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "content_splits = split_contents(corpus, llama_txt_splitter, \"content\")"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 80,
+   "id": "9917cefb-6271-4285-a75d-a6d1bfcbfd06",
+   "metadata": {},
+   "outputs": [
+    {
+     "data": {
+      "text/html": [
+       "<pre 
style=\"white-space:pre;overflow-x:auto;line-height:normal;font-family:Menlo,'DejaVu
 Sans Mono',consolas,'Courier New',monospace\"><span style=\"font-weight: 
bold\">[</span>\n",
+       "  <span style=\"font-weight: bold\">[</span>\n",
+       "    <span style=\"color: #008000; text-decoration-color: 
#008000\">\"Apache Beam is an open-source framework that provides a consistent 
programming model for both batch and streaming data processing. Developed 
originally by Google, it allows developers to write pipelines that can run on 
multiple engines, such as Apache Flink, Spark, and Google Cloud Dataflow. Beam 
uses abstractions like PCollections (data containers) and PTransforms 
(operations) to define the flow of data. The framework promotes portability 
through its runner architecture, letting the same pipeline execute on different 
backends. Support for multiple SDKs, including Java and Python, makes it 
accessible for a broad audience. Key features include support for event time, 
windowing, triggers, and stateful processing, which are essential for handling 
real-time data effectively. Beam is ideal for building ETL jobs, real-time 
analytics, and machine learning data pipelines. It helps teams focus on logic 
rather than i
 nfrastructure, offering flexibility and scalability in handling unbounded and 
bounded data sources. Apache Beam also supports a wide range of connectors for 
both input and output, including Kafka, BigQuery, and JDBC-based systems. This 
makes it easy to integrate Beam into existing data ecosystems. Developers can 
build reusable transforms and modularize pipeline logic, improving 
maintainability and testing.\"</span>,\n",
+       "    <span style=\"color: #008000; text-decoration-color: 
#008000\">\"Developers can build reusable transforms and modularize pipeline 
logic, improving maintainability and testing. The concept of runners enables 
developers to write once and run anywhere, which is particularly appealing for 
organizations that want to avoid vendor lock-in. The Beam model is based on a 
unified programming model that decouples pipeline logic from execution. This 
makes it easier to reason about time and state in both batch and streaming 
pipelines. Advanced features like late data handling, watermarks, and session 
windowing allow for more accurate and meaningful processing of real-world data. 
Beam also integrates with orchestration tools and monitoring systems, allowing 
for production-grade deployments. Community support and contributions have 
grown significantly, making Beam a stable and evolving ecosystem. Many cloud 
providers offer native support for Beam pipelines, and it's increasingly a core 
 component in modern data platform architectures.\"</span>\n",
+       "  <span style=\"font-weight: bold\">]</span>,\n",
+       "  <span style=\"font-weight: bold\">[</span>\n",
+       "    <span style=\"color: #008000; text-decoration-color: 
#008000\">\"Google Cloud Dataflow is a fully managed service that runs Apache 
Beam pipelines in the cloud. It abstracts away infrastructure management and 
handles dynamic scaling, load balancing, and fault tolerance. Developers can 
focus on writing data logic using the Beam SDK and deploy it easily to Google 
Cloud. Dataflow supports both batch and stream processing and integrates 
seamlessly with other Google services like BigQuery, Pub/Sub, and Cloud 
Storage. Its autoscaling capabilities allow it to adapt to changing data 
volumes, optimizing for cost and performance. Features like monitoring 
dashboards, job templates, and built-in logging make it suitable for both 
development and production use. With support for event time processing, 
stateful functions, and windowing, Dataflow is well-suited for real-time 
analytics and data transformation tasks. It’s a key component for architects 
building scalable, cloud-native dat
 a platforms. Dataflow also offers templates for common ETL tasks, helping 
teams get started quickly with minimal setup. Its integration with Cloud 
Functions and Cloud Composer enables event-driven and orchestrated workflows. 
Security and compliance are built-in with IAM roles, encryption at rest and in 
transit, and audit logging, making it suitable for enterprise 
environments.\"</span>,\n",
+       "    <span style=\"color: #008000; text-decoration-color: 
#008000\">\"For developers, Dataflow provides local testing capabilities and a 
unified logging system through Cloud Logging. It also supports SQL-based 
pipeline definitions using BigQuery, which lowers the barrier to entry for 
analysts and data engineers. Dataflow’s streaming engine significantly improves 
performance and reduces costs by decoupling compute and state management. In 
summary, Google Cloud Dataflow not only simplifies the deployment of Apache 
Beam pipelines but also enhances them with cloud-native features. Its managed 
runtime, high availability, and integration with the broader Google Cloud 
ecosystem make it a powerful tool for modern data processing.\"</span>\n",
+       "  <span style=\"font-weight: bold\">]</span>,\n",
+       "  <span style=\"font-weight: bold\">[</span>\n",
+       "    <span style=\"color: #008000; text-decoration-color: 
#008000\">\"Google Beam is an innovative video communication platform that 
builds on the research of Project Starline. It uses AI, 3D imaging, and light 
field rendering to create immersive, lifelike video calls. Designed to 
replicate in-person interaction, Beam allows users to see life-sized, 
three-dimensional representations of each other without the need for headsets. 
This breakthrough makes remote conversations feel natural—capturing facial 
expressions, eye contact, and subtle gestures that traditional video 
conferencing often misses. Beam reduces meeting fatigue and enhances 
engagement, making it ideal for enterprise collaboration, interviews, and 
virtual presence scenarios. Powered by Google AI, Beam represents a significant 
leap in communication technology. Major companies like Salesforce, Deloitte, 
and NEC are already exploring its impact on digital collaboration. Google is 
partnering with HP to build and dist
 ribute Beam hardware, designed to work with existing productivity and video 
tools. Currently in limited early access for enterprise partners, Google Beam 
aims to redefine virtual meetings by bridging the gap between digital and 
physical presence. It’s a promising step toward more human and effective remote 
interactions.\"</span>\n",
+       "  <span style=\"font-weight: bold\">]</span>\n",
+       "<span style=\"font-weight: bold\">]</span>\n",
+       "</pre>\n"
+      ],
+      "text/plain": [
+       "\u001b[1m[\u001b[0m\n",
+       "  \u001b[1m[\u001b[0m\n",
+       "    \u001b[32m\"Apache Beam is an open-source framework that provides 
a consistent programming model for both batch and streaming data processing. 
Developed originally by Google, it allows developers to write pipelines that 
can run on multiple engines, such as Apache Flink, Spark, and Google Cloud 
Dataflow. Beam uses abstractions like PCollections (data containers) and 
PTransforms (operations) to define the flow of data. The framework promotes 
portability through its runner architecture, letting the same pipeline execute 
on different backends. Support for multiple SDKs, including Java and Python, 
makes it accessible for a broad audience. Key features include support for 
event time, windowing, triggers, and stateful processing, which are essential 
for handling real-time data effectively. Beam is ideal for building ETL jobs, 
real-time analytics, and machine learning data pipelines. It helps teams focus 
on logic rather than infrastructure, offering flexibility and scalability i
 n handling unbounded and bounded data sources. Apache Beam also supports a 
wide range of connectors for both input and output, including Kafka, BigQuery, 
and JDBC-based systems. This makes it easy to integrate Beam into existing data 
ecosystems. Developers can build reusable transforms and modularize pipeline 
logic, improving maintainability and testing.\"\u001b[0m,\n",
+       "    \u001b[32m\"Developers can build reusable transforms and 
modularize pipeline logic, improving maintainability and testing. The concept 
of runners enables developers to write once and run anywhere, which is 
particularly appealing for organizations that want to avoid vendor lock-in. The 
Beam model is based on a unified programming model that decouples pipeline 
logic from execution. This makes it easier to reason about time and state in 
both batch and streaming pipelines. Advanced features like late data handling, 
watermarks, and session windowing allow for more accurate and meaningful 
processing of real-world data. Beam also integrates with orchestration tools 
and monitoring systems, allowing for production-grade deployments. Community 
support and contributions have grown significantly, making Beam a stable and 
evolving ecosystem. Many cloud providers offer native support for Beam 
pipelines, and it's increasingly a core component in modern data platform 
architectures.\"\u0
 01b[0m\n",
+       "  \u001b[1m]\u001b[0m,\n",
+       "  \u001b[1m[\u001b[0m\n",
+       "    \u001b[32m\"Google Cloud Dataflow is a fully managed service that 
runs Apache Beam pipelines in the cloud. It abstracts away infrastructure 
management and handles dynamic scaling, load balancing, and fault tolerance. 
Developers can focus on writing data logic using the Beam SDK and deploy it 
easily to Google Cloud. Dataflow supports both batch and stream processing and 
integrates seamlessly with other Google services like BigQuery, Pub/Sub, and 
Cloud Storage. Its autoscaling capabilities allow it to adapt to changing data 
volumes, optimizing for cost and performance. Features like monitoring 
dashboards, job templates, and built-in logging make it suitable for both 
development and production use. With support for event time processing, 
stateful functions, and windowing, Dataflow is well-suited for real-time 
analytics and data transformation tasks. It’s a key component for architects 
building scalable, cloud-native data platforms. Dataflow also offers templates 
for commo
 n ETL tasks, helping teams get started quickly with minimal setup. Its 
integration with Cloud Functions and Cloud Composer enables event-driven and 
orchestrated workflows. Security and compliance are built-in with IAM roles, 
encryption at rest and in transit, and audit logging, making it suitable for 
enterprise environments.\"\u001b[0m,\n",
+       "    \u001b[32m\"For developers, Dataflow provides local testing 
capabilities and a unified logging system through Cloud Logging. It also 
supports SQL-based pipeline definitions using BigQuery, which lowers the 
barrier to entry for analysts and data engineers. Dataflow’s streaming engine 
significantly improves performance and reduces costs by decoupling compute and 
state management. In summary, Google Cloud Dataflow not only simplifies the 
deployment of Apache Beam pipelines but also enhances them with cloud-native 
features. Its managed runtime, high availability, and integration with the 
broader Google Cloud ecosystem make it a powerful tool for modern data 
processing.\"\u001b[0m\n",
+       "  \u001b[1m]\u001b[0m,\n",
+       "  \u001b[1m[\u001b[0m\n",
+       "    \u001b[32m\"Google Beam is an innovative video communication 
platform that builds on the research of Project Starline. It uses AI, 3D 
imaging, and light field rendering to create immersive, lifelike video calls. 
Designed to replicate in-person interaction, Beam allows users to see 
life-sized, three-dimensional representations of each other without the need 
for headsets. This breakthrough makes remote conversations feel 
natural—capturing facial expressions, eye contact, and subtle gestures that 
traditional video conferencing often misses. Beam reduces meeting fatigue and 
enhances engagement, making it ideal for enterprise collaboration, interviews, 
and virtual presence scenarios. Powered by Google AI, Beam represents a 
significant leap in communication technology. Major companies like Salesforce, 
Deloitte, and NEC are already exploring its impact on digital collaboration. 
Google is partnering with HP to build and distribute Beam hardware, designed to 
work with existing 
 productivity and video tools. Currently in limited early access for enterprise 
partners, Google Beam aims to redefine virtual meetings by bridging the gap 
between digital and physical presence. It’s a promising step toward more human 
and effective remote interactions.\"\u001b[0m\n",
+       "  \u001b[1m]\u001b[0m\n",
+       "\u001b[1m]\u001b[0m\n"
+      ]
+     },
+     "metadata": {},
+     "output_type": "display_data"
+    }
+   ],
+   "source": [
+    "print_json(data=content_splits)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "c860e558-2da3-45a6-9e54-acb8b4ffab22",
+   "metadata": {},
+   "source": [
+    "### Embedding Generation"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 81,
+   "id": "aa55928d-c6ca-47c5-883d-d14eb0aa1298",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "# Let's choose `sentence-transformers/all-MiniLM-L6-v2` as our embedding 
generator here.\n",
+    "# It gives a good balance between embedding generation speed, accuracy, 
and being free to use.\n",
+    "model_name = 'sentence-transformers/all-MiniLM-L6-v2'\n",
+    "model = SentenceTransformer(model_name)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 82,
+   "id": "26e80afa-b9dc-4778-8301-ce38264d58cd",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "def get_default_device():\n",
+    "    return \"cuda:0\" if cuda.is_available() else \"cpu\""
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 83,
+   "id": "68e04606-ca81-4a1f-81d2-964495295ed3",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "def encode_embedding(chunk, device=get_default_device()):\n",

Review Comment:
   > Rather than doing embedding as a udf, could we use 
https://beam.apache.org/releases/pydoc/current/apache_beam.ml.transforms.embeddings.huggingface.html
 to generate embeddings?
   > 
   > 
https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb
 has an example doing this
   
   Updated



##########
examples/notebooks/beam-ml/milvus_enrichment_transform.ipynb:
##########
@@ -0,0 +1,2373 @@
+{
+ "cells": [
+  {
+   "cell_type": "code",
+   "execution_count": 70,
+   "id": "47053bac",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "# @title ###### Licensed to the Apache Software Foundation (ASF), Version 
2.0 (the \"License\")\n",
+    "\n",
+    "# Licensed to the Apache Software Foundation (ASF) under one\n",
+    "# or more contributor license agreements. See the NOTICE file\n",
+    "# distributed with this work for additional information\n",
+    "# regarding copyright ownership. The ASF licenses this file\n",
+    "# to you under the Apache License, Version 2.0 (the\n",
+    "# \"License\"); you may not use this file except in compliance\n",
+    "# with the License. You may obtain a copy of the License at\n",
+    "#\n",
+    "#   http://www.apache.org/licenses/LICENSE-2.0\n";,
+    "#\n",
+    "# Unless required by applicable law or agreed to in writing,\n",
+    "# software distributed under the License is distributed on an\n",
+    "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+    "# KIND, either express or implied. See the License for the\n",
+    "# specific language governing permissions and limitations\n",
+    "# under the License"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "aa881240-2f38-4335-9d4d-444776d77c92",
+   "metadata": {},
+   "source": [
+    "# Use Apache Beam and Milvus to enrich data\n",
+    "\n",
+    "<table align=\"left\">\n",
+    "  <td>\n",
+    "    <a target=\"_blank\" 
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/milvus_enrichment_transform.ipynb\";><img
 
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\";
 />Run in Google Colab</a>\n",
+    "  </td>\n",
+    "  <td>\n",
+    "    <a target=\"_blank\" 
href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/milvus_enrichment_transform.ipynb\";><img
 
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\";
 />View source on GitHub</a>\n",
+    "  </td>\n",
+    "</table>"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "0611da21-d031-4b16-8301-9b76bda731e7",
+   "metadata": {},
+   "source": [
+    "This notebook shows how to enrich data by using the Apache Beam 
[enrichment 
transform](https://beam.apache.org/documentation/transforms/python/elementwise/enrichment/)
 with [Milvus](https://milvus.io/). The enrichment transform is an Apache Beam 
turnkey transform that lets you enrich data by using a key-value lookup. This 
transform has the following features:\n",
+    "\n",
+    "- The transform has a built-in Apache Beam handler that interacts with 
Milvus data during enrichment.\n",
+    "- The enrichment transform uses client-side throttling to rate limit the 
requests. The default retry strategy uses exponential backoff. You can 
configure rate limiting to suit your use case.\n",
+    "\n",
+    "This notebook demonstrates the following search engine optimization use 
case:\n",
+    "\n",
+    "A specialized technical search engine company wants to improve its query 
result relevance by dynamically enriching search results with semantically 
related content. The example uses a vector database of technical articles and 
documentation stored in Milvus to enrich incoming user queries. The enriched 
data is then used to provide users with more comprehensive and contextually 
relevant search results, especially for complex technical topics.\n",
+    "\n",
+    "## Before you begin\n",
+    "Set up your environment and download dependencies.\n",
+    "\n",
+    "### Install Apache Beam\n",
+    "To use the enrichment transform with the built-in Milvus handler, install 
the Apache Beam SDK version 2.67.0 or later."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 71,
+   "id": "e550cd55-e91e-4d43-b1bd-b0e89bb8cbd9",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "# Disable tokenizers parallelism to prevent deadlocks when forking 
processes\n",
+    "# This avoids the \"huggingface/tokenizers: The current process just got 
forked\" warning.\n",
+    "import os\n",
+    "os.environ[\"TOKENIZERS_PARALLELISM\"] = \"false\""
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 72,
+   "id": "31747c45-107a-49be-8885-5a6cc9dc1236",
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "\n",
+      
"\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m
 A new release of pip is available: 
\u001b[0m\u001b[31;49m23.0.1\u001b[0m\u001b[39;49m -> 
\u001b[0m\u001b[32;49m25.2\u001b[0m\n",
+      
"\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m
 To update, run: \u001b[0m\u001b[32;49mpip install --upgrade pip\u001b[0m\n",
+      "\n",
+      
"\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m
 A new release of pip is available: 
\u001b[0m\u001b[31;49m23.0.1\u001b[0m\u001b[39;49m -> 
\u001b[0m\u001b[32;49m25.2\u001b[0m\n",
+      
"\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m
 To update, run: \u001b[0m\u001b[32;49mpip install --upgrade pip\u001b[0m\n"
+     ]
+    }
+   ],
+   "source": [
+    "# The Apache Beam test dependencies are included here for the 
TestContainers\n",
+    "# Milvus standalone DB container that will be used later in the demo.\n",
+    "!pip install rich sentence_transformers llama_index --quiet\n",
+    "!pip install apache_beam[milvus,gcp,test,interactive]>=2.67.0 --quiet"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 73,
+   "id": "666e0c2b-0341-4b0e-8d73-561abc39bb10",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "# Standard library imports.\n",
+    "from collections import defaultdict\n",
+    "from math import ceil\n",
+    "from typing import List\n",
+    "\n",
+    "# Third-party imports.\n",
+    "import numpy as np\n",
+    "import pandas as pd\n",
+    "from pymilvus import DataType, CollectionSchema, FieldSchema, Function, 
FunctionType, MilvusClient, RRFRanker\n",
+    "from pymilvus.milvus_client import IndexParams\n",
+    "from rich import print_json\n",
+    "from sentence_transformers import SentenceTransformer\n",
+    "from torch import cuda\n",
+    "from llama_index.core.text_splitter import SentenceSplitter\n",
+    "\n",
+    "# Local application imports.\n",
+    "import apache_beam as beam\n",
+    "from apache_beam.ml.rag.types import Chunk, Content, Embedding\n",
+    "from apache_beam.transforms.enrichment import Enrichment\n",
+    "from apache_beam.ml.rag.enrichment.milvus_search_it_test import 
MilvusEnrichmentTestHelper\n",
+    "from apache_beam.ml.rag.enrichment.milvus_search import (\n",
+    "    HybridSearchParameters, \n",
+    "    KeywordSearchMetrics, \n",
+    "    KeywordSearchParameters,\n",
+    "    MilvusCollectionLoadParameters, \n",
+    "    MilvusConnectionParameters, \n",
+    "    MilvusSearchEnrichmentHandler,\n",
+    "    MilvusSearchParameters, \n",
+    "    SearchStrategy, \n",
+    "    VectorSearchMetrics, \n",
+    "    VectorSearchParameters)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "338808ff-3f80-48e5-9c76-b8d19f8769b7",
+   "metadata": {},
+   "source": [
+    "## Collect Data"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "d83ad549-5ee1-4a4c-ae5a-e638c3d0279f",
+   "metadata": {},
+   "source": [
+    "This content has been paraphrased from publicly available information on 
the internet using a large language model (OpenAI’s GPT-4) and is provided for 
informational purposes only."
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "d39a070a-206d-41f6-9033-fff0d5ea2128",
+   "metadata": {},
+   "source": [
+    "The third data point, related to Google Beam, was intentionally included 
to illustrate the importance of metadata filtering (filtered search) in 
Milvus—such as when a user searches for the term “Beam.” without it the vector 
database retrieval engine may confuse between Apache Beam and Google Beam."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 74,
+   "id": "38781cf5-e18f-40f5-827e-2d441ae7d2fa",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "corpus = [\n",
+    "  {\n",
+    "    \"id\": \"1\",\n",
+    "    \"title\": \"Apache Beam: Unified Model for Batch and Streaming 
Data\",\n",
+    "    \"keywords\": [\"Apache Beam\", \"stream processing\", \"batch 
processing\", \"data pipelines\", \"SDK\"],\n",
+    "    \"tags\": [\"Data Engineering\", \"Open Source\", \"Streaming\", 
\"Batch\", \"Big Data\"],\n",
+    "    \"content\": (\n",
+    "      \"Apache Beam is an open-source framework that provides a 
consistent programming model for both batch and streaming data processing. 
\"\n",
+    "      \"Developed originally by Google, it allows developers to write 
pipelines that can run on multiple engines, such as Apache Flink, Spark, and 
Google Cloud Dataflow. \"\n",
+    "      \"Beam uses abstractions like PCollections (data containers) and 
PTransforms (operations) to define the flow of data. \"\n",
+    "      \"The framework promotes portability through its runner 
architecture, letting the same pipeline execute on different backends. \"\n",
+    "      \"Support for multiple SDKs, including Java and Python, makes it 
accessible for a broad audience. \"\n",
+    "      \"Key features include support for event time, windowing, triggers, 
and stateful processing, which are essential for handling real-time data 
effectively. \"\n",
+    "      \"Beam is ideal for building ETL jobs, real-time analytics, and 
machine learning data pipelines. \"\n",
+    "      \"It helps teams focus on logic rather than infrastructure, 
offering flexibility and scalability in handling unbounded and bounded data 
sources. \"\n",
+    "      \"Apache Beam also supports a wide range of connectors for both 
input and output, including Kafka, BigQuery, and JDBC-based systems. \"\n",
+    "      \"This makes it easy to integrate Beam into existing data 
ecosystems. Developers can build reusable transforms and modularize pipeline 
logic, improving maintainability and testing. \"\n",
+    "      \"The concept of runners enables developers to write once and run 
anywhere, which is particularly appealing for organizations that want to avoid 
vendor lock-in. \"\n",
+    "      \"The Beam model is based on a unified programming model that 
decouples pipeline logic from execution. \"\n",
+    "      \"This makes it easier to reason about time and state in both batch 
and streaming pipelines. \"\n",
+    "      \"Advanced features like late data handling, watermarks, and 
session windowing allow for more accurate and meaningful processing of 
real-world data. \"\n",
+    "      \"Beam also integrates with orchestration tools and monitoring 
systems, allowing for production-grade deployments. \"\n",
+    "      \"Community support and contributions have grown significantly, 
making Beam a stable and evolving ecosystem. \"\n",
+    "      \"Many cloud providers offer native support for Beam pipelines, and 
it's increasingly a core component in modern data platform architectures.\"\n",
+    "    )\n",
+    "  },\n",
+    "  {\n",
+    "    \"id\": \"2\",\n",
+    "    \"title\": \"Google Cloud Dataflow: Run Apache Beam in the 
Cloud\",\n",
+    "    \"keywords\": [\"Google Cloud\", \"Dataflow\", \"Apache Beam\", 
\"serverless\", \"stream and batch\"],\n",
+    "    \"tags\": [\"Cloud Computing\", \"Data Pipelines\", \"Google Cloud\", 
\"Serverless\", \"Enterprise\"],\n",
+    "    \"content\": (\n",
+    "      \"Google Cloud Dataflow is a fully managed service that runs Apache 
Beam pipelines in the cloud. \"\n",
+    "      \"It abstracts away infrastructure management and handles dynamic 
scaling, load balancing, and fault tolerance. \"\n",
+    "      \"Developers can focus on writing data logic using the Beam SDK and 
deploy it easily to Google Cloud. \"\n",
+    "      \"Dataflow supports both batch and stream processing and integrates 
seamlessly with other Google services like BigQuery, Pub/Sub, and Cloud 
Storage. \"\n",
+    "      \"Its autoscaling capabilities allow it to adapt to changing data 
volumes, optimizing for cost and performance. \"\n",
+    "      \"Features like monitoring dashboards, job templates, and built-in 
logging make it suitable for both development and production use. \"\n",
+    "      \"With support for event time processing, stateful functions, and 
windowing, Dataflow is well-suited for real-time analytics and data 
transformation tasks. \"\n",
+    "      \"It’s a key component for architects building scalable, 
cloud-native data platforms. \"\n",
+    "      \"Dataflow also offers templates for common ETL tasks, helping 
teams get started quickly with minimal setup. \"\n",
+    "      \"Its integration with Cloud Functions and Cloud Composer enables 
event-driven and orchestrated workflows. \"\n",
+    "      \"Security and compliance are built-in with IAM roles, encryption 
at rest and in transit, and audit logging, making it suitable for enterprise 
environments. \"\n",
+    "      \"For developers, Dataflow provides local testing capabilities and 
a unified logging system through Cloud Logging. \"\n",
+    "      \"It also supports SQL-based pipeline definitions using BigQuery, 
which lowers the barrier to entry for analysts and data engineers. \"\n",
+    "      \"Dataflow’s streaming engine significantly improves performance 
and reduces costs by decoupling compute and state management. \"\n",
+    "      \"In summary, Google Cloud Dataflow not only simplifies the 
deployment of Apache Beam pipelines but also enhances them with cloud-native 
features. \"\n",
+    "      \"Its managed runtime, high availability, and integration with the 
broader Google Cloud ecosystem make it a powerful tool for modern data 
processing.\"\n",
+    "    )\n",
+    "  },\n",
+    "  {\n",
+    "    \"id\": \"3\",\n",
+    "    \"title\": \"Google Beam: 3D Communication Powered by AI\",\n",
+    "    \"keywords\": [\"Google Beam\", \"Project Starline\", \"3D video\", 
\"AI communication\", \"real-time meetings\"],\n",
+    "    \"tags\": [\"AI\", \"Communication\", \"3D Technology\", \"Remote 
Work\", \"Enterprise Tech\"],\n",
+    "    \"content\": (\n",
+    "      \"Google Beam is an innovative video communication platform that 
builds on the research of Project Starline. It uses AI, 3D imaging, and light 
field rendering to create immersive, lifelike video calls. \"\n",
+    "      \"Designed to replicate in-person interaction, Beam allows users to 
see life-sized, three-dimensional representations of each other without the 
need for headsets. \"\n",
+    "      \"This breakthrough makes remote conversations feel 
natural—capturing facial expressions, eye contact, and subtle gestures that 
traditional video conferencing often misses. \"\n",
+    "      \"Beam reduces meeting fatigue and enhances engagement, making it 
ideal for enterprise collaboration, interviews, and virtual presence scenarios. 
\"\n",
+    "      \"Powered by Google AI, Beam represents a significant leap in 
communication technology. \"\n",
+    "      \"Major companies like Salesforce, Deloitte, and NEC are already 
exploring its impact on digital collaboration. \"\n",
+    "      \"Google is partnering with HP to build and distribute Beam 
hardware, designed to work with existing productivity and video tools. \"\n",
+    "      \"Currently in limited early access for enterprise partners, Google 
Beam aims to redefine virtual meetings by bridging the gap between digital and 
physical presence. \"\n",
+    "      \"It’s a promising step toward more human and effective remote 
interactions.\"\n",
+    "    )\n",
+    "  }\n",
+    "]"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "758c2af7-12c7-477b-9257-3c88712960e7",
+   "metadata": {},
+   "source": [
+    "## Exploratory Data Analysis (EDA)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "5e751905-7217-4571-bc07-991ef850a6b2",
+   "metadata": {},
+   "source": [
+    "### Average Words/Tokens per Doc"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 75,
+   "id": "489e93b6-de41-4ec3-be33-a15c3cba12e8",
+   "metadata": {},
+   "outputs": [
+    {
+     "data": {
+      "text/html": [
+       "<div>\n",
+       "<style scoped>\n",
+       "    .dataframe tbody tr th:only-of-type {\n",
+       "        vertical-align: middle;\n",
+       "    }\n",
+       "\n",
+       "    .dataframe tbody tr th {\n",
+       "        vertical-align: top;\n",
+       "    }\n",
+       "\n",
+       "    .dataframe thead th {\n",
+       "        text-align: right;\n",
+       "    }\n",
+       "</style>\n",
+       "<table border=\"1\" class=\"dataframe\">\n",
+       "  <thead>\n",
+       "    <tr style=\"text-align: right;\">\n",
+       "      <th></th>\n",
+       "      <th># Words</th>\n",
+       "    </tr>\n",
+       "  </thead>\n",
+       "  <tbody>\n",
+       "    <tr>\n",
+       "      <th>count</th>\n",
+       "      <td>3.000000</td>\n",
+       "    </tr>\n",
+       "    <tr>\n",
+       "      <th>mean</th>\n",
+       "      <td>253.666667</td>\n",
+       "    </tr>\n",
+       "    <tr>\n",
+       "      <th>std</th>\n",
+       "      <td>72.858310</td>\n",
+       "    </tr>\n",
+       "    <tr>\n",
+       "      <th>min</th>\n",
+       "      <td>172.000000</td>\n",
+       "    </tr>\n",
+       "    <tr>\n",
+       "      <th>25%</th>\n",
+       "      <td>224.500000</td>\n",
+       "    </tr>\n",
+       "    <tr>\n",
+       "      <th>50%</th>\n",
+       "      <td>277.000000</td>\n",
+       "    </tr>\n",
+       "    <tr>\n",
+       "      <th>75%</th>\n",
+       "      <td>294.500000</td>\n",
+       "    </tr>\n",
+       "    <tr>\n",
+       "      <th>max</th>\n",
+       "      <td>312.000000</td>\n",
+       "    </tr>\n",
+       "  </tbody>\n",
+       "</table>\n",
+       "</div>"
+      ],
+      "text/plain": [
+       "          # Words\n",
+       "count    3.000000\n",
+       "mean   253.666667\n",
+       "std     72.858310\n",
+       "min    172.000000\n",
+       "25%    224.500000\n",
+       "50%    277.000000\n",
+       "75%    294.500000\n",
+       "max    312.000000"
+      ]
+     },
+     "execution_count": 75,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "# The second video may skew the average tokens results since it is a 
youtube short video.\n",
+    "contents = [c['content'] for c in corpus]\n",
+    "content_lengths = [len(content.split(\" \")) for content in contents]\n",
+    "df = pd.DataFrame(content_lengths, columns=['# Words'])\n",
+    "df.describe()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 76,
+   "id": "eb32aad0-febd-45af-b4bd-e2176b07e2dc",
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "The mean word count for each video is about 254 words, which 
corresponds to a rough token count of 331 tokens.\n"
+     ]
+    }
+   ],
+   "source": [
+    "mean_word_count = ceil(np.mean(content_lengths))\n",
+    "token_to_word_ratio = 1.3\n",
+    "approx_token_count = ceil(mean_word_count * token_to_word_ratio)\n",
+    "print(f'The mean word count for each video is about {mean_word_count} 
words, which corresponds to a rough token count of {approx_token_count} 
tokens.')"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "42c1c159-875d-411b-a009-4361301b39f6",
+   "metadata": {},
+   "source": [
+    "## Preprocess Data"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "d545355e-41da-4c53-ba9a-4d33b1fe376c",
+   "metadata": {},
+   "source": [
+    "### Chunking"

Review Comment:
   > Could we use 
https://beam.apache.org/releases/pydoc/2.64.0/apache_beam.ml.rag.chunking.base.html#module-apache_beam.ml.rag.chunking.base
 for chunking?
   > 
   > It would be great if we just made this whole process a beam pipeline
   
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to