gemini-code-assist[bot] commented on code in PR #37628:
URL: https://github.com/apache/beam/pull/37628#discussion_r2819212803


##########
sdks/python/apache_beam/ml/rag/embeddings/huggingface.py:
##########
@@ -73,3 +94,97 @@ def get_ptransform_for_processing(
     return RunInference(
         model_handler=_TextEmbeddingHandler(self),
         inference_args=self.inference_args).with_output_types(EmbeddableItem)
+
+
+def _extract_images(items: Sequence[EmbeddableItem]) -> list:
+  """Extract images from items and convert to PIL.Image objects."""
+  images = []
+  for item in items:
+    if not item.content.image:
+      raise ValueError(
+          "Expected image content in "
+          f"{type(item).__name__} {item.id}, "
+          "got None")
+    img_data = item.content.image
+    if isinstance(img_data, bytes):
+      img = PILImage.open(io.BytesIO(img_data))
+    else:
+      img = PILImage.open(img_data)

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The current implementation of `_extract_images` uses 
`PILImage.open(img_data)` directly for string paths. This will fail for URIs 
like `gs://`. Use Beam's `FileSystems` to open the file, which provides a 
consistent interface for both local and remote paths.
   
   ```suggestion
       if isinstance(img_data, bytes):
         img = PILImage.open(io.BytesIO(img_data))
       else:
         with FileSystems.open(img_data) as f:
           img = PILImage.open(io.BytesIO(f.read()))
   ```



##########
sdks/python/apache_beam/ml/rag/embeddings/huggingface.py:
##########
@@ -16,13 +16,18 @@
 
 """RAG-specific embedding implementations using HuggingFace models."""
 
+import io
+from collections.abc import Sequence
 from typing import Optional
 
 import apache_beam as beam
 from apache_beam.ml.inference.base import RunInference

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   To support loading images from remote filesystems like GCS, we need to 
import `FileSystems`.
   
   ```suggestion
   import apache_beam as beam
   from apache_beam.io.filesystems import FileSystems
   from apache_beam.ml.inference.base import RunInference
   ```



##########
sdks/python/apache_beam/ml/rag/embeddings/vertex_ai.py:
##########
@@ -90,9 +97,100 @@ def get_model_handler(self):
 
   def get_ptransform_for_processing(
       self, **kwargs
-  ) -> beam.PTransform[beam.PCollection[EmbeddableItem],
-                       beam.PCollection[EmbeddableItem]]:
+  ) -> beam.PTransform[beam.PCollection[Chunk], beam.PCollection[Chunk]]:
     """Returns PTransform that uses the RAG adapter."""
     return RunInference(
         model_handler=_TextEmbeddingHandler(self),
+        inference_args=self.inference_args).with_output_types(Chunk)
+
+
+def _extract_images(items: Sequence[EmbeddableItem]) -> list:
+  """Extract images from items and convert to vertexai Image objects."""
+  images = []
+  for item in items:
+    if not item.content.image:
+      raise ValueError(
+          "Expected image content in "
+          f"{type(item).__name__} {item.id}, "
+          "got None")
+    img_data = item.content.image
+    if isinstance(img_data, bytes):
+      images.append(Image(image_bytes=img_data))
+    else:
+      images.append(Image.load_from_file(img_data))

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The Vertex AI SDK `Image` class supports GCS URIs directly via the `gcs_uri` 
parameter. The current implementation uses `load_from_file` for all string 
inputs, which is intended for local paths and will fail for `gs://` URIs. 
Update the logic to detect GCS paths and use the appropriate parameter.
   
   ```suggestion
       if isinstance(img_data, bytes):
         images.append(Image(image_bytes=img_data))
       elif img_data.startswith('gs://'):
         images.append(Image(gcs_uri=img_data))
       else:
         images.append(Image.load_from_file(img_data))
   ```



##########
sdks/python/apache_beam/ml/rag/types.py:
##########
@@ -121,6 +152,19 @@ def dense_embedding(self) -> Optional[List[float]]:
   def sparse_embedding(self) -> Optional[Tuple[List[int], List[float]]]:
     return self.embedding.sparse_embedding if self.embedding else None
 
+  @property
+  def content_string(self) -> str:
+    """Returns storable string content for ingestion.
+
+    Falls back through content fields in priority order:
+    text > image URI.
+    """
+    if self.content.text is not None:
+      return self.content.text
+    if isinstance(self.content.image, str):
+      return self.content.image
+    raise ValueError(f'EmbeddableItem does not contain text content. {self}')

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The error message here is slightly misleading because `content_string` is 
intended to return either text content or an image URI for ingestion. If it 
fails, it means neither is available (e.g., the item contains raw image bytes).
   
   ```suggestion
       raise ValueError(f'EmbeddableItem does not contain storable string 
content (text or image URI). {self}')
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to