This is an automated email from the ASF dual-hosted git repository. udim pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 76bc70d [BEAM-7484] Metrics collection in BigQuery perf tests (#8766) 76bc70d is described below commit 76bc70d49aa88789bcd230ad752fcbb6ec83d4a0 Author: Kamil Wasilewski <kamil.wasilew...@polidea.com> AuthorDate: Thu Jul 18 01:48:15 2019 +0200 [BEAM-7484] Metrics collection in BigQuery perf tests (#8766) --- .../apache_beam/io/gcp/bigquery_read_perf_test.py | 12 +++++++++- .../apache_beam/io/gcp/bigquery_write_perf_test.py | 18 ++++++++++----- .../testing/load_tests/load_test_metrics_utils.py | 26 ++++++++++++++++++++++ 3 files changed, 50 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_perf_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_perf_test.py index 2b6966f..3b98768 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_read_perf_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_read_perf_test.py @@ -36,6 +36,9 @@ python setup.py nosetests \ --staging_location=gs://... --temp_location=gs://... --sdk_location=.../dist/apache-beam-x.x.x.dev0.tar.gz + --publish_to_big_query=true + --metrics_dataset=gs://... + --metrics_table=... --input_dataset=... --input_table=... --input_options='{ @@ -54,6 +57,7 @@ import os import unittest from apache_beam import Map +from apache_beam import ParDo from apache_beam.io import BigQueryDisposition from apache_beam.io import BigQuerySource from apache_beam.io import Read @@ -61,6 +65,8 @@ from apache_beam.io import WriteToBigQuery from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json from apache_beam.testing.load_tests.load_test import LoadTest +from apache_beam.testing.load_tests.load_test_metrics_utils import CountMessages +from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime from apache_beam.testing.synthetic_pipeline import SyntheticSource from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that @@ -119,7 +125,7 @@ class BigQueryReadPerfTest(LoadTest): | 'Produce rows' >> Read(SyntheticSource(self.parseTestPipelineOptions())) | 'Format' >> Map(format_record) | 'Write to BigQuery' >> WriteToBigQuery( - self.input_dataset + '.' + self.input_table, + dataset=self.input_dataset, table=self.input_table, schema=SCHEMA, create_disposition=BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=BigQueryDisposition.WRITE_EMPTY)) @@ -129,6 +135,10 @@ class BigQueryReadPerfTest(LoadTest): self.result = (self.pipeline | 'Read from BigQuery' >> Read(BigQuerySource( dataset=self.input_dataset, table=self.input_table)) + | 'Count messages' >> ParDo(CountMessages( + self.metrics_namespace)) + | 'Measure time' >> ParDo(MeasureTime( + self.metrics_namespace)) | 'Count' >> Count.Globally()) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_perf_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_perf_test.py index 126d96e..c8dd304 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_write_perf_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_write_perf_test.py @@ -17,8 +17,8 @@ """ A pipeline that writes data from Synthetic Source to a BigQuery table. Besides of the standard options, there are options with special meaning: -* output - BQ destination in the following format: 'dataset_id.table_id'. -The table will be removed after test completion, +* output_dataset - BQ dataset name. +* output_table - BQ table name. The table will be removed after test completion, * input_options - options for Synthetic Source: num_records - number of rows to be inserted, value_size - the length of a single row, @@ -33,6 +33,9 @@ python setup.py nosetests \ --staging_location=gs://... --temp_location=gs://... --sdk_location=.../dist/apache-beam-x.x.x.dev0.tar.gz + --publish_to_big_query=true + --metrics_dataset=gs://... + --metrics_table=... --output_dataset=... --output_table=... --input_options='{ @@ -53,12 +56,15 @@ import os import unittest from apache_beam import Map +from apache_beam import ParDo from apache_beam.io import BigQueryDisposition from apache_beam.io import Read from apache_beam.io import WriteToBigQuery from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json from apache_beam.io.gcp.tests import utils from apache_beam.testing.load_tests.load_test import LoadTest +from apache_beam.testing.load_tests.load_test_metrics_utils import CountMessages +from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime from apache_beam.testing.synthetic_pipeline import SyntheticSource load_test_enabled = False @@ -93,10 +99,12 @@ class BigQueryWritePerfTest(LoadTest): # pylint: disable=expression-not-assigned (self.pipeline - | 'ProduceRows' >> Read(SyntheticSource(self.parseTestPipelineOptions())) + | 'Produce rows' >> Read(SyntheticSource(self.parseTestPipelineOptions())) + | 'Count messages' >> ParDo(CountMessages(self.metrics_namespace)) | 'Format' >> Map(format_record) - | 'WriteToBigQuery' >> WriteToBigQuery( - self.output_dataset + '.' + self.output_table, + | 'Measure time' >> ParDo(MeasureTime(self.metrics_namespace)) + | 'Write to BigQuery' >> WriteToBigQuery( + dataset=self.output_dataset, table=self.output_table, schema=SCHEMA, create_disposition=BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=BigQueryDisposition.WRITE_TRUNCATE)) diff --git a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py index ffb6592..194bc67 100644 --- a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py +++ b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py @@ -249,3 +249,29 @@ class MeasureTime(beam.DoFn): def process(self, element): yield element + + +class MeasureBytes(beam.DoFn): + LABEL = 'total_bytes' + + def __init__(self, namespace, extractor=None): + self.namespace = namespace + self.counter = Metrics.counter(self.namespace, self.LABEL) + self.extractor = extractor if extractor else lambda x: (yield x) + + def process(self, element, *args): + for value in self.extractor(element, *args): + self.counter.inc(len(value)) + yield element + + +class CountMessages(beam.DoFn): + LABEL = 'total_messages' + + def __init__(self, namespace): + self.namespace = namespace + self.counter = Metrics.counter(self.namespace, self.LABEL) + + def process(self, element): + self.counter.inc(1) + yield element