[ 
https://issues.apache.org/jira/browse/BEAM-5778?focusedWorklogId=171888&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-171888
 ]

ASF GitHub Bot logged work on BEAM-5778:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Dec/18 12:55
            Start Date: 04/Dec/18 12:55
    Worklog Time Spent: 10m 
      Work Description: lgajowy closed pull request #6943: [BEAM-5778] Add 
integrations of Metrics API to Big Query for SyntheticcSources load tests in 
Python SDK
URL: https://github.com/apache/beam/pull/6943
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/testing/load_tests/co_group_by_key_test.py 
b/sdks/python/apache_beam/testing/load_tests/co_group_by_key_test.py
index 6956f04531b5..8b3c026b6738 100644
--- a/sdks/python/apache_beam/testing/load_tests/co_group_by_key_test.py
+++ b/sdks/python/apache_beam/testing/load_tests/co_group_by_key_test.py
@@ -15,10 +15,25 @@
 # limitations under the License.
 #
 """
-To run test on DirectRunner
+This is CoGroupByKey load test with Synthetic Source. Besides of the standard
+input options there are additional options:
+* project (optional) - the gcp project in case of saving
+metrics in Big Query (in case of Dataflow Runner
+it is required to specify project of runner),
+* metrics_namespace (optional) - name of BigQuery table where metrics
+will be stored,
+in case of lack of any of both options metrics won't be saved
+* input_options - options for Synthetic Sources
+* co_input_options - options for  Synthetic Sources.
+
+Example test run on DirectRunner:
 
 python setup.py nosetests \
-    --test-pipeline-options="--input_options='{
+    --test-pipeline-options="
+      --project=big-query-project
+      --metrics_dataset=python_load_tests
+      --metrics_table=co_gbk
+      --input_options='{
         \"num_records\": 1000,
         \"key_size\": 5,
         \"value_size\":15,
@@ -32,7 +47,7 @@
         \"bundle_size_distribution_type\": \"const\",
         \"bundle_size_distribution_param\": 1,
         \"force_initial_num_bundles\":0}'" \
-    --tests apache_beam.testing.load_tests.co_group_by_it_test
+    --tests apache_beam.testing.load_tests.co_group_by_key_test
 
 To run test on other runner (ex. Dataflow):
 
@@ -43,6 +58,8 @@
         --staging_location=gs://...
         --temp_location=gs://...
         --sdk_location=./dist/apache-beam-x.x.x.dev0.tar.gz
+        --metrics_dataset=python_load_tests
+        --metrics_table=co_gbk
         --input_options='{
         \"num_records\": 1000,
         \"key_size\": 5,
@@ -59,7 +76,7 @@
         \"bundle_size_distribution_param\": 1,
         \"force_initial_num_bundles\":0
         }'" \
-    --tests apache_beam.testing.load_tests.co_group_by_it_test
+    --tests apache_beam.testing.load_tests.co_group_by_key_test
 
 """
 
@@ -71,13 +88,21 @@
 
 import apache_beam as beam
 from apache_beam.testing import synthetic_pipeline
-from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
 from apache_beam.testing.test_pipeline import TestPipeline
 
+try:
+  from apache_beam.testing.load_tests.load_test_metrics_utils import 
MeasureTime
+  from apache_beam.testing.load_tests.load_test_metrics_utils import 
MetricsMonitor
+  from google.cloud import bigquery as bq
+except ImportError:
+  bq = None
+
 INPUT_TAG = 'pc1'
 CO_INPUT_TAG = 'pc2'
+RUNTIME_LABEL = 'runtime'
 
 
[email protected](bq is None, 'BigQuery for storing metrics not installed')
 class CoGroupByKeyTest(unittest.TestCase):
 
   def parseTestPipelineOptions(self, options):
@@ -98,10 +123,30 @@ def parseTestPipelineOptions(self, options):
 
   def setUp(self):
     self.pipeline = TestPipeline(is_integration_test=True)
