[
https://issues.apache.org/jira/browse/BEAM-14068?focusedWorklogId=778215&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-778215
]
ASF GitHub Bot logged work on BEAM-14068:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 03/Jun/22 17:27
Start Date: 03/Jun/22 17:27
Worklog Time Spent: 10m
Work Description: AnandInguva commented on code in PR #17462:
URL: https://github.com/apache/beam/pull/17462#discussion_r889185085
##########
sdks/python/apache_beam/examples/inference/pytorch_image_classification.py:
##########
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Pipeline that uses RunInference API to perform classification task on
imagenet dataset""" # pylint: disable=line-too-long
+
+import argparse
+import io
+import os
+from functools import partial
+from typing import Any
+from typing import Iterable
+from typing import Tuple
+from typing import Union
+
+import apache_beam as beam
+import torch
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.api import PredictionResult
+from apache_beam.ml.inference.api import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelLoader
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from PIL import Image
+from torchvision import transforms
+from torchvision.models.mobilenetv2 import MobileNetV2
+
+
+def read_image(image_file_name: str,
+ path_to_dir: str = None) -> Tuple[str, Image.Image]:
+ if path_to_dir is not None:
+ image_file_name = os.path.join(path_to_dir, image_file_name)
+ with FileSystems().open(image_file_name, 'r') as file:
+ data = Image.open(io.BytesIO(file.read())).convert('RGB')
+ return image_file_name, data
+
+
+def preprocess_image(data: Image) -> torch.Tensor:
+ image_size = (224, 224)
+ # to use models in torch with imagenet weights,
+ # normalize the images using the below values.
+ # ref: https://pytorch.org/vision/stable/models.html#
+ normalize = transforms.Normalize(
+ mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
+ transform = transforms.Compose([
+ transforms.Resize(image_size),
+ transforms.ToTensor(),
+ normalize,
+ ])
+ return transform(data)
+
+
+class PostProcessor(beam.DoFn):
+ def process(
+ self, element: Union[PredictionResult, Tuple[Any, PredictionResult]]
Review Comment:
I had what you suggested here but I got this errror.
`apache_beam.typehints.decorators.TypeCheckError: Type hint violation for
'ProcessOutput': requires Tuple[Any, PredictionResult] but got
Union[PredictionResult, Tuple[Any, PredictionResult]] for element`.
Output defined here:
https://github.com/apache/beam/blob/6d6a54d21b483c20005debcfd0b6d9ff6739ec80/sdks/python/apache_beam/ml/inference/api.py#L39
I am not sure of the behavior though. It should be able to accept `element:
Tuple[str, PredictionResult]]` but to avoid this error, I changed the code
Issue Time Tracking
-------------------
Worklog Id: (was: 778215)
Time Spent: 8h 40m (was: 8.5h)
> RunInference Benchmarking tests
> -------------------------------
>
> Key: BEAM-14068
> URL: https://issues.apache.org/jira/browse/BEAM-14068
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-core
> Reporter: Anand Inguva
> Assignee: Anand Inguva
> Priority: P2
> Time Spent: 8h 40m
> Remaining Estimate: 0h
>
> RunInference benchmarks will evaluate performance of Pipelines, which
> represent common use cases of Beam + Dataflow in Pytorch, sklearn and
> possibly TFX. These benchmarks would be the integration tests that exercise
> several software components using Beam, PyTorch, Scikit learn and TensorFlow
> extended.
> we would use the datasets that's available publicly (Eg; Kaggle).
> Size: small / 10 GB / 1 TB etc
> The default execution runner would be Dataflow unless specified otherwise.
> These tests would be run very less frequently(every release cycle).
--
This message was sent by Atlassian Jira
(v8.20.7#820007)