damccorm commented on code in PR #25200:
URL: https://github.com/apache/beam/pull/25200#discussion_r1089095968
##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -62,16 +63,40 @@
_OUTPUT_TYPE = TypeVar('_OUTPUT_TYPE')
KeyT = TypeVar('KeyT')
-PredictionResult = NamedTuple(
- 'PredictionResult', [
- ('example', _INPUT_TYPE),
- ('inference', _OUTPUT_TYPE),
- ])
+
+# We use NamedTuple to define the structure of the PredictionResult,
+# however, as support for generic NamedTuples is not available in Python
+# versions prior to 3.11, we use the __new__ method to provide default
+# values for the fields while maintaining backwards compatibility.
+class PredictionResult(NamedTuple('PredictionResult',
+ [('example', _INPUT_TYPE),
+ ('inference', _OUTPUT_TYPE),
+ ('model_id', Optional[str])])):
+ __slots__ = ()
Review Comment:
Why do we need this?
##########
sdks/python/apache_beam/ml/inference/base_test.py:
##########
@@ -339,6 +375,79 @@ def validate_inference_args(
third_party_model_handler.batch_elements_kwargs()
third_party_model_handler.validate_inference_args({})
+ def test_run_inference_prediction_result_with_model_id(self):
+ examples = [1, 5, 3, 10]
+ expected = [
+ base.PredictionResult(
+ example=example,
+ inference=example + 1,
+ model_id='fake_model_id_default') for example in examples
+ ]
+ with TestPipeline() as pipeline:
+ pcoll = pipeline | 'start' >> beam.Create(examples)
+ actual = pcoll | base.RunInference(
+ FakeModelHandlerReturnsPredictionResult())
+ assert_that(actual, equal_to(expected), label='assert:inferences')
+
+ @pytest.mark.it_postcommit
+ def test_run_inference_prediction_result_with_side_input(self):
+ test_pipeline = TestPipeline(is_integration_test=True)
+
+ first_ts = math.floor(time.time()) - 30
+ interval = 5
+ main_input_windowing_interval = 7
+
+ # aligning timestamp to get persistent results
+ first_ts = first_ts - (
+ first_ts % (interval * main_input_windowing_interval))
+ last_ts = first_ts + 45
+
+ sample_main_input_elements = ([first_ts - 2, # no output due to no SI
+ first_ts + 8, # first window
+ first_ts + 22, # second window
+ ])
+
+ sample_side_input_elements = [
+ base.ModelMetdata(
+ model_id='fake_model_id_1', model_name='fake_model_id_1')
+ ]
+
+ class EmitSideInput(beam.DoFn):
+ def process(self, element):
+ for e in element:
+ yield e
+
+ model_handler = FakeModelHandlerReturnsPredictionResult()
+
+ side_input = (
+ test_pipeline
+ |
+ "PeriodicImpulse" >> PeriodicImpulse(first_ts, last_ts, interval, True)
+ | beam.Map(lambda x: sample_side_input_elements)
+ | beam.ParDo(EmitSideInput()))
+
+ result = (
+ test_pipeline
+ | beam.Create(sample_main_input_elements)
+ | "MapTimeStamp" >> beam.Map(lambda x: TimestampedValue(x, x))
+ | "ApplyWindow" >> beam.WindowInto(
+ window.FixedWindows(main_input_windowing_interval))
+ | "RunInference" >> base.RunInference(
+ model_handler, model_path_pcoll=side_input))
Review Comment:
This is causing postcommits to fail, should it be `model_metadata_pcoll`?
##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -62,16 +63,40 @@
_OUTPUT_TYPE = TypeVar('_OUTPUT_TYPE')
KeyT = TypeVar('KeyT')
-PredictionResult = NamedTuple(
- 'PredictionResult', [
- ('example', _INPUT_TYPE),
- ('inference', _OUTPUT_TYPE),
- ])
+
+# We use NamedTuple to define the structure of the PredictionResult,
+# however, as support for generic NamedTuples is not available in Python
+# versions prior to 3.11, we use the __new__ method to provide default
+# values for the fields while maintaining backwards compatibility.
+class PredictionResult(NamedTuple('PredictionResult',
+ [('example', _INPUT_TYPE),
+ ('inference', _OUTPUT_TYPE),
+ ('model_id', Optional[str])])):
+ __slots__ = ()
+
+ def __new__(cls, example, inference, model_id=None):
+ return super().__new__(cls, example, inference, model_id)
+
+
PredictionResult.__doc__ = """A NamedTuple containing both input and output
from the inference."""
PredictionResult.example.__doc__ = """The input example."""
PredictionResult.inference.__doc__ = """Results for the inference on the model
for the given example."""
+PredictionResult.model_id.__doc__ = """Model ID used to run the prediction."""
Review Comment:
```suggestion
PredictionResult.model_id.__doc__ = """Model ID used to run the prediction.
Only populated when the model may be updated during the life of the pipeline."""
```
##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -233,6 +234,16 @@ For more information, see the [`PredictionResult`
documentation](https://github.
For detailed instructions explaining how to build and run a pipeline that uses
ML models, see the
[Example RunInference API
pipelines](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference)
on GitHub.
+## Side Inputs to Update Models
+From Beam 2.45.0, the RunInference PTransform will accept a side input of
`ModelMetadata`, which is a `NamedTuple` containing the `model_id` and
`model_name`.
Review Comment:
```suggestion
From Beam 2.46.0, the RunInference PTransform will accept a side input of
`ModelMetadata`, which is a `NamedTuple` containing the `model_id` and
`model_name`.
```
2.45 has been cut already unfortunately.
Also, could we split website changes into a separate PR. Ideally we would
merge those once the feature becomes available to avoid confusing people.
##########
sdks/python/apache_beam/ml/inference/pytorch_inference.py:
##########
@@ -95,36 +101,20 @@ def _convert_to_device(examples: torch.Tensor, device) ->
torch.Tensor:
return examples
-def _convert_to_result(
- batch: Iterable, predictions: Union[Iterable, Dict[Any, Iterable]]
-) -> Iterable[PredictionResult]:
- if isinstance(predictions, dict):
- # Go from one dictionary of type: {key_type1: Iterable<val_type1>,
- # key_type2: Iterable<val_type2>, ...} where each Iterable is of
- # length batch_size, to a list of dictionaries:
- # [{key_type1: value_type1, key_type2: value_type2}]
- predictions_per_tensor = [
- dict(zip(predictions.keys(), v)) for v in zip(*predictions.values())
- ]
- return [
- PredictionResult(x, y) for x, y in zip(batch, predictions_per_tensor)
- ]
- return [PredictionResult(x, y) for x, y in zip(batch, predictions)]
-
-
def default_tensor_inference_fn(
batch: Sequence[torch.Tensor],
model: torch.nn.Module,
device: str,
- inference_args: Optional[Dict[str,
- Any]] = None) -> Iterable[PredictionResult]:
+ inference_args: Optional[Dict[str, Any]] = None,
+ model_id: Optional[str] = None,
+) -> Iterable[PredictionResult]:
# torch.no_grad() mitigates GPU memory issues
# https://github.com/apache/beam/issues/22811
with torch.no_grad():
batched_tensors = torch.stack(batch)
batched_tensors = _convert_to_device(batched_tensors, device)
predictions = model(batched_tensors, **inference_args)
- return _convert_to_result(batch, predictions)
+ return utils._convert_to_result(batch, predictions, model_id)
Review Comment:
Thanks for moving this into utils
##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -62,16 +63,40 @@
_OUTPUT_TYPE = TypeVar('_OUTPUT_TYPE')
KeyT = TypeVar('KeyT')
-PredictionResult = NamedTuple(
- 'PredictionResult', [
- ('example', _INPUT_TYPE),
- ('inference', _OUTPUT_TYPE),
- ])
+
+# We use NamedTuple to define the structure of the PredictionResult,
+# however, as support for generic NamedTuples is not available in Python
+# versions prior to 3.11, we use the __new__ method to provide default
+# values for the fields while maintaining backwards compatibility.
+class PredictionResult(NamedTuple('PredictionResult',
+ [('example', _INPUT_TYPE),
+ ('inference', _OUTPUT_TYPE),
+ ('model_id', Optional[str])])):
+ __slots__ = ()
+
+ def __new__(cls, example, inference, model_id=None):
+ return super().__new__(cls, example, inference, model_id)
+
+
PredictionResult.__doc__ = """A NamedTuple containing both input and output
from the inference."""
PredictionResult.example.__doc__ = """The input example."""
PredictionResult.inference.__doc__ = """Results for the inference on the model
for the given example."""
+PredictionResult.model_id.__doc__ = """Model ID used to run the prediction."""
Review Comment:
Ah, I see - you're actually always returning this. So maybe instead add
something like `Only populated by model handlers that support updating the
model during the life of the pipeline`?
##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -62,16 +63,40 @@
_OUTPUT_TYPE = TypeVar('_OUTPUT_TYPE')
KeyT = TypeVar('KeyT')
-PredictionResult = NamedTuple(
- 'PredictionResult', [
- ('example', _INPUT_TYPE),
- ('inference', _OUTPUT_TYPE),
- ])
+
+# We use NamedTuple to define the structure of the PredictionResult,
+# however, as support for generic NamedTuples is not available in Python
+# versions prior to 3.11, we use the __new__ method to provide default
+# values for the fields while maintaining backwards compatibility.
+class PredictionResult(NamedTuple('PredictionResult',
+ [('example', _INPUT_TYPE),
+ ('inference', _OUTPUT_TYPE),
+ ('model_id', Optional[str])])):
+ __slots__ = ()
+
+ def __new__(cls, example, inference, model_id=None):
+ return super().__new__(cls, example, inference, model_id)
+
+
PredictionResult.__doc__ = """A NamedTuple containing both input and output
from the inference."""
PredictionResult.example.__doc__ = """The input example."""
PredictionResult.inference.__doc__ = """Results for the inference on the model
for the given example."""
+PredictionResult.model_id.__doc__ = """Model ID used to run the prediction."""
+
+
+class ModelMetdata(NamedTuple):
Review Comment:
I don't think its nearly as helpful as just retraining your model with new
data (which this supports), but could be something we get asked about
eventually. This approach should support it fine, so no action required here.
##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -444,6 +528,28 @@ def process(self, batch, inference_args):
return predictions
+ def process(
+ self, batch, inference_args, si_model_metadata: Optional[ModelMetdata]):
+ """
+ When side input is enabled:
+ The method checks if the side input model has been updated, and if so,
+ updates the model and runs inference on the batch of data. If the
+ side input is empty or the model has not been updated, the method
+ simply runs inference on the batch of data.
+ """
+ if si_model_metadata and self._enable_side_input_loading:
Review Comment:
Or, alternatively can we ~just~ check `_enable_side_input_loading`?
##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -444,6 +528,28 @@ def process(self, batch, inference_args):
return predictions
+ def process(
+ self, batch, inference_args, si_model_metadata: Optional[ModelMetdata]):
+ """
+ When side input is enabled:
+ The method checks if the side input model has been updated, and if so,
+ updates the model and runs inference on the batch of data. If the
+ side input is empty or the model has not been updated, the method
+ simply runs inference on the batch of data.
+ """
+ if si_model_metadata and self._enable_side_input_loading:
Review Comment:
Do we actually need to check `_enable_side_input_loading` here? Won't this
always return the same value whether we have ` and
self._enable_side_input_loading`?
##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -62,16 +63,40 @@
_OUTPUT_TYPE = TypeVar('_OUTPUT_TYPE')
KeyT = TypeVar('KeyT')
-PredictionResult = NamedTuple(
- 'PredictionResult', [
- ('example', _INPUT_TYPE),
- ('inference', _OUTPUT_TYPE),
- ])
+
+# We use NamedTuple to define the structure of the PredictionResult,
+# however, as support for generic NamedTuples is not available in Python
+# versions prior to 3.11, we use the __new__ method to provide default
+# values for the fields while maintaining backwards compatibility.
+class PredictionResult(NamedTuple('PredictionResult',
+ [('example', _INPUT_TYPE),
+ ('inference', _OUTPUT_TYPE),
+ ('model_id', Optional[str])])):
+ __slots__ = ()
+
+ def __new__(cls, example, inference, model_id=None):
+ return super().__new__(cls, example, inference, model_id)
+
+
PredictionResult.__doc__ = """A NamedTuple containing both input and output
from the inference."""
PredictionResult.example.__doc__ = """The input example."""
PredictionResult.inference.__doc__ = """Results for the inference on the model
for the given example."""
+PredictionResult.model_id.__doc__ = """Model ID used to run the prediction."""
+
+
+class ModelMetdata(NamedTuple):
Review Comment:
This is out of scope for this PR (and this work generally), but an
interesting extension here would be to take a model class (or additional model
metadata) for Model Handlers that require a class for loading the model. That
way, you could start with a model of one type and switch to a model of
different type during execution
##########
sdks/python/apache_beam/ml/inference/base_test.py:
##########
@@ -339,6 +375,79 @@ def validate_inference_args(
third_party_model_handler.batch_elements_kwargs()
third_party_model_handler.validate_inference_args({})
+ def test_run_inference_prediction_result_with_model_id(self):
+ examples = [1, 5, 3, 10]
+ expected = [
+ base.PredictionResult(
+ example=example,
+ inference=example + 1,
+ model_id='fake_model_id_default') for example in examples
+ ]
+ with TestPipeline() as pipeline:
+ pcoll = pipeline | 'start' >> beam.Create(examples)
+ actual = pcoll | base.RunInference(
+ FakeModelHandlerReturnsPredictionResult())
+ assert_that(actual, equal_to(expected), label='assert:inferences')
+
+ @pytest.mark.it_postcommit
+ def test_run_inference_prediction_result_with_side_input(self):
Review Comment:
Note to self: I ran out of time to review this test, but don't want to block
the rest of the feedback on this. I'll take another look at this when I have
time.
##########
sdks/python/apache_beam/ml/inference/pytorch_inference.py:
##########
@@ -38,15 +38,21 @@
'PytorchModelHandlerKeyedTensor',
]
-TensorInferenceFn = Callable[
- [Sequence[torch.Tensor], torch.nn.Module, str, Optional[Dict[str, Any]]],
- Iterable[PredictionResult]]
+TensorInferenceFn = Callable[[
+ Sequence[torch.Tensor],
+ torch.nn.Module,
+ torch.device,
+ Optional[Dict[str, Any]],
+ Optional[str]
+],
+ Iterable[PredictionResult]]
Review Comment:
Nit: What's going on with the spacing here? Can we normalize this a bit?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]