mohamedawnallah commented on code in PR #35467:
URL: https://github.com/apache/beam/pull/35467#discussion_r2443411962
##########
examples/notebooks/beam-ml/milvus_enrichment_transform.ipynb:
##########
@@ -0,0 +1,2373 @@
+{
+ "cells": [
+ {
+ "cell_type": "code",
+ "execution_count": 70,
+ "id": "47053bac",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF), Version
2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "aa881240-2f38-4335-9d4d-444776d77c92",
+ "metadata": {},
+ "source": [
+ "# Use Apache Beam and Milvus to enrich data\n",
+ "\n",
+ "<table align=\"left\">\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/milvus_enrichment_transform.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\"
/>Run in Google Colab</a>\n",
+ " </td>\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/milvus_enrichment_transform.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\"
/>View source on GitHub</a>\n",
+ " </td>\n",
+ "</table>"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "0611da21-d031-4b16-8301-9b76bda731e7",
+ "metadata": {},
+ "source": [
+ "This notebook shows how to enrich data by using the Apache Beam
[enrichment
transform](https://beam.apache.org/documentation/transforms/python/elementwise/enrichment/)
with [Milvus](https://milvus.io/). The enrichment transform is an Apache Beam
turnkey transform that lets you enrich data by using a key-value lookup. This
transform has the following features:\n",
+ "\n",
+ "- The transform has a built-in Apache Beam handler that interacts with
Milvus data during enrichment.\n",
+ "- The enrichment transform uses client-side throttling to rate limit the
requests. The default retry strategy uses exponential backoff. You can
configure rate limiting to suit your use case.\n",
+ "\n",
+ "This notebook demonstrates the following search engine optimization use
case:\n",
+ "\n",
+ "A specialized technical search engine company wants to improve its query
result relevance by dynamically enriching search results with semantically
related content. The example uses a vector database of technical articles and
documentation stored in Milvus to enrich incoming user queries. The enriched
data is then used to provide users with more comprehensive and contextually
relevant search results, especially for complex technical topics.\n",
+ "\n",
+ "## Before you begin\n",
+ "Set up your environment and download dependencies.\n",
+ "\n",
+ "### Install Apache Beam\n",
+ "To use the enrichment transform with the built-in Milvus handler, install
the Apache Beam SDK version 2.67.0 or later."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 71,
+ "id": "e550cd55-e91e-4d43-b1bd-b0e89bb8cbd9",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Disable tokenizers parallelism to prevent deadlocks when forking
processes\n",
+ "# This avoids the \"huggingface/tokenizers: The current process just got
forked\" warning.\n",
+ "import os\n",
+ "os.environ[\"TOKENIZERS_PARALLELISM\"] = \"false\""
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 72,
+ "id": "31747c45-107a-49be-8885-5a6cc9dc1236",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "\n",
+
"\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m
A new release of pip is available:
\u001b[0m\u001b[31;49m23.0.1\u001b[0m\u001b[39;49m ->
\u001b[0m\u001b[32;49m25.2\u001b[0m\n",
+
"\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m
To update, run: \u001b[0m\u001b[32;49mpip install --upgrade pip\u001b[0m\n",
+ "\n",
+
"\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m
A new release of pip is available:
\u001b[0m\u001b[31;49m23.0.1\u001b[0m\u001b[39;49m ->
\u001b[0m\u001b[32;49m25.2\u001b[0m\n",
+
"\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m
To update, run: \u001b[0m\u001b[32;49mpip install --upgrade pip\u001b[0m\n"
+ ]
+ }
+ ],
+ "source": [
+ "# The Apache Beam test dependencies are included here for the
TestContainers\n",
+ "# Milvus standalone DB container that will be used later in the demo.\n",
+ "!pip install rich sentence_transformers llama_index --quiet\n",
+ "!pip install apache_beam[milvus,gcp,test,interactive]>=2.67.0 --quiet"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 73,
+ "id": "666e0c2b-0341-4b0e-8d73-561abc39bb10",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Standard library imports.\n",
+ "from collections import defaultdict\n",
+ "from math import ceil\n",
+ "from typing import List\n",
+ "\n",
+ "# Third-party imports.\n",
+ "import numpy as np\n",
+ "import pandas as pd\n",
+ "from pymilvus import DataType, CollectionSchema, FieldSchema, Function,
FunctionType, MilvusClient, RRFRanker\n",
+ "from pymilvus.milvus_client import IndexParams\n",
+ "from rich import print_json\n",
+ "from sentence_transformers import SentenceTransformer\n",
+ "from torch import cuda\n",
+ "from llama_index.core.text_splitter import SentenceSplitter\n",
+ "\n",
+ "# Local application imports.\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.ml.rag.types import Chunk, Content, Embedding\n",
+ "from apache_beam.transforms.enrichment import Enrichment\n",
+ "from apache_beam.ml.rag.enrichment.milvus_search_it_test import
MilvusEnrichmentTestHelper\n",
+ "from apache_beam.ml.rag.enrichment.milvus_search import (\n",
+ " HybridSearchParameters, \n",
+ " KeywordSearchMetrics, \n",
+ " KeywordSearchParameters,\n",
+ " MilvusCollectionLoadParameters, \n",
+ " MilvusConnectionParameters, \n",
+ " MilvusSearchEnrichmentHandler,\n",
+ " MilvusSearchParameters, \n",
+ " SearchStrategy, \n",
+ " VectorSearchMetrics, \n",
+ " VectorSearchParameters)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "338808ff-3f80-48e5-9c76-b8d19f8769b7",
+ "metadata": {},
+ "source": [
+ "## Collect Data"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "d83ad549-5ee1-4a4c-ae5a-e638c3d0279f",
+ "metadata": {},
+ "source": [
+ "This content has been paraphrased from publicly available information on
the internet using a large language model (OpenAI’s GPT-4) and is provided for
informational purposes only."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "d39a070a-206d-41f6-9033-fff0d5ea2128",
+ "metadata": {},
+ "source": [
+ "The third data point, related to Google Beam, was intentionally included
to illustrate the importance of metadata filtering (filtered search) in
Milvus—such as when a user searches for the term “Beam.” without it the vector
database retrieval engine may confuse between Apache Beam and Google Beam."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 74,
+ "id": "38781cf5-e18f-40f5-827e-2d441ae7d2fa",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "corpus = [\n",
+ " {\n",
+ " \"id\": \"1\",\n",
+ " \"title\": \"Apache Beam: Unified Model for Batch and Streaming
Data\",\n",
+ " \"keywords\": [\"Apache Beam\", \"stream processing\", \"batch
processing\", \"data pipelines\", \"SDK\"],\n",
+ " \"tags\": [\"Data Engineering\", \"Open Source\", \"Streaming\",
\"Batch\", \"Big Data\"],\n",
+ " \"content\": (\n",
+ " \"Apache Beam is an open-source framework that provides a
consistent programming model for both batch and streaming data processing.
\"\n",
+ " \"Developed originally by Google, it allows developers to write
pipelines that can run on multiple engines, such as Apache Flink, Spark, and
Google Cloud Dataflow. \"\n",
+ " \"Beam uses abstractions like PCollections (data containers) and
PTransforms (operations) to define the flow of data. \"\n",
+ " \"The framework promotes portability through its runner
architecture, letting the same pipeline execute on different backends. \"\n",
+ " \"Support for multiple SDKs, including Java and Python, makes it
accessible for a broad audience. \"\n",
+ " \"Key features include support for event time, windowing, triggers,
and stateful processing, which are essential for handling real-time data
effectively. \"\n",
+ " \"Beam is ideal for building ETL jobs, real-time analytics, and
machine learning data pipelines. \"\n",
+ " \"It helps teams focus on logic rather than infrastructure,
offering flexibility and scalability in handling unbounded and bounded data
sources. \"\n",
+ " \"Apache Beam also supports a wide range of connectors for both
input and output, including Kafka, BigQuery, and JDBC-based systems. \"\n",
+ " \"This makes it easy to integrate Beam into existing data
ecosystems. Developers can build reusable transforms and modularize pipeline
logic, improving maintainability and testing. \"\n",
+ " \"The concept of runners enables developers to write once and run
anywhere, which is particularly appealing for organizations that want to avoid
vendor lock-in. \"\n",
+ " \"The Beam model is based on a unified programming model that
decouples pipeline logic from execution. \"\n",
+ " \"This makes it easier to reason about time and state in both batch
and streaming pipelines. \"\n",
+ " \"Advanced features like late data handling, watermarks, and
session windowing allow for more accurate and meaningful processing of
real-world data. \"\n",
+ " \"Beam also integrates with orchestration tools and monitoring
systems, allowing for production-grade deployments. \"\n",
+ " \"Community support and contributions have grown significantly,
making Beam a stable and evolving ecosystem. \"\n",
+ " \"Many cloud providers offer native support for Beam pipelines, and
it's increasingly a core component in modern data platform architectures.\"\n",
+ " )\n",
+ " },\n",
+ " {\n",
+ " \"id\": \"2\",\n",
+ " \"title\": \"Google Cloud Dataflow: Run Apache Beam in the
Cloud\",\n",
+ " \"keywords\": [\"Google Cloud\", \"Dataflow\", \"Apache Beam\",
\"serverless\", \"stream and batch\"],\n",
+ " \"tags\": [\"Cloud Computing\", \"Data Pipelines\", \"Google Cloud\",
\"Serverless\", \"Enterprise\"],\n",
+ " \"content\": (\n",
+ " \"Google Cloud Dataflow is a fully managed service that runs Apache
Beam pipelines in the cloud. \"\n",
+ " \"It abstracts away infrastructure management and handles dynamic
scaling, load balancing, and fault tolerance. \"\n",
+ " \"Developers can focus on writing data logic using the Beam SDK and
deploy it easily to Google Cloud. \"\n",
+ " \"Dataflow supports both batch and stream processing and integrates
seamlessly with other Google services like BigQuery, Pub/Sub, and Cloud
Storage. \"\n",
+ " \"Its autoscaling capabilities allow it to adapt to changing data
volumes, optimizing for cost and performance. \"\n",
+ " \"Features like monitoring dashboards, job templates, and built-in
logging make it suitable for both development and production use. \"\n",
+ " \"With support for event time processing, stateful functions, and
windowing, Dataflow is well-suited for real-time analytics and data
transformation tasks. \"\n",
+ " \"It’s a key component for architects building scalable,
cloud-native data platforms. \"\n",
+ " \"Dataflow also offers templates for common ETL tasks, helping
teams get started quickly with minimal setup. \"\n",
+ " \"Its integration with Cloud Functions and Cloud Composer enables
event-driven and orchestrated workflows. \"\n",
+ " \"Security and compliance are built-in with IAM roles, encryption
at rest and in transit, and audit logging, making it suitable for enterprise
environments. \"\n",
+ " \"For developers, Dataflow provides local testing capabilities and
a unified logging system through Cloud Logging. \"\n",
+ " \"It also supports SQL-based pipeline definitions using BigQuery,
which lowers the barrier to entry for analysts and data engineers. \"\n",
+ " \"Dataflow’s streaming engine significantly improves performance
and reduces costs by decoupling compute and state management. \"\n",
+ " \"In summary, Google Cloud Dataflow not only simplifies the
deployment of Apache Beam pipelines but also enhances them with cloud-native
features. \"\n",
+ " \"Its managed runtime, high availability, and integration with the
broader Google Cloud ecosystem make it a powerful tool for modern data
processing.\"\n",
+ " )\n",
+ " },\n",
+ " {\n",
+ " \"id\": \"3\",\n",
+ " \"title\": \"Google Beam: 3D Communication Powered by AI\",\n",
+ " \"keywords\": [\"Google Beam\", \"Project Starline\", \"3D video\",
\"AI communication\", \"real-time meetings\"],\n",
+ " \"tags\": [\"AI\", \"Communication\", \"3D Technology\", \"Remote
Work\", \"Enterprise Tech\"],\n",
+ " \"content\": (\n",
+ " \"Google Beam is an innovative video communication platform that
builds on the research of Project Starline. It uses AI, 3D imaging, and light
field rendering to create immersive, lifelike video calls. \"\n",
+ " \"Designed to replicate in-person interaction, Beam allows users to
see life-sized, three-dimensional representations of each other without the
need for headsets. \"\n",
+ " \"This breakthrough makes remote conversations feel
natural—capturing facial expressions, eye contact, and subtle gestures that
traditional video conferencing often misses. \"\n",
+ " \"Beam reduces meeting fatigue and enhances engagement, making it
ideal for enterprise collaboration, interviews, and virtual presence scenarios.
\"\n",
+ " \"Powered by Google AI, Beam represents a significant leap in
communication technology. \"\n",
+ " \"Major companies like Salesforce, Deloitte, and NEC are already
exploring its impact on digital collaboration. \"\n",
+ " \"Google is partnering with HP to build and distribute Beam
hardware, designed to work with existing productivity and video tools. \"\n",
+ " \"Currently in limited early access for enterprise partners, Google
Beam aims to redefine virtual meetings by bridging the gap between digital and
physical presence. \"\n",
+ " \"It’s a promising step toward more human and effective remote
interactions.\"\n",
+ " )\n",
+ " }\n",
+ "]"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "758c2af7-12c7-477b-9257-3c88712960e7",
+ "metadata": {},
+ "source": [
+ "## Exploratory Data Analysis (EDA)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "5e751905-7217-4571-bc07-991ef850a6b2",
+ "metadata": {},
+ "source": [
+ "### Average Words/Tokens per Doc"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 75,
+ "id": "489e93b6-de41-4ec3-be33-a15c3cba12e8",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "<div>\n",
+ "<style scoped>\n",
+ " .dataframe tbody tr th:only-of-type {\n",
+ " vertical-align: middle;\n",
+ " }\n",
+ "\n",
+ " .dataframe tbody tr th {\n",
+ " vertical-align: top;\n",
+ " }\n",
+ "\n",
+ " .dataframe thead th {\n",
+ " text-align: right;\n",
+ " }\n",
+ "</style>\n",
+ "<table border=\"1\" class=\"dataframe\">\n",
+ " <thead>\n",
+ " <tr style=\"text-align: right;\">\n",
+ " <th></th>\n",
+ " <th># Words</th>\n",
+ " </tr>\n",
+ " </thead>\n",
+ " <tbody>\n",
+ " <tr>\n",
+ " <th>count</th>\n",
+ " <td>3.000000</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>mean</th>\n",
+ " <td>253.666667</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>std</th>\n",
+ " <td>72.858310</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>min</th>\n",
+ " <td>172.000000</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>25%</th>\n",
+ " <td>224.500000</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>50%</th>\n",
+ " <td>277.000000</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>75%</th>\n",
+ " <td>294.500000</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>max</th>\n",
+ " <td>312.000000</td>\n",
+ " </tr>\n",
+ " </tbody>\n",
+ "</table>\n",
+ "</div>"
+ ],
+ "text/plain": [
+ " # Words\n",
+ "count 3.000000\n",
+ "mean 253.666667\n",
+ "std 72.858310\n",
+ "min 172.000000\n",
+ "25% 224.500000\n",
+ "50% 277.000000\n",
+ "75% 294.500000\n",
+ "max 312.000000"
+ ]
+ },
+ "execution_count": 75,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "# The second video may skew the average tokens results since it is a
youtube short video.\n",
+ "contents = [c['content'] for c in corpus]\n",
+ "content_lengths = [len(content.split(\" \")) for content in contents]\n",
+ "df = pd.DataFrame(content_lengths, columns=['# Words'])\n",
+ "df.describe()"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 76,
+ "id": "eb32aad0-febd-45af-b4bd-e2176b07e2dc",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "The mean word count for each video is about 254 words, which
corresponds to a rough token count of 331 tokens.\n"
+ ]
+ }
+ ],
+ "source": [
+ "mean_word_count = ceil(np.mean(content_lengths))\n",
+ "token_to_word_ratio = 1.3\n",
+ "approx_token_count = ceil(mean_word_count * token_to_word_ratio)\n",
+ "print(f'The mean word count for each video is about {mean_word_count}
words, which corresponds to a rough token count of {approx_token_count}
tokens.')"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "42c1c159-875d-411b-a009-4361301b39f6",
+ "metadata": {},
+ "source": [
+ "## Preprocess Data"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "d545355e-41da-4c53-ba9a-4d33b1fe376c",
+ "metadata": {},
+ "source": [
+ "### Chunking"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "a034c5d0-0906-4193-80ac-736a32d7b47e",
+ "metadata": {},
+ "source": [
+ "We'll use sentence splitting as the chunking strategy for
simplicity.<br>\n",
+ "Ideally, we would pass a tokenizer here — preferably the same one used by
the retriever — to ensure consistency.<br>\n",
+ "However, in this example, we are not using a tokenizer."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 77,
+ "id": "e7e45d70-0c23-409d-b435-b9479245c1ff",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# The `chunk_size` parameter is constrained by the embedding model we’re
using.\n",
+ "# Since we’re using `sentence-transformers/all-MiniLM-L6-v2`, which has a
maximum token limit of ~384 tokens,\n",
+ "# we need to ensure chunk sizes stay well within that limit.\n",
+ "# Given that each document in our dataset contains approximately 331
tokens,\n",
+ "# using a chunk size of 256 allows us to preserve nearly the most
semantic meaning of each entry\n",
+ "# while staying safely under the model’s token limit.\n",
+ "chunk_size = 256\n",
+ "llama_txt_splitter = SentenceSplitter(chunk_size=chunk_size,
chunk_overlap=20)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 78,
+ "id": "5a013b08-d7e7-4367-ad49-43ad1320158f",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "def split_contents(corpus: list[dict], text_splitter: SentenceSplitter,
content_field: str='content') -> list[list[str]]:\n",
+ " result = []\n",
+ " for video in corpus:\n",
+ " split = llama_txt_splitter.split_text(video[content_field])\n",
+ " result.append(split)\n",
+ " return result"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 79,
+ "id": "2d5ea747-40b3-474e-ac36-ccb81256a36c",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "content_splits = split_contents(corpus, llama_txt_splitter, \"content\")"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 80,
+ "id": "9917cefb-6271-4285-a75d-a6d1bfcbfd06",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "<pre
style=\"white-space:pre;overflow-x:auto;line-height:normal;font-family:Menlo,'DejaVu
Sans Mono',consolas,'Courier New',monospace\"><span style=\"font-weight:
bold\">[</span>\n",
+ " <span style=\"font-weight: bold\">[</span>\n",
+ " <span style=\"color: #008000; text-decoration-color:
#008000\">\"Apache Beam is an open-source framework that provides a consistent
programming model for both batch and streaming data processing. Developed
originally by Google, it allows developers to write pipelines that can run on
multiple engines, such as Apache Flink, Spark, and Google Cloud Dataflow. Beam
uses abstractions like PCollections (data containers) and PTransforms
(operations) to define the flow of data. The framework promotes portability
through its runner architecture, letting the same pipeline execute on different
backends. Support for multiple SDKs, including Java and Python, makes it
accessible for a broad audience. Key features include support for event time,
windowing, triggers, and stateful processing, which are essential for handling
real-time data effectively. Beam is ideal for building ETL jobs, real-time
analytics, and machine learning data pipelines. It helps teams focus on logic
rather than i
nfrastructure, offering flexibility and scalability in handling unbounded and
bounded data sources. Apache Beam also supports a wide range of connectors for
both input and output, including Kafka, BigQuery, and JDBC-based systems. This
makes it easy to integrate Beam into existing data ecosystems. Developers can
build reusable transforms and modularize pipeline logic, improving
maintainability and testing.\"</span>,\n",
+ " <span style=\"color: #008000; text-decoration-color:
#008000\">\"Developers can build reusable transforms and modularize pipeline
logic, improving maintainability and testing. The concept of runners enables
developers to write once and run anywhere, which is particularly appealing for
organizations that want to avoid vendor lock-in. The Beam model is based on a
unified programming model that decouples pipeline logic from execution. This
makes it easier to reason about time and state in both batch and streaming
pipelines. Advanced features like late data handling, watermarks, and session
windowing allow for more accurate and meaningful processing of real-world data.
Beam also integrates with orchestration tools and monitoring systems, allowing
for production-grade deployments. Community support and contributions have
grown significantly, making Beam a stable and evolving ecosystem. Many cloud
providers offer native support for Beam pipelines, and it's increasingly a core
component in modern data platform architectures.\"</span>\n",
+ " <span style=\"font-weight: bold\">]</span>,\n",
+ " <span style=\"font-weight: bold\">[</span>\n",
+ " <span style=\"color: #008000; text-decoration-color:
#008000\">\"Google Cloud Dataflow is a fully managed service that runs Apache
Beam pipelines in the cloud. It abstracts away infrastructure management and
handles dynamic scaling, load balancing, and fault tolerance. Developers can
focus on writing data logic using the Beam SDK and deploy it easily to Google
Cloud. Dataflow supports both batch and stream processing and integrates
seamlessly with other Google services like BigQuery, Pub/Sub, and Cloud
Storage. Its autoscaling capabilities allow it to adapt to changing data
volumes, optimizing for cost and performance. Features like monitoring
dashboards, job templates, and built-in logging make it suitable for both
development and production use. With support for event time processing,
stateful functions, and windowing, Dataflow is well-suited for real-time
analytics and data transformation tasks. It’s a key component for architects
building scalable, cloud-native dat
a platforms. Dataflow also offers templates for common ETL tasks, helping
teams get started quickly with minimal setup. Its integration with Cloud
Functions and Cloud Composer enables event-driven and orchestrated workflows.
Security and compliance are built-in with IAM roles, encryption at rest and in
transit, and audit logging, making it suitable for enterprise
environments.\"</span>,\n",
+ " <span style=\"color: #008000; text-decoration-color:
#008000\">\"For developers, Dataflow provides local testing capabilities and a
unified logging system through Cloud Logging. It also supports SQL-based
pipeline definitions using BigQuery, which lowers the barrier to entry for
analysts and data engineers. Dataflow’s streaming engine significantly improves
performance and reduces costs by decoupling compute and state management. In
summary, Google Cloud Dataflow not only simplifies the deployment of Apache
Beam pipelines but also enhances them with cloud-native features. Its managed
runtime, high availability, and integration with the broader Google Cloud
ecosystem make it a powerful tool for modern data processing.\"</span>\n",
+ " <span style=\"font-weight: bold\">]</span>,\n",
+ " <span style=\"font-weight: bold\">[</span>\n",
+ " <span style=\"color: #008000; text-decoration-color:
#008000\">\"Google Beam is an innovative video communication platform that
builds on the research of Project Starline. It uses AI, 3D imaging, and light
field rendering to create immersive, lifelike video calls. Designed to
replicate in-person interaction, Beam allows users to see life-sized,
three-dimensional representations of each other without the need for headsets.
This breakthrough makes remote conversations feel natural—capturing facial
expressions, eye contact, and subtle gestures that traditional video
conferencing often misses. Beam reduces meeting fatigue and enhances
engagement, making it ideal for enterprise collaboration, interviews, and
virtual presence scenarios. Powered by Google AI, Beam represents a significant
leap in communication technology. Major companies like Salesforce, Deloitte,
and NEC are already exploring its impact on digital collaboration. Google is
partnering with HP to build and dist
ribute Beam hardware, designed to work with existing productivity and video
tools. Currently in limited early access for enterprise partners, Google Beam
aims to redefine virtual meetings by bridging the gap between digital and
physical presence. It’s a promising step toward more human and effective remote
interactions.\"</span>\n",
+ " <span style=\"font-weight: bold\">]</span>\n",
+ "<span style=\"font-weight: bold\">]</span>\n",
+ "</pre>\n"
+ ],
+ "text/plain": [
+ "\u001b[1m[\u001b[0m\n",
+ " \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Apache Beam is an open-source framework that provides
a consistent programming model for both batch and streaming data processing.
Developed originally by Google, it allows developers to write pipelines that
can run on multiple engines, such as Apache Flink, Spark, and Google Cloud
Dataflow. Beam uses abstractions like PCollections (data containers) and
PTransforms (operations) to define the flow of data. The framework promotes
portability through its runner architecture, letting the same pipeline execute
on different backends. Support for multiple SDKs, including Java and Python,
makes it accessible for a broad audience. Key features include support for
event time, windowing, triggers, and stateful processing, which are essential
for handling real-time data effectively. Beam is ideal for building ETL jobs,
real-time analytics, and machine learning data pipelines. It helps teams focus
on logic rather than infrastructure, offering flexibility and scalability i
n handling unbounded and bounded data sources. Apache Beam also supports a
wide range of connectors for both input and output, including Kafka, BigQuery,
and JDBC-based systems. This makes it easy to integrate Beam into existing data
ecosystems. Developers can build reusable transforms and modularize pipeline
logic, improving maintainability and testing.\"\u001b[0m,\n",
+ " \u001b[32m\"Developers can build reusable transforms and
modularize pipeline logic, improving maintainability and testing. The concept
of runners enables developers to write once and run anywhere, which is
particularly appealing for organizations that want to avoid vendor lock-in. The
Beam model is based on a unified programming model that decouples pipeline
logic from execution. This makes it easier to reason about time and state in
both batch and streaming pipelines. Advanced features like late data handling,
watermarks, and session windowing allow for more accurate and meaningful
processing of real-world data. Beam also integrates with orchestration tools
and monitoring systems, allowing for production-grade deployments. Community
support and contributions have grown significantly, making Beam a stable and
evolving ecosystem. Many cloud providers offer native support for Beam
pipelines, and it's increasingly a core component in modern data platform
architectures.\"\u0
01b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Google Cloud Dataflow is a fully managed service that
runs Apache Beam pipelines in the cloud. It abstracts away infrastructure
management and handles dynamic scaling, load balancing, and fault tolerance.
Developers can focus on writing data logic using the Beam SDK and deploy it
easily to Google Cloud. Dataflow supports both batch and stream processing and
integrates seamlessly with other Google services like BigQuery, Pub/Sub, and
Cloud Storage. Its autoscaling capabilities allow it to adapt to changing data
volumes, optimizing for cost and performance. Features like monitoring
dashboards, job templates, and built-in logging make it suitable for both
development and production use. With support for event time processing,
stateful functions, and windowing, Dataflow is well-suited for real-time
analytics and data transformation tasks. It’s a key component for architects
building scalable, cloud-native data platforms. Dataflow also offers templates
for commo
n ETL tasks, helping teams get started quickly with minimal setup. Its
integration with Cloud Functions and Cloud Composer enables event-driven and
orchestrated workflows. Security and compliance are built-in with IAM roles,
encryption at rest and in transit, and audit logging, making it suitable for
enterprise environments.\"\u001b[0m,\n",
+ " \u001b[32m\"For developers, Dataflow provides local testing
capabilities and a unified logging system through Cloud Logging. It also
supports SQL-based pipeline definitions using BigQuery, which lowers the
barrier to entry for analysts and data engineers. Dataflow’s streaming engine
significantly improves performance and reduces costs by decoupling compute and
state management. In summary, Google Cloud Dataflow not only simplifies the
deployment of Apache Beam pipelines but also enhances them with cloud-native
features. Its managed runtime, high availability, and integration with the
broader Google Cloud ecosystem make it a powerful tool for modern data
processing.\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Google Beam is an innovative video communication
platform that builds on the research of Project Starline. It uses AI, 3D
imaging, and light field rendering to create immersive, lifelike video calls.
Designed to replicate in-person interaction, Beam allows users to see
life-sized, three-dimensional representations of each other without the need
for headsets. This breakthrough makes remote conversations feel
natural—capturing facial expressions, eye contact, and subtle gestures that
traditional video conferencing often misses. Beam reduces meeting fatigue and
enhances engagement, making it ideal for enterprise collaboration, interviews,
and virtual presence scenarios. Powered by Google AI, Beam represents a
significant leap in communication technology. Major companies like Salesforce,
Deloitte, and NEC are already exploring its impact on digital collaboration.
Google is partnering with HP to build and distribute Beam hardware, designed to
work with existing
productivity and video tools. Currently in limited early access for enterprise
partners, Google Beam aims to redefine virtual meetings by bridging the gap
between digital and physical presence. It’s a promising step toward more human
and effective remote interactions.\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m\n",
+ "\u001b[1m]\u001b[0m\n"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "print_json(data=content_splits)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "c860e558-2da3-45a6-9e54-acb8b4ffab22",
+ "metadata": {},
+ "source": [
+ "### Embedding Generation"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 81,
+ "id": "aa55928d-c6ca-47c5-883d-d14eb0aa1298",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Let's choose `sentence-transformers/all-MiniLM-L6-v2` as our embedding
generator here.\n",
+ "# It gives a good balance between embedding generation speed, accuracy,
and being free to use.\n",
+ "model_name = 'sentence-transformers/all-MiniLM-L6-v2'\n",
+ "model = SentenceTransformer(model_name)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 82,
+ "id": "26e80afa-b9dc-4778-8301-ce38264d58cd",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "def get_default_device():\n",
+ " return \"cuda:0\" if cuda.is_available() else \"cpu\""
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 83,
+ "id": "68e04606-ca81-4a1f-81d2-964495295ed3",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "def encode_embedding(chunk, device=get_default_device()):\n",
Review Comment:
> Rather than doing embedding as a udf, could we use
https://beam.apache.org/releases/pydoc/current/apache_beam.ml.transforms.embeddings.huggingface.html
to generate embeddings?
>
>
https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb
has an example doing this
Updated
##########
examples/notebooks/beam-ml/milvus_enrichment_transform.ipynb:
##########
@@ -0,0 +1,2373 @@
+{
+ "cells": [
+ {
+ "cell_type": "code",
+ "execution_count": 70,
+ "id": "47053bac",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF), Version
2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "aa881240-2f38-4335-9d4d-444776d77c92",
+ "metadata": {},
+ "source": [
+ "# Use Apache Beam and Milvus to enrich data\n",
+ "\n",
+ "<table align=\"left\">\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/milvus_enrichment_transform.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\"
/>Run in Google Colab</a>\n",
+ " </td>\n",
+ " <td>\n",
+ " <a target=\"_blank\"
href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/milvus_enrichment_transform.ipynb\"><img
src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\"
/>View source on GitHub</a>\n",
+ " </td>\n",
+ "</table>"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "0611da21-d031-4b16-8301-9b76bda731e7",
+ "metadata": {},
+ "source": [
+ "This notebook shows how to enrich data by using the Apache Beam
[enrichment
transform](https://beam.apache.org/documentation/transforms/python/elementwise/enrichment/)
with [Milvus](https://milvus.io/). The enrichment transform is an Apache Beam
turnkey transform that lets you enrich data by using a key-value lookup. This
transform has the following features:\n",
+ "\n",
+ "- The transform has a built-in Apache Beam handler that interacts with
Milvus data during enrichment.\n",
+ "- The enrichment transform uses client-side throttling to rate limit the
requests. The default retry strategy uses exponential backoff. You can
configure rate limiting to suit your use case.\n",
+ "\n",
+ "This notebook demonstrates the following search engine optimization use
case:\n",
+ "\n",
+ "A specialized technical search engine company wants to improve its query
result relevance by dynamically enriching search results with semantically
related content. The example uses a vector database of technical articles and
documentation stored in Milvus to enrich incoming user queries. The enriched
data is then used to provide users with more comprehensive and contextually
relevant search results, especially for complex technical topics.\n",
+ "\n",
+ "## Before you begin\n",
+ "Set up your environment and download dependencies.\n",
+ "\n",
+ "### Install Apache Beam\n",
+ "To use the enrichment transform with the built-in Milvus handler, install
the Apache Beam SDK version 2.67.0 or later."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 71,
+ "id": "e550cd55-e91e-4d43-b1bd-b0e89bb8cbd9",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Disable tokenizers parallelism to prevent deadlocks when forking
processes\n",
+ "# This avoids the \"huggingface/tokenizers: The current process just got
forked\" warning.\n",
+ "import os\n",
+ "os.environ[\"TOKENIZERS_PARALLELISM\"] = \"false\""
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 72,
+ "id": "31747c45-107a-49be-8885-5a6cc9dc1236",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "\n",
+
"\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m
A new release of pip is available:
\u001b[0m\u001b[31;49m23.0.1\u001b[0m\u001b[39;49m ->
\u001b[0m\u001b[32;49m25.2\u001b[0m\n",
+
"\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m
To update, run: \u001b[0m\u001b[32;49mpip install --upgrade pip\u001b[0m\n",
+ "\n",
+
"\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m
A new release of pip is available:
\u001b[0m\u001b[31;49m23.0.1\u001b[0m\u001b[39;49m ->
\u001b[0m\u001b[32;49m25.2\u001b[0m\n",
+
"\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m
To update, run: \u001b[0m\u001b[32;49mpip install --upgrade pip\u001b[0m\n"
+ ]
+ }
+ ],
+ "source": [
+ "# The Apache Beam test dependencies are included here for the
TestContainers\n",
+ "# Milvus standalone DB container that will be used later in the demo.\n",
+ "!pip install rich sentence_transformers llama_index --quiet\n",
+ "!pip install apache_beam[milvus,gcp,test,interactive]>=2.67.0 --quiet"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 73,
+ "id": "666e0c2b-0341-4b0e-8d73-561abc39bb10",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Standard library imports.\n",
+ "from collections import defaultdict\n",
+ "from math import ceil\n",
+ "from typing import List\n",
+ "\n",
+ "# Third-party imports.\n",
+ "import numpy as np\n",
+ "import pandas as pd\n",
+ "from pymilvus import DataType, CollectionSchema, FieldSchema, Function,
FunctionType, MilvusClient, RRFRanker\n",
+ "from pymilvus.milvus_client import IndexParams\n",
+ "from rich import print_json\n",
+ "from sentence_transformers import SentenceTransformer\n",
+ "from torch import cuda\n",
+ "from llama_index.core.text_splitter import SentenceSplitter\n",
+ "\n",
+ "# Local application imports.\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.ml.rag.types import Chunk, Content, Embedding\n",
+ "from apache_beam.transforms.enrichment import Enrichment\n",
+ "from apache_beam.ml.rag.enrichment.milvus_search_it_test import
MilvusEnrichmentTestHelper\n",
+ "from apache_beam.ml.rag.enrichment.milvus_search import (\n",
+ " HybridSearchParameters, \n",
+ " KeywordSearchMetrics, \n",
+ " KeywordSearchParameters,\n",
+ " MilvusCollectionLoadParameters, \n",
+ " MilvusConnectionParameters, \n",
+ " MilvusSearchEnrichmentHandler,\n",
+ " MilvusSearchParameters, \n",
+ " SearchStrategy, \n",
+ " VectorSearchMetrics, \n",
+ " VectorSearchParameters)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "338808ff-3f80-48e5-9c76-b8d19f8769b7",
+ "metadata": {},
+ "source": [
+ "## Collect Data"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "d83ad549-5ee1-4a4c-ae5a-e638c3d0279f",
+ "metadata": {},
+ "source": [
+ "This content has been paraphrased from publicly available information on
the internet using a large language model (OpenAI’s GPT-4) and is provided for
informational purposes only."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "d39a070a-206d-41f6-9033-fff0d5ea2128",
+ "metadata": {},
+ "source": [
+ "The third data point, related to Google Beam, was intentionally included
to illustrate the importance of metadata filtering (filtered search) in
Milvus—such as when a user searches for the term “Beam.” without it the vector
database retrieval engine may confuse between Apache Beam and Google Beam."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 74,
+ "id": "38781cf5-e18f-40f5-827e-2d441ae7d2fa",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "corpus = [\n",
+ " {\n",
+ " \"id\": \"1\",\n",
+ " \"title\": \"Apache Beam: Unified Model for Batch and Streaming
Data\",\n",
+ " \"keywords\": [\"Apache Beam\", \"stream processing\", \"batch
processing\", \"data pipelines\", \"SDK\"],\n",
+ " \"tags\": [\"Data Engineering\", \"Open Source\", \"Streaming\",
\"Batch\", \"Big Data\"],\n",
+ " \"content\": (\n",
+ " \"Apache Beam is an open-source framework that provides a
consistent programming model for both batch and streaming data processing.
\"\n",
+ " \"Developed originally by Google, it allows developers to write
pipelines that can run on multiple engines, such as Apache Flink, Spark, and
Google Cloud Dataflow. \"\n",
+ " \"Beam uses abstractions like PCollections (data containers) and
PTransforms (operations) to define the flow of data. \"\n",
+ " \"The framework promotes portability through its runner
architecture, letting the same pipeline execute on different backends. \"\n",
+ " \"Support for multiple SDKs, including Java and Python, makes it
accessible for a broad audience. \"\n",
+ " \"Key features include support for event time, windowing, triggers,
and stateful processing, which are essential for handling real-time data
effectively. \"\n",
+ " \"Beam is ideal for building ETL jobs, real-time analytics, and
machine learning data pipelines. \"\n",
+ " \"It helps teams focus on logic rather than infrastructure,
offering flexibility and scalability in handling unbounded and bounded data
sources. \"\n",
+ " \"Apache Beam also supports a wide range of connectors for both
input and output, including Kafka, BigQuery, and JDBC-based systems. \"\n",
+ " \"This makes it easy to integrate Beam into existing data
ecosystems. Developers can build reusable transforms and modularize pipeline
logic, improving maintainability and testing. \"\n",
+ " \"The concept of runners enables developers to write once and run
anywhere, which is particularly appealing for organizations that want to avoid
vendor lock-in. \"\n",
+ " \"The Beam model is based on a unified programming model that
decouples pipeline logic from execution. \"\n",
+ " \"This makes it easier to reason about time and state in both batch
and streaming pipelines. \"\n",
+ " \"Advanced features like late data handling, watermarks, and
session windowing allow for more accurate and meaningful processing of
real-world data. \"\n",
+ " \"Beam also integrates with orchestration tools and monitoring
systems, allowing for production-grade deployments. \"\n",
+ " \"Community support and contributions have grown significantly,
making Beam a stable and evolving ecosystem. \"\n",
+ " \"Many cloud providers offer native support for Beam pipelines, and
it's increasingly a core component in modern data platform architectures.\"\n",
+ " )\n",
+ " },\n",
+ " {\n",
+ " \"id\": \"2\",\n",
+ " \"title\": \"Google Cloud Dataflow: Run Apache Beam in the
Cloud\",\n",
+ " \"keywords\": [\"Google Cloud\", \"Dataflow\", \"Apache Beam\",
\"serverless\", \"stream and batch\"],\n",
+ " \"tags\": [\"Cloud Computing\", \"Data Pipelines\", \"Google Cloud\",
\"Serverless\", \"Enterprise\"],\n",
+ " \"content\": (\n",
+ " \"Google Cloud Dataflow is a fully managed service that runs Apache
Beam pipelines in the cloud. \"\n",
+ " \"It abstracts away infrastructure management and handles dynamic
scaling, load balancing, and fault tolerance. \"\n",
+ " \"Developers can focus on writing data logic using the Beam SDK and
deploy it easily to Google Cloud. \"\n",
+ " \"Dataflow supports both batch and stream processing and integrates
seamlessly with other Google services like BigQuery, Pub/Sub, and Cloud
Storage. \"\n",
+ " \"Its autoscaling capabilities allow it to adapt to changing data
volumes, optimizing for cost and performance. \"\n",
+ " \"Features like monitoring dashboards, job templates, and built-in
logging make it suitable for both development and production use. \"\n",
+ " \"With support for event time processing, stateful functions, and
windowing, Dataflow is well-suited for real-time analytics and data
transformation tasks. \"\n",
+ " \"It’s a key component for architects building scalable,
cloud-native data platforms. \"\n",
+ " \"Dataflow also offers templates for common ETL tasks, helping
teams get started quickly with minimal setup. \"\n",
+ " \"Its integration with Cloud Functions and Cloud Composer enables
event-driven and orchestrated workflows. \"\n",
+ " \"Security and compliance are built-in with IAM roles, encryption
at rest and in transit, and audit logging, making it suitable for enterprise
environments. \"\n",
+ " \"For developers, Dataflow provides local testing capabilities and
a unified logging system through Cloud Logging. \"\n",
+ " \"It also supports SQL-based pipeline definitions using BigQuery,
which lowers the barrier to entry for analysts and data engineers. \"\n",
+ " \"Dataflow’s streaming engine significantly improves performance
and reduces costs by decoupling compute and state management. \"\n",
+ " \"In summary, Google Cloud Dataflow not only simplifies the
deployment of Apache Beam pipelines but also enhances them with cloud-native
features. \"\n",
+ " \"Its managed runtime, high availability, and integration with the
broader Google Cloud ecosystem make it a powerful tool for modern data
processing.\"\n",
+ " )\n",
+ " },\n",
+ " {\n",
+ " \"id\": \"3\",\n",
+ " \"title\": \"Google Beam: 3D Communication Powered by AI\",\n",
+ " \"keywords\": [\"Google Beam\", \"Project Starline\", \"3D video\",
\"AI communication\", \"real-time meetings\"],\n",
+ " \"tags\": [\"AI\", \"Communication\", \"3D Technology\", \"Remote
Work\", \"Enterprise Tech\"],\n",
+ " \"content\": (\n",
+ " \"Google Beam is an innovative video communication platform that
builds on the research of Project Starline. It uses AI, 3D imaging, and light
field rendering to create immersive, lifelike video calls. \"\n",
+ " \"Designed to replicate in-person interaction, Beam allows users to
see life-sized, three-dimensional representations of each other without the
need for headsets. \"\n",
+ " \"This breakthrough makes remote conversations feel
natural—capturing facial expressions, eye contact, and subtle gestures that
traditional video conferencing often misses. \"\n",
+ " \"Beam reduces meeting fatigue and enhances engagement, making it
ideal for enterprise collaboration, interviews, and virtual presence scenarios.
\"\n",
+ " \"Powered by Google AI, Beam represents a significant leap in
communication technology. \"\n",
+ " \"Major companies like Salesforce, Deloitte, and NEC are already
exploring its impact on digital collaboration. \"\n",
+ " \"Google is partnering with HP to build and distribute Beam
hardware, designed to work with existing productivity and video tools. \"\n",
+ " \"Currently in limited early access for enterprise partners, Google
Beam aims to redefine virtual meetings by bridging the gap between digital and
physical presence. \"\n",
+ " \"It’s a promising step toward more human and effective remote
interactions.\"\n",
+ " )\n",
+ " }\n",
+ "]"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "758c2af7-12c7-477b-9257-3c88712960e7",
+ "metadata": {},
+ "source": [
+ "## Exploratory Data Analysis (EDA)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "5e751905-7217-4571-bc07-991ef850a6b2",
+ "metadata": {},
+ "source": [
+ "### Average Words/Tokens per Doc"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 75,
+ "id": "489e93b6-de41-4ec3-be33-a15c3cba12e8",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "<div>\n",
+ "<style scoped>\n",
+ " .dataframe tbody tr th:only-of-type {\n",
+ " vertical-align: middle;\n",
+ " }\n",
+ "\n",
+ " .dataframe tbody tr th {\n",
+ " vertical-align: top;\n",
+ " }\n",
+ "\n",
+ " .dataframe thead th {\n",
+ " text-align: right;\n",
+ " }\n",
+ "</style>\n",
+ "<table border=\"1\" class=\"dataframe\">\n",
+ " <thead>\n",
+ " <tr style=\"text-align: right;\">\n",
+ " <th></th>\n",
+ " <th># Words</th>\n",
+ " </tr>\n",
+ " </thead>\n",
+ " <tbody>\n",
+ " <tr>\n",
+ " <th>count</th>\n",
+ " <td>3.000000</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>mean</th>\n",
+ " <td>253.666667</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>std</th>\n",
+ " <td>72.858310</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>min</th>\n",
+ " <td>172.000000</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>25%</th>\n",
+ " <td>224.500000</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>50%</th>\n",
+ " <td>277.000000</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>75%</th>\n",
+ " <td>294.500000</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>max</th>\n",
+ " <td>312.000000</td>\n",
+ " </tr>\n",
+ " </tbody>\n",
+ "</table>\n",
+ "</div>"
+ ],
+ "text/plain": [
+ " # Words\n",
+ "count 3.000000\n",
+ "mean 253.666667\n",
+ "std 72.858310\n",
+ "min 172.000000\n",
+ "25% 224.500000\n",
+ "50% 277.000000\n",
+ "75% 294.500000\n",
+ "max 312.000000"
+ ]
+ },
+ "execution_count": 75,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "# The second video may skew the average tokens results since it is a
youtube short video.\n",
+ "contents = [c['content'] for c in corpus]\n",
+ "content_lengths = [len(content.split(\" \")) for content in contents]\n",
+ "df = pd.DataFrame(content_lengths, columns=['# Words'])\n",
+ "df.describe()"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 76,
+ "id": "eb32aad0-febd-45af-b4bd-e2176b07e2dc",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "The mean word count for each video is about 254 words, which
corresponds to a rough token count of 331 tokens.\n"
+ ]
+ }
+ ],
+ "source": [
+ "mean_word_count = ceil(np.mean(content_lengths))\n",
+ "token_to_word_ratio = 1.3\n",
+ "approx_token_count = ceil(mean_word_count * token_to_word_ratio)\n",
+ "print(f'The mean word count for each video is about {mean_word_count}
words, which corresponds to a rough token count of {approx_token_count}
tokens.')"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "42c1c159-875d-411b-a009-4361301b39f6",
+ "metadata": {},
+ "source": [
+ "## Preprocess Data"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "d545355e-41da-4c53-ba9a-4d33b1fe376c",
+ "metadata": {},
+ "source": [
+ "### Chunking"
Review Comment:
> Could we use
https://beam.apache.org/releases/pydoc/2.64.0/apache_beam.ml.rag.chunking.base.html#module-apache_beam.ml.rag.chunking.base
for chunking?
>
> It would be great if we just made this whole process a beam pipeline
Updated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]