This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4d22202fde9 [BEAM-14255] Drop clock abstraction (#17671)
4d22202fde9 is described below
commit 4d22202fde91d5bc0b73369de39e4f4baa49739e
Author: Ryan Thompson <[email protected]>
AuthorDate: Tue May 31 11:43:45 2022 -0400
[BEAM-14255] Drop clock abstraction (#17671)
* removed unused imports
* Update sdks/python/apache_beam/ml/inference/base.py
Co-authored-by: Andy Ye <[email protected]>
* use default clock arguemnts
* simplified nubmers in tests
* Update sdks/python/apache_beam/ml/inference/base_test.py
Co-authored-by: Brian Hulette <[email protected]>
* time_ns
Co-authored-by: Andy Ye <[email protected]>
Co-authored-by: Brian Hulette <[email protected]>
---
sdks/python/apache_beam/ml/inference/base.py | 67 ++++++-----------------
sdks/python/apache_beam/ml/inference/base_test.py | 29 +++++-----
2 files changed, 33 insertions(+), 63 deletions(-)
diff --git a/sdks/python/apache_beam/ml/inference/base.py
b/sdks/python/apache_beam/ml/inference/base.py
index 7c7ef50e234..ad8ea5868f7 100644
--- a/sdks/python/apache_beam/ml/inference/base.py
+++ b/sdks/python/apache_beam/ml/inference/base.py
@@ -29,9 +29,7 @@ no expectation that these interfaces will not change.
"""
import logging
-import os
import pickle
-import platform
import sys
import time
from typing import Any
@@ -39,7 +37,6 @@ from typing import Generic
from typing import Iterable
from typing import List
from typing import Mapping
-from typing import Optional
from typing import TypeVar
import apache_beam as beam
@@ -51,13 +48,20 @@ try:
except ImportError:
resource = None # type: ignore[assignment]
-_MICROSECOND_TO_MILLISECOND = 1000
-_NANOSECOND_TO_MICROSECOND = 1000
-_SECOND_TO_MICROSECOND = 1_000_000
+_NANOSECOND_TO_MILLISECOND = 1_000_000
+_NANOSECOND_TO_MICROSECOND = 1_000
T = TypeVar('T')
+def _to_milliseconds(time_ns: int) -> int:
+ return int(time_ns / _NANOSECOND_TO_MILLISECOND)
+
+
+def _to_microseconds(time_ns: int) -> int:
+ return int(time_ns / _NANOSECOND_TO_MICROSECOND)
+
+
class InferenceRunner:
"""Implements running inferences for a framework."""
def run_inference(self, batch: List[Any], model: Any) -> Iterable[Any]:
@@ -99,8 +103,7 @@ class RunInference(beam.PTransform):
model_loader: An implementation of ModelLoader.
clock: A clock implementing get_current_time_in_microseconds.
"""
- def __init__(
- self, model_loader: ModelLoader, clock: Optional["_Clock"] = None):
+ def __init__(self, model_loader: ModelLoader, clock=time):
self._model_loader = model_loader
self._clock = clock
@@ -169,28 +172,24 @@ class _MetricsCollector:
class _RunInferenceDoFn(beam.DoFn):
"""A DoFn implementation generic to frameworks."""
- def __init__(
- self, model_loader: ModelLoader, clock: Optional["_Clock"] = None):
+ def __init__(self, model_loader: ModelLoader, clock):
self._model_loader = model_loader
self._inference_runner = model_loader.get_inference_runner()
self._shared_model_handle = shared.Shared()
self._metrics_collector = _MetricsCollector(
self._inference_runner.get_metrics_namespace())
self._clock = clock
- if not clock:
- self._clock = _ClockFactory.make_clock()
self._model = None
def _load_model(self):
def load():
"""Function for constructing shared LoadedModel."""
memory_before = _get_current_process_memory_in_bytes()
- start_time = self._clock.get_current_time_in_microseconds()
+ start_time = _to_milliseconds(self._clock.time_ns())
model = self._model_loader.load_model()
- end_time = self._clock.get_current_time_in_microseconds()
+ end_time = _to_milliseconds(self._clock.time_ns())
memory_after = _get_current_process_memory_in_bytes()
- load_model_latency_ms = ((end_time - start_time) /
- _MICROSECOND_TO_MILLISECOND)
+ load_model_latency_ms = end_time - start_time
model_byte_size = memory_after - memory_before
self._metrics_collector.cache_load_model_metrics(
load_model_latency_ms, model_byte_size)
@@ -213,13 +212,13 @@ class _RunInferenceDoFn(beam.DoFn):
examples = batch
keys = None
- start_time = self._clock.get_current_time_in_microseconds()
+ start_time = _to_microseconds(self._clock.time_ns())
result_generator = self._inference_runner.run_inference(
examples, self._model)
predictions = list(result_generator)
- inference_latency = self._clock.get_current_time_in_microseconds(
- ) - start_time
+ end_time = _to_microseconds(self._clock.time_ns())
+ inference_latency = end_time - start_time
num_bytes = self._inference_runner.get_num_bytes(examples)
num_elements = len(batch)
self._metrics_collector.update(num_elements, num_bytes, inference_latency)
@@ -252,33 +251,3 @@ def _get_current_process_memory_in_bytes():
'Resource module is not available for current platform, '
'memory usage cannot be fetched.')
return 0
-
-
-def _is_windows() -> bool:
- return platform.system() == 'Windows' or os.name == 'nt'
-
-
-def _is_cygwin() -> bool:
- return platform.system().startswith('CYGWIN_NT')
-
-
-class _Clock(object):
- def get_current_time_in_microseconds(self) -> int:
- return int(time.time() * _SECOND_TO_MICROSECOND)
-
-
-class _FineGrainedClock(_Clock):
- def get_current_time_in_microseconds(self) -> int:
- return int(
- time.clock_gettime_ns(time.CLOCK_REALTIME) / # type:
ignore[attr-defined]
- _NANOSECOND_TO_MICROSECOND)
-
-
-#TODO(BEAM-14255): Research simplifying the internal clock and just using time.
-class _ClockFactory(object):
- @staticmethod
- def make_clock() -> _Clock:
- if (hasattr(time, 'clock_gettime_ns') and not _is_windows() and
- not _is_cygwin()):
- return _FineGrainedClock()
- return _Clock()
diff --git a/sdks/python/apache_beam/ml/inference/base_test.py
b/sdks/python/apache_beam/ml/inference/base_test.py
index 384ee9426d0..d4bf6518bda 100644
--- a/sdks/python/apache_beam/ml/inference/base_test.py
+++ b/sdks/python/apache_beam/ml/inference/base_test.py
@@ -37,34 +37,35 @@ class FakeModel:
class FakeInferenceRunner(base.InferenceRunner):
def __init__(self, clock=None):
- self._mock_clock = clock
+ self._fake_clock = clock
def run_inference(self, batch: Any, model: Any) -> Iterable[Any]:
- if self._mock_clock:
- self._mock_clock.current_time += 3000
+ if self._fake_clock:
+ self._fake_clock.current_time_ns += 3_000_000 # 3 milliseconds
for example in batch:
yield model.predict(example)
class FakeModelLoader(base.ModelLoader):
def __init__(self, clock=None):
- self._mock_clock = clock
+ self._fake_clock = clock
def load_model(self):
- if self._mock_clock:
- self._mock_clock.current_time += 50000
+ if self._fake_clock:
+ self._fake_clock.current_time_ns += 500_000_000 # 500ms
return FakeModel()
def get_inference_runner(self):
- return FakeInferenceRunner(self._mock_clock)
+ return FakeInferenceRunner(self._fake_clock)
-class MockClock(base._Clock):
+class FakeClock:
def __init__(self):
- self.current_time = 10000
+ # Start at 10 seconds.
+ self.current_time_ns = 10_000_000_000
- def get_current_time_in_microseconds(self) -> int:
- return self.current_time
+ def time_ns(self) -> int:
+ return self.current_time_ns
class ExtractInferences(beam.DoFn):
@@ -137,9 +138,9 @@ class RunInferenceBaseTest(unittest.TestCase):
pipeline = TestPipeline()
examples = [1, 5, 3, 10]
pcoll = pipeline | 'start' >> beam.Create(examples)
- mock_clock = MockClock()
+ fake_clock = FakeClock()
_ = pcoll | base.RunInference(
- FakeModelLoader(clock=mock_clock), clock=mock_clock)
+ FakeModelLoader(clock=fake_clock), clock=fake_clock)
res = pipeline.run()
res.wait_until_finish()
@@ -155,7 +156,7 @@ class RunInferenceBaseTest(unittest.TestCase):
MetricsFilter().with_name('load_model_latency_milli_secs')))
load_model_latency = metric_results['distributions'][0]
self.assertEqual(load_model_latency.result.count, 1)
- self.assertEqual(load_model_latency.result.mean, 50)
+ self.assertEqual(load_model_latency.result.mean, 500)
def test_forwards_batch_args(self):
examples = list(range(100))