[ 
https://issues.apache.org/jira/browse/BEAM-14535?focusedWorklogId=777167&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-777167
 ]

ASF GitHub Bot logged work on BEAM-14535:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Jun/22 21:05
            Start Date: 01/Jun/22 21:05
    Worklog Time Spent: 10m 
      Work Description: yeandy commented on code in PR #17800:
URL: https://github.com/apache/beam/pull/17800#discussion_r887267918


##########
sdks/python/apache_beam/ml/inference/sklearn_inference_test.py:
##########
@@ -172,6 +231,53 @@ def test_bad_input_type_raises(self):
             model_uri=file.name, model_file_type=None)
         model_loader.load_model()
 
+  @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359')
+  def test_pipeline_pandas(self):
+    temp_file_name = self.tmpdir + os.sep + 'pickled_file'
+    with open(temp_file_name, 'wb') as file:
+      pickle.dump(build_pandas_pipeline(), file)
+    with TestPipeline() as pipeline:
+      data_frame = pandas_dataframe()

Review Comment:
   nit
   ```suggestion
         dataframe = pandas_dataframe()
   ```



##########
sdks/python/apache_beam/ml/inference/sklearn_inference_test.py:
##########
@@ -172,6 +231,53 @@ def test_bad_input_type_raises(self):
             model_uri=file.name, model_file_type=None)
         model_loader.load_model()
 
+  @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359')
+  def test_pipeline_pandas(self):
+    temp_file_name = self.tmpdir + os.sep + 'pickled_file'
+    with open(temp_file_name, 'wb') as file:
+      pickle.dump(build_pandas_pipeline(), file)
+    with TestPipeline() as pipeline:
+      data_frame = pandas_dataframe()
+
+      pcoll = pipeline | 'start' >> beam.Create([data_frame])
+      actual = pcoll | api.RunInference(
+          SklearnModelLoader(model_uri=temp_file_name))
+
+      splits = [data_frame.loc[[i]] for i in data_frame.index]
+      expected = [
+          api.PredictionResult(splits[0], 5),
+          api.PredictionResult(splits[1], 8),
+          api.PredictionResult(splits[2], 1),
+          api.PredictionResult(splits[3], 1),
+          api.PredictionResult(splits[4], 2),
+      ]
+      assert_that(
+          actual, equal_to(expected, equals_fn=_compare_dataframe_predictions))
+
+  @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359')
+  def test_pipeline_pandas_with_keys(self):

Review Comment:
   We're already have a keyed test in `base_test.py`. Is it necessary to test a 
keyed example for each framework?



##########
sdks/python/apache_beam/ml/inference/sklearn_inference.py:
##########
@@ -42,19 +44,46 @@ class ModelFileType(enum.Enum):
   JOBLIB = 2
 
 
