damccorm commented on code in PR #25081:
URL: https://github.com/apache/beam/pull/25081#discussion_r1081842015


##########
sdks/python/apache_beam/examples/per_entity_training.py:
##########
@@ -0,0 +1,145 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A pipeline to demonstrate per-entity training.
+
+This pipeline reads data from a CSV file, that contains information
+about 15 different attributes like salary >=50k, education level,
+native country, age, occupation and others. The pipeline does some filtering
+by selecting certain education level, discarding missing values and empty rows.
+The pipeline then groups the rows based on education level and
+trains Decision Trees for each group and finally saves them.
+"""
+
+import argparse
+import logging
+import os
+
+from joblib import dump
+import pandas as pd
+from sklearn.compose import ColumnTransformer
+from sklearn.pipeline import Pipeline
+from sklearn.preprocessing import LabelEncoder
+from sklearn.preprocessing import MinMaxScaler
+from sklearn.preprocessing import OneHotEncoder
+from sklearn.tree import DecisionTreeClassifier
+
+import apache_beam as beam
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+
+
+class CreateKey(beam.DoFn):
+  def process(self, element, *args, **kwargs):
+    # 3rd column of the dataset is Education
+    idx = 3
+    key = element.pop(idx)
+    yield (key, element)
+
+
+def custom_filter(element):
+  """Discard data point if contains ?, and
+  doesn't have all features, and
+  doesn't have Bachelors, Masters or a Doctorate Degree"""

Review Comment:
   ```suggestion
     """Discard data point if contains ?,
     doesn't have all features, or
     doesn't have Bachelors, Masters or a Doctorate Degree"""
   ```



##########
sdks/python/apache_beam/examples/per_entity_training.py:
##########
@@ -0,0 +1,145 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A pipeline to demonstrate per-entity training.
+
+This pipeline reads data from a CSV file, that contains information
+about 15 different attributes like salary >=50k, education level,
+native country, age, occupation and others. The pipeline does some filtering
+by selecting certain education level, discarding missing values and empty rows.
+The pipeline then groups the rows based on education level and
+trains Decision Trees for each group and finally saves them.
+"""
+
+import argparse
+import logging
+import os
+
+from joblib import dump
+import pandas as pd
+from sklearn.compose import ColumnTransformer
+from sklearn.pipeline import Pipeline
+from sklearn.preprocessing import LabelEncoder
+from sklearn.preprocessing import MinMaxScaler
+from sklearn.preprocessing import OneHotEncoder
+from sklearn.tree import DecisionTreeClassifier
+
+import apache_beam as beam
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+
+
+class CreateKey(beam.DoFn):
+  def process(self, element, *args, **kwargs):
+    # 3rd column of the dataset is Education
+    idx = 3
+    key = element.pop(idx)
+    yield (key, element)
+
+
+def custom_filter(element):
+  """Discard data point if contains ?, and
+  doesn't have all features, and
+  doesn't have Bachelors, Masters or a Doctorate Degree"""
+  return len(element) == 15 and '?' not in element \
+      and ' Bachelors' in element or ' Masters' in element \
+      or ' Doctorate' in element
+
+
+class PrepareDataforTraining(beam.DoFn):
+  """Preprocess data in a format suitable for training."""
+  def process(self, element, *args, **kwargs):
+    key, values = element
+    #Convert to dataframe
+    df = pd.DataFrame(values)
+    last_ix = len(df.columns) - 1
+    X, y = df.drop(last_ix, axis=1), df[last_ix]
+    # select categorical and numerical features
+    cat_ix = X.select_dtypes(include=['object', 'bool']).columns
+    num_ix = X.select_dtypes(include=['int64', 'float64']).columns
+    # label encode the target variable to have the classes 0 and 1
+    y = LabelEncoder().fit_transform(y)
+    yield (X, y, cat_ix, num_ix, key)
+
+
+class TrainModel(beam.DoFn):
+  """Takes preprocessed data as input,
+  transforms categorical columns using OneHotEncoder,
+  normalizes numerical columns and then
+  fits a decision tree classifier.
+  """
+  def process(self, element, *args, **kwargs):
+    X, y, cat_ix, num_ix, key = element
+    steps = [('c', OneHotEncoder(handle_unknown='ignore'), cat_ix),
+             ('n', MinMaxScaler(), num_ix)]
+    # one hot encode categorical, normalize numerical
+    ct = ColumnTransformer(steps)
+    # wrap the model in a pipeline
+    pipeline = Pipeline(steps=[('t', ct), ('m', DecisionTreeClassifier())])
+    pipeline.fit(X, y)
+    yield (key, pipeline)
+
+
+class SaveModel(beam.DoFn):
+  """Saves the trained model to specified location."""
+  def process(self, element, path, *args, **kwargs):
+    key, trained_model = element
+    dump(trained_model, os.path.join(path, f"{key}_model.joblib"))

Review Comment:
   Instead of using dump with a fixed path, could we please use [fileio with 
dynamic 
destinations](https://beam.apache.org/releases/pydoc/2.15.0/apache_beam.io.fileio.html#dynamic-destinations)?
 That way its easy to write to gcs or other known filesystems.
   
   It would also be good to make that path a configurable optional known_arg



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to