-    self.inputOptions = json.loads(self.pipeline.get_option('input_options'))
-    self.coInputOptions = json.loads(
+    self.input_options = json.loads(self.pipeline.get_option('input_options'))
+    self.co_input_options = json.loads(
         self.pipeline.get_option('co_input_options'))
 
+    metrics_project_id = self.pipeline.get_option('project')
+    self.metrics_namespace = self.pipeline.get_option('metrics_table')
+    metrics_dataset = self.pipeline.get_option('metrics_dataset')
+    self.metrics_monitor = None
+    check = metrics_project_id and self.metrics_namespace and metrics_dataset\
+            is not None
+    if check:
+      measured_values = [{'name': RUNTIME_LABEL,
+                          'type': 'FLOAT',
+                          'mode': 'REQUIRED'}]
+      self.metrics_monitor = MetricsMonitor(
+          project_name=metrics_project_id,
+          table=self.metrics_namespace,
+          dataset=metrics_dataset,
+          schema_map=measured_values
+      )
+    else:
+      logging.error('One or more of parameters for collecting metrics '
+                    'are empty. Metrics will not be collected')
+
   class _Ungroup(beam.DoFn):
     def process(self, element):
       values = element[1]
@@ -117,30 +162,32 @@ def testCoGroupByKey(self):
       pc1 = (p
              | 'Read ' + INPUT_TAG >> beam.io.Read(
                  synthetic_pipeline.SyntheticSource(
-                     self.parseTestPipelineOptions(self.inputOptions)))
+                     self.parseTestPipelineOptions(self.input_options)))
              | 'Make ' + INPUT_TAG + ' iterable' >> beam.Map(lambda x: (x, x))
+             | 'Measure time: Start pc1' >> beam.ParDo(
+                 MeasureTime(self.metrics_namespace))
             )
 
       pc2 = (p
              | 'Read ' + CO_INPUT_TAG >> beam.io.Read(
                  synthetic_pipeline.SyntheticSource(
-                     self.parseTestPipelineOptions(self.coInputOptions)))
+                     self.parseTestPipelineOptions(self.co_input_options)))
              | 'Make ' + CO_INPUT_TAG + ' iterable' >> beam.Map(
                  lambda x: (x, x))
+             | 'Measure time: Start pc2' >> beam.ParDo(
+                 MeasureTime(self.metrics_namespace))
             )
       # pylint: disable=expression-not-assigned
       ({INPUT_TAG: pc1, CO_INPUT_TAG: pc2}
        | 'CoGroupByKey: ' >> beam.CoGroupByKey()
        | 'Consume Joined Collections' >> beam.ParDo(self._Ungroup())
-       | 'Measure time' >> beam.ParDo(MeasureTime())
+       | 'Measure time: End' >> beam.ParDo(MeasureTime(self.metrics_namespace))
       )
 
       result = p.run()
       result.wait_until_finish()
