[ 
https://issues.apache.org/jira/browse/BEAM-14068?focusedWorklogId=771888&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-771888
 ]

ASF GitHub Bot logged work on BEAM-14068:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/May/22 13:59
            Start Date: 18/May/22 13:59
    Worklog Time Spent: 10m 
      Work Description: ryanthompson591 commented on code in PR #17697:
URL: https://github.com/apache/beam/pull/17697#discussion_r875902795


##########
sdks/python/apache_beam/examples/inference/sklearn_inference_example.py:
##########
@@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import argparse
+import numpy as np
+
+import apache_beam as beam
+
+from apache_beam.ml.inference.api import RunInference
+from apache_beam.ml.inference.sklearn_inference import ModelFileType
+from apache_beam.ml.inference.sklearn_inference import SklearnModelLoader
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+
+
+class GetData(beam.DoFn):
+  def process(self, dataset_name):
+    if dataset_name == 'fetch_20newsgroups':
+      from sklearn.datasets import fetch_20newsgroups_vectorized
+      newsgroups_test = fetch_20newsgroups_vectorized(
+          subset='test', data_home='/tmp')
+      for x, y in zip(newsgroups_test['data'], newsgroups_test['target']):
+        # (n_feat,) is supported but not (1, n_feat)
+        yield y, np.squeeze(x.toarray())
+    else:
+      raise NotImplementedError
+
+
+class PostProcessor(beam.DoFn):
+  """Post process PredictionResult to output true_label and
+  prediction using numpy."""
+  def process(self, element):
+    true_label, prediction_result = element
+    prediction = prediction_result.inference
+    yield true_label, prediction
+
+
+def setup_pipeline(options: PipelineOptions, args=None):
+  """Sets up Sklearn RunInference pipeline"""
+  model_loader = SklearnModelLoader(
+      model_file_type=ModelFileType.PICKLE, model_uri=args.model_path)

Review Comment:
   pickle is the default, so no need to specify.



##########
sdks/python/apache_beam/examples/inference/sklearn_inference_example.py:
##########
@@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import argparse
+import numpy as np
+
+import apache_beam as beam
+
+from apache_beam.ml.inference.api import RunInference
+from apache_beam.ml.inference.sklearn_inference import ModelFileType
+from apache_beam.ml.inference.sklearn_inference import SklearnModelLoader
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+
+
+class GetData(beam.DoFn):
+  def process(self, dataset_name):
+    if dataset_name == 'fetch_20newsgroups':
+      from sklearn.datasets import fetch_20newsgroups_vectorized
+      newsgroups_test = fetch_20newsgroups_vectorized(
+          subset='test', data_home='/tmp')
+      for x, y in zip(newsgroups_test['data'], newsgroups_test['target']):
+        # (n_feat,) is supported but not (1, n_feat)
+        yield y, np.squeeze(x.toarray())
+    else:
+      raise NotImplementedError
+
+
+class PostProcessor(beam.DoFn):
+  """Post process PredictionResult to output true_label and
+  prediction using numpy."""
+  def process(self, element):
+    true_label, prediction_result = element
+    prediction = prediction_result.inference
+    yield true_label, prediction
+
+
+def setup_pipeline(options: PipelineOptions, args=None):
+  """Sets up Sklearn RunInference pipeline"""
+  model_loader = SklearnModelLoader(
+      model_file_type=ModelFileType.PICKLE, model_uri=args.model_path)
+
+  with beam.Pipeline(options=options) as p:
+    data = (
+        p | "Dataset by name" >> beam.Create([args.dataset])
+        | "Get data" >> beam.ParDo(GetData()))
+
+    predictions = (
+        data | "RunInference" >> RunInference(model_loader)
+        | "PostProcessor" >> beam.ParDo(PostProcessor()))
+
+    predictions | "Write output to GCS" >> beam.io.WriteToText( # pylint: 
disable=expression-not-assigned
+      args.output,
+      file_name_suffix='.txt',
+      shard_name_template='',
+      append_trailing_newlines=True)
+
+
+def parse_known_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--dataset',
+      dest='dataset',
+      help='Sklearn built-in dataset name',
+      # datasets in sklearn.datasets module
+      choices=['olivetti_faces', 'fetch_20newsgroups'])

Review Comment:
   I suppose this is fine, but IMO it would be better to use a hosted dataset.
   
   



