This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e26fe06  [BEAM-11791] Continuously running microbenchmark for 
FnApiRunner
     new ee4cf67  Merge pull request #13906 from pabloem/fnrunner-ubench
e26fe06 is described below

commit e26fe067994edda05b9d097361f39720bd333865
Author: Pablo Estrada <[email protected]>
AuthorDate: Fri Feb 5 12:02:16 2021 -0800

    [BEAM-11791] Continuously running microbenchmark for FnApiRunner
---
 .../job_LoadTests_FnApiRunner_Python.groovy        |  64 ++++++++
 .../Python_FnApiRunner_ubenchmarks.json            | 174 +++++++++++++++++++++
 .../apache_beam/testing/load_tests/load_test.py    |   3 +-
 .../testing/load_tests/load_test_metrics_utils.py  |  24 ++-
 .../testing/load_tests/microbenchmarks_test.py     |  85 ++++++++++
 .../tools/fn_api_runner_microbenchmark.py          |   7 +-
 6 files changed, 345 insertions(+), 12 deletions(-)

diff --git a/.test-infra/jenkins/job_LoadTests_FnApiRunner_Python.groovy 
b/.test-infra/jenkins/job_LoadTests_FnApiRunner_Python.groovy
new file mode 100644
index 0000000..69c42f8
--- /dev/null
+++ b/.test-infra/jenkins/job_LoadTests_FnApiRunner_Python.groovy
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import LoadTestsBuilder as loadTestsBuilder
+import PhraseTriggeringPostCommitBuilder
+
+def now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
+
+def loadTestConfigurations = { datasetName ->
+  [
+    [
+      title          : 'FnApiRunner Python load test - microbenchmark',
+      test           : 'apache_beam.testing.load_tests.microbenchmarks_test',
+      runner         : CommonTestProperties.Runner.DIRECT,
+      pipelineOptions: [
+        publish_to_big_query: true,
+        influx_measurement  : 'python_microbenchmarks',
+        project             : 'apache-beam-testing',
+        metrics_dataset     : datasetName,
+        metrics_table       : 'python_direct_microbenchmarks',
+        input_options        : '\'{}\'',
+      ]
+    ],
+  ]
+}
+
+PhraseTriggeringPostCommitBuilder.postCommitJob(
+    'beam_Python_LoadTests_FnApiRunner_Microbenchmark',
+    'Run Python Load Tests FnApiRunner Microbenchmark',
+    'Python Load Tests FnApiRunner Microbenchmark',
+    this
+    ) {
+      def datasetName = loadTestsBuilder.getBigQueryDataset('load_test', 
CommonTestProperties.TriggeringContext.PR)
+      loadTestsBuilder.loadTests(delegate, CommonTestProperties.SDK.PYTHON,
+          loadTestConfigurations(datasetName), "MicroBenchmarks", "batch")
+    }
+
+
+// Run this job every 6 hours on a random minute.
+CronJobBuilder.cronJob('beam_Python_LoadTests_FnApiRunner_Microbenchmark', 'H 
*/6 * * *', this) {
+  additionalPipelineArgs = [
+    influx_db_name: InfluxDBCredentialsHelper.InfluxDBDatabaseName,
+    influx_hostname: InfluxDBCredentialsHelper.InfluxDBHostUrl,
+  ]
+  def datasetName = loadTestsBuilder.getBigQueryDataset('load_test', 
CommonTestProperties.TriggeringContext.POST_COMMIT)
+  loadTestsBuilder.loadTests(delegate, CommonTestProperties.SDK.PYTHON,
+      loadTestConfigurations(datasetName), "MicroBenchmarks", "batch")
+}
+
diff --git 
a/.test-infra/metrics/grafana/dashboards/perftests_metrics/Python_FnApiRunner_ubenchmarks.json
 