-      metrics = result.metrics().query()
-
-      for dist in metrics['distributions']:
-        logging.info("Distribution: %s", dist)
+      if self.metrics_monitor is not None:
+        self.metrics_monitor.send_metrics(result)
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/testing/load_tests/combine_test.py 
b/sdks/python/apache_beam/testing/load_tests/combine_test.py
index 187a04657109..8214d619e3d4 100644
--- a/sdks/python/apache_beam/testing/load_tests/combine_test.py
+++ b/sdks/python/apache_beam/testing/load_tests/combine_test.py
@@ -15,10 +15,23 @@
 # limitations under the License.
 #
 """
-To run test on DirectRunner
+This is Combine load test with Synthetic Source. Besides of the standard
+input options there are additional options:
+* project (optional) - the gcp project in case of saving
+metrics in Big Query (in case of Dataflow Runner
+it is required to specify project of runner),
+* metrics_namespace (optional) - name of BigQuery table where metrics
+will be stored,
+in case of lack of any of both options metrics won't be saved
+* input_options - options for Synthetic Sources.
+
+Example test run on DirectRunner:
 
 python setup.py nosetests \
     --test-pipeline-options="
+    --project=big-query-project
+    --metrics_dataset=python_load_tests
+    --metrics_table=combine
     --input_options='{
     \"num_records\": 300,
     \"key_size\": 5,
@@ -38,6 +51,8 @@
         --staging_location=gs://...
         --temp_location=gs://...
         --sdk_location=./dist/apache-beam-x.x.x.dev0.tar.gz
+        --metrics_dataset=python_load_tests
+        --metrics_table=combine
         --input_options='{
         \"num_records\": 1000,
         \"key_size\": 5,
@@ -58,30 +73,57 @@
 
 import apache_beam as beam
 from apache_beam.testing import synthetic_pipeline
-from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
 from apache_beam.testing.test_pipeline import TestPipeline
 
+try:
+  from apache_beam.testing.load_tests.load_test_metrics_utils import 
MeasureTime
+  from apache_beam.testing.load_tests.load_test_metrics_utils import 
MetricsMonitor
+  from google.cloud import bigquery as bq
+except ImportError:
+  bq = None
+
+RUNTIME_LABEL = 'runtime'
+
 
[email protected](bq is None, 'BigQuery for storing metrics not installed')
 class CombineTest(unittest.TestCase):
   def parseTestPipelineOptions(self):
     return {
-        'numRecords': self.inputOptions.get('num_records'),
-        'keySizeBytes': self.inputOptions.get('key_size'),
-        'valueSizeBytes': self.inputOptions.get('value_size'),
+        'numRecords': self.input_options.get('num_records'),
+        'keySizeBytes': self.input_options.get('key_size'),
+        'valueSizeBytes': self.input_options.get('value_size'),
         'bundleSizeDistribution': {
-            'type': self.inputOptions.get(
+            'type': self.input_options.get(
                 'bundle_size_distribution_type', 'const'
             ),
-            'param': self.inputOptions.get('bundle_size_distribution_param', 0)
+            'param': self.input_options.get('bundle_size_distribution_param', 
0)
         },
-        'forceNumInitialBundles': self.inputOptions.get(
+        'forceNumInitialBundles': self.input_options.get(
             'force_initial_num_bundles', 0
         )
     }
 
   def setUp(self):
     self.pipeline = TestPipeline(is_integration_test=True)
-    self.inputOptions = json.loads(self.pipeline.get_option('input_options'))
+    self.input_options = json.loads(self.pipeline.get_option('input_options'))
+
+    metrics_project_id = self.pipeline.get_option('project')
+    self.metrics_namespace = self.pipeline.get_option('metrics_table')
+    metrics_dataset = self.pipeline.get_option('metrics_dataset')
+    self.metrics_monitor = None
+    check = metrics_project_id and self.metrics_namespace and metrics_dataset \
+            is not None
+    if check:
+      schema = [{'name': RUNTIME_LABEL, 'type': 'FLOAT', 'mode': 'REQUIRED'}]
+      self.metrics_monitor = MetricsMonitor(
+          project_name=metrics_project_id,
+          table=self.metrics_namespace,
+          dataset=metrics_dataset,
+          schema_map=schema
+      )
+    else:
+      logging.error('One or more of parameters for collecting metrics '
+                    'are empty. Metrics will not be collected')
 
   class _GetElement(beam.DoFn):
     def process(self, element):
@@ -93,17 +135,18 @@ def testCombineGlobally(self):
       (p
        | beam.io.Read(synthetic_pipeline.SyntheticSource(
            self.parseTestPipelineOptions()))
-       | 'Measure time' >> beam.ParDo(MeasureTime())
+       | 'Measure time: Start' >> beam.ParDo(
+           MeasureTime(self.metrics_namespace))
        | 'Combine with Top' >> beam.CombineGlobally(
            beam.combiners.TopCombineFn(1000))
        | 'Consume' >> beam.ParDo(self._GetElement())
+       | 'Measure time: End' >> beam.ParDo(MeasureTime(self.metrics_namespace))
       )
 
       result = p.run()
       result.wait_until_finish()
-      metrics = result.metrics().query()
-      for dist in metrics['distributions']:
-        logging.info("Distribution: %s", dist)
+      if self.metrics_monitor is not None:
+        self.metrics_monitor.send_metrics(result)
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/testing/load_tests/group_by_key_test.py 
b/sdks/python/apache_beam/testing/load_tests/group_by_key_test.py
index d019a122e7b5..4382fb621039 100644
--- a/sdks/python/apache_beam/testing/load_tests/group_by_key_test.py
+++ b/sdks/python/apache_beam/testing/load_tests/group_by_key_test.py
@@ -15,10 +15,24 @@
 # limitations under the License.
 #
 """
