yeandy commented on code in PR #17800:
URL: https://github.com/apache/beam/pull/17800#discussion_r887267918


##########
sdks/python/apache_beam/ml/inference/sklearn_inference_test.py:
##########
@@ -172,6 +231,53 @@ def test_bad_input_type_raises(self):
             model_uri=file.name, model_file_type=None)
         model_loader.load_model()
 
+  @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359')
+  def test_pipeline_pandas(self):
+    temp_file_name = self.tmpdir + os.sep + 'pickled_file'
+    with open(temp_file_name, 'wb') as file:
+      pickle.dump(build_pandas_pipeline(), file)
+    with TestPipeline() as pipeline:
+      data_frame = pandas_dataframe()

Review Comment:
   nit
   ```suggestion
         dataframe = pandas_dataframe()
   ```



##########
sdks/python/apache_beam/ml/inference/sklearn_inference_test.py:
##########
@@ -172,6 +231,53 @@ def test_bad_input_type_raises(self):
             model_uri=file.name, model_file_type=None)
         model_loader.load_model()
 
+  @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359')
+  def test_pipeline_pandas(self):
+    temp_file_name = self.tmpdir + os.sep + 'pickled_file'
+    with open(temp_file_name, 'wb') as file:
+      pickle.dump(build_pandas_pipeline(), file)
+    with TestPipeline() as pipeline:
+      data_frame = pandas_dataframe()
+
+      pcoll = pipeline | 'start' >> beam.Create([data_frame])
+      actual = pcoll | api.RunInference(
+          SklearnModelLoader(model_uri=temp_file_name))
+
+      splits = [data_frame.loc[[i]] for i in data_frame.index]
+      expected = [
+          api.PredictionResult(splits[0], 5),
+          api.PredictionResult(splits[1], 8),
+          api.PredictionResult(splits[2], 1),
+          api.PredictionResult(splits[3], 1),
+          api.PredictionResult(splits[4], 2),
+      ]
+      assert_that(
+          actual, equal_to(expected, equals_fn=_compare_dataframe_predictions))
+
+  @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359')
+  def test_pipeline_pandas_with_keys(self):

Review Comment:
   We're already have a keyed test in `base_test.py`. Is it necessary to test a 
keyed example for each framework?



##########
sdks/python/apache_beam/ml/inference/sklearn_inference.py:
##########
@@ -42,19 +44,46 @@ class ModelFileType(enum.Enum):
   JOBLIB = 2
 
 
