This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d22202fde9 [BEAM-14255] Drop clock abstraction (#17671)
4d22202fde9 is described below

commit 4d22202fde91d5bc0b73369de39e4f4baa49739e
Author: Ryan Thompson <[email protected]>
AuthorDate: Tue May 31 11:43:45 2022 -0400

    [BEAM-14255] Drop clock abstraction (#17671)
    
    * removed unused imports
    
    * Update sdks/python/apache_beam/ml/inference/base.py
    
    Co-authored-by: Andy Ye <[email protected]>
    
    * use default clock arguemnts
    
    * simplified nubmers in tests
    
    * Update sdks/python/apache_beam/ml/inference/base_test.py
    
    Co-authored-by: Brian Hulette <[email protected]>
    
    * time_ns
    
    Co-authored-by: Andy Ye <[email protected]>
    Co-authored-by: Brian Hulette <[email protected]>
---
 sdks/python/apache_beam/ml/inference/base.py      | 67 ++++++-----------------
 sdks/python/apache_beam/ml/inference/base_test.py | 29 +++++-----
 2 files changed, 33 insertions(+), 63 deletions(-)

diff --git a/sdks/python/apache_beam/ml/inference/base.py 
b/sdks/python/apache_beam/ml/inference/base.py
index 7c7ef50e234..ad8ea5868f7 100644
--- a/sdks/python/apache_beam/ml/inference/base.py
+++ b/sdks/python/apache_beam/ml/inference/base.py
@@ -29,9 +29,7 @@ no expectation that these interfaces will not change.
 """
 
 import logging
-import os
 import pickle
-import platform
 import sys
 import time
 from typing import Any
@@ -39,7 +37,6 @@ from typing import Generic
 from typing import Iterable
 from typing import List
 from typing import Mapping
-from typing import Optional
 from typing import TypeVar
 
 import apache_beam as beam
@@ -51,13 +48,20 @@ try:
 except ImportError:
   resource = None  # type: ignore[assignment]
 
-_MICROSECOND_TO_MILLISECOND = 1000
-_NANOSECOND_TO_MICROSECOND = 1000
-_SECOND_TO_MICROSECOND = 1_000_000
+_NANOSECOND_TO_MILLISECOND = 1_000_000
+_NANOSECOND_TO_MICROSECOND = 1_000
 
 T = TypeVar('T')
 
 
+def _to_milliseconds(time_ns: int) -> int:
+  return int(time_ns / _NANOSECOND_TO_MILLISECOND)
+
+
+def _to_microseconds(time_ns: int) -> int:
+  return int(time_ns / _NANOSECOND_TO_MICROSECOND)
+
+
 class InferenceRunner:
   """Implements running inferences for a framework."""
   def run_inference(self, batch: List[Any], model: Any) -> Iterable[Any]:
@@ -99,8 +103,7 @@ class RunInference(beam.PTransform):
       model_loader: An implementation of ModelLoader.
       clock: A clock implementing get_current_time_in_microseconds.
   """
-  def __init__(
-      self, model_loader: ModelLoader, clock: Optional["_Clock"] = None):
+  def __init__(self, model_loader: ModelLoader, clock=time):
     self._model_loader = model_loader
     self._clock = clock
 
@@ -169,28 +172,24 @@ class _MetricsCollector:
 
 class _RunInferenceDoFn(beam.DoFn):
   """A DoFn implementation generic to frameworks."""
-  def __init__(
-      self, model_loader: ModelLoader, clock: Optional["_Clock"] = None):
+  def __init__(self, model_loader: ModelLoader, clock):
     self._model_loader = model_loader
     self._inference_runner = model_loader.get_inference_runner()
     self._shared_model_handle = shared.Shared()
     self._metrics_collector = _MetricsCollector(
         self._inference_runner.get_metrics_namespace())
     self._clock = clock
-    if not clock:
-      self._clock = _ClockFactory.make_clock()
     self._model = None
 
   def _load_model(self):
     def load():
       """Function for constructing shared LoadedModel."""
       memory_before = _get_current_process_memory_in_bytes()
-      start_time = self._clock.get_current_time_in_microseconds()
+      start_time = _to_milliseconds(self._clock.time_ns())
       model = self._model_loader.load_model()
-      end_time = self._clock.get_current_time_in_microseconds()
+      end_time = _to_milliseconds(self._clock.time_ns())
       memory_after = _get_current_process_memory_in_bytes()
-      load_model_latency_ms = ((end_time - start_time) /
-                               _MICROSECOND_TO_MILLISECOND)
+      load_model_latency_ms = end_time - start_time
       model_byte_size = memory_after - memory_before
       self._metrics_collector.cache_load_model_metrics(
           load_model_latency_ms, model_byte_size)
@@ -213,13 +212,13 @@ class _RunInferenceDoFn(beam.DoFn):
       examples = batch
       keys = None
 
-    start_time = self._clock.get_current_time_in_microseconds()
+    start_time = _to_microseconds(self._clock.time_ns())
     result_generator = self._inference_runner.run_inference(
         examples, self._model)
     predictions = list(result_generator)
 
-    inference_latency = self._clock.get_current_time_in_microseconds(
-    ) - start_time
+    end_time = _to_microseconds(self._clock.time_ns())
+    inference_latency = end_time - start_time
     num_bytes = self._inference_runner.get_num_bytes(examples)
     num_elements = len(batch)
     self._metrics_collector.update(num_elements, num_bytes, inference_latency)
@@ -252,33 +251,3 @@ def _get_current_process_memory_in_bytes():
         'Resource module is not available for current platform, '
         'memory usage cannot be fetched.')
   return 0
-
-
-def _is_windows() -> bool:
-  return platform.system() == 'Windows' or os.name == 'nt'
-
-
-def _is_cygwin() -> bool:
-  return platform.system().startswith('CYGWIN_NT')
-
-
-class _Clock(object):
-  def get_current_time_in_microseconds(self) -> int:
-    return int(time.time() * _SECOND_TO_MICROSECOND)
-
-
-class _FineGrainedClock(_Clock):
-  def get_current_time_in_microseconds(self) -> int:
-    return int(
-        time.clock_gettime_ns(time.CLOCK_REALTIME) /  # type: 
ignore[attr-defined]
-        _NANOSECOND_TO_MICROSECOND)
-
-
-#TODO(BEAM-14255): Research simplifying the internal clock and just using time.
-class _ClockFactory(object):
-  @staticmethod
-  def make_clock() -> _Clock:
-    if (hasattr(time, 'clock_gettime_ns') and not _is_windows() and
-        not _is_cygwin()):
-      return _FineGrainedClock()
-    return _Clock()
diff --git a/sdks/python/apache_beam/ml/inference/base_test.py 
b/sdks/python/apache_beam/ml/inference/base_test.py
index 384ee9426d0..d4bf6518bda 100644
--- a/sdks/python/apache_beam/ml/inference/base_test.py
+++ b/sdks/python/apache_beam/ml/inference/base_test.py
@@ -37,34 +37,35 @@ class FakeModel:
 
 class FakeInferenceRunner(base.InferenceRunner):
   def __init__(self, clock=None):
-    self._mock_clock = clock
+    self._fake_clock = clock
 
   def run_inference(self, batch: Any, model: Any) -> Iterable[Any]:
-    if self._mock_clock:
-      self._mock_clock.current_time += 3000
+    if self._fake_clock:
+      self._fake_clock.current_time_ns += 3_000_000  # 3 milliseconds
     for example in batch:
       yield model.predict(example)
 
 
 class FakeModelLoader(base.ModelLoader):
   def __init__(self, clock=None):
-    self._mock_clock = clock
+    self._fake_clock = clock
 
   def load_model(self):
-    if self._mock_clock:
-      self._mock_clock.current_time += 50000
+    if self._fake_clock:
+      self._fake_clock.current_time_ns += 500_000_000  # 500ms
     return FakeModel()
 
   def get_inference_runner(self):
-    return FakeInferenceRunner(self._mock_clock)
+    return FakeInferenceRunner(self._fake_clock)
 
 
-class MockClock(base._Clock):
+class FakeClock:
   def __init__(self):
-    self.current_time = 10000
+    # Start at 10 seconds.
+    self.current_time_ns = 10_000_000_000
 
-  def get_current_time_in_microseconds(self) -> int:
-    return self.current_time
+  def time_ns(self) -> int:
+    return self.current_time_ns
 
 
 class ExtractInferences(beam.DoFn):
@@ -137,9 +138,9 @@ class RunInferenceBaseTest(unittest.TestCase):
     pipeline = TestPipeline()
     examples = [1, 5, 3, 10]
     pcoll = pipeline | 'start' >> beam.Create(examples)
-    mock_clock = MockClock()
+    fake_clock = FakeClock()
     _ = pcoll | base.RunInference(
-        FakeModelLoader(clock=mock_clock), clock=mock_clock)
+        FakeModelLoader(clock=fake_clock), clock=fake_clock)
     res = pipeline.run()
     res.wait_until_finish()
 
@@ -155,7 +156,7 @@ class RunInferenceBaseTest(unittest.TestCase):
             MetricsFilter().with_name('load_model_latency_milli_secs')))
     load_model_latency = metric_results['distributions'][0]
     self.assertEqual(load_model_latency.result.count, 1)
-    self.assertEqual(load_model_latency.result.mean, 50)
+    self.assertEqual(load_model_latency.result.mean, 500)
 
   def test_forwards_batch_args(self):
     examples = list(range(100))

Reply via email to