-To run test on DirectRunner
+This is GroupByKey load test with Synthetic Source. Besides of the standard
+input options there are additional options:
+* project (optional) - the gcp project in case of saving
+metrics in Big Query (in case of Dataflow Runner
+it is required to specify project of runner),
+* metrics_namespace (optional) - name of BigQuery table where metrics
+will be stored,
+in case of lack of any of both options metrics won't be saved
+* input_options - options for Synthetic Sources.
+
+Example test run on DirectRunner:
 
 python setup.py nosetests \
-    --test-pipeline-options="--input_options='{
+    --test-pipeline-options="
+    --project=big-query-project
+    --metrics_dataset=python_load_tests
+    --metrics_table=gbk
+    --input_options='{
     \"num_records\": 300,
     \"key_size\": 5,
     \"value_size\":15,
@@ -26,7 +40,7 @@
     \"bundle_size_distribution_param\": 1,
     \"force_initial_num_bundles\": 0
     }'" \
-    --tests apache_beam.testing.load_tests.group_by_it_test
+    --tests apache_beam.testing.load_tests.group_by_key_test
 
 To run test on other runner (ex. Dataflow):
 
@@ -37,6 +51,8 @@
         --staging_location=gs://...
         --temp_location=gs://...
         --sdk_location=./dist/apache-beam-x.x.x.dev0.tar.gz
+        --metrics_dataset=python_load_tests
+        --metrics_table=gbk
         --input_options='{
         \"num_records\": 1000,
         \"key_size\": 5,
