[
https://issues.apache.org/jira/browse/BEAM-14068?focusedWorklogId=778064&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-778064
]
ASF GitHub Bot logged work on BEAM-14068:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 03/Jun/22 12:32
Start Date: 03/Jun/22 12:32
Worklog Time Spent: 10m
Work Description: tvalentyn commented on code in PR #17462:
URL: https://github.com/apache/beam/pull/17462#discussion_r888834446
##########
sdks/python/apache_beam/examples/inference/pytorch_image_classification.py:
##########
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Pipeline that uses RunInference API to perform classification task on
imagenet dataset""" # pylint: disable=line-too-long
Review Comment:
```suggestion
"""A pipeline that uses RunInference API to perform image classification."""
```
If you want to add more details (for example about imagenet or Image, you
can continue in expanded(multiline) docstring.
For example:
```
The pipeline uses a pre-trained PyTorch MobileNetV2 model (link) and sample
images from ImageNet dataset (link) .
```
##########
sdks/python/apache_beam/examples/inference/pytorch_image_classification.py:
##########
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Pipeline that uses RunInference API to perform classification task on
imagenet dataset""" # pylint: disable=line-too-long
+
+import argparse
+import io
+import os
+from functools import partial
+from typing import Any
+from typing import Iterable
+from typing import Tuple
+from typing import Union
+
+import apache_beam as beam
+import torch
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.api import PredictionResult
+from apache_beam.ml.inference.api import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelLoader
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from PIL import Image
+from torchvision import transforms
+from torchvision.models.mobilenetv2 import MobileNetV2
+
+
+def read_image(image_file_name: str,
+ path_to_dir: str = None) -> Tuple[str, Image.Image]:
+ if path_to_dir is not None:
+ image_file_name = os.path.join(path_to_dir, image_file_name)
+ with FileSystems().open(image_file_name, 'r') as file:
+ data = Image.open(io.BytesIO(file.read())).convert('RGB')
+ return image_file_name, data
+
+
+def preprocess_image(data: Image) -> torch.Tensor:
+ image_size = (224, 224)
+ # to use models in torch with imagenet weights,
+ # normalize the images using the below values.
+ # ref: https://pytorch.org/vision/stable/models.html#
+ normalize = transforms.Normalize(
+ mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
+ transform = transforms.Compose([
+ transforms.Resize(image_size),
+ transforms.ToTensor(),
+ normalize,
+ ])
+ return transform(data)
+
+
+class PostProcessor(beam.DoFn):
+ def process(
+ self, element: Union[PredictionResult, Tuple[Any, PredictionResult]]
+ ) -> Iterable[str]:
+ filename, prediction_result = element
+ prediction = torch.argmax(prediction_result.inference, dim=0)
+ yield filename + ',' + str(prediction.item())
+
+
+def run_pipeline(options: PipelineOptions, args=None):
+ """Sets up PyTorch RunInference pipeline"""
+ # reference to the class definition of the model.
+ model_class = MobileNetV2
+ # params for model class constructor. These values will be used in
+ # RunInference API to instantiate the model object.
+ model_params = {'num_classes': 1000} # imagenet has 1000 classes.
+ # for this example, the pretrained weights are downloaded from
+ # "https://download.pytorch.org/models/mobilenet_v2-b0353104.pth"
+ # and saved on GCS bucket
gs://apache-beam-ml/models/imagenet_classification_mobilenet_v2.pt,
+ # which will be used to load the model state_dict in the RunInference API.
+ model_loader = PytorchModelLoader(
+ state_dict_path=args.model_state_dict_path,
+ model_class=model_class,
+ model_params=model_params)
+ with beam.Pipeline(options=options) as p:
+ filename_value_pair = (
+ p
+ | 'Read from csv file' >> beam.io.ReadFromText(
+ args.input, skip_header_lines=1)
+ | 'Parse and read files from the input_file' >> beam.Map(
Review Comment:
These constant become step names; long names may not look well in the UI.
From what I've seen in examples, this is usually more concise, e.g.:
"ReadImageNames", "ReadImages", "PreprocessImages", etc.
##########
sdks/python/apache_beam/examples/inference/pytorch_image_classification.py:
##########
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Pipeline that uses RunInference API to perform classification task on
imagenet dataset""" # pylint: disable=line-too-long
+
+import argparse
+import io
+import os
+from functools import partial
+from typing import Any
+from typing import Iterable
+from typing import Tuple
+from typing import Union
+
+import apache_beam as beam
+import torch
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.api import PredictionResult
+from apache_beam.ml.inference.api import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelLoader
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from PIL import Image
+from torchvision import transforms
+from torchvision.models.mobilenetv2 import MobileNetV2
+
+
+def read_image(image_file_name: str,
+ path_to_dir: str = None) -> Tuple[str, Image.Image]:
+ if path_to_dir is not None:
+ image_file_name = os.path.join(path_to_dir, image_file_name)
+ with FileSystems().open(image_file_name, 'r') as file:
+ data = Image.open(io.BytesIO(file.read())).convert('RGB')
+ return image_file_name, data
+
+
+def preprocess_image(data: Image) -> torch.Tensor:
+ image_size = (224, 224)
+ # to use models in torch with imagenet weights,
+ # normalize the images using the below values.
+ # ref: https://pytorch.org/vision/stable/models.html#
+ normalize = transforms.Normalize(
+ mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
+ transform = transforms.Compose([
+ transforms.Resize(image_size),
+ transforms.ToTensor(),
+ normalize,
+ ])
+ return transform(data)
+
+
+class PostProcessor(beam.DoFn):
+ def process(
+ self, element: Union[PredictionResult, Tuple[Any, PredictionResult]]
+ ) -> Iterable[str]:
+ filename, prediction_result = element
+ prediction = torch.argmax(prediction_result.inference, dim=0)
+ yield filename + ',' + str(prediction.item())
+
+
+def run_pipeline(options: PipelineOptions, args=None):
+ """Sets up PyTorch RunInference pipeline"""
+ # reference to the class definition of the model.
+ model_class = MobileNetV2
+ # params for model class constructor. These values will be used in
+ # RunInference API to instantiate the model object.
+ model_params = {'num_classes': 1000} # imagenet has 1000 classes.
+ # for this example, the pretrained weights are downloaded from
+ # "https://download.pytorch.org/models/mobilenet_v2-b0353104.pth"
+ # and saved on GCS bucket
gs://apache-beam-ml/models/imagenet_classification_mobilenet_v2.pt,
+ # which will be used to load the model state_dict in the RunInference API.
+ model_loader = PytorchModelLoader(
+ state_dict_path=args.model_state_dict_path,
+ model_class=model_class,
+ model_params=model_params)
+ with beam.Pipeline(options=options) as p:
+ filename_value_pair = (
+ p
+ | 'Read from csv file' >> beam.io.ReadFromText(
+ args.input, skip_header_lines=1)
+ | 'Parse and read files from the input_file' >> beam.Map(
+ partial(read_image, path_to_dir=args.images_dir))
+ | 'Preprocess images' >> beam.MapTuple(
+ lambda file_name, data: (file_name, preprocess_image(data))))
+ predictions = (
+ filename_value_pair
+ | 'PyTorch RunInference' >> RunInference(model_loader)
+ | 'Process output' >> beam.ParDo(PostProcessor()))
+
+ if args.output:
+ predictions | "Write output to GCS" >> beam.io.WriteToText( # pylint:
disable=expression-not-assigned
+ args.output,
+ shard_name_template='',
+ append_trailing_newlines=True)
+
+
+def parse_known_args(argv):
+ """Parses args for the workflow."""
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--input',
+ dest='input',
+ required=True,
+ help='Path to the CSV file containing image names')
Review Comment:
```suggestion
help='Path to the CSV file containing image names.')
```
##########
sdks/python/apache_beam/examples/inference/pytorch_image_classification.py:
##########
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Pipeline that uses RunInference API to perform classification task on
imagenet dataset""" # pylint: disable=line-too-long
+
+import argparse
+import io
+import os
+from functools import partial
+from typing import Any
+from typing import Iterable
+from typing import Tuple
+from typing import Union
+
+import apache_beam as beam
+import torch
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.api import PredictionResult
+from apache_beam.ml.inference.api import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelLoader
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from PIL import Image
+from torchvision import transforms
+from torchvision.models.mobilenetv2 import MobileNetV2
+
+
+def read_image(image_file_name: str,
+ path_to_dir: str = None) -> Tuple[str, Image.Image]:
+ if path_to_dir is not None:
+ image_file_name = os.path.join(path_to_dir, image_file_name)
+ with FileSystems().open(image_file_name, 'r') as file:
+ data = Image.open(io.BytesIO(file.read())).convert('RGB')
+ return image_file_name, data
+
+
+def preprocess_image(data: Image) -> torch.Tensor:
Review Comment:
Image.Image ?
##########
sdks/python/apache_beam/examples/inference/pytorch_image_classification.py:
##########
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Pipeline that uses RunInference API to perform classification task on
imagenet dataset""" # pylint: disable=line-too-long
+
+import argparse
+import io
+import os
+from functools import partial
+from typing import Any
+from typing import Iterable
+from typing import Tuple
+from typing import Union
+
+import apache_beam as beam
+import torch
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.api import PredictionResult
+from apache_beam.ml.inference.api import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelLoader
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from PIL import Image
+from torchvision import transforms
+from torchvision.models.mobilenetv2 import MobileNetV2
+
+
+def read_image(image_file_name: str,
+ path_to_dir: str = None) -> Tuple[str, Image.Image]:
+ if path_to_dir is not None:
+ image_file_name = os.path.join(path_to_dir, image_file_name)
+ with FileSystems().open(image_file_name, 'r') as file:
+ data = Image.open(io.BytesIO(file.read())).convert('RGB')
+ return image_file_name, data
+
+
+def preprocess_image(data: Image) -> torch.Tensor:
+ image_size = (224, 224)
+ # to use models in torch with imagenet weights,
+ # normalize the images using the below values.
+ # ref: https://pytorch.org/vision/stable/models.html#
+ normalize = transforms.Normalize(
+ mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
+ transform = transforms.Compose([
+ transforms.Resize(image_size),
+ transforms.ToTensor(),
+ normalize,
+ ])
+ return transform(data)
+
+
+class PostProcessor(beam.DoFn):
+ def process(
+ self, element: Union[PredictionResult, Tuple[Any, PredictionResult]]
+ ) -> Iterable[str]:
+ filename, prediction_result = element
+ prediction = torch.argmax(prediction_result.inference, dim=0)
+ yield filename + ',' + str(prediction.item())
+
+
+def run_pipeline(options: PipelineOptions, args=None):
+ """Sets up PyTorch RunInference pipeline"""
+ # reference to the class definition of the model.
+ model_class = MobileNetV2
+ # params for model class constructor. These values will be used in
+ # RunInference API to instantiate the model object.
+ model_params = {'num_classes': 1000} # imagenet has 1000 classes.
+ # for this example, the pretrained weights are downloaded from
+ # "https://download.pytorch.org/models/mobilenet_v2-b0353104.pth"
Review Comment:
Make this a default value for --model_state_dict_path, and move the comment
there?
##########
sdks/python/apache_beam/examples/inference/pytorch_image_classification.py:
##########
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Pipeline that uses RunInference API to perform classification task on
imagenet dataset""" # pylint: disable=line-too-long
+
+import argparse
+import io
+import os
+from functools import partial
+from typing import Any
+from typing import Iterable
+from typing import Tuple
+from typing import Union
+
+import apache_beam as beam
+import torch
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.api import PredictionResult
+from apache_beam.ml.inference.api import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelLoader
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from PIL import Image
+from torchvision import transforms
+from torchvision.models.mobilenetv2 import MobileNetV2
+
+
+def read_image(image_file_name: str,
+ path_to_dir: str = None) -> Tuple[str, Image.Image]:
+ if path_to_dir is not None:
+ image_file_name = os.path.join(path_to_dir, image_file_name)
+ with FileSystems().open(image_file_name, 'r') as file:
+ data = Image.open(io.BytesIO(file.read())).convert('RGB')
+ return image_file_name, data
+
+
+def preprocess_image(data: Image) -> torch.Tensor:
Review Comment:
I wonder why none of our checkers catch this.
##########
sdks/python/apache_beam/examples/inference/pytorch_image_classification.py:
##########
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Pipeline that uses RunInference API to perform classification task on
imagenet dataset""" # pylint: disable=line-too-long
+
+import argparse
+import io
+import os
+from functools import partial
+from typing import Any
+from typing import Iterable
+from typing import Tuple
+from typing import Union
+
+import apache_beam as beam
+import torch
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.api import PredictionResult
+from apache_beam.ml.inference.api import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelLoader
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from PIL import Image
+from torchvision import transforms
+from torchvision.models.mobilenetv2 import MobileNetV2
+
+
+def read_image(image_file_name: str,
+ path_to_dir: str = None) -> Tuple[str, Image.Image]:
+ if path_to_dir is not None:
+ image_file_name = os.path.join(path_to_dir, image_file_name)
+ with FileSystems().open(image_file_name, 'r') as file:
+ data = Image.open(io.BytesIO(file.read())).convert('RGB')
+ return image_file_name, data
+
+
+def preprocess_image(data: Image) -> torch.Tensor:
+ image_size = (224, 224)
+ # to use models in torch with imagenet weights,
+ # normalize the images using the below values.
+ # ref: https://pytorch.org/vision/stable/models.html#
+ normalize = transforms.Normalize(
+ mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
+ transform = transforms.Compose([
+ transforms.Resize(image_size),
+ transforms.ToTensor(),
+ normalize,
+ ])
+ return transform(data)
+
+
+class PostProcessor(beam.DoFn):
+ def process(
+ self, element: Union[PredictionResult, Tuple[Any, PredictionResult]]
+ ) -> Iterable[str]:
+ filename, prediction_result = element
+ prediction = torch.argmax(prediction_result.inference, dim=0)
+ yield filename + ',' + str(prediction.item())
+
+
+def run_pipeline(options: PipelineOptions, args=None):
+ """Sets up PyTorch RunInference pipeline"""
+ # reference to the class definition of the model.
+ model_class = MobileNetV2
+ # params for model class constructor. These values will be used in
+ # RunInference API to instantiate the model object.
+ model_params = {'num_classes': 1000} # imagenet has 1000 classes.
+ # for this example, the pretrained weights are downloaded from
+ # "https://download.pytorch.org/models/mobilenet_v2-b0353104.pth"
+ # and saved on GCS bucket
gs://apache-beam-ml/models/imagenet_classification_mobilenet_v2.pt,
+ # which will be used to load the model state_dict in the RunInference API.
+ model_loader = PytorchModelLoader(
+ state_dict_path=args.model_state_dict_path,
+ model_class=model_class,
+ model_params=model_params)
+ with beam.Pipeline(options=options) as p:
+ filename_value_pair = (
+ p
+ | 'Read from csv file' >> beam.io.ReadFromText(
+ args.input, skip_header_lines=1)
+ | 'Parse and read files from the input_file' >> beam.Map(
+ partial(read_image, path_to_dir=args.images_dir))
+ | 'Preprocess images' >> beam.MapTuple(
+ lambda file_name, data: (file_name, preprocess_image(data))))
+ predictions = (
+ filename_value_pair
+ | 'PyTorch RunInference' >> RunInference(model_loader)
+ | 'Process output' >> beam.ParDo(PostProcessor()))
+
+ if args.output:
+ predictions | "Write output to GCS" >> beam.io.WriteToText( # pylint:
disable=expression-not-assigned
+ args.output,
+ shard_name_template='',
+ append_trailing_newlines=True)
+
+
+def parse_known_args(argv):
+ """Parses args for the workflow."""
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--input',
+ dest='input',
Review Comment:
Add a default value for the input so that it's easier for the user to get
started w/ running this example?
##########
sdks/python/apache_beam/examples/inference/pytorch_image_classification.py:
##########
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Pipeline that uses RunInference API to perform classification task on
imagenet dataset""" # pylint: disable=line-too-long
+
+import argparse
+import io
+import os
+from functools import partial
+from typing import Any
+from typing import Iterable
+from typing import Tuple
+from typing import Union
+
+import apache_beam as beam
+import torch
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.api import PredictionResult
+from apache_beam.ml.inference.api import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelLoader
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from PIL import Image
+from torchvision import transforms
+from torchvision.models.mobilenetv2 import MobileNetV2
+
+
+def read_image(image_file_name: str,
+ path_to_dir: str = None) -> Tuple[str, Image.Image]:
+ if path_to_dir is not None:
+ image_file_name = os.path.join(path_to_dir, image_file_name)
+ with FileSystems().open(image_file_name, 'r') as file:
+ data = Image.open(io.BytesIO(file.read())).convert('RGB')
+ return image_file_name, data
+
+
+def preprocess_image(data: Image) -> torch.Tensor:
+ image_size = (224, 224)
+ # to use models in torch with imagenet weights,
+ # normalize the images using the below values.
+ # ref: https://pytorch.org/vision/stable/models.html#
+ normalize = transforms.Normalize(
+ mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
+ transform = transforms.Compose([
+ transforms.Resize(image_size),
+ transforms.ToTensor(),
+ normalize,
+ ])
+ return transform(data)
+
+
+class PostProcessor(beam.DoFn):
+ def process(
+ self, element: Union[PredictionResult, Tuple[Any, PredictionResult]]
Review Comment:
from looking at the code, shouldn't this hint be smth like `element:
Tuple[str, PredictionResult]]` ?
##########
sdks/python/apache_beam/examples/inference/pytorch_image_classification.py:
##########
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Pipeline that uses RunInference API to perform classification task on
imagenet dataset""" # pylint: disable=line-too-long
+
+import argparse
+import io
+import os
+from functools import partial
+from typing import Any
+from typing import Iterable
+from typing import Tuple
+from typing import Union
+
+import apache_beam as beam
+import torch
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.api import PredictionResult
+from apache_beam.ml.inference.api import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelLoader
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from PIL import Image
+from torchvision import transforms
+from torchvision.models.mobilenetv2 import MobileNetV2
+
+
+def read_image(image_file_name: str,
+ path_to_dir: str = None) -> Tuple[str, Image.Image]:
+ if path_to_dir is not None:
+ image_file_name = os.path.join(path_to_dir, image_file_name)
+ with FileSystems().open(image_file_name, 'r') as file:
+ data = Image.open(io.BytesIO(file.read())).convert('RGB')
+ return image_file_name, data
+
+
+def preprocess_image(data: Image) -> torch.Tensor:
+ image_size = (224, 224)
+ # to use models in torch with imagenet weights,
Review Comment:
Pre-trained PyTorch models expect input images normalized with the below
values (ref: ...)
##########
sdks/python/apache_beam/ml/inference/pytorch_inference_it_test.py:
##########
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pylint: skip-file
+
+"""End-to-End test for Pytorch Inference"""
+
+import logging
+import os
+import unittest
+import uuid
+
+import pytest
+
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.testing.test_pipeline import TestPipeline
+
+try:
+ import torch
+ from apache_beam.examples.inference import pytorch_image_classification
+except ImportError as e:
+ torch = None
+
+_EXPECTED_OUTPUTS = {
+
'gs://apache-beam-ml/datasets/imagenet/raw-data/validation/ILSVRC2012_val_00005001.JPEG':
'681',
Review Comment:
have you considered translating class ids into category names? In practice
IDs is sufficient, but perhaps for illustration purposes this would make the
example a little more approachable.
##########
sdks/python/apache_beam/examples/inference/pytorch_image_classification.py:
##########
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Pipeline that uses RunInference API to perform classification task on
imagenet dataset""" # pylint: disable=line-too-long
+
+import argparse
+import io
+import os
+from functools import partial
+from typing import Any
+from typing import Iterable
+from typing import Tuple
+from typing import Union
+
+import apache_beam as beam
+import torch
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.api import PredictionResult
+from apache_beam.ml.inference.api import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelLoader
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from PIL import Image
+from torchvision import transforms
+from torchvision.models.mobilenetv2 import MobileNetV2
+
+
+def read_image(image_file_name: str,
+ path_to_dir: str = None) -> Tuple[str, Image.Image]:
+ if path_to_dir is not None:
+ image_file_name = os.path.join(path_to_dir, image_file_name)
+ with FileSystems().open(image_file_name, 'r') as file:
+ data = Image.open(io.BytesIO(file.read())).convert('RGB')
+ return image_file_name, data
+
+
+def preprocess_image(data: Image) -> torch.Tensor:
+ image_size = (224, 224)
+ # to use models in torch with imagenet weights,
+ # normalize the images using the below values.
+ # ref: https://pytorch.org/vision/stable/models.html#
+ normalize = transforms.Normalize(
+ mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
+ transform = transforms.Compose([
+ transforms.Resize(image_size),
+ transforms.ToTensor(),
+ normalize,
+ ])
+ return transform(data)
+
+
+class PostProcessor(beam.DoFn):
+ def process(
+ self, element: Union[PredictionResult, Tuple[Any, PredictionResult]]
+ ) -> Iterable[str]:
+ filename, prediction_result = element
+ prediction = torch.argmax(prediction_result.inference, dim=0)
+ yield filename + ',' + str(prediction.item())
+
+
+def run_pipeline(options: PipelineOptions, args=None):
+ """Sets up PyTorch RunInference pipeline"""
+ # reference to the class definition of the model.
+ model_class = MobileNetV2
+ # params for model class constructor. These values will be used in
+ # RunInference API to instantiate the model object.
+ model_params = {'num_classes': 1000} # imagenet has 1000 classes.
+ # for this example, the pretrained weights are downloaded from
+ # "https://download.pytorch.org/models/mobilenet_v2-b0353104.pth"
+ # and saved on GCS bucket
gs://apache-beam-ml/models/imagenet_classification_mobilenet_v2.pt,
+ # which will be used to load the model state_dict in the RunInference API.
+ model_loader = PytorchModelLoader(
+ state_dict_path=args.model_state_dict_path,
+ model_class=model_class,
+ model_params=model_params)
+ with beam.Pipeline(options=options) as p:
+ filename_value_pair = (
+ p
+ | 'Read from csv file' >> beam.io.ReadFromText(
+ args.input, skip_header_lines=1)
+ | 'Parse and read files from the input_file' >> beam.Map(
+ partial(read_image, path_to_dir=args.images_dir))
+ | 'Preprocess images' >> beam.MapTuple(
+ lambda file_name, data: (file_name, preprocess_image(data))))
+ predictions = (
+ filename_value_pair
+ | 'PyTorch RunInference' >> RunInference(model_loader)
+ | 'Process output' >> beam.ParDo(PostProcessor()))
+
+ if args.output:
+ predictions | "Write output to GCS" >> beam.io.WriteToText( # pylint:
disable=expression-not-assigned
+ args.output,
+ shard_name_template='',
+ append_trailing_newlines=True)
+
+
+def parse_known_args(argv):
+ """Parses args for the workflow."""
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--input',
+ dest='input',
+ required=True,
+ help='Path to the CSV file containing image names')
+ parser.add_argument(
+ '--output',
+ dest='output',
+ help='Predictions are saved to the output'
+ ' text file.')
+ parser.add_argument(
+ '--model_state_dict_path',
+ dest='model_state_dict_path',
+ required=True,
+ help='Path to load the model.')
Review Comment:
Should this be sth like:
```suggestion
help='Path to the MobileNetV2 model weights.')
```
##########
sdks/python/apache_beam/examples/inference/pytorch_image_classification.py:
##########
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Pipeline that uses RunInference API to perform classification task on
imagenet dataset""" # pylint: disable=line-too-long
+
+import argparse
+import io
+import os
+from functools import partial
+from typing import Any
+from typing import Iterable
+from typing import Tuple
+from typing import Union
+
+import apache_beam as beam
+import torch
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.api import PredictionResult
+from apache_beam.ml.inference.api import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelLoader
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from PIL import Image
+from torchvision import transforms
+from torchvision.models.mobilenetv2 import MobileNetV2
+
+
+def read_image(image_file_name: str,
+ path_to_dir: str = None) -> Tuple[str, Image.Image]:
+ if path_to_dir is not None:
+ image_file_name = os.path.join(path_to_dir, image_file_name)
+ with FileSystems().open(image_file_name, 'r') as file:
+ data = Image.open(io.BytesIO(file.read())).convert('RGB')
+ return image_file_name, data
+
+
+def preprocess_image(data: Image) -> torch.Tensor:
+ image_size = (224, 224)
+ # to use models in torch with imagenet weights,
+ # normalize the images using the below values.
+ # ref: https://pytorch.org/vision/stable/models.html#
+ normalize = transforms.Normalize(
+ mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
+ transform = transforms.Compose([
+ transforms.Resize(image_size),
+ transforms.ToTensor(),
+ normalize,
+ ])
+ return transform(data)
+
+
+class PostProcessor(beam.DoFn):
+ def process(
+ self, element: Union[PredictionResult, Tuple[Any, PredictionResult]]
+ ) -> Iterable[str]:
+ filename, prediction_result = element
+ prediction = torch.argmax(prediction_result.inference, dim=0)
+ yield filename + ',' + str(prediction.item())
+
+
+def run_pipeline(options: PipelineOptions, args=None):
+ """Sets up PyTorch RunInference pipeline"""
+ # reference to the class definition of the model.
+ model_class = MobileNetV2
+ # params for model class constructor. These values will be used in
+ # RunInference API to instantiate the model object.
+ model_params = {'num_classes': 1000} # imagenet has 1000 classes.
+ # for this example, the pretrained weights are downloaded from
+ # "https://download.pytorch.org/models/mobilenet_v2-b0353104.pth"
+ # and saved on GCS bucket
gs://apache-beam-ml/models/imagenet_classification_mobilenet_v2.pt,
+ # which will be used to load the model state_dict in the RunInference API.
+ model_loader = PytorchModelLoader(
+ state_dict_path=args.model_state_dict_path,
+ model_class=model_class,
+ model_params=model_params)
+ with beam.Pipeline(options=options) as p:
+ filename_value_pair = (
+ p
+ | 'Read from csv file' >> beam.io.ReadFromText(
+ args.input, skip_header_lines=1)
+ | 'Parse and read files from the input_file' >> beam.Map(
+ partial(read_image, path_to_dir=args.images_dir))
+ | 'Preprocess images' >> beam.MapTuple(
+ lambda file_name, data: (file_name, preprocess_image(data))))
+ predictions = (
+ filename_value_pair
+ | 'PyTorch RunInference' >> RunInference(model_loader)
+ | 'Process output' >> beam.ParDo(PostProcessor()))
+
+ if args.output:
+ predictions | "Write output to GCS" >> beam.io.WriteToText( # pylint:
disable=expression-not-assigned
+ args.output,
+ shard_name_template='',
+ append_trailing_newlines=True)
+
+
+def parse_known_args(argv):
+ """Parses args for the workflow."""
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--input',
+ dest='input',
+ required=True,
+ help='Path to the CSV file containing image names')
+ parser.add_argument(
+ '--output',
+ dest='output',
+ help='Predictions are saved to the output'
+ ' text file.')
+ parser.add_argument(
+ '--model_state_dict_path',
+ dest='model_state_dict_path',
+ required=True,
+ help='Path to load the model.')
+ parser.add_argument(
+ '--images_dir',
+ default=None,
+ help='Path to the directory where images are stored.'
+ 'This is not required if the --input has absolute path to the images.')
Review Comment:
```suggestion
'Not required if image names in the input file have absolute path.')
```
##########
sdks/python/apache_beam/ml/inference/pytorch_inference_it_test.py:
##########
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pylint: skip-file
+
+"""End-to-End test for Pytorch Inference"""
+
+import logging
+import os
+import unittest
+import uuid
+
+import pytest
+
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.testing.test_pipeline import TestPipeline
+
+try:
+ import torch
+ from apache_beam.examples.inference import pytorch_image_classification
+except ImportError as e:
+ torch = None
+
+_EXPECTED_OUTPUTS = {
+
'gs://apache-beam-ml/datasets/imagenet/raw-data/validation/ILSVRC2012_val_00005001.JPEG':
'681',
+
'gs://apache-beam-ml/datasets/imagenet/raw-data/validation/ILSVRC2012_val_00005002.JPEG':
'333',
+
'gs://apache-beam-ml/datasets/imagenet/raw-data/validation/ILSVRC2012_val_00005003.JPEG':
'711',
+
'gs://apache-beam-ml/datasets/imagenet/raw-data/validation/ILSVRC2012_val_00005004.JPEG':
'286',
+
'gs://apache-beam-ml/datasets/imagenet/raw-data/validation/ILSVRC2012_val_00005005.JPEG':
'433',
+
'gs://apache-beam-ml/datasets/imagenet/raw-data/validation/ILSVRC2012_val_00005006.JPEG':
'290',
+
'gs://apache-beam-ml/datasets/imagenet/raw-data/validation/ILSVRC2012_val_00005007.JPEG':
'890',
+
'gs://apache-beam-ml/datasets/imagenet/raw-data/validation/ILSVRC2012_val_00005008.JPEG':
'592',
+
'gs://apache-beam-ml/datasets/imagenet/raw-data/validation/ILSVRC2012_val_00005009.JPEG':
'406',
+
'gs://apache-beam-ml/datasets/imagenet/raw-data/validation/ILSVRC2012_val_00005010.JPEG':
'996',
+
'gs://apache-beam-ml/datasets/imagenet/raw-data/validation/ILSVRC2012_val_00005011.JPEG':
'327',
+
'gs://apache-beam-ml/datasets/imagenet/raw-data/validation/ILSVRC2012_val_00005012.JPEG':
'573'
+}
+
+
+def process_outputs(filepath):
+ with FileSystems().open(filepath) as f:
+ lines = f.readlines()
+ lines = [l.decode('utf-8').strip('\n') for l in lines]
+ return lines
+
+
[email protected](
+ os.getenv('FORCE_TORCH_IT') is None and torch is None,
+ 'Missing dependencies. '
+ 'Test depends on torch, torchvision and pillow')
+class PyTorchInference(unittest.TestCase):
+ @pytest.mark.uses_pytorch
+ @pytest.mark.it_postcommit
+ def test_torch_run_inference_imagenet_mobilenetv2(self):
+ test_pipeline = TestPipeline(is_integration_test=True)
+ # text files containing absolute path to the imagenet validation data on
GCS
+ file_of_image_names =
'gs://apache-beam-ml/testing/inputs/it_mobilenetv2_imagenet_validation_inputs.txt'
# disable: line-too-long
+ output_file_dir = 'gs://apache-beam-ml/testing/predictions'
+ output_file = '/'.join([output_file_dir, str(uuid.uuid4()), 'result.txt'])
+
+ model_state_dict_path =
'gs://apache-beam-ml/models/imagenet_classification_mobilenet_v2.pt'
Review Comment:
may be no longer necessary if already default.
##########
sdks/python/scripts/run_integration_test.sh:
##########
@@ -213,9 +218,13 @@ if [[ -z $PIPELINE_OPTS ]]; then
# Install test dependencies for ValidatesRunner tests.
# pyhamcrest==1.10.0 doesn't work on Py2.
# See: https://github.com/hamcrest/PyHamcrest/issues/131.
- echo "pyhamcrest!=1.10.0,<2.0.0" > postcommit_requirements.txt
- echo "mock<3.0.0" >> postcommit_requirements.txt
- echo "parameterized>=0.7.1,<0.8.0" >> postcommit_requirements.txt
+ if [[ -z $REQUIREMENTS_FILE ]]; then
+ echo "pyhamcrest!=1.10.0,<2.0.0" > requirements.txt
Review Comment:
I would avoid using `requirements.txt` as this is a common name, and
potentially can be already created by a beam developer for other reasons, then,
the script could overwrite this file.
##########
sdks/python/apache_beam/examples/inference/pytorch_image_classification.py:
##########
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Pipeline that uses RunInference API to perform classification task on
imagenet dataset""" # pylint: disable=line-too-long
+
+import argparse
+import io
+import os
+from functools import partial
+from typing import Any
+from typing import Iterable
+from typing import Tuple
+from typing import Union
+
+import apache_beam as beam
+import torch
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.api import PredictionResult
+from apache_beam.ml.inference.api import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelLoader
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from PIL import Image
+from torchvision import transforms
+from torchvision.models.mobilenetv2 import MobileNetV2
+
+
+def read_image(image_file_name: str,
+ path_to_dir: str = None) -> Tuple[str, Image.Image]:
+ if path_to_dir is not None:
+ image_file_name = os.path.join(path_to_dir, image_file_name)
+ with FileSystems().open(image_file_name, 'r') as file:
+ data = Image.open(io.BytesIO(file.read())).convert('RGB')
+ return image_file_name, data
+
+
+def preprocess_image(data: Image) -> torch.Tensor:
+ image_size = (224, 224)
+ # to use models in torch with imagenet weights,
+ # normalize the images using the below values.
+ # ref: https://pytorch.org/vision/stable/models.html#
+ normalize = transforms.Normalize(
+ mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
+ transform = transforms.Compose([
+ transforms.Resize(image_size),
+ transforms.ToTensor(),
+ normalize,
+ ])
+ return transform(data)
+
+
+class PostProcessor(beam.DoFn):
+ def process(
+ self, element: Union[PredictionResult, Tuple[Any, PredictionResult]]
+ ) -> Iterable[str]:
+ filename, prediction_result = element
+ prediction = torch.argmax(prediction_result.inference, dim=0)
+ yield filename + ',' + str(prediction.item())
+
+
+def run_pipeline(options: PipelineOptions, args=None):
+ """Sets up PyTorch RunInference pipeline"""
+ # reference to the class definition of the model.
+ model_class = MobileNetV2
+ # params for model class constructor. These values will be used in
+ # RunInference API to instantiate the model object.
+ model_params = {'num_classes': 1000} # imagenet has 1000 classes.
+ # for this example, the pretrained weights are downloaded from
+ # "https://download.pytorch.org/models/mobilenet_v2-b0353104.pth"
+ # and saved on GCS bucket
gs://apache-beam-ml/models/imagenet_classification_mobilenet_v2.pt,
+ # which will be used to load the model state_dict in the RunInference API.
+ model_loader = PytorchModelLoader(
+ state_dict_path=args.model_state_dict_path,
+ model_class=model_class,
+ model_params=model_params)
+ with beam.Pipeline(options=options) as p:
+ filename_value_pair = (
+ p
+ | 'Read from csv file' >> beam.io.ReadFromText(
+ args.input, skip_header_lines=1)
+ | 'Parse and read files from the input_file' >> beam.Map(
+ partial(read_image, path_to_dir=args.images_dir))
+ | 'Preprocess images' >> beam.MapTuple(
+ lambda file_name, data: (file_name, preprocess_image(data))))
+ predictions = (
+ filename_value_pair
+ | 'PyTorch RunInference' >> RunInference(model_loader)
+ | 'Process output' >> beam.ParDo(PostProcessor()))
+
+ if args.output:
+ predictions | "Write output to GCS" >> beam.io.WriteToText( # pylint:
disable=expression-not-assigned
+ args.output,
+ shard_name_template='',
+ append_trailing_newlines=True)
+
+
+def parse_known_args(argv):
+ """Parses args for the workflow."""
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--input',
+ dest='input',
+ required=True,
+ help='Path to the CSV file containing image names')
+ parser.add_argument(
+ '--output',
+ dest='output',
+ help='Predictions are saved to the output'
+ ' text file.')
+ parser.add_argument(
+ '--model_state_dict_path',
+ dest='model_state_dict_path',
+ required=True,
+ help='Path to load the model.')
Review Comment:
optionally, we could add model class as a parameter later. Although,
perhaps such customization would be better suited for a flex template.
Issue Time Tracking
-------------------
Worklog Id: (was: 778064)
Time Spent: 8.5h (was: 8h 20m)
> RunInference Benchmarking tests
> -------------------------------
>
> Key: BEAM-14068
> URL: https://issues.apache.org/jira/browse/BEAM-14068
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-core
> Reporter: Anand Inguva
> Assignee: Anand Inguva
> Priority: P2
> Time Spent: 8.5h
> Remaining Estimate: 0h
>
> RunInference benchmarks will evaluate performance of Pipelines, which
> represent common use cases of Beam + Dataflow in Pytorch, sklearn and
> possibly TFX. These benchmarks would be the integration tests that exercise
> several software components using Beam, PyTorch, Scikit learn and TensorFlow
> extended.
> we would use the datasets that's available publicly (Eg; Kaggle).
> Size: small / 10 GB / 1 TB etc
> The default execution runner would be Dataflow unless specified otherwise.
> These tests would be run very less frequently(every release cycle).
--
This message was sent by Atlassian Jira
(v8.20.7#820007)