TheNeuralBit commented on code in PR #17800:
URL: https://github.com/apache/beam/pull/17800#discussion_r892963554
##########
sdks/python/apache_beam/ml/inference/sklearn_inference.py:
##########
@@ -41,20 +43,57 @@ class ModelFileType(enum.Enum):
JOBLIB = 2
-class SklearnInferenceRunner(InferenceRunner[numpy.ndarray,
+class SklearnInferenceRunner(InferenceRunner[Union[numpy.ndarray,
+ pandas.DataFrame],
PredictionResult,
BaseEstimator]):
def run_inference(
- self, batch: List[numpy.ndarray], model: BaseEstimator,
+ self,
+ batch: List[Union[numpy.ndarray, pandas.DataFrame]],
+ model: BaseEstimator,
**kwargs) -> Iterable[PredictionResult]:
+ if isinstance(batch[0], numpy.ndarray):
+ return SklearnInferenceRunner._predict_np_array(batch, model)
+ elif isinstance(batch[0], pandas.DataFrame):
+ return SklearnInferenceRunner._predict_pandas_dataframe(batch, model)
Review Comment:
Could we have two separate InferenceRunner implementations, one for np.array
and one for DataFrame? Then we wouldn't need to branch on the type here for
every batch. The user could indicate which they prefer with an argument on the
model loader.
##########
sdks/python/apache_beam/ml/inference/sklearn_inference.py:
##########
@@ -41,20 +43,57 @@ class ModelFileType(enum.Enum):
JOBLIB = 2
-class SklearnInferenceRunner(InferenceRunner[numpy.ndarray,
+class SklearnInferenceRunner(InferenceRunner[Union[numpy.ndarray,
+ pandas.DataFrame],
PredictionResult,
BaseEstimator]):
def run_inference(
- self, batch: List[numpy.ndarray], model: BaseEstimator,
+ self,
+ batch: List[Union[numpy.ndarray, pandas.DataFrame]],
+ model: BaseEstimator,
**kwargs) -> Iterable[PredictionResult]:
+ if isinstance(batch[0], numpy.ndarray):
+ return SklearnInferenceRunner._predict_np_array(batch, model)
+ elif isinstance(batch[0], pandas.DataFrame):
+ return SklearnInferenceRunner._predict_pandas_dataframe(batch, model)
+ raise ValueError('Unsupported data type.')
+
+ @staticmethod
+ def _predict_np_array(batch: List[numpy.ndarray],
+ model: Any) -> Iterable[PredictionResult]:
# vectorize data for better performance
vectorized_batch = numpy.stack(batch, axis=0)
predictions = model.predict(vectorized_batch)
return [PredictionResult(x, y) for x, y in zip(batch, predictions)]
- def get_num_bytes(self, batch: List[numpy.ndarray]) -> int:
+ @staticmethod
+ def _predict_pandas_dataframe(batch: List[pandas.DataFrame],
+ model: Any) -> Iterable[PredictionResult]:
+ # sklearn_inference currently only supports single rowed dataframes.
+ for dataframe in batch:
+ if dataframe.shape[0] != 1:
+ raise ValueError('Only dataframes with single rows are supported.')
+
+ # vectorize data for better performance
+ vectorized_batch = pandas.concat(batch, axis=0)
+ predictions = model.predict(vectorized_batch)
+ splits = [
+ vectorized_batch.iloc[[i]] for i in range(vectorized_batch.shape[0])
+ ]
+ return [
+ PredictionResult(example, inference) for example,
+ inference in zip(splits, predictions)
+ ]
+
+ def get_num_bytes(
+ self, batch: List[Union[numpy.ndarray, pandas.DataFrame]]) -> int:
"""Returns the number of bytes of data for a batch."""
- return sum(sys.getsizeof(element) for element in batch)
+ if isinstance(batch[0], numpy.ndarray):
+ return sum(sys.getsizeof(element) for element in batch)
+ elif isinstance(batch[0], pandas.DataFrame):
+ data_frames: List[pandas.DataFrame] = batch
+ return sum(df.memory_usage(deep=True).sum() for df in data_frames)
+ raise ValueError('Unsupported data type.')
class SklearnModelLoader(ModelLoader[numpy.ndarray,
Review Comment:
I think you missed this typehint
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]