@@ -45,7 +61,7 @@
         \"bundle_size_distribution_param\": 1,
         \"force_initial_num_bundles\": 0
         }'" \
-    --tests apache_beam.testing.load_tests.group_by_it_test
+    --tests apache_beam.testing.load_tests.group_by_key_test
 
 """
 
@@ -57,30 +73,57 @@
 
 import apache_beam as beam
 from apache_beam.testing import synthetic_pipeline
-from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
 from apache_beam.testing.test_pipeline import TestPipeline
 
+try:
+  from apache_beam.testing.load_tests.load_test_metrics_utils import 
MeasureTime
+  from apache_beam.testing.load_tests.load_test_metrics_utils import 
MetricsMonitor
+  from google.cloud import bigquery as bq
+except ImportError:
+  bq = None
+
+RUNTIME_LABEL = 'runtime'
 
+
[email protected](bq is None, 'BigQuery for storing metrics not installed')
 class GroupByKeyTest(unittest.TestCase):
   def parseTestPipelineOptions(self):
     return {
-        'numRecords': self.inputOptions.get('num_records'),
-        'keySizeBytes': self.inputOptions.get('key_size'),
-        'valueSizeBytes': self.inputOptions.get('value_size'),
+        'numRecords': self.input_options.get('num_records'),
+        'keySizeBytes': self.input_options.get('key_size'),
+        'valueSizeBytes': self.input_options.get('value_size'),
         'bundleSizeDistribution': {
-            'type': self.inputOptions.get(
+            'type': self.input_options.get(
                 'bundle_size_distribution_type', 'const'
             ),
-            'param': self.inputOptions.get('bundle_size_distribution_param', 0)
+            'param': self.input_options.get('bundle_size_distribution_param', 
0)
         },
-        'forceNumInitialBundles': self.inputOptions.get(
+        'forceNumInitialBundles': self.input_options.get(
             'force_initial_num_bundles', 0
         )
     }
 
   def setUp(self):
     self.pipeline = TestPipeline(is_integration_test=True)
-    self.inputOptions = json.loads(self.pipeline.get_option('input_options'))
+    self.input_options = json.loads(self.pipeline.get_option('input_options'))
+
+    metrics_project_id = self.pipeline.get_option('project')
+    self.metrics_namespace = self.pipeline.get_option('metrics_table')
+    metrics_dataset = self.pipeline.get_option('metrics_dataset')
+    self.metrics_monitor = None
+    check = metrics_project_id and self.metrics_namespace and metrics_dataset \
+            is not None
+    if check:
+      schema = [{'name': RUNTIME_LABEL, 'type': 'FLOAT', 'mode': 'REQUIRED'}]
+      self.metrics_monitor = MetricsMonitor(
+          project_name=metrics_project_id,
+          table=self.metrics_namespace,
+          dataset=metrics_dataset,
+          schema_map=schema
+      )
+    else:
+      logging.error('One or more of parameters for collecting metrics '
+                    'are empty. Metrics will not be collected')
 
   def testGroupByKey(self):
     with self.pipeline as p:
@@ -88,17 +131,18 @@ def testGroupByKey(self):
       (p
        | beam.io.Read(synthetic_pipeline.SyntheticSource(
            self.parseTestPipelineOptions()))
-       | 'Measure time' >> beam.ParDo(MeasureTime())
+       | 'Measure time: Start' >> beam.ParDo(
+           MeasureTime(self.metrics_namespace))
        | 'GroupByKey' >> beam.GroupByKey()
        | 'Ungroup' >> beam.FlatMap(
            lambda elm: [(elm[0], v) for v in elm[1]])
+       | 'Measure time: End' >> beam.ParDo(MeasureTime(self.metrics_namespace))
       )
 
       result = p.run()
       result.wait_until_finish()
-      metrics = result.metrics().query()
-      for dist in metrics['distributions']:
-        logging.info("Distribution: %s", dist)
+      if self.metrics_monitor is not None:
+        self.metrics_monitor.send_metrics(result)
 
 
 if __name__ == '__main__':
diff --git 
a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py 
b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
index 393be210d715..a1f867eaaf52 100644
--- a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
+++ b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
@@ -21,22 +21,177 @@
 
 from __future__ import absolute_import
 
+import logging
 import time
 
 import apache_beam as beam
 from apache_beam.metrics import Metrics
 
+try:
+  from google.cloud import bigquery
+  from google.cloud.bigquery.schema import SchemaField
+  from google.cloud.exceptions import NotFound
+except ImportError:
+  bigquery = None
+  SchemaField = None
+  NotFound = None
+
+RUNTIME_LABEL = 'runtime'
+SUBMIT_TIMESTAMP_LABEL = 'submit_timestamp'
+
+
+def _get_schema_field(schema_field):
+  return SchemaField(
+      name=schema_field['name'],
+      field_type=schema_field['type'],
+      mode=schema_field['mode'])
+
+
+class BigQueryClient(object):
+  def __init__(self, project_name, table, dataset, schema_map):
+    self._namespace = table
+
+    self._bq_client = bigquery.Client(project=project_name)
+
+    schema = self._parse_schema(schema_map)
+    self._schema_names = self._get_schema_names(schema)
+    schema = self._prepare_schema(schema)
+
+    self._get_or_create_table(schema, dataset)
+
+  def match_and_save(self, result_list):
+    rows_tuple = tuple(self._match_inserts_by_schema(result_list))
+    self._insert_data(rows_tuple)
+
+  def _match_inserts_by_schema(self, insert_list):
+    for name in self._schema_names:
+      yield self._get_element_by_schema(name, insert_list)
+
+  def _get_element_by_schema(self, schema_name, insert_list):
+    for metric in insert_list:
+      if metric['label'] == schema_name:
+        return metric['value']
+    return None
+
+  def _insert_data(self, rows_tuple):
+    errors = self._bq_client.insert_rows(self._bq_table, rows=[rows_tuple])
+    if len(errors) > 0:
+      for err in errors:
+        logging.error(err['message'])
+        raise ValueError('Unable save rows in BigQuery.')
+
+  def _get_dataset(self, dataset_name):
+    bq_dataset_ref = self._bq_client.dataset(dataset_name)
+    try:
+      bq_dataset = self._bq_client.get_dataset(bq_dataset_ref)
+    except NotFound:
+      raise ValueError(
+          'Dataset {} does not exist in your project. '
+          'You have to create table first.'
+          .format(dataset_name))
+    return bq_dataset
+
+  def _get_or_create_table(self, bq_schemas, dataset):
+    if self._namespace == '':
+      raise ValueError('Namespace cannot be empty.')
+
+    dataset = self._get_dataset(dataset)
+    table_ref = dataset.table(self._namespace)
+
+    try:
+      self._bq_table = self._bq_client.get_table(table_ref)
+    except NotFound:
+      table = bigquery.Table(table_ref, schema=bq_schemas)
+      self._bq_table = self._bq_client.create_table(table)
+
+  def _parse_schema(self, schema_map):
+    return [{'name': SUBMIT_TIMESTAMP_LABEL,
+             'type': 'TIMESTAMP',
+             'mode': 'REQUIRED'}] + schema_map
+
+  def _prepare_schema(self, schemas):
+    return [_get_schema_field(schema) for schema in schemas]
+
+  def _get_schema_names(self, schemas):
+    return [schema['name'] for schema in schemas]
+
+
+class MetricsMonitor(object):
+  def __init__(self, project_name, table, dataset, schema_map):
+    if project_name is not None:
+      self.bq = BigQueryClient(project_name, table, dataset, schema_map)
+
+  def send_metrics(self, result):
+    metrics = result.metrics().query()
+    counters = metrics['counters']
+    counters_list = []
+    if len(counters) > 0:
+      counters_list = self._prepare_counter_metrics(counters)
+
+    distributions = metrics['distributions']
+    dist_list = []
+    if len(distributions) > 0:
+      dist_list = self._prepare_runtime_metrics(distributions)
+
+    timestamp = {'label': SUBMIT_TIMESTAMP_LABEL, 'value': time.time()}
+
+    insert_list = [timestamp] + dist_list + counters_list
+    self.bq.match_and_save(insert_list)
+
+  def _prepare_counter_metrics(self, counters):
+    for counter in counters:
+      logging.info("Counter:  %s", counter)
+    counters_list = []
+    for counter in counters:
+      counter_commited = counter.committed
+      counter_label = str(counter.key.metric.name)
+      counters_list.append(
+          {'label': counter_label, 'value': counter_commited})
+
+    return counters_list
+
+  def _prepare_runtime_metrics(self, distributions):
+    min_values = []
+    max_values = []
+    for dist in distributions:
+      logging.info("Distribution: %s", dist)
+      min_values.append(dist.committed.min)
+      max_values.append(dist.committed.max)
+    # finding real start
+    min_value = min(min_values)
+    # finding real end
+    max_value = max(max_values)
+
+    runtime_in_s = max_value - min_value
+    logging.info("Runtime: %s", runtime_in_s)
+    runtime_in_s = float(runtime_in_s)
+    return [{'label': RUNTIME_LABEL, 'value': runtime_in_s}]
+
 
 class MeasureTime(beam.DoFn):
-  def __init__(self):
-    self.runtime_start = Metrics.distribution('pardo', 'runtime.start')
-    self.runtime_end = Metrics.distribution('pardo', 'runtime.end')
+  def __init__(self, namespace):
+    self.namespace = namespace
+    self.runtime = Metrics.distribution(self.namespace, RUNTIME_LABEL)
 
   def start_bundle(self):
-    self.runtime_start.update(time.time())
+    self.runtime.update(time.time())
 
   def finish_bundle(self):
-    self.runtime_end.update(time.time())
+    self.runtime.update(time.time())
 
   def process(self, element):
     yield element
+
+
+def count_bytes(counter_name):
+  def layer(f):
+    def repl(*args):
+      namespace = args[2]
+      counter = Metrics.counter(namespace, counter_name)
+      element = args[1]
+      _, value = element
+      for i in range(len(value)):
+        counter.inc(i)
+      return f(*args)
+    return repl
+  return layer
diff --git a/sdks/python/apache_beam/testing/load_tests/pardo_test.py 
b/sdks/python/apache_beam/testing/load_tests/pardo_test.py
index a1c753ef27d4..0f2b2c1b64ba 100644
--- a/sdks/python/apache_beam/testing/load_tests/pardo_test.py
+++ b/sdks/python/apache_beam/testing/load_tests/pardo_test.py
@@ -15,11 +15,28 @@
 # limitations under the License.
 #
 """
