[
https://issues.apache.org/jira/browse/BEAM-13982?focusedWorklogId=753688&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-753688
]
ASF GitHub Bot logged work on BEAM-13982:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 06/Apr/22 21:06
Start Date: 06/Apr/22 21:06
Worklog Time Spent: 10m
Work Description: TheNeuralBit commented on code in PR #16970:
URL: https://github.com/apache/beam/pull/16970#discussion_r844408711
##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -0,0 +1,262 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An extensible run inference transform.
+
+Users of this module can extend the ModelLoader class for any MLframework. Then
+pass their extended ModelLoader object into RunInference to create a
+RunInference Beam transform for that framework.
+
+The transform will handle standard inference functionality like metric
+collection, sharing model between threads and batching elements.
+"""
+
+import logging
+import os
+import pickle
+import platform
+import sys
+import time
+from typing import Any
+from typing import Iterable
+from typing import Tuple
+
+import apache_beam as beam
+from apache_beam.utils import shared
+
+try:
+ # pylint: disable=g-import-not-at-top
+ import resource
+except ImportError:
+ resource = None
+
+_MICROSECOND_TO_MILLISECOND = 1000
+_NANOSECOND_TO_MICROSECOND = 1000
+_SECOND_TO_MICROSECOND = 1_000_000
+
+
+class InferenceRunner():
+ """Implements running inferences for a framework."""
+ def run_inference(self, batch: Any, model: Any) -> Iterable[Any]:
Review Comment:
The actual contract seems to be that batch is a `List`, but we put no
constraints on the element type, right?
```suggestion
def run_inference(self, batch: List[Any], model: Any) -> Iterable[Any]:
```
##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -0,0 +1,262 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An extensible run inference transform.
+
+Users of this module can extend the ModelLoader class for any MLframework. Then
+pass their extended ModelLoader object into RunInference to create a
+RunInference Beam transform for that framework.
+
+The transform will handle standard inference functionality like metric
+collection, sharing model between threads and batching elements.
+"""
+
+import logging
+import os
+import pickle
+import platform
+import sys
+import time
+from typing import Any
+from typing import Iterable
+from typing import Tuple
+
+import apache_beam as beam
+from apache_beam.utils import shared
+
+try:
+ # pylint: disable=g-import-not-at-top
+ import resource
+except ImportError:
+ resource = None
+
+_MICROSECOND_TO_MILLISECOND = 1000
+_NANOSECOND_TO_MICROSECOND = 1000
+_SECOND_TO_MICROSECOND = 1_000_000
+
+
+class InferenceRunner():
+ """Implements running inferences for a framework."""
+ def run_inference(self, batch: Any, model: Any) -> Iterable[Any]:
+ """Runs inferences on a batch of examples and returns an Iterable of
Predictions."""
+ raise NotImplementedError(type(self))
+
+ def get_num_bytes(self, batch: Any) -> int:
+ """Returns the number of bytes of data for a batch."""
+ return len(pickle.dumps(batch))
+
+ def get_metrics_namespace(self) -> str:
+ """Returns a namespace for metrics collected by the RunInference
transform."""
+ return 'RunInference'
+
+
+class ModelLoader():
+ """Has the ability to load an ML model."""
+ def load_model(self) -> Any:
+ """Loads and initializes a model for processing."""
+ raise NotImplementedError(type(self))
+
+ def get_inference_runner(self) -> InferenceRunner:
+ """Returns an implementation of InferenceRunner for this model."""
+ raise NotImplementedError(type(self))
+
+
+def _unbatch(maybe_keyed_batches: Tuple[Any, Any]):
+ keys, results = maybe_keyed_batches
+ if keys:
+ return zip(keys, results)
+ else:
+ return results
+
+
+class RunInference(beam.PTransform):
+ """An extensible transform for running inferences."""
+ def __init__(self, model_loader: ModelLoader, clock=None):
+ self._model_loader = model_loader
+ self._clock = clock
+
+ # TODO(BEAM-14208): Add batch_size back off in the case there
+ # are functional reasons large batch sizes cannot be handled.
+ def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
+ return (
+ pcoll
+ # TODO(BEAM-14044): Hook into the batching DoFn APIs.
+ | beam.BatchElements()
+ | beam.ParDo(_RunInferenceDoFn(self._model_loader, self._clock))
+ | beam.FlatMap(_unbatch))
+
+
+class _MetricsCollector:
+ """A metrics collector that tracks ML related performance and memory
usage."""
+ def __init__(self, namespace: str):
+ # Metrics
+ self._inference_counter = beam.metrics.Metrics.counter(
+ namespace, 'num_inferences')
+ self._inference_request_batch_size = beam.metrics.Metrics.distribution(
+ namespace, 'inference_request_batch_size')
+ self._inference_request_batch_byte_size = (
+ beam.metrics.Metrics.distribution(
+ namespace, 'inference_request_batch_byte_size'))
+ # Batch inference latency in microseconds.
+ self._inference_batch_latency_micro_secs = (
+ beam.metrics.Metrics.distribution(
+ namespace, 'inference_batch_latency_micro_secs'))
+ self._model_byte_size = beam.metrics.Metrics.distribution(
+ namespace, 'model_byte_size')
+ # Model load latency in milliseconds.
+ self._load_model_latency_milli_secs = beam.metrics.Metrics.distribution(
+ namespace, 'load_model_latency_milli_secs')
+
+ # Metrics cache
+ self._load_model_latency_milli_secs_cache = None
+ self._model_byte_size_cache = None
+
+ def update_metrics_with_cache(self):
+ if self.load_model_latency_milli_secs_cache is not None:
+ self._load_model_latency_milli_secs.update(
+ self.load_model_latency_milli_secs_cache)
+ self.load_model_latency_milli_secs_cache = None
+ if self.model_byte_size_cache is not None:
+ self._model_byte_size.update(self.model_byte_size_cache)
+ self.model_byte_size_cache = None
+
+ def cache_load_model_metrics(self, load_model_latency_ms, model_byte_size):
+ self._load_model_latency_milli_secs_cache = load_model_latency_ms
+ self._model_byte_size_cache = model_byte_size
+
+ def update(
+ self,
+ examples_count: int,
+ examples_byte_size: int,
+ latency_micro_secs: int):
+ self._inference_batch_latency_micro_secs.update(latency_micro_secs)
+ self._inference_counter.inc(examples_count)
+ self._inference_request_batch_size.update(examples_count)
+ self._inference_request_batch_byte_size.update(examples_byte_size)
+
+
+class _RunInferenceDoFn(beam.DoFn):
+ """A DoFn implementation generic to frameworks."""
+ def __init__(self, model_loader: ModelLoader, clock=None):
+ self._model_loader = model_loader
+ self._inference_runner = model_loader.get_inference_runner()
+ self._shared_model_handle = shared.Shared()
+ self._metrics_collector = _MetricsCollector(
+ self._inference_runner.get_metrics_namespace())
+ self._clock = clock
+ if not clock:
+ self._clock = _ClockFactory.make_clock()
+ self._model = None
+
+ def _load_model(self):
+ def load():
+ """Function for constructing shared LoadedModel."""
+ memory_before = _get_current_process_memory_in_bytes()
+ start_time = self._clock.get_current_time_in_microseconds()
+ model = self._model_loader.load_model()
+ end_time = self._clock.get_current_time_in_microseconds()
+ memory_after = _get_current_process_memory_in_bytes()
+ load_model_latency_ms = ((end_time - start_time) /
+ _MICROSECOND_TO_MILLISECOND)
+ model_byte_size = memory_after - memory_before
+ self._metrics_collector.cache_load_model_metrics(
+ load_model_latency_ms, model_byte_size)
+ return model
+
+ # TODO(BEAM-14207): Investigate releasing model.
+ return self._shared_model_handle.acquire(load)
+
+ def setup(self):
+ self._model = self._load_model()
+
+ def process(self, batch):
+ # Process supports both keyed data, and example only data.
+ # First keys and samples are separated (if there are keys)
+ has_keys = isinstance(batch[0], tuple)
+ if has_keys:
+ examples = [example for _, example in batch]
+ keys = [key for key, _ in batch]
+ else:
+ examples = batch
+ keys = None
+
+ start_time = self._clock.get_current_time_in_microseconds()
+ result_generator = self._inference_runner.run_inference(
+ examples, self._model)
+ predictions = list(result_generator)
+
+ inference_latency = self._clock.get_current_time_in_microseconds(
+ ) - start_time
+ num_bytes = self._inference_runner.get_num_bytes(examples)
+ num_elements = len(batch)
+ self._metrics_collector.update(num_elements, num_bytes, inference_latency)
+
+ # Keys are recombined with predictions in the RunInference PTransform.
+ yield keys, predictions
Review Comment:
Could we remove the branching logic of `_unbatch` and just handle it here
instead?
```suggestion
if has_keys:
yield from zip(keys, predictions)
else:
yield from predictions
```
##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -0,0 +1,262 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An extensible run inference transform.
+
+Users of this module can extend the ModelLoader class for any MLframework. Then
+pass their extended ModelLoader object into RunInference to create a
+RunInference Beam transform for that framework.
+
+The transform will handle standard inference functionality like metric
+collection, sharing model between threads and batching elements.
+"""
+
+import logging
+import os
+import pickle
+import platform
+import sys
+import time
+from typing import Any
+from typing import Iterable
+from typing import Tuple
+
+import apache_beam as beam
+from apache_beam.utils import shared
+
+try:
+ # pylint: disable=g-import-not-at-top
+ import resource
+except ImportError:
+ resource = None
+
+_MICROSECOND_TO_MILLISECOND = 1000
+_NANOSECOND_TO_MICROSECOND = 1000
+_SECOND_TO_MICROSECOND = 1_000_000
+
+
+class InferenceRunner():
+ """Implements running inferences for a framework."""
+ def run_inference(self, batch: Any, model: Any) -> Iterable[Any]:
Review Comment:
Also, shouldn't model be the same for every call to `run_inference`? Why
pass it as an argument here?
##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -0,0 +1,262 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An extensible run inference transform.
+
+Users of this module can extend the ModelLoader class for any MLframework. Then
+pass their extended ModelLoader object into RunInference to create a
+RunInference Beam transform for that framework.
+
+The transform will handle standard inference functionality like metric
+collection, sharing model between threads and batching elements.
+"""
+
+import logging
+import os
+import pickle
+import platform
+import sys
+import time
+from typing import Any
+from typing import Iterable
+from typing import Tuple
+
+import apache_beam as beam
+from apache_beam.utils import shared
+
+try:
+ # pylint: disable=g-import-not-at-top
+ import resource
+except ImportError:
+ resource = None
+
+_MICROSECOND_TO_MILLISECOND = 1000
+_NANOSECOND_TO_MICROSECOND = 1000
+_SECOND_TO_MICROSECOND = 1_000_000
+
+
+class InferenceRunner():
+ """Implements running inferences for a framework."""
+ def run_inference(self, batch: Any, model: Any) -> Iterable[Any]:
+ """Runs inferences on a batch of examples and returns an Iterable of
Predictions."""
+ raise NotImplementedError(type(self))
+
+ def get_num_bytes(self, batch: Any) -> int:
+ """Returns the number of bytes of data for a batch."""
+ return len(pickle.dumps(batch))
+
+ def get_metrics_namespace(self) -> str:
+ """Returns a namespace for metrics collected by the RunInference
transform."""
+ return 'RunInference'
+
+
+class ModelLoader():
+ """Has the ability to load an ML model."""
+ def load_model(self) -> Any:
Review Comment:
It would also be nice to use a `Generic` typehint for `model` in
`ModelLoader` and `RunInference` instead of just using `Any` (implementors
could always erase it with `Any` if they want to).
```suggestion
class ModelLoader(Generic[T]):
"""Has the ability to load an ML model."""
def load_model(self) -> T:
```
Issue Time Tracking
-------------------
Worklog Id: (was: 753688)
Time Spent: 16h 20m (was: 16h 10m)
> Implement Generic RunInference Base class
> -----------------------------------------
>
> Key: BEAM-13982
> URL: https://issues.apache.org/jira/browse/BEAM-13982
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-core
> Reporter: Andy Ye
> Assignee: Ryan Thompson
> Priority: P2
> Labels: run-inference
> Time Spent: 16h 20m
> Remaining Estimate: 0h
>
> This base class will have
> * Metrics
> * Will call dependent framework-specific classes
> * Unit tests
--
This message was sent by Atlassian Jira
(v8.20.1#820001)