This is an automated email from the ASF dual-hosted git repository.
jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 96713243839 Vertex AI Model Handler Example Improvements (#27580)
96713243839 is described below
commit 967132438394390a847cfe322fc105cc396f196e
Author: Jack McCluskey <[email protected]>
AuthorDate: Mon Jul 24 12:06:33 2023 -0400
Vertex AI Model Handler Example Improvements (#27580)
* Make example compatible w/ Python 3.8
* Add support for file globs to example
---
.../examples/inference/vertex_ai_image_classification.py | 13 +++++++------
1 file changed, 7 insertions(+), 6 deletions(-)
diff --git
a/sdks/python/apache_beam/examples/inference/vertex_ai_image_classification.py
b/sdks/python/apache_beam/examples/inference/vertex_ai_image_classification.py
index 26f59aa380a..f8f9e803f6f 100644
---
a/sdks/python/apache_beam/examples/inference/vertex_ai_image_classification.py
+++
b/sdks/python/apache_beam/examples/inference/vertex_ai_image_classification.py
@@ -28,10 +28,12 @@ import argparse
import io
import logging
from typing import Iterable
+from typing import List
from typing import Tuple
import apache_beam as beam
import tensorflow as tf
+from apache_beam.io import fileio
from apache_beam.io.filesystems import FileSystems
from apache_beam.ml.inference.base import KeyedModelHandler
from apache_beam.ml.inference.base import PredictionResult
@@ -51,7 +53,7 @@ def parse_known_args(argv):
dest='input',
type=str,
required=True,
- help='File path to read an image from.')
+ help='File glob to read images from.')
parser.add_argument(
'--output',
dest='output',
@@ -93,7 +95,7 @@ def read_image(image_file_name: str) -> Tuple[str, bytes]:
return image_file_name, data
-def preprocess_image(data: bytes) -> list[float]:
+def preprocess_image(data: bytes) -> List[float]:
"""Preprocess the image, resizing it and normalizing it before
converting to a list.
"""
@@ -134,11 +136,10 @@ def run(
if not test_pipeline:
pipeline = beam.Pipeline(options=pipeline_options)
- # TODO: Process a glob of files instead of a single file name.
- read_image_name = pipeline | "Get file name" >> beam.Create(
- [known_args.input])
+ read_glob = pipeline | "Get glob" >> beam.Create([known_args.input])
+ read_image_name = read_glob | "Get Image Paths" >> fileio.MatchAll()
load_image = read_image_name | "Read Image" >> beam.Map(
- lambda image_name: read_image(image_name))
+ lambda image_name: read_image(image_name.path))
preprocess = load_image | "Preprocess Image" >> beam.MapTuple(
lambda img_name, img: (img_name, preprocess_image(img)))
predictions = preprocess | "RunInference" >> RunInference(