-To run test on DirectRunner
+This is ParDo load test with Synthetic Source. Besides of the standard
+input options there are additional options:
+* number_of_counter_operations - number of pardo operations
+* project (optional) - the gcp project in case of saving
+metrics in Big Query (in case of Dataflow Runner
+it is required to specify project of runner),
+* metrics_namespace (optional) - name of BigQuery table where metrics
+will be stored,
+in case of lack of any of both options metrics won't be saved
+* output (optional) - destination to save output, in case of no option
+output won't be written
+* input_options - options for Synthetic Sources.
+
+Example test run on DirectRunner:
 
 python setup.py nosetests \
     --test-pipeline-options="
     --number_of_counter_operations=1000
+    --output=gs://...
+    --project=big-query-project
+    --metrics_dataset=python_load_tests
+    --metrics_table=pardo
     --input_options='{
     \"num_records\": 300,
     \"key_size\": 5,
@@ -28,7 +45,7 @@
     \"bundle_size_distribution_param\": 1,
     \"force_initial_num_bundles\": 0
     }'" \
-    --tests apache_beam.testing.load_tests.par_do_test
+    --tests apache_beam.testing.load_tests.pardo_test
 
 To run test on other runner (ex. Dataflow):
 
@@ -39,8 +56,10 @@
         --staging_location=gs://...
         --temp_location=gs://...
         --sdk_location=./dist/apache-beam-x.x.x.dev0.tar.gz
