This is an automated email from the ASF dual-hosted git repository.

udim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 76bc70d  [BEAM-7484] Metrics collection in BigQuery perf tests (#8766)
76bc70d is described below

commit 76bc70d49aa88789bcd230ad752fcbb6ec83d4a0
Author: Kamil Wasilewski <kamil.wasilew...@polidea.com>
AuthorDate: Thu Jul 18 01:48:15 2019 +0200

    [BEAM-7484] Metrics collection in BigQuery perf tests (#8766)
---
 .../apache_beam/io/gcp/bigquery_read_perf_test.py  | 12 +++++++++-
 .../apache_beam/io/gcp/bigquery_write_perf_test.py | 18 ++++++++++-----
 .../testing/load_tests/load_test_metrics_utils.py  | 26 ++++++++++++++++++++++
 3 files changed, 50 insertions(+), 6 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_perf_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_read_perf_test.py
index 2b6966f..3b98768 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_read_perf_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_read_perf_test.py
@@ -36,6 +36,9 @@ python setup.py nosetests \
     --staging_location=gs://...
     --temp_location=gs://...
     --sdk_location=.../dist/apache-beam-x.x.x.dev0.tar.gz
+    --publish_to_big_query=true
+    --metrics_dataset=gs://...
+    --metrics_table=...
     --input_dataset=...
     --input_table=...
     --input_options='{
@@ -54,6 +57,7 @@ import os
 import unittest
 
 from apache_beam import Map
+from apache_beam import ParDo
 from apache_beam.io import BigQueryDisposition
 from apache_beam.io import BigQuerySource
 from apache_beam.io import Read
@@ -61,6 +65,8 @@ from apache_beam.io import WriteToBigQuery
 from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
 from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
 from apache_beam.testing.load_tests.load_test import LoadTest
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
CountMessages
+from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
 from apache_beam.testing.synthetic_pipeline import SyntheticSource
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.util import assert_that
@@ -119,7 +125,7 @@ class BigQueryReadPerfTest(LoadTest):
      | 'Produce rows' >> Read(SyntheticSource(self.parseTestPipelineOptions()))
      | 'Format' >> Map(format_record)
      | 'Write to BigQuery' >> WriteToBigQuery(
-         self.input_dataset + '.' + self.input_table,
+         dataset=self.input_dataset, table=self.input_table,
          schema=SCHEMA,
          create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
          write_disposition=BigQueryDisposition.WRITE_EMPTY))
@@ -129,6 +135,10 @@ class BigQueryReadPerfTest(LoadTest):
     self.result = (self.pipeline
                    | 'Read from BigQuery' >> Read(BigQuerySource(
                        dataset=self.input_dataset, table=self.input_table))
+                   | 'Count messages' >> ParDo(CountMessages(
+                       self.metrics_namespace))
+                   | 'Measure time' >> ParDo(MeasureTime(
+                       self.metrics_namespace))
                    | 'Count' >> Count.Globally())
 
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_perf_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_write_perf_test.py
index 126d96e..c8dd304 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_write_perf_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_write_perf_test.py
@@ -17,8 +17,8 @@
 """
 A pipeline that writes data from Synthetic Source to a BigQuery table.
 Besides of the standard options, there are options with special meaning:
-* output - BQ destination in the following format: 'dataset_id.table_id'.
-The table will be removed after test completion,
+* output_dataset - BQ dataset name.
+* output_table - BQ table name. The table will be removed after test 
completion,
 * input_options - options for Synthetic Source:
 num_records - number of rows to be inserted,
 value_size - the length of a single row,
@@ -33,6 +33,9 @@ python setup.py nosetests \
     --staging_location=gs://...
     --temp_location=gs://...
     --sdk_location=.../dist/apache-beam-x.x.x.dev0.tar.gz
+    --publish_to_big_query=true
+    --metrics_dataset=gs://...
+    --metrics_table=...
     --output_dataset=...
     --output_table=...
     --input_options='{
@@ -53,12 +56,15 @@ import os
 import unittest
 
 from apache_beam import Map
+from apache_beam import ParDo
 from apache_beam.io import BigQueryDisposition
 from apache_beam.io import Read
 from apache_beam.io import WriteToBigQuery
 from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
 from apache_beam.io.gcp.tests import utils
 from apache_beam.testing.load_tests.load_test import LoadTest
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
CountMessages
+from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
 from apache_beam.testing.synthetic_pipeline import SyntheticSource
 
 load_test_enabled = False
@@ -93,10 +99,12 @@ class BigQueryWritePerfTest(LoadTest):
 
     # pylint: disable=expression-not-assigned
     (self.pipeline
-     | 'ProduceRows' >> Read(SyntheticSource(self.parseTestPipelineOptions()))
+     | 'Produce rows' >> Read(SyntheticSource(self.parseTestPipelineOptions()))
+     | 'Count messages' >> ParDo(CountMessages(self.metrics_namespace))
      | 'Format' >> Map(format_record)
-     | 'WriteToBigQuery' >> WriteToBigQuery(
-         self.output_dataset + '.' + self.output_table,
+     | 'Measure time' >> ParDo(MeasureTime(self.metrics_namespace))
+     | 'Write to BigQuery' >> WriteToBigQuery(
+         dataset=self.output_dataset, table=self.output_table,
          schema=SCHEMA,
          create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
          write_disposition=BigQueryDisposition.WRITE_TRUNCATE))
diff --git 
a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py 
b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
index ffb6592..194bc67 100644
--- a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
+++ b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
@@ -249,3 +249,29 @@ class MeasureTime(beam.DoFn):
 
   def process(self, element):
     yield element
+
+
+class MeasureBytes(beam.DoFn):
+  LABEL = 'total_bytes'
+
+  def __init__(self, namespace, extractor=None):
+    self.namespace = namespace
+    self.counter = Metrics.counter(self.namespace, self.LABEL)
+    self.extractor = extractor if extractor else lambda x: (yield x)
+
+  def process(self, element, *args):
+    for value in self.extractor(element, *args):
+      self.counter.inc(len(value))
+    yield element
+
+
+class CountMessages(beam.DoFn):
+  LABEL = 'total_messages'
+
+  def __init__(self, namespace):
+    self.namespace = namespace
+    self.counter = Metrics.counter(self.namespace, self.LABEL)
+
+  def process(self, element):
+    self.counter.inc(1)
+    yield element

Reply via email to