yeandy commented on code in PR #23554: URL: https://github.com/apache/beam/pull/23554#discussion_r991410149
########## website/www/site/content/en/documentation/ml/runinference-metrics.md: ########## @@ -0,0 +1,103 @@ +--- +title: "RunInference Metrics" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# RunInference Metrics Example + +The main purpose of the example is to demonstrate and explain different metrics that are available when using [RunInference](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/) for doing inference using a machine learning model. We use a pipeline that reads a list of sentences, tokeinze the text, uses a Transformer based model `distilbert-base-uncased-finetuned-sst-2-english` for classifies the texts into two different classes using `RunInference`. + +We showcase different RunInference metrics when the pipeline is executed using Dataflow Runner on CPU and GPU. The full example code can be found [here](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference/runinference_metrics/). + + +The file structure for entire pipeline is: + + runinference_metrics/ + ├── pipeline/ + │ ├── __init__.py + │ ├── options.py + │ └── transformations.py + ├── __init__.py + ├── config.py + ├── main.py + └── setup.py + +`pipeline/transormations.py` contains the code for `beam.DoFn` and additional functions that are used for pipeline + +`pipeline/options.py` contains the pipeline options to configure the Dataflow pipeline + +`config.py` defines some variables like GCP PROJECT_ID, NUM_WORKERS that are used multiple times + +`setup.py` defines the packages/requirements for the pipeline to run + +`main.py` contains the pipeline code and some additional functions used for running the pipeline + + +### How to Run the Pipeline ? Review Comment: ```suggestion ### How to Run the Pipeline ``` ########## website/www/site/content/en/documentation/ml/runinference-metrics.md: ########## @@ -0,0 +1,103 @@ +--- +title: "RunInference Metrics" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# RunInference Metrics Example + +The main purpose of the example is to demonstrate and explain different metrics that are available when using [RunInference](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/) for doing inference using a machine learning model. We use a pipeline that reads a list of sentences, tokeinze the text, uses a Transformer based model `distilbert-base-uncased-finetuned-sst-2-english` for classifies the texts into two different classes using `RunInference`. + +We showcase different RunInference metrics when the pipeline is executed using Dataflow Runner on CPU and GPU. The full example code can be found [here](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference/runinference_metrics/). + + +The file structure for entire pipeline is: + + runinference_metrics/ + ├── pipeline/ + │ ├── __init__.py + │ ├── options.py + │ └── transformations.py + ├── __init__.py + ├── config.py + ├── main.py + └── setup.py + +`pipeline/transormations.py` contains the code for `beam.DoFn` and additional functions that are used for pipeline + +`pipeline/options.py` contains the pipeline options to configure the Dataflow pipeline + +`config.py` defines some variables like GCP PROJECT_ID, NUM_WORKERS that are used multiple times Review Comment: ```suggestion `config.py` defines some variables like GCP `PROJECT_ID`, `NUM_WORKERS` that are used multiple times ``` ########## sdks/python/apache_beam/examples/inference/runinference_metrics/setup.py: ########## @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Setup.py module for the workflow's worker utilities. +All the workflow related code is gathered in a package that will be built as a +source distribution, staged in the staging area for the workflow being run and +then installed in the workers when they start running. +This behavior is triggered by specifying the --setup_file command line option +when running the workflow for remote execution. +""" Review Comment: ```suggestion """Setup.py module for the workflow's worker utilities. All the workflow related code is gathered in a package that will be built as a source distribution, staged in the staging area for the workflow being run, and then installed in the workers when they start running. This behavior is triggered by specifying the --setup_file command line option when running the workflow for remote execution. """ ``` ########## sdks/python/apache_beam/examples/inference/runinference_metrics/pipeline/transformations.py: ########## @@ -0,0 +1,93 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""This file contains the transformations and utility functions for +the pipeline.""" +import apache_beam as beam +import torch +from apache_beam.io.filesystems import FileSystems +from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerKeyedTensor +from transformers import DistilBertForSequenceClassification +from transformers import DistilBertTokenizer + + +class CustomPytorchModelHandlerKeyedTensor(PytorchModelHandlerKeyedTensor): + """Wrapper around PytorchModelHandlerKeyedTensor to load a model on CPU.""" + def load_model(self) -> torch.nn.Module: + """Loads and initializes a Pytorch model for processing.""" + model = self._model_class(**self._model_params) + model.to(self._device) + file = FileSystems.open(self._state_dict_path, "rb") + model.load_state_dict(torch.load(file, map_location=self._device)) + model.eval() + return model + + +class HuggingFaceStripBatchingWrapper(DistilBertForSequenceClassification): + """Wrapper around HuggingFace model because RunInference requires a batch + as a list of dicts instead of a dict of lists. Another workaround + can be found here where they disable batching instead. + https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py""" + def forward(self, **kwargs): + output = super().forward(**kwargs) + return [dict(zip(output, v)) for v in zip(*output.values())] + + +class Tokenize(beam.DoFn): + """A DoFn for tokenizing texts""" + def __init__(self, model_name: str): + """Initialises a tokenizer based on the model_name""" + self._model_name = model_name + + def setup(self): + """Loads the tokenizer""" + self._tokenizer = DistilBertTokenizer.from_pretrained(self._model_name) + + def process(self, text_input: str): + """Prepocesses the text using the tokenizer""" + # We need to pad the tokens tensors to max length to make sure + # that all the tensors are of the same length and hence + # stack-able by the RunInference API, normally you would batch first + # and tokenize the batch after and pad each tensor + # the the max length in the batch. + tokens = self._tokenizer( + text_input, return_tensors="pt", padding="max_length", max_length=512) + # squeeze because tokenization add an extra dimension, which is empty + # in this case because we're tokenizing one element at a time. + tokens = {key: torch.squeeze(val) for key, val in tokens.items()} + return [(text_input, tokens)] + + +class PostProcessor(beam.DoFn): + """Postprocess the RunInference output""" + def process(self, tuple_): Review Comment: What does `tuple_` have the underscore `_` as suffix here? ########## website/www/site/content/en/documentation/ml/runinference-metrics.md: ########## @@ -0,0 +1,103 @@ +--- +title: "RunInference Metrics" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# RunInference Metrics Example + +The main purpose of the example is to demonstrate and explain different metrics that are available when using [RunInference](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/) for doing inference using a machine learning model. We use a pipeline that reads a list of sentences, tokeinze the text, uses a Transformer based model `distilbert-base-uncased-finetuned-sst-2-english` for classifies the texts into two different classes using `RunInference`. + +We showcase different RunInference metrics when the pipeline is executed using Dataflow Runner on CPU and GPU. The full example code can be found [here](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference/runinference_metrics/). + + +The file structure for entire pipeline is: + + runinference_metrics/ + ├── pipeline/ + │ ├── __init__.py + │ ├── options.py + │ └── transformations.py + ├── __init__.py + ├── config.py + ├── main.py + └── setup.py + +`pipeline/transormations.py` contains the code for `beam.DoFn` and additional functions that are used for pipeline + +`pipeline/options.py` contains the pipeline options to configure the Dataflow pipeline + +`config.py` defines some variables like GCP PROJECT_ID, NUM_WORKERS that are used multiple times + +`setup.py` defines the packages/requirements for the pipeline to run + +`main.py` contains the pipeline code and some additional functions used for running the pipeline + + +### How to Run the Pipeline ? +First, make sure you have installed the required packages. One should have access to a Google Cloud Project and then correctly configure the GCP variables like `PROJECT_ID`, `REGION`, and others in `config.py`. For using Dataflow with GPU, all the necessary setup instructions are mentioned here: https://github.com/GoogleCloudPlatform/python-docs-samples/tree/main/dataflow/gpu-examples/pytorch-minimal. + + +1. Dataflow with CPU: `python main.py --mode cloud --device CPU` +2. Dataflow with GPU: `python main.py --mode cloud --device GPU` + +The pipeline can be broken down into few simple steps: +1. Create a list of texts to use it as an input using `beam.Create` +2. Tokenizing the text +3. Using RunInference to do inference +4. Postprocessing the output of RunInference + +The code snippet for the pipeline is: + +{{< highlight >}} + with beam.Pipeline(options=pipeline_options) as pipeline: + _ = ( + pipeline + | "Create inputs" >> beam.Create(inputs) + | "Tokenize" >> beam.ParDo(Tokenize(cfg.TOKENIZER_NAME)) + | "Inference" >> + RunInference(model_handler=KeyedModelHandler(model_handler)) + | "Decode Predictions" >> beam.ParDo(PostProcessor())) +{{< /highlight >}} + + +## RunInference Metrics + +As mentioned above, we benchmarked the performance of RunInference using Dataflow on both CPU and GPU. These metrics can be seen in the GCP UI and can also be printed using +{{< highlight >}} +metrics = pipeline.result.metrics().query(beam.metrics.MetricsFilter()) +{{< /highlight >}} + + +A snapshot of different metrics from GCP UI when using Dataflow on GPU: + +  Review Comment: What does this look like? Should we add the png locally? ########## website/www/site/content/en/documentation/ml/runinference-metrics.md: ########## @@ -0,0 +1,103 @@ +--- +title: "RunInference Metrics" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# RunInference Metrics Example + +The main purpose of the example is to demonstrate and explain different metrics that are available when using [RunInference](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/) for doing inference using a machine learning model. We use a pipeline that reads a list of sentences, tokeinze the text, uses a Transformer based model `distilbert-base-uncased-finetuned-sst-2-english` for classifies the texts into two different classes using `RunInference`. + +We showcase different RunInference metrics when the pipeline is executed using Dataflow Runner on CPU and GPU. The full example code can be found [here](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference/runinference_metrics/). + + +The file structure for entire pipeline is: + + runinference_metrics/ + ├── pipeline/ + │ ├── __init__.py + │ ├── options.py + │ └── transformations.py + ├── __init__.py + ├── config.py + ├── main.py + └── setup.py + +`pipeline/transormations.py` contains the code for `beam.DoFn` and additional functions that are used for pipeline + +`pipeline/options.py` contains the pipeline options to configure the Dataflow pipeline + +`config.py` defines some variables like GCP PROJECT_ID, NUM_WORKERS that are used multiple times + +`setup.py` defines the packages/requirements for the pipeline to run + +`main.py` contains the pipeline code and some additional functions used for running the pipeline + + +### How to Run the Pipeline ? +First, make sure you have installed the required packages. One should have access to a Google Cloud Project and then correctly configure the GCP variables like `PROJECT_ID`, `REGION`, and others in `config.py`. For using Dataflow with GPU, all the necessary setup instructions are mentioned here: https://github.com/GoogleCloudPlatform/python-docs-samples/tree/main/dataflow/gpu-examples/pytorch-minimal. Review Comment: ```suggestion First, make sure you have installed the required packages. One should have access to a Google Cloud Project and then correctly configure the GCP variables like `PROJECT_ID`, `REGION`, and others in `config.py`. For using Dataflow with GPU, all the necessary setup instructions are mentioned [here](https://github.com/GoogleCloudPlatform/python-docs-samples/tree/main/dataflow/gpu-examples/pytorch-minimal). ``` ########## sdks/python/apache_beam/examples/inference/runinference_metrics/pipeline/options.py: ########## @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""This file contains the pipeline options to configure +the Dataflow pipeline.""" + +from datetime import datetime +from typing import Any + +import config as cfg +from apache_beam.options.pipeline_options import PipelineOptions + + +def get_pipeline_options( + project: str, + job_name: str, + mode: str, + device: str, + num_workers: int = cfg.NUM_WORKERS, + **kwargs: Any, +) -> PipelineOptions: + """Function to retrieve the pipeline options. + Args: + project: GCP project to run on + mode: Indicator to run local, cloud or template + num_workers: Number of Workers for running the job parallely + Returns: + Dataflow pipeline options + """ + job_name = f'{job_name}-{datetime.now().strftime("%Y%m%d%H%M%S")}' + + staging_bucket = f"gs://{cfg.PROJECT_ID}-ml-examples" + + # For a list of available options, check: + # https://cloud.google.com/dataflow/docs/guides/specifying-exec-params#setting-other-cloud-dataflow-pipeline-options + dataflow_options = { + "runner": "DirectRunner" if mode == "local" else "DataflowRunner", + "job_name": job_name, + "project": project, + "region": cfg.REGION, + "staging_location": f"{staging_bucket}/dflow-staging", + "temp_location": f"{staging_bucket}/dflow-temp", + "setup_file": "./setup.py", + } + flags = [] + if device == "GPU": + flags = [ + "--experiment=worker_accelerator=type:nvidia-tesla-p4;count:1;"\ + "install-nvidia-driver", + "--experiment=use_runner_v2", + ] + dataflow_options.update({ + "sdk_container_image": cfg.DOCKER_IMG, + "machine_type": "n1-standard-4", + }) + + # Optional parameters + if num_workers: + dataflow_options.update({"num_workers": num_workers}) + print(dataflow_options) Review Comment: Same as above comment on print ########## sdks/python/apache_beam/examples/inference/runinference_metrics/main.py: ########## @@ -0,0 +1,127 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""This file contains the pipeline for loading a ML model, and exploring +the different RunInference metrics.""" +import argparse +import sys + +import apache_beam as beam +import config as cfg +from apache_beam.ml.inference import RunInference +from apache_beam.ml.inference.base import KeyedModelHandler +from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerKeyedTensor +from pipeline.options import get_pipeline_options +from pipeline.transformations import CustomPytorchModelHandlerKeyedTensor +from pipeline.transformations import HuggingFaceStripBatchingWrapper +from pipeline.transformations import PostProcessor +from pipeline.transformations import Tokenize +from transformers import DistilBertConfig + + +def parse_arguments(argv): + """ + Parses the arguments passed to the command line and + returns them as an object + Args: + argv: The arguments passed to the command line. + Returns: + The arguments that are being passed in. + """ + parser = argparse.ArgumentParser(description="benchmark-runinference") + + parser.add_argument( + "-m", + "--mode", + help="Mode to run pipeline in.", + choices=["local", "cloud"], + default="local", + ) + parser.add_argument( + "-p", + "--project", + help="GCP project to run pipeline on.", + default=cfg.PROJECT_ID, + ) + parser.add_argument( + "-d", + "--device", + help="Device to run the dataflow job on", + choices=["CPU", "GPU"], + default="CPU", + ) + + args, _ = parser.parse_known_args(args=argv) + return args + + +def run(): + """ + Runs the pipeline that loads a transformer based text classification model + and does inference on a list of sentences. + At the end of pipeline, different metrics like latency, + throughput and others are printed. + """ + args = parse_arguments(sys.argv) + + inputs = [ + "This is the worst food I have ever eaten", + "In my soul and in my heart, I’m convinced I’m wrong!", + "Be with me always—take any form—drive me mad!"\ + "only do not leave me in this abyss, where I cannot find you!", + "Do I want to live? Would you like to live with your soul in the grave?", + "Honest people don’t hide their deeds.", + "Nelly, I am Heathcliff! He’s always,"\ + "always in my mind: not as a pleasure,"\ + "any more than I am always a pleasure to myself, but as my own being.", + ] * 1000 + + pipeline_options = get_pipeline_options( + job_name=cfg.JOB_NAME, + num_workers=cfg.NUM_WORKERS, + project=args.project, + mode=args.mode, + device=args.device, + ) + model_handler_class = ( + PytorchModelHandlerKeyedTensor + if args.device == "GPU" else CustomPytorchModelHandlerKeyedTensor) + device = "cuda:0" if args.device == "GPU" else args.device + model_handler = model_handler_class( + state_dict_path=cfg.MODEL_STATE_DICT_PATH, + model_class=HuggingFaceStripBatchingWrapper, + model_params={ + "config": DistilBertConfig.from_pretrained(cfg.MODEL_CONFIG_PATH) + }, + device=device, + ) + + with beam.Pipeline(options=pipeline_options) as pipeline: + _ = ( + pipeline + | "Create inputs" >> beam.Create(inputs) + | "Tokenize" >> beam.ParDo(Tokenize(cfg.TOKENIZER_NAME)) + | "Inference" >> + RunInference(model_handler=KeyedModelHandler(model_handler)) + | "Decode Predictions" >> beam.ParDo(PostProcessor())) + metrics = pipeline.result.metrics().query(beam.metrics.MetricsFilter()) + print("\n\n\n\n") + print(metrics) Review Comment: Should we be using `logging` instead of `print`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
