This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 632f7d0800a Add integration tests to exercise large model (#28121)
632f7d0800a is described below
commit 632f7d0800aaa8fa7572547b8d0d4f63673936e3
Author: Danny McCormick <[email protected]>
AuthorDate: Wed Aug 23 15:07:32 2023 -0400
Add integration tests to exercise large model (#28121)
---
.../inference/huggingface_language_modeling.py | 10 ++++++-
.../inference/pytorch_language_modeling.py | 10 ++++++-
.../inference/sklearn_mnist_classification.py | 10 ++++++-
.../inference/tensorflow_mnist_classification.py | 11 ++++++-
.../inference/xgboost_iris_classification.py | 10 ++++++-
.../ml/inference/huggingface_inference_it_test.py | 35 ++++++++++++++++++++++
.../ml/inference/pytorch_inference_it_test.py | 35 ++++++++++++++++++++++
.../ml/inference/sklearn_inference_it_test.py | 32 ++++++++++++++++++++
.../ml/inference/tensorflow_inference_it_test.py | 31 +++++++++++++++++++
.../ml/inference/xgboost_inference_it_test.py | 34 +++++++++++++++++++++
10 files changed, 213 insertions(+), 5 deletions(-)
diff --git
a/sdks/python/apache_beam/examples/inference/huggingface_language_modeling.py
b/sdks/python/apache_beam/examples/inference/huggingface_language_modeling.py
index f6cb3de72b7..dbdb8c0651a 100644
---
a/sdks/python/apache_beam/examples/inference/huggingface_language_modeling.py
+++
b/sdks/python/apache_beam/examples/inference/huggingface_language_modeling.py
@@ -114,6 +114,13 @@ def parse_known_args(argv):
dest='model_class',
default=AutoModelForMaskedLM,
help="Name of the model from Hugging Face")
+ parser.add_argument(
+ '--large_model',
+ action='store_true',
+ dest='large_model',
+ default=False,
+ help='Set to true if your model is large enough to run into memory '
+ 'pressure if you load multiple copies.')
return parser.parse_known_args(argv)
@@ -139,7 +146,8 @@ def run(
model_uri=known_args.model_name,
model_class=known_args.model_class,
framework='pt',
- max_batch_size=1)
+ max_batch_size=1,
+ large_model=known_args.large_model)
if not known_args.input:
text = (
pipeline | 'CreateSentences' >> beam.Create([
diff --git
a/sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py
b/sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py
index b5fabbb1f1e..9de10e73e11 100644
--- a/sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py
+++ b/sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py
@@ -118,6 +118,13 @@ def parse_known_args(argv):
dest='model_state_dict_path',
required=True,
help="Path to the model's state_dict.")
+ parser.add_argument(
+ '--large_model',
+ action='store_true',
+ dest='large_model',
+ default=False,
+ help='Set to true if your model is large enough to run into memory '
+ 'pressure if you load multiple copies.')
return parser.parse_known_args(argv)
@@ -166,7 +173,8 @@ def run(
model_handler = PytorchNoBatchModelHandler(
state_dict_path=known_args.model_state_dict_path,
model_class=model_class,
- model_params=model_params)
+ model_params=model_params,
+ large_model=known_args.large_model)
pipeline = test_pipeline
if not test_pipeline:
diff --git
a/sdks/python/apache_beam/examples/inference/sklearn_mnist_classification.py
b/sdks/python/apache_beam/examples/inference/sklearn_mnist_classification.py
index 6f8ea929bbb..5392cdf7dda 100644
--- a/sdks/python/apache_beam/examples/inference/sklearn_mnist_classification.py
+++ b/sdks/python/apache_beam/examples/inference/sklearn_mnist_classification.py
@@ -77,6 +77,13 @@ def parse_known_args(argv):
dest='model_path',
required=True,
help='Path to load the Sklearn model for Inference.')
+ parser.add_argument(
+ '--large_model',
+ action='store_true',
+ dest='large_model',
+ default=False,
+ help='Set to true if your model is large enough to run into memory '
+ 'pressure if you load multiple copies.')
return parser.parse_known_args(argv)
@@ -103,7 +110,8 @@ def run(
model_loader = KeyedModelHandler(
SklearnModelHandlerNumpy(
model_file_type=ModelFileType.PICKLE,
- model_uri=known_args.model_path))
+ model_uri=known_args.model_path,
+ large_model=known_args.large_model))
pipeline = test_pipeline
if not test_pipeline:
diff --git
a/sdks/python/apache_beam/examples/inference/tensorflow_mnist_classification.py
b/sdks/python/apache_beam/examples/inference/tensorflow_mnist_classification.py
index 174d21b26af..6cf746e77cd 100644
---
a/sdks/python/apache_beam/examples/inference/tensorflow_mnist_classification.py
+++
b/sdks/python/apache_beam/examples/inference/tensorflow_mnist_classification.py
@@ -70,6 +70,13 @@ def parse_known_args(argv):
dest='model_path',
required=True,
help='Path to load the Tensorflow model for Inference.')
+ parser.add_argument(
+ '--large_model',
+ action='store_true',
+ dest='large_model',
+ default=False,
+ help='Set to true if your model is large enough to run into memory '
+ 'pressure if you load multiple copies.')
return parser.parse_known_args(argv)
@@ -89,7 +96,9 @@ def run(
# Therefore, we use KeyedModelHandler wrapper over TFModelHandlerNumpy.
model_loader = KeyedModelHandler(
TFModelHandlerNumpy(
- model_uri=known_args.model_path, model_type=ModelType.SAVED_MODEL))
+ model_uri=known_args.model_path,
+ model_type=ModelType.SAVED_MODEL,
+ large_model=known_args.large_model))
pipeline = test_pipeline
if not test_pipeline:
diff --git
a/sdks/python/apache_beam/examples/inference/xgboost_iris_classification.py
b/sdks/python/apache_beam/examples/inference/xgboost_iris_classification.py
index 59ee7868ca0..963187fd210 100644
--- a/sdks/python/apache_beam/examples/inference/xgboost_iris_classification.py
+++ b/sdks/python/apache_beam/examples/inference/xgboost_iris_classification.py
@@ -73,6 +73,13 @@ def parse_known_args(argv):
dest='model_state',
required=True,
help='Path to the state of the XGBoost model loaded for Inference.')
+ parser.add_argument(
+ '--large_model',
+ action='store_true',
+ dest='large_model',
+ default=False,
+ help='Set to true if your model is large enough to run into memory '
+ 'pressure if you load multiple copies.')
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--split', action='store_true', dest='split')
group.add_argument('--no_split', action='store_false', dest='split')
@@ -125,7 +132,8 @@ def run(
xgboost_model_handler = KeyedModelHandler(
model_handler(
model_class=xgboost.XGBClassifier,
- model_state=known_args.model_state))
+ model_state=known_args.model_state,
+ large_model=known_args.large_model))
input_data = load_sklearn_iris_test_data(
data_type=input_data_type, split=known_args.split)
diff --git
a/sdks/python/apache_beam/ml/inference/huggingface_inference_it_test.py
b/sdks/python/apache_beam/ml/inference/huggingface_inference_it_test.py
index 0be359a8719..dd675d1935a 100644
--- a/sdks/python/apache_beam/ml/inference/huggingface_inference_it_test.py
+++ b/sdks/python/apache_beam/ml/inference/huggingface_inference_it_test.py
@@ -75,6 +75,41 @@ class HuggingFaceInference(unittest.TestCase):
predicted_predicted_text = predictions_dict[text]
self.assertEqual(actual_predicted_text, predicted_predicted_text)
+ def test_hf_language_modeling_large_model(self):
+ test_pipeline = TestPipeline(is_integration_test=True)
+ # Path to text file containing some sentences
+ file_of_sentences = 'gs://apache-beam-ml/datasets/custom/hf_sentences.txt'
+ output_file_dir = 'gs://apache-beam-ml/testing/predictions'
+ output_file = '/'.join([output_file_dir, str(uuid.uuid4()), 'result.txt'])
+
+ model_name = 'stevhliu/my_awesome_eli5_mlm_model'
+
+ extra_opts = {
+ 'input': file_of_sentences,
+ 'output': output_file,
+ 'model_name': model_name,
+ 'large_model': True,
+ }
+ huggingface_language_modeling.run(
+ test_pipeline.get_full_options_as_args(**extra_opts),
+ save_main_session=False)
+
+ self.assertEqual(FileSystems().exists(output_file), True)
+ predictions = pytorch_inference_it_test.process_outputs(
+ filepath=output_file)
+ actuals_file =
'gs://apache-beam-ml/testing/expected_outputs/test_hf_run_inference_for_masked_lm_actuals.txt'
# pylint: disable=line-too-long
+ actuals = pytorch_inference_it_test.process_outputs(filepath=actuals_file)
+
+ predictions_dict = {}
+ for prediction in predictions:
+ text, predicted_text = prediction.split(';')
+ predictions_dict[text] = predicted_text.strip().lower()
+
+ for actual in actuals:
+ text, actual_predicted_text = actual.split(';')
+ predicted_predicted_text = predictions_dict[text]
+ self.assertEqual(actual_predicted_text, predicted_predicted_text)
+
def test_hf_pipeline(self):
test_pipeline = TestPipeline(is_integration_test=True)
# Path to text file containing some questions and context
diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference_it_test.py
b/sdks/python/apache_beam/ml/inference/pytorch_inference_it_test.py
index 5e377720408..e00660bcbd9 100644
--- a/sdks/python/apache_beam/ml/inference/pytorch_inference_it_test.py
+++ b/sdks/python/apache_beam/ml/inference/pytorch_inference_it_test.py
@@ -161,6 +161,41 @@ class PyTorchInference(unittest.TestCase):
predicted_predicted_text = predictions_dict[text]
self.assertEqual(actual_predicted_text, predicted_predicted_text)
+ @pytest.mark.uses_pytorch
+ @pytest.mark.it_postcommit
+ def test_torch_run_inference_bert_for_masked_lm_large_model(self):
+ test_pipeline = TestPipeline(is_integration_test=True)
+ # Path to text file containing some sentences
+ file_of_sentences = 'gs://apache-beam-ml/datasets/custom/sentences.txt' #
pylint: disable=line-too-long
+ output_file_dir = 'gs://apache-beam-ml/testing/predictions'
+ output_file = '/'.join([output_file_dir, str(uuid.uuid4()), 'result.txt'])
+
+ model_state_dict_path =
'gs://apache-beam-ml/models/huggingface.BertForMaskedLM.bert-base-uncased.pth'
# pylint: disable=line-too-long
+ extra_opts = {
+ 'input': file_of_sentences,
+ 'output': output_file,
+ 'model_state_dict_path': model_state_dict_path,
+ 'large_model': True,
+ }
+ pytorch_language_modeling.run(
+ test_pipeline.get_full_options_as_args(**extra_opts),
+ save_main_session=False)
+
+ self.assertEqual(FileSystems().exists(output_file), True)
+ predictions = process_outputs(filepath=output_file)
+ actuals_file =
'gs://apache-beam-ml/testing/expected_outputs/test_torch_run_inference_bert_for_masked_lm_actuals.txt'
# pylint: disable=line-too-long
+ actuals = process_outputs(filepath=actuals_file)
+
+ predictions_dict = {}
+ for prediction in predictions:
+ text, predicted_text = prediction.split(';')
+ predictions_dict[text] = predicted_text
+
+ for actual in actuals:
+ text, actual_predicted_text = actual.split(';')
+ predicted_predicted_text = predictions_dict[text]
+ self.assertEqual(actual_predicted_text, predicted_predicted_text)
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
diff --git a/sdks/python/apache_beam/ml/inference/sklearn_inference_it_test.py
b/sdks/python/apache_beam/ml/inference/sklearn_inference_it_test.py
index 73bb9341f25..c5480234cda 100644
--- a/sdks/python/apache_beam/ml/inference/sklearn_inference_it_test.py
+++ b/sdks/python/apache_beam/ml/inference/sklearn_inference_it_test.py
@@ -85,6 +85,38 @@ class SklearnInference(unittest.TestCase):
true_label, expected_prediction = expected_outputs[i].split(',')
self.assertEqual(predictions_dict[true_label], expected_prediction)
+ def test_sklearn_mnist_classification_large_model(self):
+ test_pipeline = TestPipeline(is_integration_test=True)
+ input_file = 'gs://apache-beam-ml/testing/inputs/it_mnist_data.csv'
+ output_file_dir = 'gs://temp-storage-for-end-to-end-tests'
+ output_file = '/'.join([output_file_dir, str(uuid.uuid4()), 'result.txt'])
+ model_path = 'gs://apache-beam-ml/models/mnist_model_svm.pickle'
+ extra_opts = {
+ 'input': input_file,
+ 'output': output_file,
+ 'model_path': model_path,
+ 'large_model': True
+ }
+ sklearn_mnist_classification.run(
+ test_pipeline.get_full_options_as_args(**extra_opts),
+ save_main_session=False)
+ self.assertEqual(FileSystems().exists(output_file), True)
+
+ expected_output_filepath =
'gs://apache-beam-ml/testing/expected_outputs/test_sklearn_mnist_classification_actuals.txt'
# pylint: disable=line-too-long
+ expected_outputs = process_outputs(expected_output_filepath)
+
+ predicted_outputs = process_outputs(output_file)
+ self.assertEqual(len(expected_outputs), len(predicted_outputs))
+
+ predictions_dict = {}
+ for i in range(len(predicted_outputs)):
+ true_label, prediction = predicted_outputs[i].split(',')
+ predictions_dict[true_label] = prediction
+
+ for i in range(len(expected_outputs)):
+ true_label, expected_prediction = expected_outputs[i].split(',')
+ self.assertEqual(predictions_dict[true_label], expected_prediction)
+
# TODO(https://github.com/apache/beam/issues/27151) use model with sklearn
1.2
@unittest.skipIf(sys.version_info >= (3, 11, 0), "Beam#27151")
def test_sklearn_regression(self):
diff --git
a/sdks/python/apache_beam/ml/inference/tensorflow_inference_it_test.py
b/sdks/python/apache_beam/ml/inference/tensorflow_inference_it_test.py
index bdc0291dd1e..4786b7a0398 100644
--- a/sdks/python/apache_beam/ml/inference/tensorflow_inference_it_test.py
+++ b/sdks/python/apache_beam/ml/inference/tensorflow_inference_it_test.py
@@ -102,6 +102,37 @@ class TensorflowInference(unittest.TestCase):
true_label, expected_prediction = expected_outputs[i].split(',')
self.assertEqual(predictions_dict[true_label], expected_prediction)
+ def test_tf_mnist_classification_large_model(self):
+ test_pipeline = TestPipeline(is_integration_test=True)
+ input_file = 'gs://apache-beam-ml/testing/inputs/it_mnist_data.csv'
+ output_file_dir = 'gs://apache-beam-ml/testing/outputs'
+ output_file = '/'.join([output_file_dir, str(uuid.uuid4()), 'result.txt'])
+ model_path = 'gs://apache-beam-ml/models/tensorflow/mnist/'
+ extra_opts = {
+ 'input': input_file,
+ 'output': output_file,
+ 'model_path': model_path,
+ 'large_model': True,
+ }
+ tensorflow_mnist_classification.run(
+ test_pipeline.get_full_options_as_args(**extra_opts),
+ save_main_session=False)
+ self.assertEqual(FileSystems().exists(output_file), True)
+
+ expected_output_filepath =
'gs://apache-beam-ml/testing/expected_outputs/test_sklearn_mnist_classification_actuals.txt'
# pylint: disable=line-too-long
+ expected_outputs = process_outputs(expected_output_filepath)
+ predicted_outputs = process_outputs(output_file)
+ self.assertEqual(len(expected_outputs), len(predicted_outputs))
+
+ predictions_dict = {}
+ for i in range(len(predicted_outputs)):
+ true_label, prediction = predicted_outputs[i].split(',')
+ predictions_dict[true_label] = prediction
+
+ for i in range(len(expected_outputs)):
+ true_label, expected_prediction = expected_outputs[i].split(',')
+ self.assertEqual(predictions_dict[true_label], expected_prediction)
+
def test_tf_imagenet_image_segmentation(self):
test_pipeline = TestPipeline(is_integration_test=True)
input_file = (
diff --git a/sdks/python/apache_beam/ml/inference/xgboost_inference_it_test.py
b/sdks/python/apache_beam/ml/inference/xgboost_inference_it_test.py
index e458ccab4b0..3db62bcc6a9 100644
--- a/sdks/python/apache_beam/ml/inference/xgboost_inference_it_test.py
+++ b/sdks/python/apache_beam/ml/inference/xgboost_inference_it_test.py
@@ -113,6 +113,40 @@ class XGBoostInference(unittest.TestCase):
true_label, expected_prediction = expected_output.split(',')
self.assertEqual(predictions_dict[true_label], expected_prediction)
+ def test_iris_classification_numpy_single_batch_large_model(self):
+ test_pipeline = TestPipeline(is_integration_test=True)
+ input_type = 'numpy'
+ output_file_dir = '/tmp'
+ output_file = '/'.join(
+ [output_file_dir, str(uuid.uuid4()), 'numpy_single_batch.txt'])
+ model_state_path =
'gs://apache-beam-ml/models/xgboost.iris_classifier.json'
+ extra_opts = {
+ 'input_type': input_type,
+ 'output': output_file,
+ 'model_state': model_state_path,
+ 'no_split': True,
+ 'large_model': True,
+ }
+
+ xgboost_iris_classification.run(
+ test_pipeline.get_full_options_as_args(**extra_opts),
+ save_main_session=False)
+ self.assertEqual(FileSystems().exists(output_file), True)
+
+ expected_outputs = EXPECTED_OUTPUT_SINGLE_BATCHES
+
+ predicted_outputs = process_outputs(output_file)
+ self.assertEqual(len(expected_outputs), len(predicted_outputs))
+
+ predictions_dict = {}
+ for predicted_output in predicted_outputs:
+ true_label, prediction = predicted_output.split(',')
+ predictions_dict[true_label] = prediction
+
+ for expected_output in expected_outputs:
+ true_label, expected_prediction = expected_output.split(',')
+ self.assertEqual(predictions_dict[true_label], expected_prediction)
+
def test_iris_classification_pandas_single_batch(self):
test_pipeline = TestPipeline(is_integration_test=True)
input_type = 'pandas'