gemini-code-assist[bot] commented on code in PR #37628:
URL: https://github.com/apache/beam/pull/37628#discussion_r2819212803
##########
sdks/python/apache_beam/ml/rag/embeddings/huggingface.py:
##########
@@ -73,3 +94,97 @@ def get_ptransform_for_processing(
return RunInference(
model_handler=_TextEmbeddingHandler(self),
inference_args=self.inference_args).with_output_types(EmbeddableItem)
+
+
+def _extract_images(items: Sequence[EmbeddableItem]) -> list:
+ """Extract images from items and convert to PIL.Image objects."""
+ images = []
+ for item in items:
+ if not item.content.image:
+ raise ValueError(
+ "Expected image content in "
+ f"{type(item).__name__} {item.id}, "
+ "got None")
+ img_data = item.content.image
+ if isinstance(img_data, bytes):
+ img = PILImage.open(io.BytesIO(img_data))
+ else:
+ img = PILImage.open(img_data)
Review Comment:

The current implementation of `_extract_images` uses
`PILImage.open(img_data)` directly for string paths. This will fail for URIs
like `gs://`. Use Beam's `FileSystems` to open the file, which provides a
consistent interface for both local and remote paths.
```suggestion
if isinstance(img_data, bytes):
img = PILImage.open(io.BytesIO(img_data))
else:
with FileSystems.open(img_data) as f:
img = PILImage.open(io.BytesIO(f.read()))
```
##########
sdks/python/apache_beam/ml/rag/embeddings/huggingface.py:
##########
@@ -16,13 +16,18 @@
"""RAG-specific embedding implementations using HuggingFace models."""
+import io
+from collections.abc import Sequence
from typing import Optional
import apache_beam as beam
from apache_beam.ml.inference.base import RunInference
Review Comment:

To support loading images from remote filesystems like GCS, we need to
import `FileSystems`.
```suggestion
import apache_beam as beam
from apache_beam.io.filesystems import FileSystems
from apache_beam.ml.inference.base import RunInference
```
##########
sdks/python/apache_beam/ml/rag/embeddings/vertex_ai.py:
##########
@@ -90,9 +97,100 @@ def get_model_handler(self):
def get_ptransform_for_processing(
self, **kwargs
- ) -> beam.PTransform[beam.PCollection[EmbeddableItem],
- beam.PCollection[EmbeddableItem]]:
+ ) -> beam.PTransform[beam.PCollection[Chunk], beam.PCollection[Chunk]]:
"""Returns PTransform that uses the RAG adapter."""
return RunInference(
model_handler=_TextEmbeddingHandler(self),
+ inference_args=self.inference_args).with_output_types(Chunk)
+
+
+def _extract_images(items: Sequence[EmbeddableItem]) -> list:
+ """Extract images from items and convert to vertexai Image objects."""
+ images = []
+ for item in items:
+ if not item.content.image:
+ raise ValueError(
+ "Expected image content in "
+ f"{type(item).__name__} {item.id}, "
+ "got None")
+ img_data = item.content.image
+ if isinstance(img_data, bytes):
+ images.append(Image(image_bytes=img_data))
+ else:
+ images.append(Image.load_from_file(img_data))
Review Comment:

The Vertex AI SDK `Image` class supports GCS URIs directly via the `gcs_uri`
parameter. The current implementation uses `load_from_file` for all string
inputs, which is intended for local paths and will fail for `gs://` URIs.
Update the logic to detect GCS paths and use the appropriate parameter.
```suggestion
if isinstance(img_data, bytes):
images.append(Image(image_bytes=img_data))
elif img_data.startswith('gs://'):
images.append(Image(gcs_uri=img_data))
else:
images.append(Image.load_from_file(img_data))
```
##########
sdks/python/apache_beam/ml/rag/types.py:
##########
@@ -121,6 +152,19 @@ def dense_embedding(self) -> Optional[List[float]]:
def sparse_embedding(self) -> Optional[Tuple[List[int], List[float]]]:
return self.embedding.sparse_embedding if self.embedding else None
+ @property
+ def content_string(self) -> str:
+ """Returns storable string content for ingestion.
+
+ Falls back through content fields in priority order:
+ text > image URI.
+ """
+ if self.content.text is not None:
+ return self.content.text
+ if isinstance(self.content.image, str):
+ return self.content.image
+ raise ValueError(f'EmbeddableItem does not contain text content. {self}')
Review Comment:

The error message here is slightly misleading because `content_string` is
intended to return either text content or an image URI for ingestion. If it
fails, it means neither is available (e.g., the item contains raw image bytes).
```suggestion
raise ValueError(f'EmbeddableItem does not contain storable string
content (text or image URI). {self}')
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]