##########
sdks/python/apache_beam/ml/inference/sklearn_inference_it_test.py:
##########
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-End test for Sklearn Inference"""
+
+import logging
+import pytest
+import unittest
+import uuid
+
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.examples.inference import sklearn_inference_example
+from apache_beam.testing.test_pipeline import TestPipeline
+
+
+class SklearnInference(unittest.TestCase):
+  @pytest.mark.it_postcommit
+  @pytest.mark.sickbay_direct
+  @pytest.mark.sickbay_spark
+  @pytest.mark.sickbay_flink
+  def test_predictions_output_file(self):
+    test_pipeline = TestPipeline(is_integration_test=True)
+    output_file_dir = 
'gs://apache-beam-ml/temp_storage_end_to_end_testing/outputs'  # pylint: 
disable=line-too-long
+    output_file = '/'.join([output_file_dir, str(uuid.uuid4()), 'result.txt'])
+    model_path = 
'gs://apache-beam-ml/temp_storage_end_to_end_testing/models/fetch_20newsgroups/multinomialNB.pkl'
  # pylint: disable=line-too-long
+    extra_opts = {
+        'dataset': 'fetch_20newsgroups',
+        'output': output_file,
+        'model_path': model_path,
+    }
+
+    sklearn_inference_example.run(
+        test_pipeline.get_full_options_as_args(**extra_opts),
+        save_main_session=False)
+    self.assertEqual(FileSystems().exists(output_file), True)

Review Comment:
   Is it possible to read anything from the file and validate it?



##########
sdks/python/apache_beam/examples/inference/sklearn_inference_example.py:
##########
@@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import argparse
+import numpy as np
+
+import apache_beam as beam
+
+from apache_beam.ml.inference.api import RunInference
+from apache_beam.ml.inference.sklearn_inference import ModelFileType
+from apache_beam.ml.inference.sklearn_inference import SklearnModelLoader
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+
+
+class GetData(beam.DoFn):

Review Comment:
   I think this would be a stronger example if it followed a more conventional 
approach like loading from a csv.
   
   Also in this case, even having labels is not important. Since I suppose in a 
true example if the labels were already known it wouldn't be valuable.
   
   Plus I don't think the labels are really a good key, later when we validate 
data. Because I don't think the data is guaranteed to come out ordered the same 
as it was input.
   
   A better key for later validating expectation might be just the index of the 
example.



##########
sdks/python/apache_beam/examples/inference/sklearn_inference_example.py:
##########
@@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import argparse
+import numpy as np
+
+import apache_beam as beam
+
+from apache_beam.ml.inference.api import RunInference
+from apache_beam.ml.inference.sklearn_inference import ModelFileType
+from apache_beam.ml.inference.sklearn_inference import SklearnModelLoader
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+
+
+class GetData(beam.DoFn):
+  def process(self, dataset_name):
+    if dataset_name == 'fetch_20newsgroups':
+      from sklearn.datasets import fetch_20newsgroups_vectorized
+      newsgroups_test = fetch_20newsgroups_vectorized(
+          subset='test', data_home='/tmp')
+      for x, y in zip(newsgroups_test['data'], newsgroups_test['target']):
+        # (n_feat,) is supported but not (1, n_feat)
+        yield y, np.squeeze(x.toarray())
+    else:
+      raise NotImplementedError
+
+
+class PostProcessor(beam.DoFn):
+  """Post process PredictionResult to output true_label and

Review Comment:
   Maybe rename true_label to expected_label or just label. ML people should 
know what that is.



##########
sdks/python/apache_beam/ml/inference/sklearn_inference_it_test.py:
##########
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-End test for Sklearn Inference"""
+
+import logging
+import pytest
+import unittest
+import uuid
+
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.examples.inference import sklearn_inference_example
+from apache_beam.testing.test_pipeline import TestPipeline
+
+
+class SklearnInference(unittest.TestCase):
+  @pytest.mark.it_postcommit
+  @pytest.mark.sickbay_direct

Review Comment:
   Why do we want to sickbay spark/flink/direct.  Shouldn't those all work?
   
   Or will they fail when data is hosted on gcs?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 771888)
    Time Spent: 5h 10m  (was: 5h)

> RunInference Benchmarking tests
> -------------------------------
>
>                 Key: BEAM-14068
>                 URL: https://issues.apache.org/jira/browse/BEAM-14068
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Anand Inguva
>            Assignee: Anand Inguva
>            Priority: P2
>          Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> RunInference benchmarks will evaluate performance of Pipelines, which 
> represent common use cases of Beam + Dataflow in Pytorch, sklearn and 
> possibly TFX. These benchmarks would be the integration tests that exercise 
> several software components using Beam, PyTorch, Scikit learn and TensorFlow 
> extended.
> we would use the datasets that's available publicly (Eg; Kaggle). 
> Size: small / 10 GB / 1 TB etc
> The default execution runner would be Dataflow unless specified otherwise.
> These tests would be run very less frequently(every release cycle).  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to