[ 
https://issues.apache.org/jira/browse/BEAM-13982?focusedWorklogId=753790&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-753790
 ]

ASF GitHub Bot logged work on BEAM-13982:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Apr/22 00:07
            Start Date: 07/Apr/22 00:07
    Worklog Time Spent: 10m 
      Work Description: robertwb commented on code in PR #16970:
URL: https://github.com/apache/beam/pull/16970#discussion_r844531031


##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -0,0 +1,262 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An extensible run inference transform.
+
+Users of this module can extend the ModelLoader class for any MLframework. Then
+pass their extended ModelLoader object into RunInference to create a
+RunInference Beam transform for that framework.
+
+The transform will handle standard inference functionality like metric
+collection, sharing model between threads and batching elements.
+"""
+
+import logging
+import os
+import pickle
+import platform
+import sys
+import time
+from typing import Any
+from typing import Iterable
+from typing import Tuple
+
+import apache_beam as beam
+from apache_beam.utils import shared
+
+try:
+  # pylint: disable=g-import-not-at-top
+  import resource
+except ImportError:
+  resource = None
+
+_MICROSECOND_TO_MILLISECOND = 1000
+_NANOSECOND_TO_MICROSECOND = 1000
+_SECOND_TO_MICROSECOND = 1_000_000
+
+
+class InferenceRunner():
+  """Implements running inferences for a framework."""
+  def run_inference(self, batch: Any, model: Any) -> Iterable[Any]:
+    """Runs inferences on a batch of examples and returns an Iterable of 
Predictions."""
+    raise NotImplementedError(type(self))
+
+  def get_num_bytes(self, batch: Any) -> int:
+    """Returns the number of bytes of data for a batch."""
+    return len(pickle.dumps(batch))
+
+  def get_metrics_namespace(self) -> str:
+    """Returns a namespace for metrics collected by the RunInference 
transform."""
+    return 'RunInference'
+
+
+class ModelLoader():
+  """Has the ability to load an ML model."""
+  def load_model(self) -> Any:
+    """Loads and initializes a model for processing."""
+    raise NotImplementedError(type(self))
+
+  def get_inference_runner(self) -> InferenceRunner:
+    """Returns an implementation of InferenceRunner for this model."""
+    raise NotImplementedError(type(self))
+
+
+def _unbatch(maybe_keyed_batches: Tuple[Any, Any]):
+  keys, results = maybe_keyed_batches
+  if keys:
+    return zip(keys, results)
+  else:
+    return results
+
+
+class RunInference(beam.PTransform):
+  """An extensible transform for running inferences."""
+  def __init__(self, model_loader: ModelLoader, clock=None):
+    self._model_loader = model_loader
+    self._clock = clock
+
+  # TODO(BEAM-14208): Add batch_size back off in the case there
+  # are functional reasons large batch sizes cannot be handled.
+  def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
+    return (
+        pcoll
+        # TODO(BEAM-14044): Hook into the batching DoFn APIs.
+        | beam.BatchElements()
+        | beam.ParDo(_RunInferenceDoFn(self._model_loader, self._clock))
+        | beam.FlatMap(_unbatch))
+
+
+class _MetricsCollector:
+  """A metrics collector that tracks ML related performance and memory 
usage."""
+  def __init__(self, namespace: str):
+    # Metrics
+    self._inference_counter = beam.metrics.Metrics.counter(
+        namespace, 'num_inferences')
+    self._inference_request_batch_size = beam.metrics.Metrics.distribution(
+        namespace, 'inference_request_batch_size')
+    self._inference_request_batch_byte_size = (
+        beam.metrics.Metrics.distribution(
+            namespace, 'inference_request_batch_byte_size'))
+    # Batch inference latency in microseconds.
+    self._inference_batch_latency_micro_secs = (
+        beam.metrics.Metrics.distribution(
+            namespace, 'inference_batch_latency_micro_secs'))
+    self._model_byte_size = beam.metrics.Metrics.distribution(
+        namespace, 'model_byte_size')
+    # Model load latency in milliseconds.
+    self._load_model_latency_milli_secs = beam.metrics.Metrics.distribution(
+        namespace, 'load_model_latency_milli_secs')
+
+    # Metrics cache
+    self._load_model_latency_milli_secs_cache = None
+    self._model_byte_size_cache = None
+
+  def update_metrics_with_cache(self):
+    if self.load_model_latency_milli_secs_cache is not None:
+      self._load_model_latency_milli_secs.update(
+          self.load_model_latency_milli_secs_cache)
+      self.load_model_latency_milli_secs_cache = None
+    if self.model_byte_size_cache is not None:
+      self._model_byte_size.update(self.model_byte_size_cache)
+      self.model_byte_size_cache = None
+
+  def cache_load_model_metrics(self, load_model_latency_ms, model_byte_size):
+    self._load_model_latency_milli_secs_cache = load_model_latency_ms
+    self._model_byte_size_cache = model_byte_size
+
+  def update(
+      self,
+      examples_count: int,
+      examples_byte_size: int,
+      latency_micro_secs: int):
+    self._inference_batch_latency_micro_secs.update(latency_micro_secs)
+    self._inference_counter.inc(examples_count)
+    self._inference_request_batch_size.update(examples_count)
+    self._inference_request_batch_byte_size.update(examples_byte_size)
+
+
+class _RunInferenceDoFn(beam.DoFn):
+  """A DoFn implementation generic to frameworks."""
+  def __init__(self, model_loader: ModelLoader, clock=None):
+    self._model_loader = model_loader
+    self._inference_runner = model_loader.get_inference_runner()
+    self._shared_model_handle = shared.Shared()
+    self._metrics_collector = _MetricsCollector(
+        self._inference_runner.get_metrics_namespace())
+    self._clock = clock
+    if not clock:
+      self._clock = _ClockFactory.make_clock()
+    self._model = None
+
+  def _load_model(self):
+    def load():
+      """Function for constructing shared LoadedModel."""
+      memory_before = _get_current_process_memory_in_bytes()
+      start_time = self._clock.get_current_time_in_microseconds()
+      model = self._model_loader.load_model()
+      end_time = self._clock.get_current_time_in_microseconds()
+      memory_after = _get_current_process_memory_in_bytes()
+      load_model_latency_ms = ((end_time - start_time) /
+                               _MICROSECOND_TO_MILLISECOND)
+      model_byte_size = memory_after - memory_before
+      self._metrics_collector.cache_load_model_metrics(
+          load_model_latency_ms, model_byte_size)
+      return model
+
+    # TODO(BEAM-14207): Investigate releasing model.
+    return self._shared_model_handle.acquire(load)
+
+  def setup(self):
+    self._model = self._load_model()
+
+  def process(self, batch):
+    # Process supports both keyed data, and example only data.
+    # First keys and samples are separated (if there are keys)
+    has_keys = isinstance(batch[0], tuple)
+    if has_keys:
+      examples = [example for _, example in batch]
+      keys = [key for key, _ in batch]
+    else:
+      examples = batch
+      keys = None
+
+    start_time = self._clock.get_current_time_in_microseconds()
+    result_generator = self._inference_runner.run_inference(
+        examples, self._model)
+    predictions = list(result_generator)
+
+    inference_latency = self._clock.get_current_time_in_microseconds(
+    ) - start_time
+    num_bytes = self._inference_runner.get_num_bytes(examples)
+    num_elements = len(batch)
+    self._metrics_collector.update(num_elements, num_bytes, inference_latency)
+
+    # Keys are recombined with predictions in the RunInference PTransform.
+    yield keys, predictions

Review Comment:
   +1



##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -0,0 +1,262 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An extensible run inference transform.
+
+Users of this module can extend the ModelLoader class for any MLframework. Then
+pass their extended ModelLoader object into RunInference to create a
+RunInference Beam transform for that framework.
+
+The transform will handle standard inference functionality like metric
+collection, sharing model between threads and batching elements.
+"""
+
+import logging
+import os
+import pickle
+import platform
+import sys
+import time
+from typing import Any
+from typing import Iterable
+from typing import Tuple
+
+import apache_beam as beam
+from apache_beam.utils import shared
+
+try:
+  # pylint: disable=g-import-not-at-top
+  import resource
+except ImportError:
+  resource = None
+
+_MICROSECOND_TO_MILLISECOND = 1000
+_NANOSECOND_TO_MICROSECOND = 1000
+_SECOND_TO_MICROSECOND = 1_000_000
+
+
+class InferenceRunner():
+  """Implements running inferences for a framework."""
+  def run_inference(self, batch: Any, model: Any) -> Iterable[Any]:
+    """Runs inferences on a batch of examples and returns an Iterable of 
Predictions."""
+    raise NotImplementedError(type(self))
+
+  def get_num_bytes(self, batch: Any) -> int:
+    """Returns the number of bytes of data for a batch."""
+    return len(pickle.dumps(batch))
+
+  def get_metrics_namespace(self) -> str:
+    """Returns a namespace for metrics collected by the RunInference 
transform."""
+    return 'RunInference'
+
+
+class ModelLoader():
+  """Has the ability to load an ML model."""
+  def load_model(self) -> Any:

Review Comment:
   +1, though this may fall under the typing improvements that were deferred to 
another JIRA. (Frankly, there's no need to return a model that is then passed 
to the InferenceRunner on every call, instead the InferenceRunner could 
encapsulate the model itself on instantiation.)





Issue Time Tracking
-------------------

    Worklog Id:     (was: 753790)
    Time Spent: 16.5h  (was: 16h 20m)

> Implement Generic RunInference Base class
> -----------------------------------------
>
>                 Key: BEAM-13982
>                 URL: https://issues.apache.org/jira/browse/BEAM-13982
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Andy Ye
>            Assignee: Ryan Thompson
>            Priority: P2
>              Labels: run-inference
>          Time Spent: 16.5h
>  Remaining Estimate: 0h
>
> This base class will have
>  * Metrics
>  * Will call dependent framework-specific classes
>  * Unit tests



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to