This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e26fe06 [BEAM-11791] Continuously running microbenchmark for
FnApiRunner
new ee4cf67 Merge pull request #13906 from pabloem/fnrunner-ubench
e26fe06 is described below
commit e26fe067994edda05b9d097361f39720bd333865
Author: Pablo Estrada <[email protected]>
AuthorDate: Fri Feb 5 12:02:16 2021 -0800
[BEAM-11791] Continuously running microbenchmark for FnApiRunner
---
.../job_LoadTests_FnApiRunner_Python.groovy | 64 ++++++++
.../Python_FnApiRunner_ubenchmarks.json | 174 +++++++++++++++++++++
.../apache_beam/testing/load_tests/load_test.py | 3 +-
.../testing/load_tests/load_test_metrics_utils.py | 24 ++-
.../testing/load_tests/microbenchmarks_test.py | 85 ++++++++++
.../tools/fn_api_runner_microbenchmark.py | 7 +-
6 files changed, 345 insertions(+), 12 deletions(-)
diff --git a/.test-infra/jenkins/job_LoadTests_FnApiRunner_Python.groovy
b/.test-infra/jenkins/job_LoadTests_FnApiRunner_Python.groovy
new file mode 100644
index 0000000..69c42f8
--- /dev/null
+++ b/.test-infra/jenkins/job_LoadTests_FnApiRunner_Python.groovy
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import LoadTestsBuilder as loadTestsBuilder
+import PhraseTriggeringPostCommitBuilder
+
+def now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
+
+def loadTestConfigurations = { datasetName ->
+ [
+ [
+ title : 'FnApiRunner Python load test - microbenchmark',
+ test : 'apache_beam.testing.load_tests.microbenchmarks_test',
+ runner : CommonTestProperties.Runner.DIRECT,
+ pipelineOptions: [
+ publish_to_big_query: true,
+ influx_measurement : 'python_microbenchmarks',
+ project : 'apache-beam-testing',
+ metrics_dataset : datasetName,
+ metrics_table : 'python_direct_microbenchmarks',
+ input_options : '\'{}\'',
+ ]
+ ],
+ ]
+}
+
+PhraseTriggeringPostCommitBuilder.postCommitJob(
+ 'beam_Python_LoadTests_FnApiRunner_Microbenchmark',
+ 'Run Python Load Tests FnApiRunner Microbenchmark',
+ 'Python Load Tests FnApiRunner Microbenchmark',
+ this
+ ) {
+ def datasetName = loadTestsBuilder.getBigQueryDataset('load_test',
CommonTestProperties.TriggeringContext.PR)
+ loadTestsBuilder.loadTests(delegate, CommonTestProperties.SDK.PYTHON,
+ loadTestConfigurations(datasetName), "MicroBenchmarks", "batch")
+ }
+
+
+// Run this job every 6 hours on a random minute.
+CronJobBuilder.cronJob('beam_Python_LoadTests_FnApiRunner_Microbenchmark', 'H
*/6 * * *', this) {
+ additionalPipelineArgs = [
+ influx_db_name: InfluxDBCredentialsHelper.InfluxDBDatabaseName,
+ influx_hostname: InfluxDBCredentialsHelper.InfluxDBHostUrl,
+ ]
+ def datasetName = loadTestsBuilder.getBigQueryDataset('load_test',
CommonTestProperties.TriggeringContext.POST_COMMIT)
+ loadTestsBuilder.loadTests(delegate, CommonTestProperties.SDK.PYTHON,
+ loadTestConfigurations(datasetName), "MicroBenchmarks", "batch")
+}
+
diff --git
a/.test-infra/metrics/grafana/dashboards/perftests_metrics/Python_FnApiRunner_ubenchmarks.json
b/.test-infra/metrics/grafana/dashboards/perftests_metrics/Python_FnApiRunner_ubenchmarks.json
new file mode 100644
index 0000000..3b2f2eb
--- /dev/null
+++
b/.test-infra/metrics/grafana/dashboards/perftests_metrics/Python_FnApiRunner_ubenchmarks.json
@@ -0,0 +1,174 @@
+{
+ "annotations": {
+ "list": [
+ {
+ "builtIn": 1,
+ "datasource": "-- Grafana --",
+ "enable": true,
+ "hide": true,
+ "iconColor": "rgba(0, 211, 255, 1)",
+ "name": "Annotations & Alerts",
+ "type": "dashboard"
+ }
+ ]
+ },
+ "editable": true,
+ "gnetId": null,
+ "graphTooltip": 0,
+ "id": 16,
+ "links": [],
+ "panels": [
+ {
+ "aliasColors": {},
+ "bars": false,
+ "dashLength": 10,
+ "dashes": false,
+ "datasource": "BeamInfluxDB",
+ "fill": 1,
+ "fillGradient": 0,
+ "gridPos": {
+ "h": 9,
+ "w": 12,
+ "x": 0,
+ "y": 0
+ },
+ "hiddenSeries": false,
+ "id": 2,
+ "interval": "6h",
+ "legend": {
+ "avg": false,
+ "current": false,
+ "max": false,
+ "min": false,
+ "show": true,
+ "total": false,
+ "values": false
+ },
+ "lines": true,
+ "linewidth": 2,
+ "nullPointMode": "connected",
+ "options": {
+ "dataLinks": []
+ },
+ "percentage": false,
+ "pointradius": 2,
+ "points": true,
+ "renderer": "flot",
+ "seriesOverrides": [],
+ "spaceLength": 10,
+ "stack": false,
+ "steppedLine": false,
+ "targets": [
+ {
+ "alias": "runtime",
+ "groupBy": [
+ {
+ "params": [
+ "$__interval"
+ ],
+ "type": "time"
+ },
+ {
+ "params": [
+ "null"
+ ],
+ "type": "fill"
+ }
+ ],
+ "orderByTime": "ASC",
+ "policy": "default",
+ "query": "SELECT mean(\"value\") FROM
\"python_direct_microbenchmarks\" WHERE metric =
'fn_api_runner_microbenchmark_runtime_sec' AND $timeFilter GROUP BY
time($__interval), \"metric\"",
+ "rawQuery": true,
+ "refId": "A",
+ "resultFormat": "time_series",
+ "select": [
+ [
+ {
+ "params": [
+ "value"
+ ],
+ "type": "field"
+ },
+ {
+ "params": [],
+ "type": "mean"
+ }
+ ]
+ ],
+ "tags": []
+ }
+ ],
+ "thresholds": [],
+ "timeFrom": null,
+ "timeRegions": [],
+ "timeShift": null,
+ "title": "Python SDK Microbenchmarks - FnApiRunner microbenchmark",
+ "tooltip": {
+ "shared": true,
+ "sort": 0,
+ "value_type": "individual"
+ },
+ "transparent": true,
+ "type": "graph",
+ "xaxis": {
+ "buckets": null,
+ "mode": "time",
+ "name": null,
+ "show": true,
+ "values": []
+ },
+ "yaxes": [
+ {
+ "format": "s",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ },
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ }
+ ],
+ "yaxis": {
+ "align": false,
+ "alignLevel": null
+ }
+ ],
+ "schemaVersion": 22,
+ "style": "dark",
+ "tags": ["performance tests"],
+ "templating": {
+ "list": []
+ },
+ "time": {
+ "from": "now-30d",
+ "to": "now"
+ },
+ "timepicker": {
+ "refresh_intervals": [
+ "5s",
+ "10s",
+ "30s",
+ "1m",
+ "5m",
+ "15m",
+ "30m",
+ "1h",
+ "2h",
+ "1d"
+ ]
+ },
+ "timezone": "",
+ "title": "FnApiRunner Micro-benchmarks",
+ "uid": "1cnwVDkGk",
+ "variables": {
+ "list": []
+ },
+ "version": 2
+}
diff --git a/sdks/python/apache_beam/testing/load_tests/load_test.py
b/sdks/python/apache_beam/testing/load_tests/load_test.py
index a15f819..6fb48cd 100644
--- a/sdks/python/apache_beam/testing/load_tests/load_test.py
+++ b/sdks/python/apache_beam/testing/load_tests/load_test.py
@@ -100,6 +100,7 @@ class LoadTest(object):
options = self.pipeline.get_pipeline_options().view_as(LoadTestOptions)
self.timeout_ms = options.timeout_ms
self.input_options = options.input_options
+ self.extra_metrics = {}
if metrics_namespace:
self.metrics_namespace = metrics_namespace
@@ -150,7 +151,7 @@ class LoadTest(object):
self.result = self.pipeline.run()
# Defaults to waiting forever, unless timeout_ms has been set
self.result.wait_until_finish(duration=self.timeout_ms)
- self._metrics_monitor.publish_metrics(self.result)
+ self._metrics_monitor.publish_metrics(self.result, self.extra_metrics)
finally:
self.cleanup()
diff --git
a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
index d816827..9475578 100644
--- a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
+++ b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
@@ -218,7 +218,8 @@ class MetricsReader(object):
'InfluxDB')
self.filters = filters
- def publish_metrics(self, result):
+ def publish_metrics(self, result, extra_metrics: dict):
+ metric_id = uuid.uuid4().hex
metrics = result.metrics().query(self.filters)
# Metrics from pipeline result are stored in map with keys: 'gauges',
@@ -226,11 +227,20 @@ class MetricsReader(object):
# Under each key there is list of objects of each metric type. It is
# required to prepare metrics for publishing purposes. Expected is to have
# a list of dictionaries matching the schema.
- insert_dicts = self._prepare_all_metrics(metrics)
+ insert_dicts = self._prepare_all_metrics(metrics, metric_id)
+
+ insert_dicts += self._prepare_extra_metrics(extra_metrics, metric_id)
if len(insert_dicts) > 0:
for publisher in self.publishers:
publisher.publish(insert_dicts)
+ def _prepare_extra_metrics(self, extra_metrics: dict, metric_id: str):
+ ts = time.time()
+ return [
+ Metric(ts, metric_id, v, label=k).as_dict() for k,
+ v in extra_metrics.items()
+ ]
+
def publish_values(self, labeled_values):
"""The method to publish simple labeled values.
@@ -246,8 +256,7 @@ class MetricsReader(object):
for publisher in self.publishers:
publisher.publish(metric_dicts)
- def _prepare_all_metrics(self, metrics):
- metric_id = uuid.uuid4().hex
+ def _prepare_all_metrics(self, metrics, metric_id):
insert_rows = self._get_counters(metrics['counters'], metric_id)
insert_rows += self._get_distributions(metrics['distributions'], metric_id)
@@ -398,11 +407,10 @@ class BigQueryMetricsPublisher(object):
outputs = self.bq.save(results)
if len(outputs) > 0:
for output in outputs:
- errors = output['errors']
- for err in errors:
- _LOGGER.error(err['message'])
+ if output['errors']:
+ _LOGGER.error(output)
raise ValueError(
- 'Unable save rows in BigQuery: {}'.format(err['message']))
+ 'Unable save rows in BigQuery: {}'.format(output['errors']))
class BigQueryClient(object):
diff --git a/sdks/python/apache_beam/testing/load_tests/microbenchmarks_test.py
b/sdks/python/apache_beam/testing/load_tests/microbenchmarks_test.py
new file mode 100644
index 0000000..78874a1
--- /dev/null
+++ b/sdks/python/apache_beam/testing/load_tests/microbenchmarks_test.py
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+This is a load test that runs a set of basic microbenchmarks for the Python SDK
+and the DirectRunner.
+
+This test does not need any additional options passed to run, besides the
+dataset information.
+
+Example test run:
+
+python -m apache_beam.testing.load_tests.microbenchmarks_test \
+ --test-pipeline-options="
+ --project=big-query-project
+ --input_options='{}'
+ --region=...
+ --publish_to_big_query=true
+ --metrics_dataset=python_load_tests
+ --metrics_table=microbenchmarks"
+
+or:
+
+./gradlew -PloadTest.args="
+ --publish_to_big_query=true
+ --project=...
+ --region=...
+ --input_options='{}'
+ --metrics_dataset=python_load_tests
+ --metrics_table=microbenchmarks
+ --runner=DirectRunner" \
+-PloadTest.mainClass=apache_beam.testing.load_tests.microbenchmarks_test \
+-Prunner=DirectRunner :sdks:python:apache_beam:testing:load_tests:run
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import logging
+import time
+
+from apache_beam.testing.load_tests.load_test import LoadTest
+from apache_beam.tools import fn_api_runner_microbenchmark
+from apache_beam.transforms.util import _BatchSizeEstimator
+
+
+class MicroBenchmarksLoadTest(LoadTest):
+ def __init__(self):
+ super(MicroBenchmarksLoadTest, self).__init__()
+
+ def test(self):
+ self.extra_metrics.update(self._run_fn_api_runner_microbenchmark())
+
+ def _run_fn_api_runner_microbenchmark(self):
+ start = time.perf_counter()
+ result = fn_api_runner_microbenchmark.run_benchmark(verbose=False)
+ sizes = list(result[0].values())[0]
+ costs = list(result[1].values())[0]
+ a, b = _BatchSizeEstimator.linear_regression_no_numpy(sizes, costs)
+
+ return {
+ 'fn_api_runner_microbenchmark_runtime_sec': time.perf_counter() -
start,
+ 'fn_api_runner_microbenchmark_fixed_cost_ms': a * 1000,
+ 'fn_api_runner_microbenchmark_per_element_cost_ms': b * 1000,
+ }
+
+
+if __name__ == '__main__':
+ logging.basicConfig(level=logging.INFO)
+ MicroBenchmarksLoadTest().run()
diff --git a/sdks/python/apache_beam/tools/fn_api_runner_microbenchmark.py
b/sdks/python/apache_beam/tools/fn_api_runner_microbenchmark.py
index 0b9eb6a..b0fc46c 100644
--- a/sdks/python/apache_beam/tools/fn_api_runner_microbenchmark.py
+++ b/sdks/python/apache_beam/tools/fn_api_runner_microbenchmark.py
@@ -77,7 +77,7 @@ from apache_beam.transforms.userstate import on_timer
NUM_PARALLEL_STAGES = 7
-NUM_SERIAL_STAGES = 5
+NUM_SERIAL_STAGES = 7
class BagInStateOutputAfterTimer(beam.DoFn):
@@ -128,12 +128,13 @@ def run_single_pipeline(size):
return _pipeline_runner
-def run_benchmark(starting_point, num_runs, num_elements_step, verbose):
+def run_benchmark(
+ starting_point=1, num_runs=10, num_elements_step=100, verbose=True):
suite = [
utils.LinearRegressionBenchmarkConfig(
run_single_pipeline, starting_point, num_elements_step, num_runs)
]
- utils.run_benchmarks(suite, verbose=verbose)
+ return utils.run_benchmarks(suite, verbose=verbose)
if __name__ == '__main__':