b/.test-infra/metrics/grafana/dashboards/perftests_metrics/Python_FnApiRunner_ubenchmarks.json
new file mode 100644
index 0000000..3b2f2eb
--- /dev/null
+++ 
b/.test-infra/metrics/grafana/dashboards/perftests_metrics/Python_FnApiRunner_ubenchmarks.json
@@ -0,0 +1,174 @@
+{
+  "annotations": {
+    "list": [
+      {
+        "builtIn": 1,
+        "datasource": "-- Grafana --",
+        "enable": true,
+        "hide": true,
+        "iconColor": "rgba(0, 211, 255, 1)",
+        "name": "Annotations & Alerts",
+        "type": "dashboard"
+      }
+    ]
+  },
+  "editable": true,
+  "gnetId": null,
+  "graphTooltip": 0,
+  "id": 16,
+  "links": [],
+  "panels": [
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "BeamInfluxDB",
+      "fill": 1,
+      "fillGradient": 0,
+      "gridPos": {
+        "h": 9,
+        "w": 12,
+        "x": 0,
+        "y": 0
+      },
+      "hiddenSeries": false,
+      "id": 2,
+      "interval": "6h",
+      "legend": {
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "show": true,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 2,
+      "nullPointMode": "connected",
+      "options": {
+        "dataLinks": []
+      },
+      "percentage": false,
+      "pointradius": 2,
+      "points": true,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "alias": "runtime",
+          "groupBy": [
+            {
+              "params": [
+                "$__interval"
+              ],
+              "type": "time"
+            },
+            {
+              "params": [
+                "null"
+              ],
+              "type": "fill"
+            }
+          ],
+          "orderByTime": "ASC",
+          "policy": "default",
+          "query": "SELECT mean(\"value\") FROM 
\"python_direct_microbenchmarks\" WHERE metric = 
'fn_api_runner_microbenchmark_runtime_sec' AND $timeFilter GROUP BY 
time($__interval),  \"metric\"",
+          "rawQuery": true,
+          "refId": "A",
+          "resultFormat": "time_series",
+          "select": [
+            [
+              {
+                "params": [
+                  "value"
+                ],
+                "type": "field"
+              },
+              {
+                "params": [],
+                "type": "mean"
+              }
+            ]
+          ],
+          "tags": []
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeRegions": [],
+      "timeShift": null,
+      "title": "Python SDK Microbenchmarks - FnApiRunner microbenchmark",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "transparent": true,
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "s",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+  ],
+  "schemaVersion": 22,
+  "style": "dark",
+  "tags": ["performance tests"],
+  "templating": {
+    "list": []
+  },
+  "time": {
+    "from": "now-30d",
+    "to": "now"
+  },
+  "timepicker": {
+    "refresh_intervals": [
+      "5s",
+      "10s",
+      "30s",
+      "1m",
+      "5m",
+      "15m",
+      "30m",
+      "1h",
+      "2h",
+      "1d"
+    ]
+  },
+  "timezone": "",
+  "title": "FnApiRunner Micro-benchmarks",
+  "uid": "1cnwVDkGk",
+  "variables": {
+    "list": []
+  },
+  "version": 2
+}
diff --git a/sdks/python/apache_beam/testing/load_tests/load_test.py 
b/sdks/python/apache_beam/testing/load_tests/load_test.py
index a15f819..6fb48cd 100644
--- a/sdks/python/apache_beam/testing/load_tests/load_test.py
+++ b/sdks/python/apache_beam/testing/load_tests/load_test.py
@@ -100,6 +100,7 @@ class LoadTest(object):
     options = self.pipeline.get_pipeline_options().view_as(LoadTestOptions)
     self.timeout_ms = options.timeout_ms
     self.input_options = options.input_options
+    self.extra_metrics = {}
 
     if metrics_namespace:
       self.metrics_namespace = metrics_namespace
@@ -150,7 +151,7 @@ class LoadTest(object):
         self.result = self.pipeline.run()
         # Defaults to waiting forever, unless timeout_ms has been set
         self.result.wait_until_finish(duration=self.timeout_ms)
-      self._metrics_monitor.publish_metrics(self.result)
+      self._metrics_monitor.publish_metrics(self.result, self.extra_metrics)
     finally:
       self.cleanup()
 
diff --git 
a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py 
b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
index d816827..9475578 100644
--- a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
+++ b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
@@ -218,7 +218,8 @@ class MetricsReader(object):
           'InfluxDB')
     self.filters = filters
 
-  def publish_metrics(self, result):
+  def publish_metrics(self, result, extra_metrics: dict):
+    metric_id = uuid.uuid4().hex
     metrics = result.metrics().query(self.filters)
 
     # Metrics from pipeline result are stored in map with keys: 'gauges',
@@ -226,11 +227,20 @@ class MetricsReader(object):
     # Under each key there is list of objects of each metric type. It is
     # required to prepare metrics for publishing purposes. Expected is to have
     # a list of dictionaries matching the schema.
-    insert_dicts = self._prepare_all_metrics(metrics)
+    insert_dicts = self._prepare_all_metrics(metrics, metric_id)
+
+    insert_dicts += self._prepare_extra_metrics(extra_metrics, metric_id)
     if len(insert_dicts) > 0:
       for publisher in self.publishers:
         publisher.publish(insert_dicts)
 
