[
https://issues.apache.org/jira/browse/BEAM-5778?focusedWorklogId=163852&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-163852
]
ASF GitHub Bot logged work on BEAM-5778:
----------------------------------------
Author: ASF GitHub Bot
Created on: 08/Nov/18 09:56
Start Date: 08/Nov/18 09:56
Worklog Time Spent: 10m
Work Description: kkucharc commented on a change in pull request #6943:
[BEAM-5778] Add integrations of Metrics API to Big Query for SyntheticcSources
load tests in Python SDK
URL: https://github.com/apache/beam/pull/6943#discussion_r231824300
##########
File path:
sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
##########
@@ -21,16 +21,121 @@
from __future__ import absolute_import
+import logging
import time
+from google.cloud import bigquery
+from google.cloud.bigquery.schema import SchemaField
+
import apache_beam as beam
from apache_beam.metrics import Metrics
+START_TIME_LABEL = 'runtime_start'
+END_TIME_LABEL = 'runtime_end'
+RUNTIME_LABEL = 'runtime'
+SUBMIT_TIMESTAMP_LABEL = 'submit_timestamp'
+LOAD_TEST_DATASET_NAME = 'python_load_tests'
+
+
+def _get_schema_field(schema_field):
+ return SchemaField(
+ name=schema_field['name'],
+ field_type=schema_field['type'],
+ mode=schema_field['mode'])
+
+
+class BigQueryMetricsCollector(object):
+ def __init__(self, project_name, namespace, schema_map):
+ self._namespace = namespace
+ bq_client = bigquery.Client(project=project_name)
+ bq_dataset = bq_client.dataset(LOAD_TEST_DATASET_NAME)
+ if not bq_dataset.exists():
+ raise ValueError(
+ 'Dataset {} does not exist in your project. '
+ 'You have to create table first.'
+ .format(namespace))
+
+ schemas = [{'name': SUBMIT_TIMESTAMP_LABEL,
+ 'type': 'TIMESTAMP',
+ 'mode': 'REQUIRED'}] + schema_map
+
+ self._schema_names = [schema['name'] for schema in schemas]
+
+ bq_schemas = [_get_schema_field(schema) for schema in schemas]
+
+ self._bq_table = bq_dataset.table(namespace, bq_schemas)
+ if not self._bq_table.exists():
+ self._bq_table.create()
+
+ def save_metrics(self, result):
+ metrics = result.metrics().query()
+ counters = metrics['counters']
+ counters_list = []
+ if len(counters) > 0:
+ counters_list = self._prepare_counter_metrics(counters)
+
+ distributions = metrics['distributions']
+ dist_list = []
+ if len(distributions) > 0:
+ dist_list = self._prepare_runtime_metrics(distributions)
+
+ timestamp = {'label': SUBMIT_TIMESTAMP_LABEL, 'value': time.time()}
+
+ insert_list = [timestamp] + dist_list + counters_list
+ rows_tuple = tuple(self._match_inserts_by_schema(insert_list))
+
+ self.insert_metrics(rows_tuple)
+
+ def _prepare_counter_metrics(self, counters):
+ for counter in counters:
+ logging.info("Counter: %s", counter)
+ counters_list = []
+ for counter in counters:
+ counter_commited = counter.committed
+ counter_label = str(counter.key.metric.name)
+ counters_list.append({'label': counter_label, 'value': counter_commited})
+
+ return counters_list
+
+ # prepares distributions of start and end metrics to show test runtime
+ def _prepare_runtime_metrics(self, distributions):
+ dist_pivot = {}
+ for dist in distributions:
+ logging.info("Distribution: %s", dist)
+ dist_pivot.update(self._get_start_end_time(dist))
+
+ runtime_in_s = dist_pivot[END_TIME_LABEL] - dist_pivot[START_TIME_LABEL]
+ runtime_in_s = float(runtime_in_s)
+ return [{'label': RUNTIME_LABEL, 'value': runtime_in_s}]
+
+ def _match_inserts_by_schema(self, insert_list):
+ for name in self._schema_names:
+ yield self._get_element_by_schema(name, insert_list)
+
+ def _get_element_by_schema(self, schema_name, list):
+ for metric in list:
+ if metric['label'] == schema_name:
+ return metric['value']
+ return None
+
+ def insert_metrics(self, rows_tuple):
+ job = self._bq_table.insert_data(rows=[rows_tuple])
+ if len(job) > 0 and len(job[0]['errors']) > 0:
+ for err in job[0]['errors']:
+ raise ValueError(err['message'])
+
+ def _get_start_end_time(self, distribution):
+ if distribution.key.metric.name == START_TIME_LABEL:
+ return {distribution.key.metric.name: distribution.committed.min}
+ elif distribution.key.metric.name == END_TIME_LABEL:
+ return {distribution.key.metric.name: distribution.committed.max}
+
class MeasureTime(beam.DoFn):
- def __init__(self):
- self.runtime_start = Metrics.distribution('pardo', 'runtime.start')
- self.runtime_end = Metrics.distribution('pardo', 'runtime.end')
+ def __init__(self, namespace):
+ self.namespace = namespace
+ self.runtime_start = Metrics.distribution(self.namespace, START_TIME_LABEL)
Review comment:
Ok
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 163852)
Time Spent: 3h 10m (was: 3h)
> Add integrations of Metrics API to Big Query for SyntheticSources load tests
> in Python SDK
> ------------------------------------------------------------------------------------------
>
> Key: BEAM-5778
> URL: https://issues.apache.org/jira/browse/BEAM-5778
> Project: Beam
> Issue Type: Improvement
> Components: testing
> Reporter: Kasia Kucharczyk
> Assignee: Kasia Kucharczyk
> Priority: Major
> Time Spent: 3h 10m
> Remaining Estimate: 0h
>
> Right now Metrics API collects basic metrics of load tests of
> SyntheticSources (Python SDK). It should be collected in BigQuery for
> presenting it on performance dashboards.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)