[
https://issues.apache.org/jira/browse/BEAM-14068?focusedWorklogId=773606&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-773606
]
ASF GitHub Bot logged work on BEAM-14068:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 23/May/22 17:13
Start Date: 23/May/22 17:13
Worklog Time Spent: 10m
Work Description: AnandInguva commented on code in PR #17697:
URL: https://github.com/apache/beam/pull/17697#discussion_r879700180
##########
sdks/python/apache_beam/examples/inference/sklearn_inference_example.py:
##########
@@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import argparse
+import numpy as np
+
+import apache_beam as beam
+
+from apache_beam.ml.inference.api import RunInference
+from apache_beam.ml.inference.sklearn_inference import ModelFileType
+from apache_beam.ml.inference.sklearn_inference import SklearnModelLoader
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+
+
+class GetData(beam.DoFn):
+ def process(self, dataset_name):
+ if dataset_name == 'fetch_20newsgroups':
+ from sklearn.datasets import fetch_20newsgroups_vectorized
+ newsgroups_test = fetch_20newsgroups_vectorized(
+ subset='test', data_home='/tmp')
+ for x, y in zip(newsgroups_test['data'], newsgroups_test['target']):
+ # (n_feat,) is supported but not (1, n_feat)
+ yield y, np.squeeze(x.toarray())
+ else:
+ raise NotImplementedError
+
+
+class PostProcessor(beam.DoFn):
+ """Post process PredictionResult to output true_label and
+ prediction using numpy."""
+ def process(self, element):
+ true_label, prediction_result = element
+ prediction = prediction_result.inference
+ yield true_label, prediction
+
+
+def setup_pipeline(options: PipelineOptions, args=None):
+ """Sets up Sklearn RunInference pipeline"""
+ model_loader = SklearnModelLoader(
+ model_file_type=ModelFileType.PICKLE, model_uri=args.model_path)
Review Comment:
Yes, but I thought it would be nice just to pass it as a param :)
##########
sdks/python/apache_beam/ml/inference/sklearn_inference_it_test.py:
##########
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-End test for Sklearn Inference"""
+
+import logging
+import pytest
+import unittest
+import uuid
+
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.examples.inference import sklearn_inference_example
+from apache_beam.testing.test_pipeline import TestPipeline
+
+
+class SklearnInference(unittest.TestCase):
+ @pytest.mark.it_postcommit
+ @pytest.mark.sickbay_direct
+ @pytest.mark.sickbay_spark
+ @pytest.mark.sickbay_flink
+ def test_predictions_output_file(self):
+ test_pipeline = TestPipeline(is_integration_test=True)
+ output_file_dir =
'gs://apache-beam-ml/temp_storage_end_to_end_testing/outputs' # pylint:
disable=line-too-long
+ output_file = '/'.join([output_file_dir, str(uuid.uuid4()), 'result.txt'])
+ model_path =
'gs://apache-beam-ml/temp_storage_end_to_end_testing/models/fetch_20newsgroups/multinomialNB.pkl'
# pylint: disable=line-too-long
+ extra_opts = {
+ 'dataset': 'fetch_20newsgroups',
+ 'output': output_file,
+ 'model_path': model_path,
+ }
+
+ sklearn_inference_example.run(
+ test_pipeline.get_full_options_as_args(**extra_opts),
+ save_main_session=False)
+ self.assertEqual(FileSystems().exists(output_file), True)
Review Comment:
validation is still WIP.
##########
sdks/python/apache_beam/examples/inference/sklearn_inference_example.py:
##########
@@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import argparse
+import numpy as np
+
+import apache_beam as beam
+
+from apache_beam.ml.inference.api import RunInference
+from apache_beam.ml.inference.sklearn_inference import ModelFileType
+from apache_beam.ml.inference.sklearn_inference import SklearnModelLoader
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+
+
+class GetData(beam.DoFn):
+ def process(self, dataset_name):
+ if dataset_name == 'fetch_20newsgroups':
+ from sklearn.datasets import fetch_20newsgroups_vectorized
+ newsgroups_test = fetch_20newsgroups_vectorized(
+ subset='test', data_home='/tmp')
+ for x, y in zip(newsgroups_test['data'], newsgroups_test['target']):
+ # (n_feat,) is supported but not (1, n_feat)
+ yield y, np.squeeze(x.toarray())
+ else:
+ raise NotImplementedError
+
+
+class PostProcessor(beam.DoFn):
+ """Post process PredictionResult to output true_label and
Review Comment:
conventional way, we call label as true_label. Anyway, I am gonna change the
docstring on this method soon
Issue Time Tracking
-------------------
Worklog Id: (was: 773606)
Time Spent: 6h (was: 5h 50m)
> RunInference Benchmarking tests
> -------------------------------
>
> Key: BEAM-14068
> URL: https://issues.apache.org/jira/browse/BEAM-14068
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-core
> Reporter: Anand Inguva
> Assignee: Anand Inguva
> Priority: P2
> Time Spent: 6h
> Remaining Estimate: 0h
>
> RunInference benchmarks will evaluate performance of Pipelines, which
> represent common use cases of Beam + Dataflow in Pytorch, sklearn and
> possibly TFX. These benchmarks would be the integration tests that exercise
> several software components using Beam, PyTorch, Scikit learn and TensorFlow
> extended.
> we would use the datasets that's available publicly (Eg; Kaggle).
> Size: small / 10 GB / 1 TB etc
> The default execution runner would be Dataflow unless specified otherwise.
> These tests would be run very less frequently(every release cycle).
--
This message was sent by Atlassian Jira
(v8.20.7#820007)