-class SklearnInferenceRunner(InferenceRunner[numpy.ndarray,
+class SklearnInferenceRunner(InferenceRunner[Union[numpy.ndarray,
+                                                   pandas.DataFrame],
                                              PredictionResult,
                                              BaseEstimator]):
-  def run_inference(self, batch: List[numpy.ndarray],
-                    model: BaseEstimator) -> Iterable[PredictionResult]:
+  def run_inference(
+      self,
+      batch: List[Union[numpy.ndarray, pandas.DataFrame]],

Review Comment:
   I don't think there's a convention pertaining the order of elements in a 
Union? Since `pandas.DataFrame` is used more, would this be more useful?
   ```suggestion
         batch: List[Union[pandas.DataFrame, numpy.ndarray]],
   ```



##########
sdks/python/apache_beam/ml/inference/sklearn_inference.py:
##########
@@ -42,19 +44,46 @@ class ModelFileType(enum.Enum):
   JOBLIB = 2
 
 
-class SklearnInferenceRunner(InferenceRunner[numpy.ndarray,
+class SklearnInferenceRunner(InferenceRunner[Union[numpy.ndarray,
+                                                   pandas.DataFrame],
                                              PredictionResult,
                                              BaseEstimator]):
-  def run_inference(self, batch: List[numpy.ndarray],
-                    model: BaseEstimator) -> Iterable[PredictionResult]:
+  def run_inference(
+      self,
+      batch: List[Union[numpy.ndarray, pandas.DataFrame]],
+      model: BaseEstimator) -> Iterable[PredictionResult]:
+    if isinstance(batch[0], numpy.ndarray):
+      return SklearnInferenceRunner._predict_np_array(batch, model)
+    elif isinstance(batch[0], pandas.DataFrame):
+      return SklearnInferenceRunner._predict_pandas_dataframe(batch, model)
+
+  @staticmethod
+  def _predict_np_array(batch: List[numpy.ndarray],
+                        model: Any) -> Iterable[PredictionResult]:
     # vectorize data for better performance
     vectorized_batch = numpy.stack(batch, axis=0)
     predictions = model.predict(vectorized_batch)
     return [PredictionResult(x, y) for x, y in zip(batch, predictions)]
 
-  def get_num_bytes(self, batch: List[numpy.ndarray]) -> int:
+  @staticmethod
+  def _predict_pandas_dataframe(batch: List[pandas.DataFrame],
+                                model: Any) -> Iterable[PredictionResult]:
+    # vectorize data for better performance
+    vectorized_batch = pandas.concat(batch, axis=0)
+    predictions = model.predict(vectorized_batch)
+    splits = [vectorized_batch.loc[[i]] for i in vectorized_batch.index]

Review Comment:
   I did a toy example like so:
   ```
   >>> df1 = pd.DataFrame([[1,2,3],[4,5,6]])
   >>> df2 = pd.DataFrame([[11,12,13],[14,15,16]])
   >>> vectorized_batch = pd.concat([df1, df2])
   >>> vectorized_batch.index
   Int64Index([0, 1, 0, 1], dtype='int64')
   >>> vectorized_batch.loc[[0]]
       0   1   2
   0   1   2   3
   0  11  12  13
   >>> vectorized_batch.loc[[0]]
       0   1   2
   1   4   5   6
   1  14  15  16
   >>> splits = [vectorized_batch.loc[[i]] for i in vectorized_batch.index]
   >>> splits
   [    0   1   2
   0   1   2   3
   0  11  12  13,     0   1   2
   1   4   5   6
   1  14  15  16,     0   1   2
   0   1   2   3
   0  11  12  13,     0   1   2
   1   4   5   6
   1  14  15  16]
   ```
   I see that index 0 is found in both the first and second batch. Same for 
index 1. So indexing by `vectorized_batch.index` appears to up data from the 
different DataFrames. 
   
   Can you clarify how `vectorized_batch.index` is supposed to differentiate 
between the batches?



##########
sdks/python/apache_beam/ml/inference/sklearn_inference_test.py:
##########
@@ -62,6 +83,44 @@ def build_model():
   return model
 
 
+def pandas_dataframe():
+  csv_string = (
+      'category_1,number_1,category_2,number_2,label,number_3\n'

Review Comment:
   Does this implicitly add indexes starting from 0? What if the user has 
custom indexes?



##########
sdks/python/apache_beam/ml/inference/sklearn_inference_test.py:
##########
@@ -172,6 +231,53 @@ def test_bad_input_type_raises(self):
             model_uri=file.name, model_file_type=None)
         model_loader.load_model()
 
+  @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359')
+  def test_pipeline_pandas(self):
+    temp_file_name = self.tmpdir + os.sep + 'pickled_file'
+    with open(temp_file_name, 'wb') as file:
+      pickle.dump(build_pandas_pipeline(), file)
+    with TestPipeline() as pipeline:
+      data_frame = pandas_dataframe()
+
+      pcoll = pipeline | 'start' >> beam.Create([data_frame])

Review Comment:
   So here, we have a list of 1 DataFrame. Can we test a list of 2 or more 
DataFrames which would simulate reading in separate text files into their own 
DFs?
   



##########
sdks/python/apache_beam/ml/inference/sklearn_inference.py:
##########
@@ -42,19 +44,46 @@ class ModelFileType(enum.Enum):
   JOBLIB = 2
 
 
-class SklearnInferenceRunner(InferenceRunner[numpy.ndarray,
+class SklearnInferenceRunner(InferenceRunner[Union[numpy.ndarray,
+                                                   pandas.DataFrame],
                                              PredictionResult,
                                              BaseEstimator]):
-  def run_inference(self, batch: List[numpy.ndarray],
-                    model: BaseEstimator) -> Iterable[PredictionResult]:
+  def run_inference(
+      self,
+      batch: List[Union[numpy.ndarray, pandas.DataFrame]],
+      model: BaseEstimator) -> Iterable[PredictionResult]:
+    if isinstance(batch[0], numpy.ndarray):
+      return SklearnInferenceRunner._predict_np_array(batch, model)
+    elif isinstance(batch[0], pandas.DataFrame):
+      return SklearnInferenceRunner._predict_pandas_dataframe(batch, model)

Review Comment:
   Nice refactor



##########
sdks/python/apache_beam/ml/inference/sklearn_inference.py:
##########
@@ -42,19 +44,46 @@ class ModelFileType(enum.Enum):
   JOBLIB = 2
 
 
-class SklearnInferenceRunner(InferenceRunner[numpy.ndarray,
+class SklearnInferenceRunner(InferenceRunner[Union[numpy.ndarray,
+                                                   pandas.DataFrame],
                                              PredictionResult,
                                              BaseEstimator]):
-  def run_inference(self, batch: List[numpy.ndarray],
-                    model: BaseEstimator) -> Iterable[PredictionResult]:
+  def run_inference(
+      self,
+      batch: List[Union[numpy.ndarray, pandas.DataFrame]],
+      model: BaseEstimator) -> Iterable[PredictionResult]:
+    if isinstance(batch[0], numpy.ndarray):
+      return SklearnInferenceRunner._predict_np_array(batch, model)
+    elif isinstance(batch[0], pandas.DataFrame):
+      return SklearnInferenceRunner._predict_pandas_dataframe(batch, model)
+
+  @staticmethod
+  def _predict_np_array(batch: List[numpy.ndarray],
+                        model: Any) -> Iterable[PredictionResult]:
     # vectorize data for better performance
     vectorized_batch = numpy.stack(batch, axis=0)
     predictions = model.predict(vectorized_batch)
     return [PredictionResult(x, y) for x, y in zip(batch, predictions)]
 
-  def get_num_bytes(self, batch: List[numpy.ndarray]) -> int:
+  @staticmethod
+  def _predict_pandas_dataframe(batch: List[pandas.DataFrame],
+                                model: Any) -> Iterable[PredictionResult]:
+    # vectorize data for better performance
+    vectorized_batch = pandas.concat(batch, axis=0)
+    predictions = model.predict(vectorized_batch)
+    splits = [vectorized_batch.loc[[i]] for i in vectorized_batch.index]
+    return [
+        PredictionResult(example, inference) for example,
+        inference in zip(splits, predictions)
+    ]
+
+  def get_num_bytes(
+      self, batch: List[Union[numpy.ndarray, pandas.DataFrame]]) -> int:
     """Returns the number of bytes of data for a batch."""
-    return sum(sys.getsizeof(element) for element in batch)
+    if isinstance(batch[0], numpy.ndarray):
+      return sum(sys.getsizeof(element) for element in batch)
+    elif isinstance(batch[0], pandas.DataFrame):
+      return sum(df.memory_usage(deep=True).sum() for df in batch)

Review Comment:
   What does `deep=True` mean?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 777167)
    Time Spent: 1h 20m  (was: 1h 10m)

> Add support for Pandas Dataframes to sklearn RunInference Implementation
> ------------------------------------------------------------------------
>
>                 Key: BEAM-14535
>                 URL: https://issues.apache.org/jira/browse/BEAM-14535
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Ryan Thompson
>            Assignee: Ryan Thompson
>            Priority: P2
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Sklearn pipelines are often set up to take pandas dataframes.
>  
> Our current implementation only supports numpy arrays.
>  
> This FR allows the sklearn implementation to autodetect pandas dataframes or 
> numpy arrays and then combine them (via concat).
>  
> In the case of a pandas dataframe that value will be passed through to the 
> pipeline.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to