-class SklearnInferenceRunner(InferenceRunner[numpy.ndarray,
+class SklearnInferenceRunner(InferenceRunner[Union[numpy.ndarray,
+                                                   pandas.DataFrame],
                                              PredictionResult,
                                              BaseEstimator]):
-  def run_inference(self, batch: List[numpy.ndarray],
-                    model: BaseEstimator) -> Iterable[PredictionResult]:
+  def run_inference(
+      self,
+      batch: List[Union[numpy.ndarray, pandas.DataFrame]],

Review Comment:
   I don't think there's a convention pertaining the order of elements in a 
Union? Since `pandas.DataFrame` is used more, would this be more useful?
   ```suggestion
         batch: List[Union[pandas.DataFrame, numpy.ndarray]],
   ```



##########
sdks/python/apache_beam/ml/inference/sklearn_inference.py:
##########
@@ -42,19 +44,46 @@ class ModelFileType(enum.Enum):
   JOBLIB = 2
 
 
-class SklearnInferenceRunner(InferenceRunner[numpy.ndarray,
+class SklearnInferenceRunner(InferenceRunner[Union[numpy.ndarray,
+                                                   pandas.DataFrame],
                                              PredictionResult,
                                              BaseEstimator]):
-  def run_inference(self, batch: List[numpy.ndarray],
-                    model: BaseEstimator) -> Iterable[PredictionResult]:
+  def run_inference(
+      self,
+      batch: List[Union[numpy.ndarray, pandas.DataFrame]],
+      model: BaseEstimator) -> Iterable[PredictionResult]:
+    if isinstance(batch[0], numpy.ndarray):
+      return SklearnInferenceRunner._predict_np_array(batch, model)
+    elif isinstance(batch[0], pandas.DataFrame):
+      return SklearnInferenceRunner._predict_pandas_dataframe(batch, model)
+
+  @staticmethod
+  def _predict_np_array(batch: List[numpy.ndarray],
+                        model: Any) -> Iterable[PredictionResult]:
     # vectorize data for better performance
     vectorized_batch = numpy.stack(batch, axis=0)
     predictions = model.predict(vectorized_batch)
     return [PredictionResult(x, y) for x, y in zip(batch, predictions)]
 
-  def get_num_bytes(self, batch: List[numpy.ndarray]) -> int:
+  @staticmethod
+  def _predict_pandas_dataframe(batch: List[pandas.DataFrame],
+                                model: Any) -> Iterable[PredictionResult]:
+    # vectorize data for better performance
+    vectorized_batch = pandas.concat(batch, axis=0)
+    predictions = model.predict(vectorized_batch)
+    splits = [vectorized_batch.loc[[i]] for i in vectorized_batch.index]

Review Comment:
   I did a toy example like so:
   ```
   >>> df1 = pd.DataFrame([[1,2,3],[4,5,6]])
   >>> df2 = pd.DataFrame([[11,12,13],[14,15,16]])
   >>> vectorized_batch = pd.concat([df1, df2])
   >>> vectorized_batch.index
   Int64Index([0, 1, 0, 1], dtype='int64')
   >>> vectorized_batch.loc[[0]]
       0   1   2
   0   1   2   3
   0  11  12  13
   >>> vectorized_batch.loc[[0]]
       0   1   2
   1   4   5   6
   1  14  15  16
   >>> splits = [vectorized_batch.loc[[i]] for i in vectorized_batch.index]
   >>> splits
   [    0   1   2
   0   1   2   3
   0  11  12  13,     0   1   2
   1   4   5   6
   1  14  15  16,     0   1   2
   0   1   2   3
   0  11  12  13,     0   1   2
   1   4   5   6
   1  14  15  16]
   ```
   I see that index 0 is found in both the first and second batch. Same for 
index 1. So indexing by `vectorized_batch.index` appears to up data from the 
different DataFrames. 
   
   Can you clarify how `vectorized_batch.index` is supposed to differentiate 
between the batches?



##########
sdks/python/apache_beam/ml/inference/sklearn_inference_test.py:
##########
@@ -62,6 +83,44 @@ def build_model():
   return model
 
 
+def pandas_dataframe():
+  csv_string = (
+      'category_1,number_1,category_2,number_2,label,number_3\n'

Review Comment:
   Does this implicitly add indexes starting from 0? What if the user has 
custom indexes?



##########
sdks/python/apache_beam/ml/inference/sklearn_inference_test.py:
##########
@@ -172,6 +231,53 @@ def test_bad_input_type_raises(self):
             model_uri=file.name, model_file_type=None)
         model_loader.load_model()
 
+  @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359')
+  def test_pipeline_pandas(self):
+    temp_file_name = self.tmpdir + os.sep + 'pickled_file'
+    with open(temp_file_name, 'wb') as file:
+      pickle.dump(build_pandas_pipeline(), file)
+    with TestPipeline() as pipeline:
+      data_frame = pandas_dataframe()
+
+      pcoll = pipeline | 'start' >> beam.Create([data_frame])

Review Comment:
   So here, we have a list of 1 DataFrame. Can we test a list of 2 or more 
DataFrames which would simulate reading in separate text files into their own 
DFs?
   



##########
sdks/python/apache_beam/ml/inference/sklearn_inference.py:
##########
@@ -42,19 +44,46 @@ class ModelFileType(enum.Enum):
   JOBLIB = 2
 
 
-class SklearnInferenceRunner(InferenceRunner[numpy.ndarray,
+class SklearnInferenceRunner(InferenceRunner[Union[numpy.ndarray,
+                                                   pandas.DataFrame],
                                              PredictionResult,
                                              BaseEstimator]):
-  def run_inference(self, batch: List[numpy.ndarray],
-                    model: BaseEstimator) -> Iterable[PredictionResult]:
+  def run_inference(
+      self,
+      batch: List[Union[numpy.ndarray, pandas.DataFrame]],
+      model: BaseEstimator) -> Iterable[PredictionResult]:
+    if isinstance(batch[0], numpy.ndarray):
+      return SklearnInferenceRunner._predict_np_array(batch, model)
+    elif isinstance(batch[0], pandas.DataFrame):
+      return SklearnInferenceRunner._predict_pandas_dataframe(batch, model)

Review Comment:
   Nice refactor



##########
sdks/python/apache_beam/ml/inference/sklearn_inference.py:
##########
@@ -42,19 +44,46 @@ class ModelFileType(enum.Enum):
   JOBLIB = 2
 
 
-class SklearnInferenceRunner(InferenceRunner[numpy.ndarray,
+class SklearnInferenceRunner(InferenceRunner[Union[numpy.ndarray,
+                                                   pandas.DataFrame],
                                              PredictionResult,
                                              BaseEstimator]):
-  def run_inference(self, batch: List[numpy.ndarray],
-                    model: BaseEstimator) -> Iterable[PredictionResult]:
+  def run_inference(
+      self,
+      batch: List[Union[numpy.ndarray, pandas.DataFrame]],
+      model: BaseEstimator) -> Iterable[PredictionResult]:
+    if isinstance(batch[0], numpy.ndarray):
+      return SklearnInferenceRunner._predict_np_array(batch, model)
+    elif isinstance(batch[0], pandas.DataFrame):
+      return SklearnInferenceRunner._predict_pandas_dataframe(batch, model)
+
+  @staticmethod
+  def _predict_np_array(batch: List[numpy.ndarray],
+                        model: Any) -> Iterable[PredictionResult]:
     # vectorize data for better performance
     vectorized_batch = numpy.stack(batch, axis=0)
     predictions = model.predict(vectorized_batch)
     return [PredictionResult(x, y) for x, y in zip(batch, predictions)]
 
-  def get_num_bytes(self, batch: List[numpy.ndarray]) -> int:
+  @staticmethod
+  def _predict_pandas_dataframe(batch: List[pandas.DataFrame],
+                                model: Any) -> Iterable[PredictionResult]:
+    # vectorize data for better performance
+    vectorized_batch = pandas.concat(batch, axis=0)
+    predictions = model.predict(vectorized_batch)
+    splits = [vectorized_batch.loc[[i]] for i in vectorized_batch.index]
+    return [
+        PredictionResult(example, inference) for example,
+        inference in zip(splits, predictions)
+    ]
+
+  def get_num_bytes(
+      self, batch: List[Union[numpy.ndarray, pandas.DataFrame]]) -> int:
     """Returns the number of bytes of data for a batch."""
-    return sum(sys.getsizeof(element) for element in batch)
+    if isinstance(batch[0], numpy.ndarray):
+      return sum(sys.getsizeof(element) for element in batch)
+    elif isinstance(batch[0], pandas.DataFrame):
+      return sum(df.memory_usage(deep=True).sum() for df in batch)

Review Comment:
   What does `deep=True` mean?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to