This is an automated email from the ASF dual-hosted git repository.

anandinguva pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5446776d00f Support for custom MetricsFetcher in Perf tooling.  
(#28671)
5446776d00f is described below

commit 5446776d00f6d3970ff859c38e26343274413c57
Author: Anand Inguva <[email protected]>
AuthorDate: Thu Oct 5 11:50:02 2023 -0400

    Support for custom MetricsFetcher in Perf tooling.  (#28671)
    
    * Support for custom BigQueryMetricsFetcher
    
    * Read GITHUB repo and owner name from environment variables
    
    * Add test_name, test_id
    
    * Move client to the fetch method
    
    * Update skip condition
    
    * Run on self hosted runner
    
    * Update readme
    
    * Update README
    
    * Pass test_name to the metrics_fetcher
    
    * Fix linting issues
    
    * Fix lint
    
    * Fix formatting and lint issues
    
    * fix lint
---
 .github/workflows/run_perf_alert_tool.yml          |   2 +-
 .../python/apache_beam/testing/analyzers/README.md |  54 ++++++++---
 .../testing/analyzers/github_issues_utils.py       |  34 ++++---
 .../apache_beam/testing/analyzers/perf_analysis.py |  55 ++++++-----
 .../testing/analyzers/perf_analysis_test.py        |  41 ++++----
 .../testing/analyzers/perf_analysis_utils.py       | 104 +++++++++++++--------
 .../testing/analyzers/tests_config.yaml            |   2 +-
 .../testing/load_tests/load_test_metrics_utils.py  |  11 ---
 8 files changed, 183 insertions(+), 120 deletions(-)

diff --git a/.github/workflows/run_perf_alert_tool.yml 
b/.github/workflows/run_perf_alert_tool.yml
index 6946011f061..1bd8d525c2f 100644
--- a/.github/workflows/run_perf_alert_tool.yml
+++ b/.github/workflows/run_perf_alert_tool.yml
@@ -30,7 +30,7 @@ on:
 jobs:
   python_run_change_point_analysis:
     name: Run Change Point Analysis.
-    runs-on: ubuntu-latest
+    runs-on: [self-hosted, ubuntu-20.04, main]
     permissions:
       issues: write
     steps:
diff --git a/sdks/python/apache_beam/testing/analyzers/README.md 
b/sdks/python/apache_beam/testing/analyzers/README.md
index 076f173f9d7..91b21076f88 100644
--- a/sdks/python/apache_beam/testing/analyzers/README.md
+++ b/sdks/python/apache_beam/testing/analyzers/README.md
@@ -35,16 +35,13 @@ update already created GitHub issue or ignore performance 
alert by not creating
 
 ## Config file structure
 
-The config file defines the structure to run change point analysis on a given 
test. To add a test to the config file,
+The yaml defines the structure to run change point analysis on a given test. 
To add a test config to the yaml file,
 please follow the below structure.
 
-**NOTE**: The Change point analysis only supports reading the metric data from 
Big Query for now.
+**NOTE**: The Change point analysis only supports reading the metric data from 
`BigQuery` only.
 
 ```
-# the test_1 must be a unique id.
-test_1:
-  test_description: Pytorch image classification on 50k images of size 224 x 
224 with resnet 152
-  test_target: 
apache_beam.testing.benchmarks.inference.pytorch_image_classification_benchmarks
+test_1: # a unique id for each test config.
   metrics_dataset: beam_run_inference
   metrics_table: torch_inference_imagenet_results_resnet152
   project: apache-beam-testing
@@ -55,11 +52,17 @@ test_1:
   num_runs_in_change_point_window: 30 # optional parameter
 ```
 
-**NOTE**: `test_target` is optional. It is used for identifying the test that 
was causing the regression.
+#### Optional Parameters:
 
-**Note**: By default, the tool fetches metrics from BigQuery tables. 
`metrics_dataset`, `metrics_table`, `project` and `metric_name` should match 
with the values defined for performance/load tests.
-The above example uses this [test 
configuration](https://github.com/apache/beam/blob/0a91d139dea4276dc46176c4cdcdfce210fc50c4/.test-infra/jenkins/job_InferenceBenchmarkTests_Python.groovy#L30)
-to fill up the values required to fetch the data from source.
+These are the optional parameters that can be added to the test config in 
addition to the parameters mentioned above.
+
+- `test_target`: Identifies the test responsible for the regression.
+
+- `test_description`: Provides a brief overview of the test's function.
+
+- `test_name`: Denotes the name of the test as stored in the BigQuery table.
+
+**Note**: The tool, by default, pulls metrics from BigQuery tables. Ensure 
that the values for `metrics_dataset`, `metrics_table`, `project`, and 
`metric_name` align with those defined for performance/load tests. The provided 
example utilizes this [test 
configuration](https://github.com/apache/beam/blob/0a91d139dea4276dc46176c4cdcdfce210fc50c4/.test-infra/jenkins/job_InferenceBenchmarkTests_Python.groovy#L30)
 to populate the necessary values for data retrieval.
 
 ### Different ways to avoid false positive change points
 
@@ -76,8 +79,35 @@ setting `num_runs_in_change_point_window=7` will achieve it.
 
 ## Register a test for performance alerts
 
-If a new test needs to be registered for the performance alerting tool, please 
add the required test parameters to the
-config file.
+If a new test needs to be registered for the performance alerting tool,
+
+- You can either add it to the config file that is already present.
+- You can define your own yaml file and call the 
[perf_analysis.run()](https://github.com/apache/beam/blob/a46bc12a256dcaa3ae2cc9e5d6fdcaa82b59738b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py#L152)
 method.
+
+
+## Integrating the Perf Alert Tool with a Custom BigQuery Schema
+
+By default, the Perf Alert Tool retrieves metrics from the 
`apache-beam-testing` BigQuery projects. All performance and load tests within 
Beam utilize a standard 
[schema](https://github.com/apache/beam/blob/a7e12db9b5977c4a7b13554605c0300389a3d6da/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py#L70)
 for metrics publication. The tool inherently recognizes and operates with this 
schema when extracting metrics from BigQuery tables.
+
+To fetch the data from a BigQuery dataset that is not a default setting of the 
Apache Beam's setting, One can inherit the `MetricsFetcher` class and implement 
the abstract method `fetch_metric_data`. This method should return a tuple of 
desired metric values and timestamps of the metric values of when it was 
published.
+
+```
+from apache_beam.testing.analyzers import perf_analysis
+config_file_path = <path_to_config_file>
+my_metric_fetcher = MyMetricsFetcher() # inherited from MetricsFetcher
+perf_analysis.run(config_file_path, my_metrics_fetcher)
+```
+
+``Note``: The metrics and timestamps should be sorted based on the timestamps 
values in ascending order.
+
+### Configuring GitHub Parameters
+
+Out of the box, the performance alert tool targets the `apache/beam` 
repository when raising issues. If you wish to utilize this tool for another 
repository, you'll need to pre-set a couple of environment variables:
+
+- `REPO_OWNER`: Represents the owner of the repository. (e.g., `apache`)
+- `REPO_NAME`: Specifies the repository name itself. (e.g., `beam`)
+
+Before initiating the tool, also ensure that the `GITHUB_TOKEN` is set to an 
authenticated GitHub token. This permits the tool to generate GitHub issues 
whenever performance alerts arise.
 
 ## Triage performance alert issues
 
diff --git a/sdks/python/apache_beam/testing/analyzers/github_issues_utils.py 
b/sdks/python/apache_beam/testing/analyzers/github_issues_utils.py
index e1f20baa50a..82758be8f18 100644
--- a/sdks/python/apache_beam/testing/analyzers/github_issues_utils.py
+++ b/sdks/python/apache_beam/testing/analyzers/github_issues_utils.py
@@ -34,8 +34,8 @@ except KeyError as e:
       'A Github Personal Access token is required '
       'to create Github Issues.')
 
-_BEAM_GITHUB_REPO_OWNER = 'apache'
-_BEAM_GITHUB_REPO_NAME = 'beam'
+_GITHUB_REPO_OWNER = os.environ.get('REPO_OWNER', 'apache')
+_GITHUB_REPO_NAME = os.environ.get('REPO_NAME', 'beam')
 # Adding GitHub Rest API version to the header to maintain version stability.
 # For more information, please look at
 # 
https://github.blog/2022-11-28-to-infinity-and-beyond-enabling-the-future-of-githubs-rest-api-with-api-versioning/
 # pylint: disable=line-too-long
@@ -77,10 +77,10 @@ def create_issue(
     Tuple containing GitHub issue number and issue URL.
   """
   url = "https://api.github.com/repos/{}/{}/issues".format(
-      _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME)
+      _GITHUB_REPO_OWNER, _GITHUB_REPO_NAME)
   data = {
-      'owner': _BEAM_GITHUB_REPO_OWNER,
-      'repo': _BEAM_GITHUB_REPO_NAME,
+      'owner': _GITHUB_REPO_OWNER,
+      'repo': _GITHUB_REPO_NAME,
       'title': title,
       'body': description,
       'labels': [_AWAITING_TRIAGE_LABEL, _PERF_ALERT_LABEL]
@@ -108,20 +108,20 @@ def comment_on_issue(issue_number: int,
       issue, and the comment URL.
   """
   url = 'https://api.github.com/repos/{}/{}/issues/{}'.format(
-      _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME, issue_number)
+      _GITHUB_REPO_OWNER, _GITHUB_REPO_NAME, issue_number)
   open_issue_response = requests.get(
       url,
       json.dumps({
-          'owner': _BEAM_GITHUB_REPO_OWNER,
-          'repo': _BEAM_GITHUB_REPO_NAME,
+          'owner': _GITHUB_REPO_OWNER,
+          'repo': _GITHUB_REPO_NAME,
           'issue_number': issue_number
       },
                  default=str),
       headers=_HEADERS).json()
   if open_issue_response['state'] == 'open':
     data = {
-        'owner': _BEAM_GITHUB_REPO_OWNER,
-        'repo': _BEAM_GITHUB_REPO_NAME,
+        'owner': _GITHUB_REPO_OWNER,
+        'repo': _GITHUB_REPO_NAME,
         'body': comment_description,
         issue_number: issue_number,
     }
@@ -134,13 +134,14 @@ def comment_on_issue(issue_number: int,
 
 def add_awaiting_triage_label(issue_number: int):
   url = 'https://api.github.com/repos/{}/{}/issues/{}/labels'.format(
-      _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME, issue_number)
+      _GITHUB_REPO_OWNER, _GITHUB_REPO_NAME, issue_number)
   requests.post(
       url, json.dumps({'labels': [_AWAITING_TRIAGE_LABEL]}), headers=_HEADERS)
 
 
 def get_issue_description(
-    test_name: str,
+    test_id: str,
+    test_name: Optional[str],
     metric_name: str,
     timestamps: List[pd.Timestamp],
     metric_values: List,
@@ -167,10 +168,13 @@ def get_issue_description(
 
   description = []
 
-  description.append(_ISSUE_DESCRIPTION_TEMPLATE.format(test_name, 
metric_name))
+  description.append(_ISSUE_DESCRIPTION_TEMPLATE.format(test_id, metric_name))
 
-  description.append(("`Test description:` " +
-                      f'{test_description}') if test_description else '')
+  if test_name:
+    description.append(("`test_name:` " + f'{test_name}'))
+
+  if test_description:
+    description.append(("`Test description:` " + f'{test_description}'))
 
   description.append('```')
 
diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis.py 
b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py
index 7f1ffbb944e..c86ecb2c4e2 100644
--- a/sdks/python/apache_beam/testing/analyzers/perf_analysis.py
+++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py
@@ -23,7 +23,6 @@
 import argparse
 import logging
 import os
-import uuid
 from datetime import datetime
 from datetime import timezone
 from typing import Any
@@ -33,9 +32,10 @@ from typing import Optional
 import pandas as pd
 
 from apache_beam.testing.analyzers import constants
+from apache_beam.testing.analyzers.perf_analysis_utils import 
BigQueryMetricsFetcher
 from apache_beam.testing.analyzers.perf_analysis_utils import 
GitHubIssueMetaData
+from apache_beam.testing.analyzers.perf_analysis_utils import MetricsFetcher
 from apache_beam.testing.analyzers.perf_analysis_utils import 
create_performance_alert
-from apache_beam.testing.analyzers.perf_analysis_utils import fetch_metric_data
 from apache_beam.testing.analyzers.perf_analysis_utils import 
find_latest_change_point_index
 from apache_beam.testing.analyzers.perf_analysis_utils import 
get_existing_issues_data
 from apache_beam.testing.analyzers.perf_analysis_utils import 
is_change_point_in_valid_window
@@ -43,10 +43,10 @@ from apache_beam.testing.analyzers.perf_analysis_utils 
import is_perf_alert
 from apache_beam.testing.analyzers.perf_analysis_utils import 
publish_issue_metadata_to_big_query
 from apache_beam.testing.analyzers.perf_analysis_utils import read_test_config
 from apache_beam.testing.analyzers.perf_analysis_utils import validate_config
-from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsFetcher
 
 
-def run_change_point_analysis(params, test_name, big_query_metrics_fetcher):
+def run_change_point_analysis(
+    params, test_id, big_query_metrics_fetcher: MetricsFetcher):
   """
   Args:
    params: Dict containing parameters to run change point analysis.
@@ -56,14 +56,21 @@ def run_change_point_analysis(params, test_name, 
big_query_metrics_fetcher):
   Returns:
      bool indicating if a change point is observed and alerted on GitHub.
   """
-  logging.info("Running change point analysis for test %s" % test_name)
+  logging.info("Running change point analysis for test ID %s" % test_id)
   if not validate_config(params.keys()):
     raise ValueError(
         f"Please make sure all these keys {constants._PERF_TEST_KEYS} "
-        f"are specified for the {test_name}")
+        f"are specified for the {test_id}")
 
   metric_name = params['metric_name']
 
+  # test_name will be used to query a single test from
+  # multiple tests in a single BQ table. Right now, the default
+  # assumption is that all the test have an individual BQ table
+  # but this might not be case for other tests(such as IO tests where
+  # a single BQ tables stores all the data)
+  test_name = params.get('test_name', None)
+
   min_runs_between_change_points = (
       constants._DEFAULT_MIN_RUNS_BETWEEN_CHANGE_POINTS)
   if 'min_runs_between_change_points' in params:
@@ -74,15 +81,18 @@ def run_change_point_analysis(params, test_name, 
big_query_metrics_fetcher):
   if 'num_runs_in_change_point_window' in params:
     num_runs_in_change_point_window = params['num_runs_in_change_point_window']
 
-  metric_values, timestamps = fetch_metric_data(
-    params=params,
-    big_query_metrics_fetcher=big_query_metrics_fetcher
+  metric_values, timestamps = big_query_metrics_fetcher.fetch_metric_data(
+    project=params['project'],
+    metrics_dataset=params['metrics_dataset'],
+    metrics_table=params['metrics_table'],
+    metric_name=params['metric_name'],
+    test_name=test_name
   )
 
   change_point_index = find_latest_change_point_index(
       metric_values=metric_values)
   if not change_point_index:
-    logging.info("Change point is not detected for the test %s" % test_name)
+    logging.info("Change point is not detected for the test ID %s" % test_id)
     return False
   # since timestamps are ordered in ascending order and
   # num_runs_in_change_point_window refers to the latest runs,
@@ -92,11 +102,11 @@ def run_change_point_analysis(params, test_name, 
big_query_metrics_fetcher):
   if not is_change_point_in_valid_window(num_runs_in_change_point_window,
                                          latest_change_point_run):
     logging.info(
-        'Performance regression/improvement found for the test: %s. '
+        'Performance regression/improvement found for the test ID: %s. '
         'on metric %s. Since the change point run %s '
         'lies outside the num_runs_in_change_point_window distance: %s, '
         'alert is not raised.' % (
-            test_name,
+            test_id,
             metric_name,
             latest_change_point_run + 1,
             num_runs_in_change_point_window))
@@ -106,8 +116,7 @@ def run_change_point_analysis(params, test_name, 
big_query_metrics_fetcher):
   last_reported_issue_number = None
   issue_metadata_table_name = f'{params.get("metrics_table")}_{metric_name}'
   existing_issue_data = get_existing_issues_data(
-      table_name=issue_metadata_table_name,
-      big_query_metrics_fetcher=big_query_metrics_fetcher)
+      table_name=issue_metadata_table_name)
 
   if existing_issue_data is not None:
     existing_issue_timestamps = existing_issue_data[
@@ -124,20 +133,21 @@ def run_change_point_analysis(params, test_name, 
big_query_metrics_fetcher):
         min_runs_between_change_points=min_runs_between_change_points)
   if is_alert:
     issue_number, issue_url = create_performance_alert(
-    metric_name, test_name, timestamps,
+    metric_name, test_id, timestamps,
     metric_values, change_point_index,
     params.get('labels', None),
     last_reported_issue_number,
     test_description = params.get('test_description', None),
+    test_name = test_name
     )
 
     issue_metadata = GitHubIssueMetaData(
         issue_timestamp=pd.Timestamp(
             datetime.now().replace(tzinfo=timezone.utc)),
         # BQ doesn't allow '.' in table name
-        test_name=test_name.replace('.', '_'),
+        test_id=test_id.replace('.', '_'),
+        test_name=test_name,
         metric_name=metric_name,
-        test_id=uuid.uuid4().hex,
         change_point=metric_values[change_point_index],
         issue_number=issue_number,
         issue_url=issue_url,
@@ -149,7 +159,10 @@ def run_change_point_analysis(params, test_name, 
big_query_metrics_fetcher):
   return is_alert
 
 
-def run(config_file_path: Optional[str] = None) -> None:
+def run(
+    big_query_metrics_fetcher: MetricsFetcher = BigQueryMetricsFetcher(),
+    config_file_path: Optional[str] = None,
+) -> None:
   """
   run is the entry point to run change point analysis on test metric
   data, which is read from config file, and if there is a performance
@@ -169,12 +182,10 @@ def run(config_file_path: Optional[str] = None) -> None:
 
   tests_config: Dict[str, Dict[str, Any]] = read_test_config(config_file_path)
 
-  big_query_metrics_fetcher = BigQueryMetricsFetcher()
-
-  for test_name, params in tests_config.items():
+  for test_id, params in tests_config.items():
     run_change_point_analysis(
         params=params,
-        test_name=test_name,
+        test_id=test_id,
         big_query_metrics_fetcher=big_query_metrics_fetcher)
 
 
diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py 
b/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py
index 094cd9c47ec..9c7921300d9 100644
--- a/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py
+++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py
@@ -32,6 +32,7 @@ try:
   from apache_beam.io.filesystems import FileSystems
   from apache_beam.testing.analyzers import constants
   from apache_beam.testing.analyzers import github_issues_utils
+  from apache_beam.testing.analyzers.perf_analysis_utils import 
BigQueryMetricsFetcher
   from apache_beam.testing.analyzers.perf_analysis_utils import 
is_change_point_in_valid_window
   from apache_beam.testing.analyzers.perf_analysis_utils import is_perf_alert
   from apache_beam.testing.analyzers.perf_analysis_utils import e_divisive
@@ -41,18 +42,18 @@ try:
   from apache_beam.testing.analyzers.perf_analysis_utils import validate_config
   from apache_beam.testing.load_tests import load_test_metrics_utils
 except ImportError as e:
-  analysis = None  # type: ignore
+  raise unittest.SkipTest('Missing dependencies to run perf analysis tests.')
 
 
 # mock methods.
-def get_fake_data_with_no_change_point(**kwargs):
+def get_fake_data_with_no_change_point(*args, **kwargs):
   num_samples = 20
   metric_values = [1] * num_samples
   timestamps = list(range(num_samples))
   return metric_values, timestamps
 
 
-def get_fake_data_with_change_point(**kwargs):
+def get_fake_data_with_change_point(*args, **kwargs):
   # change point will be at index 13.
   num_samples = 20
   metric_values = [0] * 12 + [3] + [4] * 7
@@ -69,10 +70,6 @@ def get_existing_issue_data(**kwargs):
   }])
 
 
[email protected](
-    analysis is None,
-    'Missing dependencies. '
-    'Test dependencies are missing for the Analyzer.')
 class TestChangePointAnalysis(unittest.TestCase):
   def setUp(self) -> None:
     self.single_change_point_series = [0] * 10 + [1] * 10
@@ -151,18 +148,20 @@ class TestChangePointAnalysis(unittest.TestCase):
         min_runs_between_change_points=min_runs_between_change_points)
     self.assertFalse(is_alert)
 
-  @mock.patch(
-      'apache_beam.testing.analyzers.perf_analysis.fetch_metric_data',
+  @mock.patch.object(
+      BigQueryMetricsFetcher,
+      'fetch_metric_data',
       get_fake_data_with_no_change_point)
   def test_no_alerts_when_no_change_points(self):
     is_alert = analysis.run_change_point_analysis(
         params=self.params,
-        test_name=self.test_id,
-        big_query_metrics_fetcher=None)
+        test_id=self.test_id,
+        big_query_metrics_fetcher=BigQueryMetricsFetcher())
     self.assertFalse(is_alert)
 
-  @mock.patch(
-      'apache_beam.testing.analyzers.perf_analysis.fetch_metric_data',
+  @mock.patch.object(
+      BigQueryMetricsFetcher,
+      'fetch_metric_data',
       get_fake_data_with_change_point)
   @mock.patch(
       'apache_beam.testing.analyzers.perf_analysis.get_existing_issues_data',
@@ -178,12 +177,13 @@ class TestChangePointAnalysis(unittest.TestCase):
   def test_alert_on_data_with_change_point(self, *args):
     is_alert = analysis.run_change_point_analysis(
         params=self.params,
-        test_name=self.test_id,
-        big_query_metrics_fetcher=None)
+        test_id=self.test_id,
+        big_query_metrics_fetcher=BigQueryMetricsFetcher())
     self.assertTrue(is_alert)
 
-  @mock.patch(
-      'apache_beam.testing.analyzers.perf_analysis.fetch_metric_data',
+  @mock.patch.object(
+      BigQueryMetricsFetcher,
+      'fetch_metric_data',
       get_fake_data_with_change_point)
   @mock.patch(
       'apache_beam.testing.analyzers.perf_analysis.get_existing_issues_data',
@@ -198,8 +198,8 @@ class TestChangePointAnalysis(unittest.TestCase):
   def test_alert_on_data_with_reported_change_point(self, *args):
     is_alert = analysis.run_change_point_analysis(
         params=self.params,
-        test_name=self.test_id,
-        big_query_metrics_fetcher=None)
+        test_id=self.test_id,
+        big_query_metrics_fetcher=BigQueryMetricsFetcher())
     self.assertFalse(is_alert)
 
   def test_change_point_has_anomaly_marker_in_gh_description(self):
@@ -208,7 +208,8 @@ class TestChangePointAnalysis(unittest.TestCase):
     change_point_index = find_latest_change_point_index(metric_values)
 
     description = github_issues_utils.get_issue_description(
-        test_name=self.test_id,
+        test_id=self.test_id,
+        test_name=self.params.get('test_name', None),
         test_description=self.params['test_description'],
         metric_name=self.params['metric_name'],
         metric_values=metric_values,
diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py 
b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py
index 0a559fc4bee..f9604c490fc 100644
--- a/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py
+++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py
@@ -14,11 +14,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+import abc
 import logging
 from dataclasses import asdict
 from dataclasses import dataclass
 from statistics import median
-from typing import Any
 from typing import Dict
 from typing import List
 from typing import Optional
@@ -28,11 +28,11 @@ from typing import Union
 import pandas as pd
 import yaml
 from google.api_core import exceptions
+from google.cloud import bigquery
 
 from apache_beam.testing.analyzers import constants
 from apache_beam.testing.analyzers import github_issues_utils
 from apache_beam.testing.load_tests import load_test_metrics_utils
-from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsFetcher
 from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsPublisher
 from signal_processing_algorithms.energy_statistics.energy_statistics import 
e_divisive
 
@@ -59,9 +59,7 @@ def is_change_point_in_valid_window(
   return num_runs_in_change_point_window > latest_change_point_run
 
 
-def get_existing_issues_data(
-    table_name: str, big_query_metrics_fetcher: BigQueryMetricsFetcher
-) -> Optional[pd.DataFrame]:
+def get_existing_issues_data(table_name: str) -> Optional[pd.DataFrame]:
   """
   Finds the most recent GitHub issue created for the test_name.
   If no table found with name=test_name, return (None, None)
@@ -73,12 +71,14 @@ def get_existing_issues_data(
   LIMIT 10
   """
   try:
-    df = big_query_metrics_fetcher.fetch(query=query)
+    client = bigquery.Client()
+    query_job = client.query(query=query)
+    existing_issue_data = query_job.result().to_dataframe()
   except exceptions.NotFound:
     # If no table found, that means this is first performance regression
     # on the current test+metric.
     return None
-  return df
+  return existing_issue_data
 
 
 def is_perf_alert(
@@ -123,33 +123,6 @@ def validate_config(keys):
   return constants._PERF_TEST_KEYS.issubset(keys)
 
 
-def fetch_metric_data(
-    params: Dict[str, Any], big_query_metrics_fetcher: BigQueryMetricsFetcher
-) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]:
-  """
-  Args:
-   params: Dict containing keys required to fetch data from a data source.
-   big_query_metrics_fetcher: A BigQuery metrics fetcher for fetch metrics.
-  Returns:
-    Tuple[List[Union[int, float]], List[pd.Timestamp]]: Tuple containing list
-    of metric_values and list of timestamps. Both are sorted in ascending
-    order wrt timestamps.
-  """
-  query = f"""
-      SELECT *
-      FROM 
{params['project']}.{params['metrics_dataset']}.{params['metrics_table']}
-      WHERE CONTAINS_SUBSTR(({load_test_metrics_utils.METRICS_TYPE_LABEL}), 
'{params['metric_name']}')
-      ORDER BY {load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL} DESC
-      LIMIT {constants._NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS}
-    """
-  metric_data: pd.DataFrame = big_query_metrics_fetcher.fetch(query=query)
-  metric_data.sort_values(
-      by=[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL], inplace=True)
-  return (
-      metric_data[load_test_metrics_utils.VALUE_LABEL].tolist(),
-      metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL].tolist())
-
-
 def find_change_points(metric_values: List[Union[float, int]]):
   return e_divisive(metric_values)
 
@@ -175,7 +148,7 @@ def find_latest_change_point_index(metric_values: 
List[Union[float, int]]):
 
 def publish_issue_metadata_to_big_query(issue_metadata, table_name):
   """
-  Published issue_metadata to BigQuery with table name=test_name.
+  Published issue_metadata to BigQuery with table name.
   """
   bq_metrics_publisher = BigQueryMetricsPublisher(
       project_name=constants._BQ_PROJECT_NAME,
@@ -190,18 +163,21 @@ def publish_issue_metadata_to_big_query(issue_metadata, 
table_name):
 
 def create_performance_alert(
     metric_name: str,
-    test_name: str,
+    test_id: str,
     timestamps: List[pd.Timestamp],
     metric_values: List[Union[int, float]],
     change_point_index: int,
     labels: List[str],
     existing_issue_number: Optional[int],
-    test_description: Optional[str] = None) -> Tuple[int, str]:
+    test_description: Optional[str] = None,
+    test_name: Optional[str] = None,
+) -> Tuple[int, str]:
   """
   Creates performance alert on GitHub issues and returns GitHub issue
   number and issue URL.
   """
   description = github_issues_utils.get_issue_description(
+      test_id=test_id,
       test_name=test_name,
       test_description=test_description,
       metric_name=metric_name,
@@ -213,7 +189,7 @@ def create_performance_alert(
 
   issue_number, issue_url = github_issues_utils.report_change_point_on_issues(
         title=github_issues_utils._ISSUE_TITLE_TEMPLATE.format(
-          test_name, metric_name
+          test_id, metric_name
         ),
         description=description,
         labels=labels,
@@ -253,3 +229,55 @@ def filter_change_points_by_median_threshold(
     if relative_change > threshold:
       valid_change_points.append(idx)
   return valid_change_points
+
+
+class MetricsFetcher(metaclass=abc.ABCMeta):
+  @abc.abstractmethod
+  def fetch_metric_data(
+      self,
+      *,
+      project,
+      metrics_dataset,
+      metrics_table,
+      metric_name,
+      test_name=None) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]:
+    """
+    Define SQL query and fetch the timestamp values and metric values
+    from BigQuery tables.
+    """
+    raise NotImplementedError
+
+
+class BigQueryMetricsFetcher(MetricsFetcher):
+  def fetch_metric_data(
+      self,
+      *,
+      project,
+      metrics_dataset,
+      metrics_table,
+      metric_name,
+      test_name=None,
+  ) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]:
+    """
+    Args:
+    params: Dict containing keys required to fetch data from a data source.
+    Returns:
+      Tuple[List[Union[int, float]], List[pd.Timestamp]]: Tuple containing list
+      of metric_values and list of timestamps. Both are sorted in ascending
+      order wrt timestamps.
+    """
+    query = f"""
+          SELECT *
+          FROM {project}.{metrics_dataset}.{metrics_table}
+          WHERE 
CONTAINS_SUBSTR(({load_test_metrics_utils.METRICS_TYPE_LABEL}), '{metric_name}')
+          ORDER BY {load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL} DESC
+          LIMIT {constants._NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS}
+        """
+    client = bigquery.Client()
+    query_job = client.query(query=query)
+    metric_data = query_job.result().to_dataframe()
+    metric_data.sort_values(
+        by=[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL], inplace=True)
+    return (
+        metric_data[load_test_metrics_utils.VALUE_LABEL].tolist(),
+        metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL].tolist())
diff --git a/sdks/python/apache_beam/testing/analyzers/tests_config.yaml 
b/sdks/python/apache_beam/testing/analyzers/tests_config.yaml
index f808f5e41d7..ec9cfe6f1ac 100644
--- a/sdks/python/apache_beam/testing/analyzers/tests_config.yaml
+++ b/sdks/python/apache_beam/testing/analyzers/tests_config.yaml
@@ -16,7 +16,7 @@
 #
 
 # for the unique key to define a test, please use the following format:
-# {test_name}-{metric_name}
+# {test_id}-{metric_name}
 
 
pytorch_image_classification_benchmarks-resnet152-mean_inference_batch_latency_micro_secs:
   test_description:
diff --git 
a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py 
b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
index 92a5f68351f..01db2c114ef 100644
--- a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
+++ b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
@@ -38,7 +38,6 @@ from typing import Mapping
 from typing import Optional
 from typing import Union
 
-import pandas as pd
 import requests
 from requests.auth import HTTPBasicAuth
 
@@ -650,13 +649,3 @@ class AssignTimestamps(beam.DoFn):
   def process(self, element):
     yield self.timestamp_val_fn(
         element, self.timestamp_fn(micros=int(self.time_fn() * 1000000)))
-
-
-class BigQueryMetricsFetcher:
-  def __init__(self):
-    self.client = bigquery.Client()
-
-  def fetch(self, query) -> pd.DataFrame:
-    query_job = self.client.query(query=query)
-    result = query_job.result()
-    return result.to_dataframe()

Reply via email to