damccorm commented on code in PR #25905: URL: https://github.com/apache/beam/pull/25905#discussion_r1156124730
########## sdks/python/apache_beam/examples/inference/README.md: ########## @@ -645,3 +645,44 @@ When all elements are in a single batch the output looks like this: 0,[1 1 1 0 0 0 0 1 2 0 0 2 0 2 1 2 2 2 2 0 0 0 0 2 2 0 2 2 2 1] ``` + +## Milk Quality Prediction Windowing Example + +`milk_quality_prediction_windowing.py` contains an implementation of a windowing pipeline making use of the RunInference transform. An XGBoost classification the quality of milk based on measurements of pH, temperature, taste, odor, fat, turbidity and color. The model labels a measurement as `bad`, `medium` or `good`. The model is trained on the [Kaggle Milk Quality Prediction dataset](https://www.kaggle.com/datasets/cpluzshrijayan/milkquality). + +#### Loading and preprocessing the dataset + +The `preprocess_data` function loads the Kaggle dataset from a csv file and splits it into a training and accompanying label set as well as a test set. In typical machine learning setting we would use the training set and the labels to train the model and the test set is used to calculate various metrics such as recall and precision. We will use the test set data in a test streaming pipeline to showcase the windowing capabilities. + +#### Training an XGBoost classifier + +The `train_model` function allows you to train a simple XGBoost classifier using the Kaggle Milk Quality Prediction dataset. The trained model will be saved in JSON format at the location passed as a parameter and can then later be used for inference using by loading it via the XGBoostModelhandler. + +#### Running the pipeline + +``` +python -m apache_beam.examples.inference.milk_quality_prediction_windowing.py \ + --dataset \ + <DATASET> \ + --pipeline_input_data \ + <INPUT_DATA> \ + --training_set \ + <TRAINING_SET> \ + --labels \ + <LABELS> \ + --model_state \ + <MODEL_STATE> +``` + +Where `<DATASET>` is the path to a csv file containing the Kaggle Milk Quality prediction dataset, `<INPUT_DATA>` the data that will be used as input for the streaming pipeline (test set), `<TRAINING_SET>` the path to store the training set in csv format, `<LABELS>` the path to store the csv containing the labels used to train the model and `<MODEL_STATE>` the path to the JSON file containing the trained model. Review Comment: ```suggestion Where `<DATASET>` is the path to a csv file containing the Kaggle Milk Quality prediction dataset, `<INPUT_DATA>` a filepath to save the data that will be used as input for the streaming pipeline (test set), `<TRAINING_SET>` a filepath to store the training set in csv format, `<LABELS>` a filepath to store the csv containing the labels used to train the model and `<MODEL_STATE>` the path to the JSON file containing the trained model. `<INPUT_DATA>`, `<TRAINING_SET>`, and `<LABELS>` will all be parsed from `<DATASET>` and saved before pipeline execution. ``` This was unclear to me just reading the description ########## sdks/python/apache_beam/examples/inference/README.md: ########## @@ -645,3 +645,44 @@ When all elements are in a single batch the output looks like this: 0,[1 1 1 0 0 0 0 1 2 0 0 2 0 2 1 2 2 2 2 0 0 0 0 2 2 0 2 2 2 1] ``` + +## Milk Quality Prediction Windowing Example + +`milk_quality_prediction_windowing.py` contains an implementation of a windowing pipeline making use of the RunInference transform. An XGBoost classification the quality of milk based on measurements of pH, temperature, taste, odor, fat, turbidity and color. The model labels a measurement as `bad`, `medium` or `good`. The model is trained on the [Kaggle Milk Quality Prediction dataset](https://www.kaggle.com/datasets/cpluzshrijayan/milkquality). + +#### Loading and preprocessing the dataset + +The `preprocess_data` function loads the Kaggle dataset from a csv file and splits it into a training and accompanying label set as well as a test set. In typical machine learning setting we would use the training set and the labels to train the model and the test set is used to calculate various metrics such as recall and precision. We will use the test set data in a test streaming pipeline to showcase the windowing capabilities. + +#### Training an XGBoost classifier + +The `train_model` function allows you to train a simple XGBoost classifier using the Kaggle Milk Quality Prediction dataset. The trained model will be saved in JSON format at the location passed as a parameter and can then later be used for inference using by loading it via the XGBoostModelhandler. + +#### Running the pipeline + +``` +python -m apache_beam.examples.inference.milk_quality_prediction_windowing.py \ + --dataset \ + <DATASET> \ + --pipeline_input_data \ + <INPUT_DATA> \ + --training_set \ + <TRAINING_SET> \ + --labels \ + <LABELS> \ + --model_state \ + <MODEL_STATE> +``` + +Where `<DATASET>` is the path to a csv file containing the Kaggle Milk Quality prediction dataset, `<INPUT_DATA>` the data that will be used as input for the streaming pipeline (test set), `<TRAINING_SET>` the path to store the training set in csv format, `<LABELS>` the path to store the csv containing the labels used to train the model and `<MODEL_STATE>` the path to the JSON file containing the trained model. + +Using the test set, we simulate a streaming pipeline that a receives a new measurement of the milk quality parameters every minute. A sliding window keeps track of the measurement of the last 30 minutes and new window starts every 5 minutes. The model predicts the quality of each measurement. After 30 minutes the results are aggregated in a tuple contains the number of measurements that were predicted as bad, medium and high quality samples. The output of each window looks as follows: Review Comment: ```suggestion Using the test set, we simulate a streaming pipeline that a receives a new measurement of the milk quality parameters every minute. A sliding window keeps track of the measurement of the last 30 minutes and new window starts every 5 minutes. The model predicts the quality of each measurement. After 30 minutes the results are aggregated in a tuple containing the number of measurements that were predicted as bad, medium and high quality samples. The output of each window looks as follows: ``` ########## sdks/python/apache_beam/examples/inference/milk_quality_prediction_windowing.py: ########## @@ -1,32 +1,78 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A streaming pipeline that uses RunInference API and windowing that classifies +the quality of milk as good, bad or medium based on based pH, temperature, +taste, odor, fat, turbidity and Color.Each minute a new measurements come in and +a sliding window aggregates the number of good, bad and medium samples. + +This example uses the milk quality prediction dataset from kaggle. +https://www.kaggle.com/datasets/cpluzshrijayan/milkquality + + +In order to set this example up, you will need two things. +1. Download the data in csv format from kaggle and host it. +2. Split the dataset in a training set and test set (preprocess_data function). +3. Train the classifier. +""" + import argparse import logging import time from typing import NamedTuple import pandas -import pandas as pd -import xgboost - from sklearn.model_selection import train_test_split import apache_beam as beam +import xgboost +from apache_beam import window from apache_beam.ml.inference import RunInference from apache_beam.ml.inference.base import PredictionResult from apache_beam.ml.inference.xgboost_inference import XGBoostModelHandlerPandas -from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions from apache_beam.runners.runner import PipelineResult from apache_beam.testing.test_stream import TestStream -from apache_beam import window def parse_known_args(argv): """Parses args for the workflow.""" parser = argparse.ArgumentParser() parser.add_argument( - '--input_data', - dest='input_data', + '--dataset', + dest='dataset', + required=True, + help='Path to the csv containing Kaggle Milk Quality dataset.') + parser.add_argument( + '--pipeline_input_data', + dest='pipeline_input_data', + required=True, + help='Path to store the csv containing input data for the pipeline.') Review Comment: ```suggestion help='Path to store the csv containing input data for the pipeline.' 'This will be generated as part of preprocessing the data.') ``` Same nit as the README, this wasn't obvious to me from the comment alone. ########## sdks/python/apache_beam/examples/inference/milk_quality_prediction_windowing.py: ########## @@ -1,32 +1,78 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A streaming pipeline that uses RunInference API and windowing that classifies +the quality of milk as good, bad or medium based on based pH, temperature, +taste, odor, fat, turbidity and Color.Each minute a new measurements come in and +a sliding window aggregates the number of good, bad and medium samples. Review Comment: ```suggestion taste, odor, fat, turbidity and color. Each minute a new measurements come in and a sliding window aggregates the number of good, bad and medium samples. ``` Nit - missing a space (I think adding it will set off the linter thus the new lines) ########## sdks/python/apache_beam/examples/inference/milk_quality_prediction_windowing.py: ########## @@ -1,32 +1,78 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A streaming pipeline that uses RunInference API and windowing that classifies +the quality of milk as good, bad or medium based on based pH, temperature, +taste, odor, fat, turbidity and Color.Each minute a new measurements come in and +a sliding window aggregates the number of good, bad and medium samples. + +This example uses the milk quality prediction dataset from kaggle. +https://www.kaggle.com/datasets/cpluzshrijayan/milkquality + + +In order to set this example up, you will need two things. +1. Download the data in csv format from kaggle and host it. +2. Split the dataset in a training set and test set (preprocess_data function). +3. Train the classifier. +""" + import argparse import logging import time from typing import NamedTuple import pandas -import pandas as pd -import xgboost - from sklearn.model_selection import train_test_split import apache_beam as beam +import xgboost +from apache_beam import window from apache_beam.ml.inference import RunInference from apache_beam.ml.inference.base import PredictionResult from apache_beam.ml.inference.xgboost_inference import XGBoostModelHandlerPandas -from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions from apache_beam.runners.runner import PipelineResult from apache_beam.testing.test_stream import TestStream -from apache_beam import window def parse_known_args(argv): """Parses args for the workflow.""" parser = argparse.ArgumentParser() parser.add_argument( - '--input_data', - dest='input_data', + '--dataset', + dest='dataset', + required=True, + help='Path to the csv containing Kaggle Milk Quality dataset.') + parser.add_argument( + '--pipeline_input_data', + dest='pipeline_input_data', + required=True, + help='Path to store the csv containing input data for the pipeline.') + parser.add_argument( + '--training_set', + dest='training_set', + required=True, + help='Path to store the csv containing the training set') + parser.add_argument( + '--labels', + dest='labels', required=True, - help='Path to the csv containing the dataset.') + help='Path to store the csv containing the labels used in training.') Review Comment: ```suggestion help='Path to store the csv containing the labels used in training.' 'This will be generated as part of preprocessing the data.') ``` Same as other flag ########## sdks/python/apache_beam/examples/inference/milk_quality_prediction_windowing.py: ########## @@ -1,32 +1,78 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A streaming pipeline that uses RunInference API and windowing that classifies +the quality of milk as good, bad or medium based on based pH, temperature, +taste, odor, fat, turbidity and Color.Each minute a new measurements come in and +a sliding window aggregates the number of good, bad and medium samples. + +This example uses the milk quality prediction dataset from kaggle. +https://www.kaggle.com/datasets/cpluzshrijayan/milkquality + + +In order to set this example up, you will need two things. +1. Download the data in csv format from kaggle and host it. +2. Split the dataset in a training set and test set (preprocess_data function). +3. Train the classifier. +""" + import argparse import logging import time from typing import NamedTuple import pandas -import pandas as pd -import xgboost - from sklearn.model_selection import train_test_split import apache_beam as beam +import xgboost +from apache_beam import window from apache_beam.ml.inference import RunInference from apache_beam.ml.inference.base import PredictionResult from apache_beam.ml.inference.xgboost_inference import XGBoostModelHandlerPandas -from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions from apache_beam.runners.runner import PipelineResult from apache_beam.testing.test_stream import TestStream -from apache_beam import window def parse_known_args(argv): """Parses args for the workflow.""" parser = argparse.ArgumentParser() parser.add_argument( - '--input_data', - dest='input_data', + '--dataset', + dest='dataset', + required=True, + help='Path to the csv containing Kaggle Milk Quality dataset.') + parser.add_argument( + '--pipeline_input_data', + dest='pipeline_input_data', + required=True, + help='Path to store the csv containing input data for the pipeline.') + parser.add_argument( + '--training_set', + dest='training_set', + required=True, + help='Path to store the csv containing the training set') Review Comment: ```suggestion help='Path to store the csv containing the training set' 'This will be generated as part of preprocessing the data.') ``` Same as other flag -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
