This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6c25bc0 Add start_times to MonitoringInfos and populate them in the
python SDK
new eca935b Merge pull request #13429 from Add start_times to
MonitoringInfos and populate them in the python SDK
6c25bc0 is described below
commit 6c25bc082e809246989ca8a459c36c5b4d247f20
Author: Alex Amato <[email protected]>
AuthorDate: Wed Nov 25 12:23:45 2020 -0800
Add start_times to MonitoringInfos and populate them in the python SDK
---
model/pipeline/src/main/proto/metrics.proto | 13 +++++++++++
sdks/python/apache_beam/metrics/cells.pxd | 2 ++
sdks/python/apache_beam/metrics/cells.py | 15 ++++++++++---
sdks/python/apache_beam/metrics/cells_test.py | 25 ++++++++++++++++++++++
.../apache_beam/runners/worker/sdk_worker_test.py | 2 ++
5 files changed, 54 insertions(+), 3 deletions(-)
diff --git a/model/pipeline/src/main/proto/metrics.proto
b/model/pipeline/src/main/proto/metrics.proto
index 86114a8..39ef551 100644
--- a/model/pipeline/src/main/proto/metrics.proto
+++ b/model/pipeline/src/main/proto/metrics.proto
@@ -31,6 +31,7 @@ option java_outer_classname = "MetricsApi";
import "beam_runner_api.proto";
import "google/protobuf/descriptor.proto";
+import "google/protobuf/timestamp.proto";
// A specification for describing a well known MonitoringInfo.
//
@@ -401,6 +402,18 @@ message MonitoringInfo {
// as Stackdriver will be able to aggregate the metrics using a subset of the
// provided labels
map<string, string> labels = 4;
+
+ // This indicates the start of the time range over which this value was
+ // measured.
+ // This is needed by some external metric aggregation services
+ // to indicate when the reporter of the metric first began collecting the
+ // cumulative value for the timeseries.
+ // If the SDK Harness restarts, it should reset the start_time, and reset
+ // the collection of cumulative metrics (i.e. start to count again from 0).
+ // HarnessMonitoringInfos should set this start_time once, when the
+ // MonitoringInfo is first reported.
+ // ProcessBundle MonitoringInfos should set a start_time for each bundle.
+ google.protobuf.Timestamp start_time = 5;
}
// A set of well known URNs that specify the encoding and aggregation method.
diff --git a/sdks/python/apache_beam/metrics/cells.pxd
b/sdks/python/apache_beam/metrics/cells.pxd
index 0204da8..0eaa890 100644
--- a/sdks/python/apache_beam/metrics/cells.pxd
+++ b/sdks/python/apache_beam/metrics/cells.pxd
@@ -17,11 +17,13 @@
cimport cython
cimport libc.stdint
+from cpython.datetime cimport datetime
cdef class MetricCell(object):
cdef object _lock
cpdef bint update(self, value) except -1
+ cdef datetime _start_time
cdef class CounterCell(MetricCell):
diff --git a/sdks/python/apache_beam/metrics/cells.py
b/sdks/python/apache_beam/metrics/cells.py
index a7b7938..34ce2a4 100644
--- a/sdks/python/apache_beam/metrics/cells.py
+++ b/sdks/python/apache_beam/metrics/cells.py
@@ -29,6 +29,7 @@ from __future__ import division
import threading
import time
from builtins import object
+from datetime import datetime
from typing import Any
from typing import Optional
from typing import SupportsInt
@@ -63,6 +64,7 @@ class MetricCell(object):
"""
def __init__(self):
self._lock = threading.Lock()
+ self._start_time = None
def update(self, value):
raise NotImplementedError
@@ -71,6 +73,13 @@ class MetricCell(object):
raise NotImplementedError
def to_runner_api_monitoring_info(self, name, transform_id):
+ if not self._start_time:
+ self._start_time = datetime.utcnow()
+ mi = self.to_runner_api_monitoring_info_impl(name, transform_id)
+ mi.start_time.FromDatetime(self._start_time)
+ return mi
+
+ def to_runner_api_monitoring_info_impl(self, name, transform_id):
raise NotImplementedError
def reset(self):
@@ -136,7 +145,7 @@ class CounterCell(MetricCell):
with self._lock:
return self.value
- def to_runner_api_monitoring_info(self, name, transform_id):
+ def to_runner_api_monitoring_info_impl(self, name, transform_id):
from apache_beam.metrics import monitoring_infos
if not name.urn:
# User counter case.
@@ -201,7 +210,7 @@ class DistributionCell(MetricCell):
with self._lock:
return self.data.get_cumulative()
- def to_runner_api_monitoring_info(self, name, transform_id):
+ def to_runner_api_monitoring_info_impl(self, name, transform_id):
from apache_beam.metrics import monitoring_infos
return monitoring_infos.int64_user_distribution(
name.namespace,
@@ -251,7 +260,7 @@ class GaugeCell(MetricCell):
with self._lock:
return self.data.get_cumulative()
- def to_runner_api_monitoring_info(self, name, transform_id):
+ def to_runner_api_monitoring_info_impl(self, name, transform_id):
from apache_beam.metrics import monitoring_infos
return monitoring_infos.int64_user_gauge(
name.namespace,
diff --git a/sdks/python/apache_beam/metrics/cells_test.py
b/sdks/python/apache_beam/metrics/cells_test.py
index e9aae5c..a120d15 100644
--- a/sdks/python/apache_beam/metrics/cells_test.py
+++ b/sdks/python/apache_beam/metrics/cells_test.py
@@ -28,6 +28,7 @@ from apache_beam.metrics.cells import DistributionCell
from apache_beam.metrics.cells import DistributionData
from apache_beam.metrics.cells import GaugeCell
from apache_beam.metrics.cells import GaugeData
+from apache_beam.metrics.metricbase import MetricName
class TestCounterCell(unittest.TestCase):
@@ -69,6 +70,14 @@ class TestCounterCell(unittest.TestCase):
c.inc()
self.assertEqual(c.get_cumulative(), -8)
+ def test_start_time_set(self):
+ c = CounterCell()
+ c.inc(2)
+
+ name = MetricName('namespace', 'name1')
+ mi = c.to_runner_api_monitoring_info(name, 'transform_id')
+ self.assertGreater(mi.start_time.seconds, 0)
+
class TestDistributionCell(unittest.TestCase):
@classmethod
@@ -119,6 +128,14 @@ class TestDistributionCell(unittest.TestCase):
d.update(3.3)
self.assertEqual(d.get_cumulative(), DistributionData(9, 3, 3, 3))
+ def test_start_time_set(self):
+ d = DistributionCell()
+ d.update(3.1)
+
+ name = MetricName('namespace', 'name1')
+ mi = d.to_runner_api_monitoring_info(name, 'transform_id')
+ self.assertGreater(mi.start_time.seconds, 0)
+
class TestGaugeCell(unittest.TestCase):
def test_basic_operations(self):
@@ -146,6 +163,14 @@ class TestGaugeCell(unittest.TestCase):
result = g2.combine(g1)
self.assertEqual(result.data.value, 1)
+ def test_start_time_set(self):
+ g1 = GaugeCell()
+ g1.set(3)
+
+ name = MetricName('namespace', 'name1')
+ mi = g1.to_runner_api_monitoring_info(name, 'transform_id')
+ self.assertGreater(mi.start_time.seconds, 0)
+
if __name__ == '__main__':
unittest.main()
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
index c1cf641..7fa290f 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
@@ -263,6 +263,8 @@ class SdkWorkerTest(unittest.TestCase):
responses['monitoring_infos_metadata'].monitoring_infos.monitoring_info)
found = False
for mi in short_id_to_mi.values():
+ # Clear the timestamp before comparing
+ mi.ClearField("start_time")
if mi == expected_monitoring_info:
found = True
self.assertTrue(found, str(responses['monitoring_infos_metadata']))