+  def _prepare_extra_metrics(self, extra_metrics: dict, metric_id: str):
+    ts = time.time()
+    return [
+        Metric(ts, metric_id, v, label=k).as_dict() for k,
+        v in extra_metrics.items()
+    ]
+
   def publish_values(self, labeled_values):
     """The method to publish simple labeled values.
 
@@ -246,8 +256,7 @@ class MetricsReader(object):
     for publisher in self.publishers:
       publisher.publish(metric_dicts)
 
-  def _prepare_all_metrics(self, metrics):
-    metric_id = uuid.uuid4().hex
+  def _prepare_all_metrics(self, metrics, metric_id):
 
     insert_rows = self._get_counters(metrics['counters'], metric_id)
     insert_rows += self._get_distributions(metrics['distributions'], metric_id)
@@ -398,11 +407,10 @@ class BigQueryMetricsPublisher(object):
     outputs = self.bq.save(results)
     if len(outputs) > 0:
       for output in outputs:
-        errors = output['errors']
-        for err in errors:
-          _LOGGER.error(err['message'])
+        if output['errors']:
+          _LOGGER.error(output)
           raise ValueError(
-              'Unable save rows in BigQuery: {}'.format(err['message']))
+              'Unable save rows in BigQuery: {}'.format(output['errors']))
 
 
 class BigQueryClient(object):
diff --git a/sdks/python/apache_beam/testing/load_tests/microbenchmarks_test.py 
b/sdks/python/apache_beam/testing/load_tests/microbenchmarks_test.py
new file mode 100644
index 0000000..78874a1
--- /dev/null
+++ b/sdks/python/apache_beam/testing/load_tests/microbenchmarks_test.py
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+This is a load test that runs a set of basic microbenchmarks for the Python SDK
+and the DirectRunner.
+
+This test does not need any additional options passed to run, besides the
+dataset information.
+
+Example test run:
+
+python -m apache_beam.testing.load_tests.microbenchmarks_test \
+    --test-pipeline-options="
+    --project=big-query-project
+    --input_options='{}'
+    --region=...
+    --publish_to_big_query=true
+    --metrics_dataset=python_load_tests
+    --metrics_table=microbenchmarks"
+
+or:
+
+./gradlew -PloadTest.args="
+    --publish_to_big_query=true
+    --project=...
+    --region=...
+    --input_options='{}'
+    --metrics_dataset=python_load_tests
+    --metrics_table=microbenchmarks
+    --runner=DirectRunner" \
+-PloadTest.mainClass=apache_beam.testing.load_tests.microbenchmarks_test \
+-Prunner=DirectRunner :sdks:python:apache_beam:testing:load_tests:run
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import logging
+import time
+
+from apache_beam.testing.load_tests.load_test import LoadTest
+from apache_beam.tools import fn_api_runner_microbenchmark
+from apache_beam.transforms.util import _BatchSizeEstimator
+
+
+class MicroBenchmarksLoadTest(LoadTest):
+  def __init__(self):
+    super(MicroBenchmarksLoadTest, self).__init__()
+
+  def test(self):
+    self.extra_metrics.update(self._run_fn_api_runner_microbenchmark())
+
+  def _run_fn_api_runner_microbenchmark(self):
+    start = time.perf_counter()
+    result = fn_api_runner_microbenchmark.run_benchmark(verbose=False)
+    sizes = list(result[0].values())[0]
+    costs = list(result[1].values())[0]
+    a, b = _BatchSizeEstimator.linear_regression_no_numpy(sizes, costs)
+
+    return {
+        'fn_api_runner_microbenchmark_runtime_sec': time.perf_counter() - 
start,
+        'fn_api_runner_microbenchmark_fixed_cost_ms': a * 1000,
+        'fn_api_runner_microbenchmark_per_element_cost_ms': b * 1000,
+    }
+
+
+if __name__ == '__main__':
+  logging.basicConfig(level=logging.INFO)
+  MicroBenchmarksLoadTest().run()
diff --git a/sdks/python/apache_beam/tools/fn_api_runner_microbenchmark.py 
b/sdks/python/apache_beam/tools/fn_api_runner_microbenchmark.py
index 0b9eb6a..b0fc46c 100644
--- a/sdks/python/apache_beam/tools/fn_api_runner_microbenchmark.py
+++ b/sdks/python/apache_beam/tools/fn_api_runner_microbenchmark.py
@@ -77,7 +77,7 @@ from apache_beam.transforms.userstate import on_timer
 
 NUM_PARALLEL_STAGES = 7
 
-NUM_SERIAL_STAGES = 5
+NUM_SERIAL_STAGES = 7
 
 
 class BagInStateOutputAfterTimer(beam.DoFn):
@@ -128,12 +128,13 @@ def run_single_pipeline(size):
   return _pipeline_runner
 
 
-def run_benchmark(starting_point, num_runs, num_elements_step, verbose):
+def run_benchmark(
+    starting_point=1, num_runs=10, num_elements_step=100, verbose=True):
   suite = [
       utils.LinearRegressionBenchmarkConfig(
           run_single_pipeline, starting_point, num_elements_step, num_runs)
   ]
-  utils.run_benchmarks(suite, verbose=verbose)
+  return utils.run_benchmarks(suite, verbose=verbose)
 
 
 if __name__ == '__main__':

Reply via email to