jerrypeng closed pull request #3229: optimize py function stats usage
URL: https://github.com/apache/pulsar/pull/3229
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py
b/pulsar-functions/instance/src/main/python/contextimpl.py
index a9b9b1a39d..2797658c14 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -58,7 +58,7 @@ def __init__(self, instance_config, logger, pulsar_client,
user_code, consumers,
else {}
self.metrics_labels = metrics_labels
- self.user_metrics_labels = dict()
+ self.user_metrics_map = dict()
self.user_metrics_summary = Summary("pulsar_function_user_metric",
'Pulsar Function user defined metric',
ContextImpl.user_metrics_label_names)
@@ -120,9 +120,12 @@ def get_secret(self, secret_key):
return self.secrets_provider.provide_secret(secret_key,
self.secrets_map[secret_key])
def record_metric(self, metric_name, metric_value):
- if metric_name not in self.user_metrics_labels:
- self.user_metrics_labels[metric_name] = self.metrics_labels +
[metric_name]
-
self.user_metrics_summary.labels(*self.user_metrics_labels[metric_name]).observe(metric_value)
+ if metric_name not in self.user_metrics_map:
+ user_metrics_labels = self.metrics_labels + [metric_name]
+ self.user_metrics_map[metric_name] =
self.user_metrics_summary.labels(*user_metrics_labels)
+
+ self.user_metrics_map[metric_name].observe(metric_value)
+
def get_output_topic(self):
return self.instance_config.function_details.output
@@ -167,14 +170,14 @@ def get_and_reset_metrics(self):
def reset_metrics(self):
# TODO: Make it thread safe
- for labels in self.user_metrics_labels.values():
- self.user_metrics_summary.labels(*labels)._sum.set(0.0)
- self.user_metrics_summary.labels(*labels)._count.set(0.0)
+ for user_metric in self.user_metrics_map.values():
+ user_metric._sum.set(0.0)
+ user_metric._count.set(0.0)
def get_metrics(self):
metrics_map = {}
- for metric_name, metric_labels in self.user_metrics_labels.items():
- metrics_map["%s%s_sum" % (Stats.USER_METRIC_PREFIX, metric_name)] =
self.user_metrics_summary.labels(*metric_labels)._sum.get()
- metrics_map["%s%s_count" % (Stats.USER_METRIC_PREFIX, metric_name)] =
self.user_metrics_summary.labels(*metric_labels)._count.get()
+ for metric_name, user_metric in self.user_metrics_map.items():
+ metrics_map["%s%s_sum" % (Stats.USER_METRIC_PREFIX, metric_name)] =
user_metric._sum.get()
+ metrics_map["%s%s_count" % (Stats.USER_METRIC_PREFIX, metric_name)] =
user_metric._count.get()
return metrics_map
diff --git a/pulsar-functions/instance/src/main/python/function_stats.py
b/pulsar-functions/instance/src/main/python/function_stats.py
index 3ea0316190..b9a09c4f93 100644
--- a/pulsar-functions/instance/src/main/python/function_stats.py
+++ b/pulsar-functions/instance/src/main/python/function_stats.py
@@ -89,67 +89,80 @@ def __init__(self, metrics_labels):
self.metrics_labels = metrics_labels;
self.process_start_time = None
+ # as optimization
+ self._stat_total_processed_successfully =
self.stat_total_processed_successfully.labels(*self.metrics_labels)
+ self._stat_total_sys_exceptions =
self.stat_total_sys_exceptions.labels(*self.metrics_labels)
+ self._stat_total_user_exceptions =
self.stat_total_user_exceptions.labels(*self.metrics_labels)
+ self._stat_process_latency_ms =
self.stat_process_latency_ms.labels(*self.metrics_labels)
+ self._stat_last_invocation =
self.stat_last_invocation.labels(*self.metrics_labels)
+ self._stat_total_received =
self.stat_total_received.labels(*self.metrics_labels)
+ self._stat_total_processed_successfully_1min =
self.stat_total_processed_successfully_1min.labels(*self.metrics_labels)
+ self._stat_total_sys_exceptions_1min =
self.stat_total_sys_exceptions_1min.labels(*self.metrics_labels)
+ self._stat_total_user_exceptions_1min =
self.stat_total_user_exceptions_1min.labels(*self.metrics_labels)
+ self._stat_process_latency_ms_1min =
self.stat_process_latency_ms_1min.labels(*self.metrics_labels)
+ self._stat_total_received_1min =
self.stat_total_received_1min.labels(*self.metrics_labels)
+
# start time for windowed metrics
util.FixedTimer(60, self.reset).start()
def get_total_received(self):
- return self.stat_total_received.labels(*self.metrics_labels)._value.get();
+ return self._stat_total_received._value.get();
def get_total_processed_successfully(self):
- return
self.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get();
+ return self._stat_total_processed_successfully._value.get();
def get_total_sys_exceptions(self):
- return
self.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get();
+ return self._stat_total_sys_exceptions._value.get();
def get_total_user_exceptions(self):
- return
self.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get();
+ return self._stat_total_user_exceptions._value.get();
def get_avg_process_latency(self):
- process_latency_ms_count =
self.stat_process_latency_ms.labels(*self.metrics_labels)._count.get()
- process_latency_ms_sum =
self.stat_process_latency_ms.labels(*self.metrics_labels)._sum.get()
+ process_latency_ms_count = self._stat_process_latency_ms._count.get()
+ process_latency_ms_sum = self._stat_process_latency_ms._sum.get()
return 0.0 \
if process_latency_ms_count <= 0.0 \
else process_latency_ms_sum / process_latency_ms_count
def get_total_processed_successfully_1min(self):
- return
self.stat_total_processed_successfully_1min.labels(*self.metrics_labels)._value.get()
+ return self._stat_total_processed_successfully_1min._value.get()
def get_total_sys_exceptions_1min(self):
- return
self.stat_total_sys_exceptions_1min.labels(*self.metrics_labels)._value.get()
+ return self._stat_total_sys_exceptions_1min._value.get()
def get_total_user_exceptions_1min(self):
- return
self.stat_total_user_exceptions_1min.labels(*self.metrics_labels)._value.get()
+ return self._stat_total_user_exceptions_1min._value.get()
def get_total_received_1min(self):
- return
self.stat_total_received_1min.labels(*self.metrics_labels)._value.get()
+ return self._stat_total_received_1min._value.get()
def get_avg_process_latency_1min(self):
- process_latency_ms_count =
self.stat_process_latency_ms_1min.labels(*self.metrics_labels)._count.get()
- process_latency_ms_sum =
self.stat_process_latency_ms_1min.labels(*self.metrics_labels)._sum.get()
+ process_latency_ms_count = self._stat_process_latency_ms_1min._count.get()
+ process_latency_ms_sum = self._stat_process_latency_ms_1min._sum.get()
return 0.0 \
if process_latency_ms_count <= 0.0 \
else process_latency_ms_sum / process_latency_ms_count
def get_last_invocation(self):
- return self.stat_last_invocation.labels(*self.metrics_labels)._value.get()
+ return self._stat_last_invocation._value.get()
def incr_total_processed_successfully(self):
- self.stat_total_processed_successfully.labels(*self.metrics_labels).inc()
-
self.stat_total_processed_successfully_1min.labels(*self.metrics_labels).inc()
+ self._stat_total_processed_successfully.inc()
+ self._stat_total_processed_successfully_1min.inc()
def incr_total_sys_exceptions(self, exception):
- self.stat_total_sys_exceptions.labels(*self.metrics_labels).inc()
- self.stat_total_sys_exceptions_1min.labels(*self.metrics_labels).inc()
+ self._stat_total_sys_exceptions.inc()
+ self._stat_total_sys_exceptions_1min.inc()
self.add_sys_exception(exception)
def incr_total_user_exceptions(self, exception):
- self.stat_total_user_exceptions.labels(*self.metrics_labels).inc()
- self.stat_total_user_exceptions_1min.labels(*self.metrics_labels).inc()
+ self._stat_total_user_exceptions.inc()
+ self._stat_total_user_exceptions_1min.inc()
self.add_user_exception(exception)
def incr_total_received(self):
- self.stat_total_received.labels(*self.metrics_labels).inc()
- self.stat_total_received_1min.labels(*self.metrics_labels).inc()
+ self._stat_total_received.inc()
+ self._stat_total_received_1min.inc()
def process_time_start(self):
self.process_start_time = time.time();
@@ -157,11 +170,11 @@ def process_time_start(self):
def process_time_end(self):
if self.process_start_time:
duration = (time.time() - self.process_start_time) * 1000.0
-
self.stat_process_latency_ms.labels(*self.metrics_labels).observe(duration)
-
self.stat_process_latency_ms_1min.labels(*self.metrics_labels).observe(duration)
+ self._stat_process_latency_ms.observe(duration)
+ self._stat_process_latency_ms_1min.observe(duration)
def set_last_invocation(self, time):
- self.stat_last_invocation.labels(*self.metrics_labels).set(time * 1000.0)
+ self._stat_last_invocation.set(time * 1000.0)
def add_user_exception(self, exception):
error = traceback.format_exc()
@@ -178,7 +191,7 @@ def add_user_exception(self, exception):
@limits(calls=5, period=60)
def report_user_exception_prometheus(self, exception, ts):
- exception_metric_labels = self.metrics_labels + [exception.message,
str(ts)]
+ exception_metric_labels = self.metrics_labels + [str(exception), str(ts)]
self.user_exceptions.labels(*exception_metric_labels).set(1.0)
def add_sys_exception(self, exception):
@@ -196,15 +209,15 @@ def add_sys_exception(self, exception):
@limits(calls=5, period=60)
def report_system_exception_prometheus(self, exception, ts):
- exception_metric_labels = self.metrics_labels + [exception.message,
str(ts)]
+ exception_metric_labels = self.metrics_labels + [str(exception), str(ts)]
self.system_exceptions.labels(*exception_metric_labels).set(1.0)
def reset(self):
self.latest_user_exception = []
self.latest_sys_exception = []
-
self.stat_total_processed_successfully_1min.labels(*self.metrics_labels)._value.set(0.0)
-
self.stat_total_user_exceptions_1min.labels(*self.metrics_labels)._value.set(0.0)
-
self.stat_total_sys_exceptions_1min.labels(*self.metrics_labels)._value.set(0.0)
-
self.stat_process_latency_ms_1min.labels(*self.metrics_labels)._sum.set(0.0)
-
self.stat_process_latency_ms_1min.labels(*self.metrics_labels)._count.set(0.0)
- self.stat_total_received_1min.labels(*self.metrics_labels)._value.set(0.0)
\ No newline at end of file
+ self._stat_total_processed_successfully_1min._value.set(0.0)
+ self._stat_total_user_exceptions_1min._value.set(0.0)
+ self._stat_total_sys_exceptions_1min._value.set(0.0)
+ self._stat_process_latency_ms_1min._sum.set(0.0)
+ self._stat_process_latency_ms_1min._count.set(0.0)
+ self._stat_total_received_1min._value.set(0.0)
\ No newline at end of file
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services