This is an automated email from the ASF dual-hosted git repository.
anandinguva pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5446776d00f Support for custom MetricsFetcher in Perf tooling.
(#28671)
5446776d00f is described below
commit 5446776d00f6d3970ff859c38e26343274413c57
Author: Anand Inguva <[email protected]>
AuthorDate: Thu Oct 5 11:50:02 2023 -0400
Support for custom MetricsFetcher in Perf tooling. (#28671)
* Support for custom BigQueryMetricsFetcher
* Read GITHUB repo and owner name from environment variables
* Add test_name, test_id
* Move client to the fetch method
* Update skip condition
* Run on self hosted runner
* Update readme
* Update README
* Pass test_name to the metrics_fetcher
* Fix linting issues
* Fix lint
* Fix formatting and lint issues
* fix lint
---
.github/workflows/run_perf_alert_tool.yml | 2 +-
.../python/apache_beam/testing/analyzers/README.md | 54 ++++++++---
.../testing/analyzers/github_issues_utils.py | 34 ++++---
.../apache_beam/testing/analyzers/perf_analysis.py | 55 ++++++-----
.../testing/analyzers/perf_analysis_test.py | 41 ++++----
.../testing/analyzers/perf_analysis_utils.py | 104 +++++++++++++--------
.../testing/analyzers/tests_config.yaml | 2 +-
.../testing/load_tests/load_test_metrics_utils.py | 11 ---
8 files changed, 183 insertions(+), 120 deletions(-)
diff --git a/.github/workflows/run_perf_alert_tool.yml
b/.github/workflows/run_perf_alert_tool.yml
index 6946011f061..1bd8d525c2f 100644
--- a/.github/workflows/run_perf_alert_tool.yml
+++ b/.github/workflows/run_perf_alert_tool.yml
@@ -30,7 +30,7 @@ on:
jobs:
python_run_change_point_analysis:
name: Run Change Point Analysis.
- runs-on: ubuntu-latest
+ runs-on: [self-hosted, ubuntu-20.04, main]
permissions:
issues: write
steps:
diff --git a/sdks/python/apache_beam/testing/analyzers/README.md
b/sdks/python/apache_beam/testing/analyzers/README.md
index 076f173f9d7..91b21076f88 100644
--- a/sdks/python/apache_beam/testing/analyzers/README.md
+++ b/sdks/python/apache_beam/testing/analyzers/README.md
@@ -35,16 +35,13 @@ update already created GitHub issue or ignore performance
alert by not creating
## Config file structure
-The config file defines the structure to run change point analysis on a given
test. To add a test to the config file,
+The yaml defines the structure to run change point analysis on a given test.
To add a test config to the yaml file,
please follow the below structure.
-**NOTE**: The Change point analysis only supports reading the metric data from
Big Query for now.
+**NOTE**: The Change point analysis only supports reading the metric data from
`BigQuery` only.
```
-# the test_1 must be a unique id.
-test_1:
- test_description: Pytorch image classification on 50k images of size 224 x
224 with resnet 152
- test_target:
apache_beam.testing.benchmarks.inference.pytorch_image_classification_benchmarks
+test_1: # a unique id for each test config.
metrics_dataset: beam_run_inference
metrics_table: torch_inference_imagenet_results_resnet152
project: apache-beam-testing
@@ -55,11 +52,17 @@ test_1:
num_runs_in_change_point_window: 30 # optional parameter
```
-**NOTE**: `test_target` is optional. It is used for identifying the test that
was causing the regression.
+#### Optional Parameters:
-**Note**: By default, the tool fetches metrics from BigQuery tables.
`metrics_dataset`, `metrics_table`, `project` and `metric_name` should match
with the values defined for performance/load tests.
-The above example uses this [test
configuration](https://github.com/apache/beam/blob/0a91d139dea4276dc46176c4cdcdfce210fc50c4/.test-infra/jenkins/job_InferenceBenchmarkTests_Python.groovy#L30)
-to fill up the values required to fetch the data from source.
+These are the optional parameters that can be added to the test config in
addition to the parameters mentioned above.
+
+- `test_target`: Identifies the test responsible for the regression.
+
+- `test_description`: Provides a brief overview of the test's function.
+
+- `test_name`: Denotes the name of the test as stored in the BigQuery table.
+
+**Note**: The tool, by default, pulls metrics from BigQuery tables. Ensure
that the values for `metrics_dataset`, `metrics_table`, `project`, and
`metric_name` align with those defined for performance/load tests. The provided
example utilizes this [test
configuration](https://github.com/apache/beam/blob/0a91d139dea4276dc46176c4cdcdfce210fc50c4/.test-infra/jenkins/job_InferenceBenchmarkTests_Python.groovy#L30)
to populate the necessary values for data retrieval.
### Different ways to avoid false positive change points
@@ -76,8 +79,35 @@ setting `num_runs_in_change_point_window=7` will achieve it.
## Register a test for performance alerts
-If a new test needs to be registered for the performance alerting tool, please
add the required test parameters to the
-config file.
+If a new test needs to be registered for the performance alerting tool,
+
+- You can either add it to the config file that is already present.
+- You can define your own yaml file and call the
[perf_analysis.run()](https://github.com/apache/beam/blob/a46bc12a256dcaa3ae2cc9e5d6fdcaa82b59738b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py#L152)
method.
+
+
+## Integrating the Perf Alert Tool with a Custom BigQuery Schema
+
+By default, the Perf Alert Tool retrieves metrics from the
`apache-beam-testing` BigQuery projects. All performance and load tests within
Beam utilize a standard
[schema](https://github.com/apache/beam/blob/a7e12db9b5977c4a7b13554605c0300389a3d6da/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py#L70)
for metrics publication. The tool inherently recognizes and operates with this
schema when extracting metrics from BigQuery tables.
+
+To fetch the data from a BigQuery dataset that is not a default setting of the
Apache Beam's setting, One can inherit the `MetricsFetcher` class and implement
the abstract method `fetch_metric_data`. This method should return a tuple of
desired metric values and timestamps of the metric values of when it was
published.
+
+```
+from apache_beam.testing.analyzers import perf_analysis
+config_file_path = <path_to_config_file>
+my_metric_fetcher = MyMetricsFetcher() # inherited from MetricsFetcher
+perf_analysis.run(config_file_path, my_metrics_fetcher)
+```
+
+``Note``: The metrics and timestamps should be sorted based on the timestamps
values in ascending order.
+
+### Configuring GitHub Parameters
+
+Out of the box, the performance alert tool targets the `apache/beam`
repository when raising issues. If you wish to utilize this tool for another
repository, you'll need to pre-set a couple of environment variables:
+
+- `REPO_OWNER`: Represents the owner of the repository. (e.g., `apache`)
+- `REPO_NAME`: Specifies the repository name itself. (e.g., `beam`)
+
+Before initiating the tool, also ensure that the `GITHUB_TOKEN` is set to an
authenticated GitHub token. This permits the tool to generate GitHub issues
whenever performance alerts arise.
## Triage performance alert issues
diff --git a/sdks/python/apache_beam/testing/analyzers/github_issues_utils.py
b/sdks/python/apache_beam/testing/analyzers/github_issues_utils.py
index e1f20baa50a..82758be8f18 100644
--- a/sdks/python/apache_beam/testing/analyzers/github_issues_utils.py
+++ b/sdks/python/apache_beam/testing/analyzers/github_issues_utils.py
@@ -34,8 +34,8 @@ except KeyError as e:
'A Github Personal Access token is required '
'to create Github Issues.')
-_BEAM_GITHUB_REPO_OWNER = 'apache'
-_BEAM_GITHUB_REPO_NAME = 'beam'
+_GITHUB_REPO_OWNER = os.environ.get('REPO_OWNER', 'apache')
+_GITHUB_REPO_NAME = os.environ.get('REPO_NAME', 'beam')
# Adding GitHub Rest API version to the header to maintain version stability.
# For more information, please look at
#
https://github.blog/2022-11-28-to-infinity-and-beyond-enabling-the-future-of-githubs-rest-api-with-api-versioning/
# pylint: disable=line-too-long
@@ -77,10 +77,10 @@ def create_issue(
Tuple containing GitHub issue number and issue URL.
"""
url = "https://api.github.com/repos/{}/{}/issues".format(
- _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME)
+ _GITHUB_REPO_OWNER, _GITHUB_REPO_NAME)
data = {
- 'owner': _BEAM_GITHUB_REPO_OWNER,
- 'repo': _BEAM_GITHUB_REPO_NAME,
+ 'owner': _GITHUB_REPO_OWNER,
+ 'repo': _GITHUB_REPO_NAME,
'title': title,
'body': description,
'labels': [_AWAITING_TRIAGE_LABEL, _PERF_ALERT_LABEL]
@@ -108,20 +108,20 @@ def comment_on_issue(issue_number: int,
issue, and the comment URL.
"""
url = 'https://api.github.com/repos/{}/{}/issues/{}'.format(
- _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME, issue_number)
+ _GITHUB_REPO_OWNER, _GITHUB_REPO_NAME, issue_number)
open_issue_response = requests.get(
url,
json.dumps({
- 'owner': _BEAM_GITHUB_REPO_OWNER,
- 'repo': _BEAM_GITHUB_REPO_NAME,
+ 'owner': _GITHUB_REPO_OWNER,
+ 'repo': _GITHUB_REPO_NAME,
'issue_number': issue_number
},
default=str),
headers=_HEADERS).json()
if open_issue_response['state'] == 'open':
data = {
- 'owner': _BEAM_GITHUB_REPO_OWNER,
- 'repo': _BEAM_GITHUB_REPO_NAME,
+ 'owner': _GITHUB_REPO_OWNER,
+ 'repo': _GITHUB_REPO_NAME,
'body': comment_description,
issue_number: issue_number,
}
@@ -134,13 +134,14 @@ def comment_on_issue(issue_number: int,
def add_awaiting_triage_label(issue_number: int):
url = 'https://api.github.com/repos/{}/{}/issues/{}/labels'.format(
- _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME, issue_number)
+ _GITHUB_REPO_OWNER, _GITHUB_REPO_NAME, issue_number)
requests.post(
url, json.dumps({'labels': [_AWAITING_TRIAGE_LABEL]}), headers=_HEADERS)
def get_issue_description(
- test_name: str,
+ test_id: str,
+ test_name: Optional[str],
metric_name: str,
timestamps: List[pd.Timestamp],
metric_values: List,
@@ -167,10 +168,13 @@ def get_issue_description(
description = []
- description.append(_ISSUE_DESCRIPTION_TEMPLATE.format(test_name,
metric_name))
+ description.append(_ISSUE_DESCRIPTION_TEMPLATE.format(test_id, metric_name))
- description.append(("`Test description:` " +
- f'{test_description}') if test_description else '')
+ if test_name:
+ description.append(("`test_name:` " + f'{test_name}'))
+
+ if test_description:
+ description.append(("`Test description:` " + f'{test_description}'))
description.append('```')
diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis.py
b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py
index 7f1ffbb944e..c86ecb2c4e2 100644
--- a/sdks/python/apache_beam/testing/analyzers/perf_analysis.py
+++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py
@@ -23,7 +23,6 @@
import argparse
import logging
import os
-import uuid
from datetime import datetime
from datetime import timezone
from typing import Any
@@ -33,9 +32,10 @@ from typing import Optional
import pandas as pd
from apache_beam.testing.analyzers import constants
+from apache_beam.testing.analyzers.perf_analysis_utils import
BigQueryMetricsFetcher
from apache_beam.testing.analyzers.perf_analysis_utils import
GitHubIssueMetaData
+from apache_beam.testing.analyzers.perf_analysis_utils import MetricsFetcher
from apache_beam.testing.analyzers.perf_analysis_utils import
create_performance_alert
-from apache_beam.testing.analyzers.perf_analysis_utils import fetch_metric_data
from apache_beam.testing.analyzers.perf_analysis_utils import
find_latest_change_point_index
from apache_beam.testing.analyzers.perf_analysis_utils import
get_existing_issues_data
from apache_beam.testing.analyzers.perf_analysis_utils import
is_change_point_in_valid_window
@@ -43,10 +43,10 @@ from apache_beam.testing.analyzers.perf_analysis_utils
import is_perf_alert
from apache_beam.testing.analyzers.perf_analysis_utils import
publish_issue_metadata_to_big_query
from apache_beam.testing.analyzers.perf_analysis_utils import read_test_config
from apache_beam.testing.analyzers.perf_analysis_utils import validate_config
-from apache_beam.testing.load_tests.load_test_metrics_utils import
BigQueryMetricsFetcher
-def run_change_point_analysis(params, test_name, big_query_metrics_fetcher):
+def run_change_point_analysis(
+ params, test_id, big_query_metrics_fetcher: MetricsFetcher):
"""
Args:
params: Dict containing parameters to run change point analysis.
@@ -56,14 +56,21 @@ def run_change_point_analysis(params, test_name,
big_query_metrics_fetcher):
Returns:
bool indicating if a change point is observed and alerted on GitHub.
"""
- logging.info("Running change point analysis for test %s" % test_name)
+ logging.info("Running change point analysis for test ID %s" % test_id)
if not validate_config(params.keys()):
raise ValueError(
f"Please make sure all these keys {constants._PERF_TEST_KEYS} "
- f"are specified for the {test_name}")
+ f"are specified for the {test_id}")
metric_name = params['metric_name']
+ # test_name will be used to query a single test from
+ # multiple tests in a single BQ table. Right now, the default
+ # assumption is that all the test have an individual BQ table
+ # but this might not be case for other tests(such as IO tests where
+ # a single BQ tables stores all the data)
+ test_name = params.get('test_name', None)
+
min_runs_between_change_points = (
constants._DEFAULT_MIN_RUNS_BETWEEN_CHANGE_POINTS)
if 'min_runs_between_change_points' in params:
@@ -74,15 +81,18 @@ def run_change_point_analysis(params, test_name,
big_query_metrics_fetcher):
if 'num_runs_in_change_point_window' in params:
num_runs_in_change_point_window = params['num_runs_in_change_point_window']
- metric_values, timestamps = fetch_metric_data(
- params=params,
- big_query_metrics_fetcher=big_query_metrics_fetcher
+ metric_values, timestamps = big_query_metrics_fetcher.fetch_metric_data(
+ project=params['project'],
+ metrics_dataset=params['metrics_dataset'],
+ metrics_table=params['metrics_table'],
+ metric_name=params['metric_name'],
+ test_name=test_name
)
change_point_index = find_latest_change_point_index(
metric_values=metric_values)
if not change_point_index:
- logging.info("Change point is not detected for the test %s" % test_name)
+ logging.info("Change point is not detected for the test ID %s" % test_id)
return False
# since timestamps are ordered in ascending order and
# num_runs_in_change_point_window refers to the latest runs,
@@ -92,11 +102,11 @@ def run_change_point_analysis(params, test_name,
big_query_metrics_fetcher):
if not is_change_point_in_valid_window(num_runs_in_change_point_window,
latest_change_point_run):
logging.info(
- 'Performance regression/improvement found for the test: %s. '
+ 'Performance regression/improvement found for the test ID: %s. '
'on metric %s. Since the change point run %s '
'lies outside the num_runs_in_change_point_window distance: %s, '
'alert is not raised.' % (
- test_name,
+ test_id,
metric_name,
latest_change_point_run + 1,
num_runs_in_change_point_window))
@@ -106,8 +116,7 @@ def run_change_point_analysis(params, test_name,
big_query_metrics_fetcher):
last_reported_issue_number = None
issue_metadata_table_name = f'{params.get("metrics_table")}_{metric_name}'
existing_issue_data = get_existing_issues_data(
- table_name=issue_metadata_table_name,
- big_query_metrics_fetcher=big_query_metrics_fetcher)
+ table_name=issue_metadata_table_name)
if existing_issue_data is not None:
existing_issue_timestamps = existing_issue_data[
@@ -124,20 +133,21 @@ def run_change_point_analysis(params, test_name,
big_query_metrics_fetcher):
min_runs_between_change_points=min_runs_between_change_points)
if is_alert:
issue_number, issue_url = create_performance_alert(
- metric_name, test_name, timestamps,
+ metric_name, test_id, timestamps,
metric_values, change_point_index,
params.get('labels', None),
last_reported_issue_number,
test_description = params.get('test_description', None),
+ test_name = test_name
)
issue_metadata = GitHubIssueMetaData(
issue_timestamp=pd.Timestamp(
datetime.now().replace(tzinfo=timezone.utc)),
# BQ doesn't allow '.' in table name
- test_name=test_name.replace('.', '_'),
+ test_id=test_id.replace('.', '_'),
+ test_name=test_name,
metric_name=metric_name,
- test_id=uuid.uuid4().hex,
change_point=metric_values[change_point_index],
issue_number=issue_number,
issue_url=issue_url,
@@ -149,7 +159,10 @@ def run_change_point_analysis(params, test_name,
big_query_metrics_fetcher):
return is_alert
-def run(config_file_path: Optional[str] = None) -> None:
+def run(
+ big_query_metrics_fetcher: MetricsFetcher = BigQueryMetricsFetcher(),
+ config_file_path: Optional[str] = None,
+) -> None:
"""
run is the entry point to run change point analysis on test metric
data, which is read from config file, and if there is a performance
@@ -169,12 +182,10 @@ def run(config_file_path: Optional[str] = None) -> None:
tests_config: Dict[str, Dict[str, Any]] = read_test_config(config_file_path)
- big_query_metrics_fetcher = BigQueryMetricsFetcher()
-
- for test_name, params in tests_config.items():
+ for test_id, params in tests_config.items():
run_change_point_analysis(
params=params,
- test_name=test_name,
+ test_id=test_id,
big_query_metrics_fetcher=big_query_metrics_fetcher)
diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py
b/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py
index 094cd9c47ec..9c7921300d9 100644
--- a/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py
+++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py
@@ -32,6 +32,7 @@ try:
from apache_beam.io.filesystems import FileSystems
from apache_beam.testing.analyzers import constants
from apache_beam.testing.analyzers import github_issues_utils
+ from apache_beam.testing.analyzers.perf_analysis_utils import
BigQueryMetricsFetcher
from apache_beam.testing.analyzers.perf_analysis_utils import
is_change_point_in_valid_window
from apache_beam.testing.analyzers.perf_analysis_utils import is_perf_alert
from apache_beam.testing.analyzers.perf_analysis_utils import e_divisive
@@ -41,18 +42,18 @@ try:
from apache_beam.testing.analyzers.perf_analysis_utils import validate_config
from apache_beam.testing.load_tests import load_test_metrics_utils
except ImportError as e:
- analysis = None # type: ignore
+ raise unittest.SkipTest('Missing dependencies to run perf analysis tests.')
# mock methods.
-def get_fake_data_with_no_change_point(**kwargs):
+def get_fake_data_with_no_change_point(*args, **kwargs):
num_samples = 20
metric_values = [1] * num_samples
timestamps = list(range(num_samples))
return metric_values, timestamps
-def get_fake_data_with_change_point(**kwargs):
+def get_fake_data_with_change_point(*args, **kwargs):
# change point will be at index 13.
num_samples = 20
metric_values = [0] * 12 + [3] + [4] * 7
@@ -69,10 +70,6 @@ def get_existing_issue_data(**kwargs):
}])
[email protected](
- analysis is None,
- 'Missing dependencies. '
- 'Test dependencies are missing for the Analyzer.')
class TestChangePointAnalysis(unittest.TestCase):
def setUp(self) -> None:
self.single_change_point_series = [0] * 10 + [1] * 10
@@ -151,18 +148,20 @@ class TestChangePointAnalysis(unittest.TestCase):
min_runs_between_change_points=min_runs_between_change_points)
self.assertFalse(is_alert)
- @mock.patch(
- 'apache_beam.testing.analyzers.perf_analysis.fetch_metric_data',
+ @mock.patch.object(
+ BigQueryMetricsFetcher,
+ 'fetch_metric_data',
get_fake_data_with_no_change_point)
def test_no_alerts_when_no_change_points(self):
is_alert = analysis.run_change_point_analysis(
params=self.params,
- test_name=self.test_id,
- big_query_metrics_fetcher=None)
+ test_id=self.test_id,
+ big_query_metrics_fetcher=BigQueryMetricsFetcher())
self.assertFalse(is_alert)
- @mock.patch(
- 'apache_beam.testing.analyzers.perf_analysis.fetch_metric_data',
+ @mock.patch.object(
+ BigQueryMetricsFetcher,
+ 'fetch_metric_data',
get_fake_data_with_change_point)
@mock.patch(
'apache_beam.testing.analyzers.perf_analysis.get_existing_issues_data',
@@ -178,12 +177,13 @@ class TestChangePointAnalysis(unittest.TestCase):
def test_alert_on_data_with_change_point(self, *args):
is_alert = analysis.run_change_point_analysis(
params=self.params,
- test_name=self.test_id,
- big_query_metrics_fetcher=None)
+ test_id=self.test_id,
+ big_query_metrics_fetcher=BigQueryMetricsFetcher())
self.assertTrue(is_alert)
- @mock.patch(
- 'apache_beam.testing.analyzers.perf_analysis.fetch_metric_data',
+ @mock.patch.object(
+ BigQueryMetricsFetcher,
+ 'fetch_metric_data',
get_fake_data_with_change_point)
@mock.patch(
'apache_beam.testing.analyzers.perf_analysis.get_existing_issues_data',
@@ -198,8 +198,8 @@ class TestChangePointAnalysis(unittest.TestCase):
def test_alert_on_data_with_reported_change_point(self, *args):
is_alert = analysis.run_change_point_analysis(
params=self.params,
- test_name=self.test_id,
- big_query_metrics_fetcher=None)
+ test_id=self.test_id,
+ big_query_metrics_fetcher=BigQueryMetricsFetcher())
self.assertFalse(is_alert)
def test_change_point_has_anomaly_marker_in_gh_description(self):
@@ -208,7 +208,8 @@ class TestChangePointAnalysis(unittest.TestCase):
change_point_index = find_latest_change_point_index(metric_values)
description = github_issues_utils.get_issue_description(
- test_name=self.test_id,
+ test_id=self.test_id,
+ test_name=self.params.get('test_name', None),
test_description=self.params['test_description'],
metric_name=self.params['metric_name'],
metric_values=metric_values,
diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py
b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py
index 0a559fc4bee..f9604c490fc 100644
--- a/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py
+++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py
@@ -14,11 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+import abc
import logging
from dataclasses import asdict
from dataclasses import dataclass
from statistics import median
-from typing import Any
from typing import Dict
from typing import List
from typing import Optional
@@ -28,11 +28,11 @@ from typing import Union
import pandas as pd
import yaml
from google.api_core import exceptions
+from google.cloud import bigquery
from apache_beam.testing.analyzers import constants
from apache_beam.testing.analyzers import github_issues_utils
from apache_beam.testing.load_tests import load_test_metrics_utils
-from apache_beam.testing.load_tests.load_test_metrics_utils import
BigQueryMetricsFetcher
from apache_beam.testing.load_tests.load_test_metrics_utils import
BigQueryMetricsPublisher
from signal_processing_algorithms.energy_statistics.energy_statistics import
e_divisive
@@ -59,9 +59,7 @@ def is_change_point_in_valid_window(
return num_runs_in_change_point_window > latest_change_point_run
-def get_existing_issues_data(
- table_name: str, big_query_metrics_fetcher: BigQueryMetricsFetcher
-) -> Optional[pd.DataFrame]:
+def get_existing_issues_data(table_name: str) -> Optional[pd.DataFrame]:
"""
Finds the most recent GitHub issue created for the test_name.
If no table found with name=test_name, return (None, None)
@@ -73,12 +71,14 @@ def get_existing_issues_data(
LIMIT 10
"""
try:
- df = big_query_metrics_fetcher.fetch(query=query)
+ client = bigquery.Client()
+ query_job = client.query(query=query)
+ existing_issue_data = query_job.result().to_dataframe()
except exceptions.NotFound:
# If no table found, that means this is first performance regression
# on the current test+metric.
return None
- return df
+ return existing_issue_data
def is_perf_alert(
@@ -123,33 +123,6 @@ def validate_config(keys):
return constants._PERF_TEST_KEYS.issubset(keys)
-def fetch_metric_data(
- params: Dict[str, Any], big_query_metrics_fetcher: BigQueryMetricsFetcher
-) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]:
- """
- Args:
- params: Dict containing keys required to fetch data from a data source.
- big_query_metrics_fetcher: A BigQuery metrics fetcher for fetch metrics.
- Returns:
- Tuple[List[Union[int, float]], List[pd.Timestamp]]: Tuple containing list
- of metric_values and list of timestamps. Both are sorted in ascending
- order wrt timestamps.
- """
- query = f"""
- SELECT *
- FROM
{params['project']}.{params['metrics_dataset']}.{params['metrics_table']}
- WHERE CONTAINS_SUBSTR(({load_test_metrics_utils.METRICS_TYPE_LABEL}),
'{params['metric_name']}')
- ORDER BY {load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL} DESC
- LIMIT {constants._NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS}
- """
- metric_data: pd.DataFrame = big_query_metrics_fetcher.fetch(query=query)
- metric_data.sort_values(
- by=[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL], inplace=True)
- return (
- metric_data[load_test_metrics_utils.VALUE_LABEL].tolist(),
- metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL].tolist())
-
-
def find_change_points(metric_values: List[Union[float, int]]):
return e_divisive(metric_values)
@@ -175,7 +148,7 @@ def find_latest_change_point_index(metric_values:
List[Union[float, int]]):
def publish_issue_metadata_to_big_query(issue_metadata, table_name):
"""
- Published issue_metadata to BigQuery with table name=test_name.
+ Published issue_metadata to BigQuery with table name.
"""
bq_metrics_publisher = BigQueryMetricsPublisher(
project_name=constants._BQ_PROJECT_NAME,
@@ -190,18 +163,21 @@ def publish_issue_metadata_to_big_query(issue_metadata,
table_name):
def create_performance_alert(
metric_name: str,
- test_name: str,
+ test_id: str,
timestamps: List[pd.Timestamp],
metric_values: List[Union[int, float]],
change_point_index: int,
labels: List[str],
existing_issue_number: Optional[int],
- test_description: Optional[str] = None) -> Tuple[int, str]:
+ test_description: Optional[str] = None,
+ test_name: Optional[str] = None,
+) -> Tuple[int, str]:
"""
Creates performance alert on GitHub issues and returns GitHub issue
number and issue URL.
"""
description = github_issues_utils.get_issue_description(
+ test_id=test_id,
test_name=test_name,
test_description=test_description,
metric_name=metric_name,
@@ -213,7 +189,7 @@ def create_performance_alert(
issue_number, issue_url = github_issues_utils.report_change_point_on_issues(
title=github_issues_utils._ISSUE_TITLE_TEMPLATE.format(
- test_name, metric_name
+ test_id, metric_name
),
description=description,
labels=labels,
@@ -253,3 +229,55 @@ def filter_change_points_by_median_threshold(
if relative_change > threshold:
valid_change_points.append(idx)
return valid_change_points
+
+
+class MetricsFetcher(metaclass=abc.ABCMeta):
+ @abc.abstractmethod
+ def fetch_metric_data(
+ self,
+ *,
+ project,
+ metrics_dataset,
+ metrics_table,
+ metric_name,
+ test_name=None) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]:
+ """
+ Define SQL query and fetch the timestamp values and metric values
+ from BigQuery tables.
+ """
+ raise NotImplementedError
+
+
+class BigQueryMetricsFetcher(MetricsFetcher):
+ def fetch_metric_data(
+ self,
+ *,
+ project,
+ metrics_dataset,
+ metrics_table,
+ metric_name,
+ test_name=None,
+ ) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]:
+ """
+ Args:
+ params: Dict containing keys required to fetch data from a data source.
+ Returns:
+ Tuple[List[Union[int, float]], List[pd.Timestamp]]: Tuple containing list
+ of metric_values and list of timestamps. Both are sorted in ascending
+ order wrt timestamps.
+ """
+ query = f"""
+ SELECT *
+ FROM {project}.{metrics_dataset}.{metrics_table}
+ WHERE
CONTAINS_SUBSTR(({load_test_metrics_utils.METRICS_TYPE_LABEL}), '{metric_name}')
+ ORDER BY {load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL} DESC
+ LIMIT {constants._NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS}
+ """
+ client = bigquery.Client()
+ query_job = client.query(query=query)
+ metric_data = query_job.result().to_dataframe()
+ metric_data.sort_values(
+ by=[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL], inplace=True)
+ return (
+ metric_data[load_test_metrics_utils.VALUE_LABEL].tolist(),
+ metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL].tolist())
diff --git a/sdks/python/apache_beam/testing/analyzers/tests_config.yaml
b/sdks/python/apache_beam/testing/analyzers/tests_config.yaml
index f808f5e41d7..ec9cfe6f1ac 100644
--- a/sdks/python/apache_beam/testing/analyzers/tests_config.yaml
+++ b/sdks/python/apache_beam/testing/analyzers/tests_config.yaml
@@ -16,7 +16,7 @@
#
# for the unique key to define a test, please use the following format:
-# {test_name}-{metric_name}
+# {test_id}-{metric_name}
pytorch_image_classification_benchmarks-resnet152-mean_inference_batch_latency_micro_secs:
test_description:
diff --git
a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
index 92a5f68351f..01db2c114ef 100644
--- a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
+++ b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
@@ -38,7 +38,6 @@ from typing import Mapping
from typing import Optional
from typing import Union
-import pandas as pd
import requests
from requests.auth import HTTPBasicAuth
@@ -650,13 +649,3 @@ class AssignTimestamps(beam.DoFn):
def process(self, element):
yield self.timestamp_val_fn(
element, self.timestamp_fn(micros=int(self.time_fn() * 1000000)))
-
-
-class BigQueryMetricsFetcher:
- def __init__(self):
- self.client = bigquery.Client()
-
- def fetch(self, query) -> pd.DataFrame:
- query_job = self.client.query(query=query)
- result = query_job.result()
- return result.to_dataframe()