-        --output=gc
+        --output=gs://...
         --number_of_counter_operations=1000
+        --metrics_dataset=python_load_tests
+        --metrics_table=pardo
         --input_options='{
         \"num_records\": 1000,
         \"key_size\": 5,
@@ -49,7 +68,7 @@
         \"bundle_size_distribution_param\": 1,
         \"force_initial_num_bundles\": 0
         }'" \
-    --tests apache_beam.testing.load_tests.par_do_test
+    --tests apache_beam.testing.load_tests.pardo_test
 
 """
 
@@ -57,65 +76,78 @@
 
 import json
 import logging
-import time
 import unittest
 
 import apache_beam as beam
-from apache_beam.metrics import Metrics
 from apache_beam.testing import synthetic_pipeline
-from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
 from apache_beam.testing.test_pipeline import TestPipeline
 
+try:
+  from apache_beam.testing.load_tests.load_test_metrics_utils import 
MeasureTime
+  from apache_beam.testing.load_tests.load_test_metrics_utils import 
MetricsMonitor
+  from google.cloud import bigquery as bq
+except ImportError:
+  bq = None
 
+COUNTER_LABEL = "total_bytes_count"
+RUNTIME_LABEL = 'runtime'
+
+
[email protected](bq is None, 'BigQuery for storing metrics not installed')
 class ParDoTest(unittest.TestCase):
   def parseTestPipelineOptions(self):
-    return {'numRecords': self.inputOptions.get('num_records'),
-            'keySizeBytes': self.inputOptions.get('key_size'),
-            'valueSizeBytes': self.inputOptions.get('value_size'),
+    return {'numRecords': self.input_options.get('num_records'),
+            'keySizeBytes': self.input_options.get('key_size'),
+            'valueSizeBytes': self.input_options.get('value_size'),
             'bundleSizeDistribution': {
-                'type': self.inputOptions.get(
+                'type': self.input_options.get(
                     'bundle_size_distribution_type', 'const'
                 ),
-                'param': self.inputOptions.get(
+                'param': self.input_options.get(
                     'bundle_size_distribution_param', 0
                 )
             },
-            'forceNumInitialBundles': self.inputOptions.get(
+            'forceNumInitialBundles': self.input_options.get(
                 'force_initial_num_bundles', 0
             )
            }
 
   def setUp(self):
     self.pipeline = TestPipeline(is_integration_test=True)
+
     self.output = self.pipeline.get_option('output')
     self.iterations = self.pipeline.get_option('number_of_counter_operations')
-    self.inputOptions = json.loads(self.pipeline.get_option('input_options'))
-
-  class _MeasureTime(beam.DoFn):
-    def __init__(self):
-      self.runtime_start = Metrics.distribution('pardo', 'runtime.start')
-      self.runtime_end = Metrics.distribution('pardo', 'runtime.end')
-
-    def start_bundle(self):
-      self.runtime_start.update(time.time())
-
-    def finish_bundle(self):
-      self.runtime_end.update(time.time())
+    self.input_options = json.loads(self.pipeline.get_option('input_options'))
+
+    metrics_project_id = self.pipeline.get_option('project')
+    self.metrics_namespace = self.pipeline.get_option('metrics_table')
+    metrics_dataset = self.pipeline.get_option('metrics_dataset')
+    self.metrics_monitor = None
+    if metrics_project_id and self.metrics_namespace is not None:
+      measured_values = [
+          {'name': RUNTIME_LABEL, 'type': 'FLOAT', 'mode': 'REQUIRED'},
+          {'name': COUNTER_LABEL, 'type': 'INTEGER', 'mode': 'REQUIRED'}
+      ]
+      self.metrics_monitor = MetricsMonitor(
+          project_name=metrics_project_id,
+          table=self.metrics_namespace,
+          dataset=metrics_dataset,
+          schema_map=measured_values
+      )
+    else:
+      logging.error('One or more of parameters for collecting metrics '
+                    'are empty. Metrics will not be collected')
 
-    def process(self, element):
-      yield element
+  def testParDo(self):
 
-  class _GetElement(beam.DoFn):
-    def __init__(self):
-      self.counter = Metrics.counter('pardo', 'total_bytes.count')
+    class _GetElement(beam.DoFn):
+      from apache_beam.testing.load_tests.load_test_metrics_utils import 
count_bytes
 
-    def process(self, element):
-      _, value = element
-      for i in range(len(value)):
-        self.counter.inc(i)
-      yield element
+      @count_bytes(COUNTER_LABEL)
+      def process(self, element, namespace, is_returning):
+        if is_returning:
+          yield element
 
-  def testParDo(self):
     if self.iterations is None:
       num_runs = 1
     else:
@@ -127,28 +159,32 @@ def testParDo(self):
                 synthetic_pipeline.SyntheticSource(
                     self.parseTestPipelineOptions()
                 ))
-            | 'Measure time' >> beam.ParDo(MeasureTime())
+            | 'Measure time: Start' >> beam.ParDo(
+                MeasureTime(self.metrics_namespace))
            )
 
       for i in range(num_runs):
-        label = 'Step: %d' % i
+        is_returning = (i == (num_runs-1))
         pc = (pc
-              | label >> beam.ParDo(self._GetElement()))
+              | 'Step: %d' % i >> beam.ParDo(
+                  _GetElement(), self.metrics_namespace, is_returning)
+             )
 
       if self.output is not None:
-        # pylint: disable=expression-not-assigned
-        (pc
-         | "Write" >> beam.io.WriteToText(self.output)
-        )
+        pc = (pc
+              | "Write" >> beam.io.WriteToText(self.output)
+             )
+
+      # pylint: disable=expression-not-assigned
+      (pc
+       | 'Measure time: End' >> beam.ParDo(MeasureTime(self.metrics_namespace))
+      )
 
       result = p.run()
       result.wait_until_finish()
-      metrics = result.metrics().query()
-      for counter in metrics['counters']:
-        logging.info("Counter: %s", counter)
 
-      for dist in metrics['distributions']:
-        logging.info("Distribution: %s", dist)
+      if self.metrics_monitor is not None:
+        self.metrics_monitor.send_metrics(result)
 
 
 if __name__ == '__main__':


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 171888)
    Time Spent: 3h 50m  (was: 3h 40m)

> Add integrations of Metrics API to Big Query for SyntheticSources load tests 
> in Python SDK
> ------------------------------------------------------------------------------------------
>
>                 Key: BEAM-5778
>                 URL: https://issues.apache.org/jira/browse/BEAM-5778
>             Project: Beam
>          Issue Type: Improvement
>          Components: testing
>            Reporter: Kasia Kucharczyk
>            Assignee: Kasia Kucharczyk
>            Priority: Major
>          Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> Right now Metrics API collects basic metrics of load tests of 
> SyntheticSources (Python SDK). It should be collected in BigQuery for 
> presenting it on performance dashboards.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to