tvalentyn commented on code in PR #23931:
URL: https://github.com/apache/beam/pull/23931#discussion_r1034172621


##########
sdks/python/apache_beam/testing/analyzers/README.md:
##########
@@ -0,0 +1,85 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+<h1>Performance alerts for Beam Python performance and load tests</h1>
+
+
+<h2> Alerts </h2>
+
+Performance regressions or improvements detected with the [Change Point 
Analysis](https://en.wikipedia.org/wiki/Change_detection) using 
[edivisive](https://github.com/apache/beam/blob/0a91d139dea4276dc46176c4cdcdfce210fc50c4/.test-infra/jenkins/job_InferenceBenchmarkTests_Python.groovy#L30)
 
+analyzer are automatically filed as Beam GitHub issues with a label 
`perf-alert`.
+
+The GitHub issue description will contain the information on the affected test 
and metric by providing the metric values for N consecutive runs with timestamps
+before and after the observed change point. Observed change point is pointed 
as `Anomaly` in the issue description. 
+
+Example: [sample perf alert GitHub 
issue](https://github.com/AnandInguva/beam/issues/83).
+
+If a performance alert is created on a test, a GitHub issue will be created 
and the GitHub issue metadata such as GitHub issue
+URL, issue number along with the change point value and timestamp are exported 
to BigQuery. This data will be used to analyze the next change point observed 
on the same test to
+update already created GitHub issue or ignore performance alert by not 
creating GitHub issue to avoid duplicate issue creation.
+
+<h2> Config file structure </h2>
+The config file defines the structure to run change point analysis on a given 
test. To add a test to the config file, 
+please follow the below structure.
+
+**NOTE**: The Change point analysis only supports reading the metric data from 
Big Query for now.
+
+```
+# the test_1 must be a unique id.
+test_1:
+  test_name: 
apache_beam.testing.benchmarks.inference.pytorch_image_classification_benchmarks
+  source: big_query
+  metrics_dataset: beam_run_inference
+  metrics_table: torch_inference_imagenet_results_resnet152
+  project: apache-beam-testing
+  metric_name: mean_load_model_latency_milli_secs
+  labels:
+    - perf-alert
+    - run-inference
+  min_runs_between_change_points: 5
+  num_runs_in_change_point_window: 7
+```
+
+**Note**: If the source is **BigQuery**, the metrics_dataset, metrics_table, 
project and metric_name should match with the values defined for 
performance/load tests.
+The above example uses this [test 
configuration](https://github.com/apache/beam/blob/0a91d139dea4276dc46176c4cdcdfce210fc50c4/.test-infra/jenkins/job_InferenceBenchmarkTests_Python.groovy#L30)
 
+to fill up the values required to fetch the data from source.
+
+<h3>Different ways to avoid false positive change points</h3>
+
+**min_runs_between_change_points**: As the metric data moves across the runs, 
the change point analysis can place the 
+change point in a slightly different place. These change points refer to the 
same regression and are just noise.
+When we find a new change point, we will search up to the 
`min_runs_between_change_points` in both directions from the 
+current change point. If an existing change point is found within the 
distance, then the current change point will be
+suppressed. The units for the `min_runs_between_change_points` are number of 
runs. 

Review Comment:
   > The units for the `min_runs_between_change_points` are number of runs. 
   >  The units for `num_runs_in_change_point_window` is number of runs.
   
   This is now obvious from the name, we don't need to mention it.



##########
sdks/python/apache_beam/testing/analyzers/README.md:
##########
@@ -0,0 +1,85 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+<h1>Performance alerts for Beam Python performance and load tests</h1>
+
+
+<h2> Alerts </h2>
+
+Performance regressions or improvements detected with the [Change Point 
Analysis](https://en.wikipedia.org/wiki/Change_detection) using 
[edivisive](https://github.com/apache/beam/blob/0a91d139dea4276dc46176c4cdcdfce210fc50c4/.test-infra/jenkins/job_InferenceBenchmarkTests_Python.groovy#L30)
 
+analyzer are automatically filed as Beam GitHub issues with a label 
`perf-alert`.
+
+The GitHub issue description will contain the information on the affected test 
and metric by providing the metric values for N consecutive runs with timestamps
+before and after the observed change point. Observed change point is pointed 
as `Anomaly` in the issue description. 
+
+Example: [sample perf alert GitHub 
issue](https://github.com/AnandInguva/beam/issues/83).
+
+If a performance alert is created on a test, a GitHub issue will be created 
and the GitHub issue metadata such as GitHub issue
+URL, issue number along with the change point value and timestamp are exported 
to BigQuery. This data will be used to analyze the next change point observed 
on the same test to
+update already created GitHub issue or ignore performance alert by not 
creating GitHub issue to avoid duplicate issue creation.
+
+<h2> Config file structure </h2>
+The config file defines the structure to run change point analysis on a given 
test. To add a test to the config file, 
+please follow the below structure.
+
+**NOTE**: The Change point analysis only supports reading the metric data from 
Big Query for now.
+
+```
+# the test_1 must be a unique id.
+test_1:
+  test_name: 
apache_beam.testing.benchmarks.inference.pytorch_image_classification_benchmarks
+  source: big_query
+  metrics_dataset: beam_run_inference
+  metrics_table: torch_inference_imagenet_results_resnet152
+  project: apache-beam-testing
+  metric_name: mean_load_model_latency_milli_secs
+  labels:
+    - perf-alert
+    - run-inference
+  min_runs_between_change_points: 5
+  num_runs_in_change_point_window: 7
+```
+
+**Note**: If the source is **BigQuery**, the metrics_dataset, metrics_table, 
project and metric_name should match with the values defined for 
performance/load tests.
+The above example uses this [test 
configuration](https://github.com/apache/beam/blob/0a91d139dea4276dc46176c4cdcdfce210fc50c4/.test-infra/jenkins/job_InferenceBenchmarkTests_Python.groovy#L30)
 
+to fill up the values required to fetch the data from source.
+
+<h3>Different ways to avoid false positive change points</h3>
+
+**min_runs_between_change_points**: As the metric data moves across the runs, 
the change point analysis can place the 
+change point in a slightly different place. These change points refer to the 
same regression and are just noise.
+When we find a new change point, we will search up to the 
`min_runs_between_change_points` in both directions from the 
+current change point. If an existing change point is found within the 
distance, then the current change point will be
+suppressed. The units for the `min_runs_between_change_points` are number of 
runs. 
+
+**num_runs_in_change_point_window**: This defines how many runs to consider 
from the most recent run to be in change point window.
+Sometimes, the change point found might be way back in time and could be 
irrelevant. For a test, if a change point needs to be 
+reported only when it was observed in the last 7 runs from the current run,
+setting `num_runs_in_change_point_window=7` will achieve it. The units for 
`num_runs_in_change_point_window` is number of runs. 
+
+
+<h2> Register a test for performance alerts. </h2>
+
+If a new test needs to be registered for the performance alerting tool, please 
add the required test parameters to the 
+config file. 
+
+**Note**: Please add the label `perf-alert` along with the other labels that 
are relevant to the test. This will help keeping

Review Comment:
   I would configure the tool to make `perf-alert` a default label to remove 
the burden from the user, and configure additional labels on top of that.



##########
sdks/python/apache_beam/testing/analyzers/github_issues_utils.py:
##########
@@ -123,9 +128,18 @@ def comment_on_issue(issue_number: int,
   return False, None
 
 
+def add_label_to_issue(issue_number: int, labels: List[str] = None):
+  url = 'https://api.github.com/repos/{}/{}/issues/{}/labels'.format(
+      _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME, issue_number)
+  data = {}
+  if labels:
+    data['labels'] = labels
+    requests.post(url, json.dumps(data), headers=_HEADERS)

Review Comment:
   ```suggestion
       requests.post(url, json.dumps({'labels':labels}), headers=_HEADERS)
   ```



##########
sdks/python/apache_beam/testing/analyzers/github_issues_utils.py:
##########
@@ -123,9 +128,18 @@ def comment_on_issue(issue_number: int,
   return False, None
 
 
+def add_label_to_issue(issue_number: int, labels: List[str] = None):

Review Comment:
   ```suggestion
   def add_labels(issue_number: int, labels: List[str] = None):
   ```
   
   also to confirm: does this function only add labels or it sets labels 
(removes any other labels that were set previously and sets provided ones).



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -156,78 +147,44 @@ def edivisive_means(self,
 
 
 def is_change_point_in_valid_window(
-    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+    num_runs_in_change_point_window: int, change_point_index: int) -> bool:
   # If the change point is more than N runs behind the most recent run,
   # Ignore the change point and don't raise an alert for it.
-  if change_point_to_recent_run_window >= change_point_index:
-    return True
-  return False
+  return num_runs_in_change_point_window >= change_point_index
 
 
-def has_sibling_change_point(
-    change_point_index: int,
-    change_point_sibling_distance: int,
-    metric_values: List,
+def find_existing_issue(
     metric_name: str,
     test_name: str,
-    change_point_timestamp: float,
+    change_point_timestamp: pd.Timestamp,
+    sibling_change_point_min_timestamp: pd.Timestamp,
+    sibling_change_point_max_timestamp: pd.Timestamp,
 ) -> Optional[Tuple[bool, Optional[int]]]:
   """
-  Finds the sibling change point index. If not,
-  returns the original change point index.
-
-  Sibling change point is a neighbor of latest
-  change point, within the distance of change_point_sibling_distance.
-  For sibling change point, a GitHub issue is already created.
+  Finds the most recent GitHub issue created for change points for this
+  test+metric in sibling change point min and max timestamps window.
+  Returns a boolean and an issue ID whether the issue needs to be updated.
+  """
+  query_template = f"""
+  SELECT * FROM {_BQ_PROJECT_NAME}.{_BQ_DATASET}.{test_name}
+  WHERE {METRIC_NAME} = '{metric_name}'
+  ORDER BY {ISSUE_CREATION_TIMESTAMP_LABEL} DESC
+  LIMIT 1
   """
-
-  # Search backward from the current change point
-  sibling_indexes_to_search = []
-  for i in range(change_point_index - 1, -1, -1):
-    if change_point_index - i <= change_point_sibling_distance:
-      sibling_indexes_to_search.append(i)
-  # Search forward from the current change point
-  for i in range(change_point_index + 1, len(metric_values)):
-    if i - change_point_index <= change_point_sibling_distance:
-      sibling_indexes_to_search.append(i)
-  # Look for change points within change_point_sibling_distance.
-  # Return the first change point found.
-  query_template = """
-  SELECT * FROM {project}.{dataset}.{table}
-  WHERE {metric_name_id} = '{metric_name}'
-  ORDER BY {timestamp} DESC
-  LIMIT 10
-  """.format(
-      project=_BQ_PROJECT_NAME,
-      dataset=_BQ_DATASET,
-      metric_name_id=METRIC_NAME,
-      metric_name=metric_name,
-      timestamp=ISSUE_CREATION_TIMESTAMP_LABEL,
-      table=test_name)
   try:
-    df = FetchMetrics.fetch_from_bq(query_template=query_template)
-  except google.api_core.exceptions.NotFound:
+    df = BigQueryMetricsFetcher().get_metrics(
+        query_template=query_template,
+        limit=_NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS)

Review Comment:
   >         limit=_NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS)
   
   Added by mistake. Move this to the other BQ call instead.



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -240,149 +197,156 @@ def read_test_config(config_file_path: str) -> Dict:
   return config
 
 
-def run(args) -> None:
+def run(config_file_path: str = None) -> None:
   """
   run is the entry point to run change point analysis on test metric
   data, which is read from config file, and if there is a performance
   regression observed for a test, an alert will filed with GitHub Issues.
 
-  The config file is provide as command line argument. If no config file was
-  provided on cmd line, the default config file will be used.
-
-  For each test is config yaml file, if the source is the big_query,
-  the expected data required to run the change point analysis are
-  test_name, metrics_dataset, metrics_table, project, metric_name.
+  If config_file_path is None, then the run method will use default
+  config file to read the required perf test parameters.
 
   """
-  config_file_path = args.config_file_path
   if config_file_path is None:
     config_file_path = os.path.join(
         os.path.dirname(os.path.abspath(__file__)), 'tests_config.yaml')
 
   tests_config: Dict[Dict[str, Any]] = read_test_config(config_file_path)
 
-  # change_point_sibling_distance, change_point_to_recent_run_window can be
-  # defined in the config file for each test whihc are used
+  # min_runs_between_change_points, num_runs_in_change_point_window can be
+  # defined in the config file for each test which are used
   # to avoid filing GitHub issues for duplicate change points. Please take
   # a look at the README for more information on the parameters defined in the
   # config file.
-  for _, params in tests_config.items():
-    metric_name = params['metric_name']
-    # replace . with _ in test_name. This test name would be used later
-    # as a BQ table name and the BQ table doesn't accept . in the name.
-    test_name = params['test_name'].replace('.', '_')
-    if params['source'] == 'big_query':
-      metric_data: pd.DataFrame = FetchMetrics.fetch_from_bq(
-          project_name=params['project'],
-          dataset=params['metrics_dataset'],
-          table=params['metrics_table'],
-          metric_name=metric_name)
-    else:
-      # (TODO): Implement fetching metric_data from InfluxDB.
-      params = None
-    assert params is not None
-
-    labels = params['labels']
-    change_point_sibling_distance = params['change_point_sibling_distance']
-    change_point_to_recent_run_window = params[
-        'change_point_to_recent_run_window']
-
-    metric_values = metric_data[load_test_metrics_utils.VALUE_LABEL]
-    timestamps = metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL]
-
-    # run change point analysis on the metric_values using edivisive means
-    cp_analyzer = ChangePointAnalysis(
-        metric_name=metric_name, data=metric_values)
-
-    change_points_idx = cp_analyzer.edivisive_means()
-    # No change point found. Continue on to the next test.
-    if not change_points_idx:
-      continue
-
-    # always consider the latest change points
-    change_points_idx.sort(reverse=True)
-    change_point_index = change_points_idx[0]
-    change_point_timestamp = timestamps[change_point_index]
-
-    # check if the change point lies in the valid window.
-    # window - Number of runs between the
-    # change_point_to_recent_run_window run and the most recent run.
-    if not is_change_point_in_valid_window(change_point_to_recent_run_window,
-                                           change_point_index):
-      # change point lies outside the window from the recent run.
-      # Ignore this change point.
-      logging.info(
-          'Performance regression found for the test: %s. '
-          'but not creating an alert since the Change Point '
-          'lies outside the '
-          'change_point_to_recent_run_window distance' % test_name)
-      continue
-
-    # check for sibling change point. Sibling change point is a change
-    # point that lies in the distance of change_point_sibling_distance
-    # in both directions from the current change point index.
-    # Here, distance can be interpreted as number of runs between two change
-    # points. The idea here is that sibling change point will also point to
-    # the same performance regression.
-
-    create_alert, last_created_issue_number = (
-      has_sibling_change_point(
-        change_point_index=change_point_index,
-        change_point_sibling_distance=change_point_sibling_distance,
-        metric_values=metric_values,
-        metric_name=metric_name,
-        test_name=test_name,
-        change_point_timestamp=change_point_timestamp
-      )
-    )
-
-    logging.info(
-        "Create performance alert for the "
-        "test %s: %s" % (test_name, create_alert))
-
-    if create_alert:
-      # get the issue description for the creating GH issue or
-      # to comment on open GH issue.
-      issue_description = get_issue_description(
+  for test_id, params in tests_config.items():
+    try:
+      metric_name = params['metric_name']
+      # replace . with _ in test_name. This test name would be used later
+      # as a BQ table name and the BQ table doesn't accept . in the name.
+      test_name = params['test_name'].replace('.', '_') + f'_{metric_name}'
+      if params['source'] == 'big_query':

Review Comment:
   i would remove 'source' param entirely and all the logic around it until we 
actually decide to implement another source.
   
   Unrelated: we may be able to switch grafana to BQ source? See: 
https://grafana.com/grafana/plugins/doitintl-bigquery-datasource/ and stop 
using InfluxDB altogether. Not sure if there is any other usage for it, but we 
could ask on dev@ later. 



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -240,149 +197,156 @@ def read_test_config(config_file_path: str) -> Dict:
   return config
 
 
-def run(args) -> None:
+def run(config_file_path: str = None) -> None:
   """
   run is the entry point to run change point analysis on test metric
   data, which is read from config file, and if there is a performance
   regression observed for a test, an alert will filed with GitHub Issues.
 
-  The config file is provide as command line argument. If no config file was
-  provided on cmd line, the default config file will be used.
-
-  For each test is config yaml file, if the source is the big_query,
-  the expected data required to run the change point analysis are
-  test_name, metrics_dataset, metrics_table, project, metric_name.
+  If config_file_path is None, then the run method will use default
+  config file to read the required perf test parameters.
 
   """
-  config_file_path = args.config_file_path
   if config_file_path is None:
     config_file_path = os.path.join(
         os.path.dirname(os.path.abspath(__file__)), 'tests_config.yaml')
 
   tests_config: Dict[Dict[str, Any]] = read_test_config(config_file_path)
 
-  # change_point_sibling_distance, change_point_to_recent_run_window can be
-  # defined in the config file for each test whihc are used
+  # min_runs_between_change_points, num_runs_in_change_point_window can be
+  # defined in the config file for each test which are used
   # to avoid filing GitHub issues for duplicate change points. Please take
   # a look at the README for more information on the parameters defined in the
   # config file.
-  for _, params in tests_config.items():
-    metric_name = params['metric_name']
-    # replace . with _ in test_name. This test name would be used later
-    # as a BQ table name and the BQ table doesn't accept . in the name.
-    test_name = params['test_name'].replace('.', '_')
-    if params['source'] == 'big_query':
-      metric_data: pd.DataFrame = FetchMetrics.fetch_from_bq(
-          project_name=params['project'],
-          dataset=params['metrics_dataset'],
-          table=params['metrics_table'],
-          metric_name=metric_name)
-    else:
-      # (TODO): Implement fetching metric_data from InfluxDB.
-      params = None
-    assert params is not None
-
-    labels = params['labels']
-    change_point_sibling_distance = params['change_point_sibling_distance']
-    change_point_to_recent_run_window = params[
-        'change_point_to_recent_run_window']
-
-    metric_values = metric_data[load_test_metrics_utils.VALUE_LABEL]
-    timestamps = metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL]
-
-    # run change point analysis on the metric_values using edivisive means
-    cp_analyzer = ChangePointAnalysis(
-        metric_name=metric_name, data=metric_values)
-
-    change_points_idx = cp_analyzer.edivisive_means()
-    # No change point found. Continue on to the next test.
-    if not change_points_idx:
-      continue
-
-    # always consider the latest change points
-    change_points_idx.sort(reverse=True)
-    change_point_index = change_points_idx[0]
-    change_point_timestamp = timestamps[change_point_index]
-
-    # check if the change point lies in the valid window.
-    # window - Number of runs between the
-    # change_point_to_recent_run_window run and the most recent run.
-    if not is_change_point_in_valid_window(change_point_to_recent_run_window,
-                                           change_point_index):
-      # change point lies outside the window from the recent run.
-      # Ignore this change point.
-      logging.info(
-          'Performance regression found for the test: %s. '
-          'but not creating an alert since the Change Point '
-          'lies outside the '
-          'change_point_to_recent_run_window distance' % test_name)
-      continue
-
-    # check for sibling change point. Sibling change point is a change
-    # point that lies in the distance of change_point_sibling_distance
-    # in both directions from the current change point index.
-    # Here, distance can be interpreted as number of runs between two change
-    # points. The idea here is that sibling change point will also point to
-    # the same performance regression.
-
-    create_alert, last_created_issue_number = (
-      has_sibling_change_point(
-        change_point_index=change_point_index,
-        change_point_sibling_distance=change_point_sibling_distance,
-        metric_values=metric_values,
-        metric_name=metric_name,
-        test_name=test_name,
-        change_point_timestamp=change_point_timestamp
-      )
-    )
-
-    logging.info(
-        "Create performance alert for the "
-        "test %s: %s" % (test_name, create_alert))
-
-    if create_alert:
-      # get the issue description for the creating GH issue or
-      # to comment on open GH issue.
-      issue_description = get_issue_description(
+  for test_id, params in tests_config.items():
+    try:
+      metric_name = params['metric_name']
+      # replace . with _ in test_name. This test name would be used later
+      # as a BQ table name and the BQ table doesn't accept . in the name.
+      test_name = params['test_name'].replace('.', '_') + f'_{metric_name}'
+      if params['source'] == 'big_query':
+        metric_data: pd.DataFrame = BigQueryMetricsFetcher().get_metrics(
+            project_name=params['project'],
+            dataset=params['metrics_dataset'],
+            table=params['metrics_table'],
+            metric_name=metric_name)
+      else:
+        # (TODO): Implement fetching metric_data from InfluxDB.
+        raise ValueError(
+            'For change point analysis, only big_query is'
+            'accepted as source.')
+
+      labels = params['labels']
+      min_runs_between_change_points = params['min_runs_between_change_points']
+      num_runs_in_change_point_window = params[
+          'num_runs_in_change_point_window']
+
+      metric_values = metric_data[load_test_metrics_utils.VALUE_LABEL]
+      timestamps = metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL]
+
+      cp_analyzer = ChangePointAnalysis(
+          metric_name=metric_name, data=metric_values)
+
+      change_points_idx = cp_analyzer.edivisive_means()
+      if not change_points_idx:
+        continue
+
+      # Consider the latest change points to observe the latest perf alerts.
+      change_points_idx.sort(reverse=True)
+      change_point_index = change_points_idx[0]
+      change_point_timestamp = timestamps[change_point_index]
+
+      # check if the change point lies in the valid window.

Review Comment:
   this comment just repeats the code. please remove it.



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -240,149 +197,156 @@ def read_test_config(config_file_path: str) -> Dict:
   return config
 
 
-def run(args) -> None:
+def run(config_file_path: str = None) -> None:
   """
   run is the entry point to run change point analysis on test metric
   data, which is read from config file, and if there is a performance
   regression observed for a test, an alert will filed with GitHub Issues.
 
-  The config file is provide as command line argument. If no config file was
-  provided on cmd line, the default config file will be used.
-
-  For each test is config yaml file, if the source is the big_query,
-  the expected data required to run the change point analysis are
-  test_name, metrics_dataset, metrics_table, project, metric_name.
+  If config_file_path is None, then the run method will use default
+  config file to read the required perf test parameters.
 
   """
-  config_file_path = args.config_file_path
   if config_file_path is None:
     config_file_path = os.path.join(
         os.path.dirname(os.path.abspath(__file__)), 'tests_config.yaml')
 
   tests_config: Dict[Dict[str, Any]] = read_test_config(config_file_path)
 
-  # change_point_sibling_distance, change_point_to_recent_run_window can be
-  # defined in the config file for each test whihc are used
+  # min_runs_between_change_points, num_runs_in_change_point_window can be
+  # defined in the config file for each test which are used
   # to avoid filing GitHub issues for duplicate change points. Please take
   # a look at the README for more information on the parameters defined in the
   # config file.
-  for _, params in tests_config.items():
-    metric_name = params['metric_name']
-    # replace . with _ in test_name. This test name would be used later
-    # as a BQ table name and the BQ table doesn't accept . in the name.
-    test_name = params['test_name'].replace('.', '_')
-    if params['source'] == 'big_query':
-      metric_data: pd.DataFrame = FetchMetrics.fetch_from_bq(
-          project_name=params['project'],
-          dataset=params['metrics_dataset'],
-          table=params['metrics_table'],
-          metric_name=metric_name)
-    else:
-      # (TODO): Implement fetching metric_data from InfluxDB.
-      params = None
-    assert params is not None
-
-    labels = params['labels']
-    change_point_sibling_distance = params['change_point_sibling_distance']
-    change_point_to_recent_run_window = params[
-        'change_point_to_recent_run_window']
-
-    metric_values = metric_data[load_test_metrics_utils.VALUE_LABEL]
-    timestamps = metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL]
-
-    # run change point analysis on the metric_values using edivisive means
-    cp_analyzer = ChangePointAnalysis(
-        metric_name=metric_name, data=metric_values)
-
-    change_points_idx = cp_analyzer.edivisive_means()
-    # No change point found. Continue on to the next test.
-    if not change_points_idx:
-      continue
-
-    # always consider the latest change points
-    change_points_idx.sort(reverse=True)
-    change_point_index = change_points_idx[0]
-    change_point_timestamp = timestamps[change_point_index]
-
-    # check if the change point lies in the valid window.
-    # window - Number of runs between the
-    # change_point_to_recent_run_window run and the most recent run.
-    if not is_change_point_in_valid_window(change_point_to_recent_run_window,
-                                           change_point_index):
-      # change point lies outside the window from the recent run.
-      # Ignore this change point.
-      logging.info(
-          'Performance regression found for the test: %s. '
-          'but not creating an alert since the Change Point '
-          'lies outside the '
-          'change_point_to_recent_run_window distance' % test_name)
-      continue
-
-    # check for sibling change point. Sibling change point is a change
-    # point that lies in the distance of change_point_sibling_distance
-    # in both directions from the current change point index.
-    # Here, distance can be interpreted as number of runs between two change
-    # points. The idea here is that sibling change point will also point to
-    # the same performance regression.
-
-    create_alert, last_created_issue_number = (
-      has_sibling_change_point(
-        change_point_index=change_point_index,
-        change_point_sibling_distance=change_point_sibling_distance,
-        metric_values=metric_values,
-        metric_name=metric_name,
-        test_name=test_name,
-        change_point_timestamp=change_point_timestamp
-      )
-    )
-
-    logging.info(
-        "Create performance alert for the "
-        "test %s: %s" % (test_name, create_alert))
-
-    if create_alert:
-      # get the issue description for the creating GH issue or
-      # to comment on open GH issue.
-      issue_description = get_issue_description(
+  for test_id, params in tests_config.items():
+    try:
+      metric_name = params['metric_name']
+      # replace . with _ in test_name. This test name would be used later
+      # as a BQ table name and the BQ table doesn't accept . in the name.
+      test_name = params['test_name'].replace('.', '_') + f'_{metric_name}'
+      if params['source'] == 'big_query':
+        metric_data: pd.DataFrame = BigQueryMetricsFetcher().get_metrics(
+            project_name=params['project'],
+            dataset=params['metrics_dataset'],
+            table=params['metrics_table'],
+            metric_name=metric_name)
+      else:
+        # (TODO): Implement fetching metric_data from InfluxDB.
+        raise ValueError(
+            'For change point analysis, only big_query is'
+            'accepted as source.')
+
+      labels = params['labels']
+      min_runs_between_change_points = params['min_runs_between_change_points']
+      num_runs_in_change_point_window = params[
+          'num_runs_in_change_point_window']
+
+      metric_values = metric_data[load_test_metrics_utils.VALUE_LABEL]
+      timestamps = metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL]
+
+      cp_analyzer = ChangePointAnalysis(
+          metric_name=metric_name, data=metric_values)
+
+      change_points_idx = cp_analyzer.edivisive_means()
+      if not change_points_idx:
+        continue
+
+      # Consider the latest change points to observe the latest perf alerts.
+      change_points_idx.sort(reverse=True)

Review Comment:
   does edivisive_means return changepoints in random/unspecified order that we 
need this sort?
   



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,372 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+from dataclasses import asdict
+from dataclasses import dataclass
+import logging
+import os
+import uuid
+from datetime import datetime
+from datetime import timezone
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import 
create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import 
get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsFetcher
+from signal_processing_algorithms.energy_statistics.energy_statistics import 
e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'

Review Comment:
   Please prepend internal constants/function/methods with `_`



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,372 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+from dataclasses import asdict
+from dataclasses import dataclass
+import logging
+import os
+import uuid
+from datetime import datetime
+from datetime import timezone
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import 
create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import 
get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsFetcher
+from signal_processing_algorithms.energy_statistics.energy_statistics import 
e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGE_POINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+_NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+_NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS = 100
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression or Improvement: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+@dataclass
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  issue_timestamp: pd.Timestamp
+  change_point_timestamp: pd.Timestamp
+  test_name: str
+  metric_name: str
+  issue_number: int
+  issue_url: str
+  test_id: str
+  change_point: float
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+     For more information, please look at
+     https://pypi.org/project/signal-processing-algorithms/
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    num_runs_in_change_point_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  return num_runs_in_change_point_window >= change_point_index
+
+
+def find_existing_issue(
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: pd.Timestamp,
+    sibling_change_point_min_timestamp: pd.Timestamp,
+    sibling_change_point_max_timestamp: pd.Timestamp,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the most recent GitHub issue created for change points for this
+  test+metric in sibling change point min and max timestamps window.
+  Returns a boolean and an issue ID whether the issue needs to be updated.
+  """
+  query_template = f"""
+  SELECT * FROM {_BQ_PROJECT_NAME}.{_BQ_DATASET}.{test_name}
+  WHERE {METRIC_NAME} = '{metric_name}'
+  ORDER BY {ISSUE_CREATION_TIMESTAMP_LABEL} DESC
+  LIMIT 1
+  """
+  try:
+    df = BigQueryMetricsFetcher().get_metrics(
+        query_template=query_template,
+        limit=_NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test+metric.
+    return True, None
+  issue_number = df[ISSUE_NUMBER].tolist()[0]
+
+  if (sibling_change_point_min_timestamp <= change_point_timestamp <=
+      sibling_change_point_max_timestamp):
+    return False, None
+  return True, issue_number
+
+
+def read_test_config(config_file_path: str) -> Dict:
+  """
+  Reads the config file in which the data required to
+  run the change point analysis is specified.
+  """
+  with open(config_file_path, 'r') as stream:
+    config = yaml.safe_load(stream)
+  return config
+
+
+def run(config_file_path: str = None) -> None:
+  """
+  run is the entry point to run change point analysis on test metric
+  data, which is read from config file, and if there is a performance
+  regression observed for a test, an alert will filed with GitHub Issues.
+
+  If config_file_path is None, then the run method will use default
+  config file to read the required perf test parameters.
+
+  """
+  if config_file_path is None:
+    config_file_path = os.path.join(
+        os.path.dirname(os.path.abspath(__file__)), 'tests_config.yaml')
+
+  tests_config: Dict[Dict[str, Any]] = read_test_config(config_file_path)
+
+  # min_runs_between_change_points, num_runs_in_change_point_window can be
+  # defined in the config file for each test which are used
+  # to avoid filing GitHub issues for duplicate change points. Please take
+  # a look at the README for more information on the parameters defined in the
+  # config file.
+  for test_id, params in tests_config.items():
+    try:
+      metric_name = params['metric_name']
+      # replace . with _ in test_name. This test name would be used later
+      # as a BQ table name and the BQ table doesn't accept . in the name.
+      test_name = params['test_name'].replace('.', '_') + f'_{metric_name}'
+      if params['source'] == 'big_query':
+        metric_data: pd.DataFrame = BigQueryMetricsFetcher().get_metrics(

Review Comment:
   What is the order of elements returned by this query? 



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -14,44 +14,54 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.

Review Comment:
   > 
   ```suggestion
   # regressions for benchmark/load/performance tests.
   ```



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -14,44 +14,54 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
 import argparse
+from dataclasses import asdict
+from dataclasses import dataclass
 import logging
 import os
-import time
 import uuid
+from datetime import datetime
+from datetime import timezone
 from typing import Any
 from typing import Dict
 from typing import List
 from typing import Optional
 from typing import Tuple
 from typing import Union
 
-import google.api_core.exceptions
 import numpy as np
 import pandas as pd
 import yaml
+from google.api_core import exceptions
 
 from apache_beam.testing.analyzers.github_issues_utils import 
create_or_comment_issue
 from apache_beam.testing.analyzers.github_issues_utils import 
get_issue_description
 from apache_beam.testing.load_tests import load_test_metrics_utils
 from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsPublisher
-from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsFetcher
 from signal_processing_algorithms.energy_statistics.energy_statistics import 
e_divisive
 
 _BQ_PROJECT_NAME = 'apache-beam-testing'
 _BQ_DATASET = 'beam_perf_storage'
 
 UNIQUE_ID = 'test_id'
 ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
-CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_TIMESTAMP_LABEL = 'change_point_timestamp'
 CHANGE_POINT_LABEL = 'change_point'
 TEST_NAME = 'test_name'
 METRIC_NAME = 'metric_name'
 ISSUE_NUMBER = 'issue_number'
 ISSUE_URL = 'issue_url'
 # number of results to display on the issue description
 # from change point index in both directions.
-NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+_NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+_NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS = 100

Review Comment:
   is this the same as num_runs_in_change_point_window ? 
   
   WDYT about names: _NUM_DATA_POINTS_TO_ANALYZE or _NUM_RECENT_RUNS_TO_ANALYZE 
for both those params?



##########
sdks/python/apache_beam/testing/analyzers/README.md:
##########
@@ -0,0 +1,85 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+<h1>Performance alerts for Beam Python performance and load tests</h1>
+
+
+<h2> Alerts </h2>
+
+Performance regressions or improvements detected with the [Change Point 
Analysis](https://en.wikipedia.org/wiki/Change_detection) using 
[edivisive](https://github.com/apache/beam/blob/0a91d139dea4276dc46176c4cdcdfce210fc50c4/.test-infra/jenkins/job_InferenceBenchmarkTests_Python.groovy#L30)
 
+analyzer are automatically filed as Beam GitHub issues with a label 
`perf-alert`.
+
+The GitHub issue description will contain the information on the affected test 
and metric by providing the metric values for N consecutive runs with timestamps
+before and after the observed change point. Observed change point is pointed 
as `Anomaly` in the issue description. 
+
+Example: [sample perf alert GitHub 
issue](https://github.com/AnandInguva/beam/issues/83).
+
+If a performance alert is created on a test, a GitHub issue will be created 
and the GitHub issue metadata such as GitHub issue
+URL, issue number along with the change point value and timestamp are exported 
to BigQuery. This data will be used to analyze the next change point observed 
on the same test to
+update already created GitHub issue or ignore performance alert by not 
creating GitHub issue to avoid duplicate issue creation.
+
+<h2> Config file structure </h2>
+The config file defines the structure to run change point analysis on a given 
test. To add a test to the config file, 
+please follow the below structure.
+
+**NOTE**: The Change point analysis only supports reading the metric data from 
Big Query for now.
+
+```
+# the test_1 must be a unique id.
+test_1:
+  test_name: 
apache_beam.testing.benchmarks.inference.pytorch_image_classification_benchmarks
+  source: big_query
+  metrics_dataset: beam_run_inference
+  metrics_table: torch_inference_imagenet_results_resnet152
+  project: apache-beam-testing
+  metric_name: mean_load_model_latency_milli_secs
+  labels:
+    - perf-alert
+    - run-inference
+  min_runs_between_change_points: 5

Review Comment:
   I think reasonable default values should obviate the need to configure some 
of the parameters.



##########
sdks/python/apache_beam/testing/analyzers/README.md:
##########
@@ -0,0 +1,85 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+<h1>Performance alerts for Beam Python performance and load tests</h1>
+
+
+<h2> Alerts </h2>
+
+Performance regressions or improvements detected with the [Change Point 
Analysis](https://en.wikipedia.org/wiki/Change_detection) using 
[edivisive](https://github.com/apache/beam/blob/0a91d139dea4276dc46176c4cdcdfce210fc50c4/.test-infra/jenkins/job_InferenceBenchmarkTests_Python.groovy#L30)
 
+analyzer are automatically filed as Beam GitHub issues with a label 
`perf-alert`.
+
+The GitHub issue description will contain the information on the affected test 
and metric by providing the metric values for N consecutive runs with timestamps
+before and after the observed change point. Observed change point is pointed 
as `Anomaly` in the issue description. 
+
+Example: [sample perf alert GitHub 
issue](https://github.com/AnandInguva/beam/issues/83).
+
+If a performance alert is created on a test, a GitHub issue will be created 
and the GitHub issue metadata such as GitHub issue
+URL, issue number along with the change point value and timestamp are exported 
to BigQuery. This data will be used to analyze the next change point observed 
on the same test to
+update already created GitHub issue or ignore performance alert by not 
creating GitHub issue to avoid duplicate issue creation.
+
+<h2> Config file structure </h2>
+The config file defines the structure to run change point analysis on a given 
test. To add a test to the config file, 
+please follow the below structure.
+
+**NOTE**: The Change point analysis only supports reading the metric data from 
Big Query for now.
+
+```
+# the test_1 must be a unique id.

Review Comment:
   can we validate that necessary parameters are unique at runtime?  should 
metrics_dataset + metrics_table be unique as well?



##########
sdks/python/apache_beam/testing/analyzers/github_issues_utils.py:
##########
@@ -21,6 +21,7 @@
 from typing import Optional
 from typing import Tuple
 
+import pandas as pd

Review Comment:
   PTAL at: 
   
   
https://github.blog/2022-11-28-to-infinity-and-beyond-enabling-the-future-of-githubs-rest-api-with-api-versioning/
   
   we can set a header to prevent issues due to changes in the REST api.
   
   Also, it would be good to check that when filing an issue fails, then the 
tool produces a signal that will be noticed. for example, the script and the 
corresponding cron job fails.



##########
sdks/python/apache_beam/testing/analyzers/github_issues_utils.py:
##########
@@ -0,0 +1,163 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+import logging
+import os
+from typing import List
+from typing import Optional
+from typing import Tuple
+
+import requests
+
+try:
+  _GITHUB_TOKEN = os.environ['GITHUB_TOKEN']
+except KeyError as e:
+  _GITHUB_TOKEN = None
+  logging.warning(
+      'A Github Personal Access token is required '
+      'to create Github Issues.')
+
+_BEAM_REPO_OWNER = 'AnandInguva'
+_BEAM_REPO_NAME = 'beam'
+_HEADERS = {
+    "Authorization": 'token {}'.format(_GITHUB_TOKEN),
+    "Accept": "application/vnd.github+json"
+}
+
+# Fill the GitHub issue description with the below variables.
+_ISSUE_DESCRIPTION_HEADER = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+
+
+def create_or_comment_issue(
+    title: str,
+    description: str,
+    labels: Optional[List] = None,
+    issue_number: Optional[int] = None) -> Tuple[int, str]:
+  """
+  Create an issue with title, description with a label.
+  If an issue is already created and is open,
+  then comment on the issue instead of creating a duplicate issue.
+
+  Args:
+    title:  GitHub issue title.
+    description: GitHub issue description.
+    labels: Labels used to tag the GitHub issue.
+    issue_number: GitHub issue number used to find the already created issue.
+  """
+  if issue_number:
+    commented_on_issue, comment_url = comment_on_issue(
+      issue_number=issue_number,
+      comment_description=description)
+    if commented_on_issue:
+      return issue_number, comment_url
+
+  # Issue number was not provided or issue with provided number
+  # is closed. In that case, create a new issue.
+  url = "https://api.github.com/repos/{}/{}/issues".format(
+      _BEAM_REPO_OWNER, _BEAM_REPO_NAME)
+  data = {
+      'owner': _BEAM_REPO_OWNER,
+      'repo': _BEAM_REPO_NAME,
+      'title': title,
+      'body': description,
+  }
+  if labels:
+    data['labels'] = labels
+  response = requests.post(
+      url=url, data=json.dumps(data), headers=_HEADERS).json()
+  return response['number'], response['html_url']
+
+
+def comment_on_issue(issue_number: int,
+                     comment_description: str) -> Tuple[bool, Optional[str]]:
+  """
+  This method looks for an issue with provided issue_number. If an open
+  issue is found, comment on the open issue with provided description else
+  do nothing.
+
+  Args:
+    issue_number: A GitHub issue number.
+    comment_description: If an issue with issue_number is open,
+      then comment on the issue with the using comment_description.
+  """
+  url = 'https://api.github.com/repos/{}/{}/issues/{}'.format(
+      _BEAM_REPO_OWNER, _BEAM_REPO_NAME, issue_number)
+  open_issue_response = requests.get(
+      url,
+      json.dumps({
+          'owner': _BEAM_REPO_OWNER,
+          'repo': _BEAM_REPO_NAME,
+          'issue_number': issue_number
+      }),
+      headers=_HEADERS)
+  status_code = open_issue_response.status_code
+  open_issue_response = open_issue_response.json()
+  if status_code == 200 and open_issue_response['state'] == 'open':
+    data = {
+        'owner': _BEAM_REPO_OWNER,
+        'repo': _BEAM_REPO_NAME,
+        'body': comment_description,
+        issue_number: issue_number,
+    }
+    response = requests.post(
+        open_issue_response['comments_url'], json.dumps(data), 
headers=_HEADERS)
+    return True, response.json()['html_url']
+
+  return False, None
+
+
+def get_issue_description(
+    metric_name: str,
+    timestamps: List,
+    metric_values: List,
+    change_point_index: int,
+    max_results_to_display: int = 5) -> str:
+  """
+  Args:
+   metric_name: Metric name used for the Change Point Analysis.
+   timestamps: Timestamps of the metrics when they were published to the
+    Database.
+   metric_values: Values of the metric for the previous runs.
+   change_point_index: Index for the change point. The element in the
+    index of the metric_values would be the change point.
+   max_results_to_display: Max number of results to display from the change
+    point index, in both directions of the change point index.
+
+  Returns:
+    str: Description used to fill the GitHub issues description.
+  """
+
+  # TODO: Add mean and median before and after the changepoint index.
+  indices_to_display = []
+  upper_bound = min(
+      change_point_index + max_results_to_display, len(metric_values))
+  for i in range(change_point_index, upper_bound):
+    indices_to_display.append(i)
+  lower_bound = max(0, change_point_index - max_results_to_display)
+  for i in range(lower_bound, change_point_index):
+    indices_to_display.append(i)
+  indices_to_display.sort()
+  description = _ISSUE_DESCRIPTION_HEADER.format(metric_name) + 2 * '\n'
+  for i in indices_to_display:

Review Comment:
   `for i[ndex_to_display] in range(lower_bound, upper_bound):`...



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -240,149 +197,156 @@ def read_test_config(config_file_path: str) -> Dict:
   return config
 
 
-def run(args) -> None:
+def run(config_file_path: str = None) -> None:
   """
   run is the entry point to run change point analysis on test metric
   data, which is read from config file, and if there is a performance
   regression observed for a test, an alert will filed with GitHub Issues.
 
-  The config file is provide as command line argument. If no config file was
-  provided on cmd line, the default config file will be used.
-
-  For each test is config yaml file, if the source is the big_query,
-  the expected data required to run the change point analysis are
-  test_name, metrics_dataset, metrics_table, project, metric_name.
+  If config_file_path is None, then the run method will use default
+  config file to read the required perf test parameters.
 
   """
-  config_file_path = args.config_file_path
   if config_file_path is None:
     config_file_path = os.path.join(
         os.path.dirname(os.path.abspath(__file__)), 'tests_config.yaml')
 
   tests_config: Dict[Dict[str, Any]] = read_test_config(config_file_path)
 
-  # change_point_sibling_distance, change_point_to_recent_run_window can be
-  # defined in the config file for each test whihc are used
+  # min_runs_between_change_points, num_runs_in_change_point_window can be
+  # defined in the config file for each test which are used
   # to avoid filing GitHub issues for duplicate change points. Please take
   # a look at the README for more information on the parameters defined in the
   # config file.
-  for _, params in tests_config.items():
-    metric_name = params['metric_name']
-    # replace . with _ in test_name. This test name would be used later
-    # as a BQ table name and the BQ table doesn't accept . in the name.
-    test_name = params['test_name'].replace('.', '_')
-    if params['source'] == 'big_query':
-      metric_data: pd.DataFrame = FetchMetrics.fetch_from_bq(
-          project_name=params['project'],
-          dataset=params['metrics_dataset'],
-          table=params['metrics_table'],
-          metric_name=metric_name)
-    else:
-      # (TODO): Implement fetching metric_data from InfluxDB.
-      params = None
-    assert params is not None
-
-    labels = params['labels']
-    change_point_sibling_distance = params['change_point_sibling_distance']
-    change_point_to_recent_run_window = params[
-        'change_point_to_recent_run_window']
-
-    metric_values = metric_data[load_test_metrics_utils.VALUE_LABEL]
-    timestamps = metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL]
-
-    # run change point analysis on the metric_values using edivisive means
-    cp_analyzer = ChangePointAnalysis(
-        metric_name=metric_name, data=metric_values)
-
-    change_points_idx = cp_analyzer.edivisive_means()
-    # No change point found. Continue on to the next test.
-    if not change_points_idx:
-      continue
-
-    # always consider the latest change points
-    change_points_idx.sort(reverse=True)
-    change_point_index = change_points_idx[0]
-    change_point_timestamp = timestamps[change_point_index]
-
-    # check if the change point lies in the valid window.
-    # window - Number of runs between the
-    # change_point_to_recent_run_window run and the most recent run.
-    if not is_change_point_in_valid_window(change_point_to_recent_run_window,
-                                           change_point_index):
-      # change point lies outside the window from the recent run.
-      # Ignore this change point.
-      logging.info(
-          'Performance regression found for the test: %s. '
-          'but not creating an alert since the Change Point '
-          'lies outside the '
-          'change_point_to_recent_run_window distance' % test_name)
-      continue
-
-    # check for sibling change point. Sibling change point is a change
-    # point that lies in the distance of change_point_sibling_distance
-    # in both directions from the current change point index.
-    # Here, distance can be interpreted as number of runs between two change
-    # points. The idea here is that sibling change point will also point to
-    # the same performance regression.
-
-    create_alert, last_created_issue_number = (
-      has_sibling_change_point(
-        change_point_index=change_point_index,
-        change_point_sibling_distance=change_point_sibling_distance,
-        metric_values=metric_values,
-        metric_name=metric_name,
-        test_name=test_name,
-        change_point_timestamp=change_point_timestamp
-      )
-    )
-
-    logging.info(
-        "Create performance alert for the "
-        "test %s: %s" % (test_name, create_alert))
-
-    if create_alert:
-      # get the issue description for the creating GH issue or
-      # to comment on open GH issue.
-      issue_description = get_issue_description(
+  for test_id, params in tests_config.items():
+    try:
+      metric_name = params['metric_name']
+      # replace . with _ in test_name. This test name would be used later
+      # as a BQ table name and the BQ table doesn't accept . in the name.
+      test_name = params['test_name'].replace('.', '_') + f'_{metric_name}'
+      if params['source'] == 'big_query':
+        metric_data: pd.DataFrame = BigQueryMetricsFetcher().get_metrics(
+            project_name=params['project'],
+            dataset=params['metrics_dataset'],
+            table=params['metrics_table'],
+            metric_name=metric_name)
+      else:
+        # (TODO): Implement fetching metric_data from InfluxDB.
+        raise ValueError(
+            'For change point analysis, only big_query is'
+            'accepted as source.')
+
+      labels = params['labels']
+      min_runs_between_change_points = params['min_runs_between_change_points']
+      num_runs_in_change_point_window = params[
+          'num_runs_in_change_point_window']
+
+      metric_values = metric_data[load_test_metrics_utils.VALUE_LABEL]

Review Comment:
   it may be a good idea to check whether the metric is empty and fail if it 
is. it may likely mean that there is a typo in the config or test never ran.



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,372 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+from dataclasses import asdict
+from dataclasses import dataclass
+import logging
+import os
+import uuid
+from datetime import datetime
+from datetime import timezone
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import 
create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import 
get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsFetcher
+from signal_processing_algorithms.energy_statistics.energy_statistics import 
e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGE_POINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+_NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+_NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS = 100
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression or Improvement: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+@dataclass
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  issue_timestamp: pd.Timestamp
+  change_point_timestamp: pd.Timestamp
+  test_name: str
+  metric_name: str
+  issue_number: int
+  issue_url: str
+  test_id: str
+  change_point: float
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],

Review Comment:
   this is way too complex for what we are passing for out purposes.



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -240,149 +197,156 @@ def read_test_config(config_file_path: str) -> Dict:
   return config
 
 
-def run(args) -> None:
+def run(config_file_path: str = None) -> None:
   """
   run is the entry point to run change point analysis on test metric
   data, which is read from config file, and if there is a performance
   regression observed for a test, an alert will filed with GitHub Issues.
 
-  The config file is provide as command line argument. If no config file was
-  provided on cmd line, the default config file will be used.
-
-  For each test is config yaml file, if the source is the big_query,
-  the expected data required to run the change point analysis are
-  test_name, metrics_dataset, metrics_table, project, metric_name.
+  If config_file_path is None, then the run method will use default
+  config file to read the required perf test parameters.
 
   """
-  config_file_path = args.config_file_path
   if config_file_path is None:
     config_file_path = os.path.join(
         os.path.dirname(os.path.abspath(__file__)), 'tests_config.yaml')
 
   tests_config: Dict[Dict[str, Any]] = read_test_config(config_file_path)
 
-  # change_point_sibling_distance, change_point_to_recent_run_window can be
-  # defined in the config file for each test whihc are used
+  # min_runs_between_change_points, num_runs_in_change_point_window can be
+  # defined in the config file for each test which are used
   # to avoid filing GitHub issues for duplicate change points. Please take
   # a look at the README for more information on the parameters defined in the
   # config file.
-  for _, params in tests_config.items():
-    metric_name = params['metric_name']
-    # replace . with _ in test_name. This test name would be used later
-    # as a BQ table name and the BQ table doesn't accept . in the name.
-    test_name = params['test_name'].replace('.', '_')
-    if params['source'] == 'big_query':
-      metric_data: pd.DataFrame = FetchMetrics.fetch_from_bq(
-          project_name=params['project'],
-          dataset=params['metrics_dataset'],
-          table=params['metrics_table'],
-          metric_name=metric_name)
-    else:
-      # (TODO): Implement fetching metric_data from InfluxDB.
-      params = None
-    assert params is not None
-
-    labels = params['labels']
-    change_point_sibling_distance = params['change_point_sibling_distance']
-    change_point_to_recent_run_window = params[
-        'change_point_to_recent_run_window']
-
-    metric_values = metric_data[load_test_metrics_utils.VALUE_LABEL]
-    timestamps = metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL]
-
-    # run change point analysis on the metric_values using edivisive means
-    cp_analyzer = ChangePointAnalysis(
-        metric_name=metric_name, data=metric_values)
-
-    change_points_idx = cp_analyzer.edivisive_means()
-    # No change point found. Continue on to the next test.
-    if not change_points_idx:
-      continue
-
-    # always consider the latest change points
-    change_points_idx.sort(reverse=True)
-    change_point_index = change_points_idx[0]
-    change_point_timestamp = timestamps[change_point_index]
-
-    # check if the change point lies in the valid window.
-    # window - Number of runs between the
-    # change_point_to_recent_run_window run and the most recent run.
-    if not is_change_point_in_valid_window(change_point_to_recent_run_window,
-                                           change_point_index):
-      # change point lies outside the window from the recent run.
-      # Ignore this change point.
-      logging.info(
-          'Performance regression found for the test: %s. '
-          'but not creating an alert since the Change Point '
-          'lies outside the '
-          'change_point_to_recent_run_window distance' % test_name)
-      continue
-
-    # check for sibling change point. Sibling change point is a change
-    # point that lies in the distance of change_point_sibling_distance
-    # in both directions from the current change point index.
-    # Here, distance can be interpreted as number of runs between two change
-    # points. The idea here is that sibling change point will also point to
-    # the same performance regression.
-
-    create_alert, last_created_issue_number = (
-      has_sibling_change_point(
-        change_point_index=change_point_index,
-        change_point_sibling_distance=change_point_sibling_distance,
-        metric_values=metric_values,
-        metric_name=metric_name,
-        test_name=test_name,
-        change_point_timestamp=change_point_timestamp
-      )
-    )
-
-    logging.info(
-        "Create performance alert for the "
-        "test %s: %s" % (test_name, create_alert))
-
-    if create_alert:
-      # get the issue description for the creating GH issue or
-      # to comment on open GH issue.
-      issue_description = get_issue_description(
+  for test_id, params in tests_config.items():
+    try:
+      metric_name = params['metric_name']
+      # replace . with _ in test_name. This test name would be used later
+      # as a BQ table name and the BQ table doesn't accept . in the name.
+      test_name = params['test_name'].replace('.', '_') + f'_{metric_name}'
+      if params['source'] == 'big_query':
+        metric_data: pd.DataFrame = BigQueryMetricsFetcher().get_metrics(
+            project_name=params['project'],
+            dataset=params['metrics_dataset'],
+            table=params['metrics_table'],
+            metric_name=metric_name)
+      else:
+        # (TODO): Implement fetching metric_data from InfluxDB.
+        raise ValueError(
+            'For change point analysis, only big_query is'
+            'accepted as source.')
+
+      labels = params['labels']
+      min_runs_between_change_points = params['min_runs_between_change_points']
+      num_runs_in_change_point_window = params[
+          'num_runs_in_change_point_window']
+
+      metric_values = metric_data[load_test_metrics_utils.VALUE_LABEL]
+      timestamps = metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL]
+
+      cp_analyzer = ChangePointAnalysis(
+          metric_name=metric_name, data=metric_values)
+
+      change_points_idx = cp_analyzer.edivisive_means()
+      if not change_points_idx:
+        continue
+
+      # Consider the latest change points to observe the latest perf alerts.
+      change_points_idx.sort(reverse=True)
+      change_point_index = change_points_idx[0]
+      change_point_timestamp = timestamps[change_point_index]
+
+      # check if the change point lies in the valid window.
+      # window - Number of runs between the
+      # num_runs_in_change_point_window run and the most recent run.
+      if not is_change_point_in_valid_window(num_runs_in_change_point_window,
+                                             change_point_index):
+        logging.info(
+            'Performance regression found for the test: %s. '
+            'but not creating an alert since the Change Point '
+            'lies outside the '
+            'num_runs_in_change_point_window distance.' % test_name)
+        continue
+
+      # Look for an existing GitHub issue related to the current change point.
+      # It can be interpreted sibling change point.
+      # Sibling change point is a change point that lies in the distance of
+      # min_runs_between_change_points in both directions from the current
+      # change point index.
+      # Here, distance can be interpreted as number of runs between two change
+      # points. The idea here is that sibling change point will also point to
+      # the same performance regression.
+      min_timestamp_index = min(

Review Comment:
   are `min` and `max` variables mixed up? which timestamp will be newer/larger?



##########
sdks/python/apache_beam/testing/analyzers/github_issues_utils.py:
##########
@@ -0,0 +1,176 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+import logging
+import os
+from typing import List
+from typing import Optional
+from typing import Tuple
+
+import pandas as pd
+import requests
+
+try:
+  _GITHUB_TOKEN = os.environ['GITHUB_TOKEN']
+except KeyError as e:
+  _GITHUB_TOKEN = None
+  logging.warning(
+      'A Github Personal Access token is required '
+      'to create Github Issues.')
+
+_BEAM_GITHUB_REPO_OWNER = 'AnandInguva'
+_BEAM_GITHUB_REPO_NAME = 'beam'
+_HEADERS = {
+    "Authorization": 'token {}'.format(_GITHUB_TOKEN),
+    "Accept": "application/vnd.github+json"
+}
+
+# Fill the GitHub issue description with the below variables.
+_ISSUE_DESCRIPTION_HEADER = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+_AWAITING_TRIAGE_LABEL = 'awaiting triage'
+
+
+def create_or_comment_issue(

Review Comment:
   Let's try to have functions that have a single, clear purpose when possible. 
   I would extract the branching logic into  
`report_changepoint_to_issue_tracker()`, then have two methods `create_issue` 
and `add_comment`/`comment_on_issue`. 



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,372 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+from dataclasses import asdict
+from dataclasses import dataclass
+import logging
+import os
+import uuid
+from datetime import datetime
+from datetime import timezone
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import 
create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import 
get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsFetcher
+from signal_processing_algorithms.energy_statistics.energy_statistics import 
e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGE_POINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+_NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+_NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS = 100
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression or Improvement: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+@dataclass
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  issue_timestamp: pd.Timestamp
+  change_point_timestamp: pd.Timestamp
+  test_name: str
+  metric_name: str
+  issue_number: int
+  issue_url: str
+  test_id: str
+  change_point: float
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+     For more information, please look at
+     https://pypi.org/project/signal-processing-algorithms/
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    num_runs_in_change_point_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  return num_runs_in_change_point_window >= change_point_index
+
+
+def find_existing_issue(

Review Comment:
   The logic should be simpler if we split the responsibilities of this method. 
Have it only find whether the issues exists or not. Then, decide whether to 
comment on it elsewhere. Note that  the query to DB is not necessary if 
```(sibling_change_point_min_timestamp <= change_point_timestamp <=
         sibling_change_point_max_timestamp)```, but we currently do it anyway.
   



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,372 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+from dataclasses import asdict
+from dataclasses import dataclass
+import logging
+import os
+import uuid
+from datetime import datetime
+from datetime import timezone
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import 
create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import 
get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsFetcher
+from signal_processing_algorithms.energy_statistics.energy_statistics import 
e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGE_POINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+_NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+_NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS = 100
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression or Improvement: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+@dataclass
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  issue_timestamp: pd.Timestamp
+  change_point_timestamp: pd.Timestamp
+  test_name: str
+  metric_name: str
+  issue_number: int
+  issue_url: str
+  test_id: str
+  change_point: float
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+     For more information, please look at
+     https://pypi.org/project/signal-processing-algorithms/
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    num_runs_in_change_point_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  return num_runs_in_change_point_window >= change_point_index
+
+
+def find_existing_issue(
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: pd.Timestamp,
+    sibling_change_point_min_timestamp: pd.Timestamp,
+    sibling_change_point_max_timestamp: pd.Timestamp,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the most recent GitHub issue created for change points for this
+  test+metric in sibling change point min and max timestamps window.
+  Returns a boolean and an issue ID whether the issue needs to be updated.
+  """
+  query_template = f"""
+  SELECT * FROM {_BQ_PROJECT_NAME}.{_BQ_DATASET}.{test_name}
+  WHERE {METRIC_NAME} = '{metric_name}'
+  ORDER BY {ISSUE_CREATION_TIMESTAMP_LABEL} DESC
+  LIMIT 1
+  """
+  try:
+    df = BigQueryMetricsFetcher().get_metrics(
+        query_template=query_template,
+        limit=_NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test+metric.
+    return True, None
+  issue_number = df[ISSUE_NUMBER].tolist()[0]
+
+  if (sibling_change_point_min_timestamp <= change_point_timestamp <=
+      sibling_change_point_max_timestamp):
+    return False, None
+  return True, issue_number
+
+
+def read_test_config(config_file_path: str) -> Dict:
+  """
+  Reads the config file in which the data required to
+  run the change point analysis is specified.
+  """
+  with open(config_file_path, 'r') as stream:
+    config = yaml.safe_load(stream)
+  return config
+
+
+def run(config_file_path: str = None) -> None:
+  """
+  run is the entry point to run change point analysis on test metric
+  data, which is read from config file, and if there is a performance
+  regression observed for a test, an alert will filed with GitHub Issues.
+
+  If config_file_path is None, then the run method will use default
+  config file to read the required perf test parameters.
+
+  """
+  if config_file_path is None:
+    config_file_path = os.path.join(
+        os.path.dirname(os.path.abspath(__file__)), 'tests_config.yaml')
+
+  tests_config: Dict[Dict[str, Any]] = read_test_config(config_file_path)

Review Comment:
   I don't follow the `Dict[Dict[str, Any]]` hint. Dicts should have keys and 
values, outer dict only has 1 param.



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,372 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+from dataclasses import asdict
+from dataclasses import dataclass
+import logging
+import os
+import uuid
+from datetime import datetime
+from datetime import timezone
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import 
create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import 
get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsFetcher
+from signal_processing_algorithms.energy_statistics.energy_statistics import 
e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGE_POINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+_NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+_NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS = 100
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression or Improvement: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+@dataclass
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  issue_timestamp: pd.Timestamp
+  change_point_timestamp: pd.Timestamp
+  test_name: str
+  metric_name: str
+  issue_number: int
+  issue_url: str
+  test_id: str
+  change_point: float
+
+
+class ChangePointAnalysis:

Review Comment:
   I don't see a need for this class. 
   `metric_name` is not used. we are deferring the call to e_divisive() with 
exact same params. 



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import 
create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import 
get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import 
e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:

Review Comment:
   if we are using defaults, and they are good enough, I wouldn't pass them  
(after you remove the class).



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,372 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+from dataclasses import asdict
+from dataclasses import dataclass
+import logging
+import os
+import uuid
+from datetime import datetime
+from datetime import timezone
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import 
create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import 
get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsFetcher
+from signal_processing_algorithms.energy_statistics.energy_statistics import 
e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGE_POINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+_NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+_NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS = 100
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression or Improvement: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+@dataclass
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  issue_timestamp: pd.Timestamp
+  change_point_timestamp: pd.Timestamp
+  test_name: str
+  metric_name: str
+  issue_number: int
+  issue_url: str
+  test_id: str
+  change_point: float
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+     For more information, please look at
+     https://pypi.org/project/signal-processing-algorithms/
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    num_runs_in_change_point_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  return num_runs_in_change_point_window >= change_point_index
+
+
+def find_existing_issue(
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: pd.Timestamp,
+    sibling_change_point_min_timestamp: pd.Timestamp,
+    sibling_change_point_max_timestamp: pd.Timestamp,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the most recent GitHub issue created for change points for this
+  test+metric in sibling change point min and max timestamps window.
+  Returns a boolean and an issue ID whether the issue needs to be updated.
+  """
+  query_template = f"""
+  SELECT * FROM {_BQ_PROJECT_NAME}.{_BQ_DATASET}.{test_name}
+  WHERE {METRIC_NAME} = '{metric_name}'
+  ORDER BY {ISSUE_CREATION_TIMESTAMP_LABEL} DESC
+  LIMIT 1
+  """
+  try:
+    df = BigQueryMetricsFetcher().get_metrics(
+        query_template=query_template,
+        limit=_NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test+metric.
+    return True, None
+  issue_number = df[ISSUE_NUMBER].tolist()[0]
+
+  if (sibling_change_point_min_timestamp <= change_point_timestamp <=
+      sibling_change_point_max_timestamp):
+    return False, None
+  return True, issue_number
+
+
+def read_test_config(config_file_path: str) -> Dict:
+  """
+  Reads the config file in which the data required to
+  run the change point analysis is specified.
+  """
+  with open(config_file_path, 'r') as stream:
+    config = yaml.safe_load(stream)
+  return config
+
+
+def run(config_file_path: str = None) -> None:

Review Comment:
   try to use helpers to break up this method for readability, as it is getting 
long and does many different things.
   
   for example, it might look like so:
   
   ```
   def run():
      tests = parse_config(...)
      for test in tests: 
          change_point = find_most_recent_change_point(test)
          if not_reported(change_point):
             report_change_point_on_issue_tracker(change_point) 
   ```



##########
sdks/python/apache_beam/testing/analyzers/github_issues_utils.py:
##########
@@ -123,9 +128,18 @@ def comment_on_issue(issue_number: int,
   return False, None
 
 
+def add_label_to_issue(issue_number: int, labels: List[str] = None):
+  url = 'https://api.github.com/repos/{}/{}/issues/{}/labels'.format(
+      _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME, issue_number)
+  data = {}
+  if labels:
+    data['labels'] = labels
+    requests.post(url, json.dumps(data), headers=_HEADERS)

Review Comment:
   looks like adding an extra variable not necessary here given how simple it 
is.



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,372 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+from dataclasses import asdict
+from dataclasses import dataclass
+import logging
+import os
+import uuid
+from datetime import datetime
+from datetime import timezone
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import 
create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import 
get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsFetcher
+from signal_processing_algorithms.energy_statistics.energy_statistics import 
e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGE_POINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+_NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+_NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS = 100
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression or Improvement: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+@dataclass
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  issue_timestamp: pd.Timestamp
+  change_point_timestamp: pd.Timestamp
+  test_name: str
+  metric_name: str
+  issue_number: int
+  issue_url: str
+  test_id: str
+  change_point: float
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+     For more information, please look at
+     https://pypi.org/project/signal-processing-algorithms/
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    num_runs_in_change_point_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  return num_runs_in_change_point_window >= change_point_index
+
+
+def find_existing_issue(
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: pd.Timestamp,
+    sibling_change_point_min_timestamp: pd.Timestamp,
+    sibling_change_point_max_timestamp: pd.Timestamp,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the most recent GitHub issue created for change points for this
+  test+metric in sibling change point min and max timestamps window.
+  Returns a boolean and an issue ID whether the issue needs to be updated.
+  """
+  query_template = f"""
+  SELECT * FROM {_BQ_PROJECT_NAME}.{_BQ_DATASET}.{test_name}
+  WHERE {METRIC_NAME} = '{metric_name}'
+  ORDER BY {ISSUE_CREATION_TIMESTAMP_LABEL} DESC
+  LIMIT 1
+  """
+  try:
+    df = BigQueryMetricsFetcher().get_metrics(
+        query_template=query_template,
+        limit=_NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test+metric.
+    return True, None
+  issue_number = df[ISSUE_NUMBER].tolist()[0]
+
+  if (sibling_change_point_min_timestamp <= change_point_timestamp <=
+      sibling_change_point_max_timestamp):
+    return False, None
+  return True, issue_number
+
+
+def read_test_config(config_file_path: str) -> Dict:
+  """
+  Reads the config file in which the data required to
+  run the change point analysis is specified.
+  """
+  with open(config_file_path, 'r') as stream:
+    config = yaml.safe_load(stream)
+  return config
+
+
+def run(config_file_path: str = None) -> None:
+  """
+  run is the entry point to run change point analysis on test metric
+  data, which is read from config file, and if there is a performance
+  regression observed for a test, an alert will filed with GitHub Issues.
+
+  If config_file_path is None, then the run method will use default
+  config file to read the required perf test parameters.
+
+  """
+  if config_file_path is None:
+    config_file_path = os.path.join(
+        os.path.dirname(os.path.abspath(__file__)), 'tests_config.yaml')
+
+  tests_config: Dict[Dict[str, Any]] = read_test_config(config_file_path)
+
+  # min_runs_between_change_points, num_runs_in_change_point_window can be

Review Comment:
   the comment seems out of place. you move add the reference to README in the 
docstring for `run` and remove the rest.



##########
sdks/python/apache_beam/testing/analyzers/github_issues_utils.py:
##########
@@ -0,0 +1,163 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+import logging
+import os
+from typing import List
+from typing import Optional
+from typing import Tuple
+
+import requests
+
+try:
+  _GITHUB_TOKEN = os.environ['GITHUB_TOKEN']
+except KeyError as e:
+  _GITHUB_TOKEN = None
+  logging.warning(
+      'A Github Personal Access token is required '
+      'to create Github Issues.')
+
+_BEAM_REPO_OWNER = 'AnandInguva'
+_BEAM_REPO_NAME = 'beam'
+_HEADERS = {
+    "Authorization": 'token {}'.format(_GITHUB_TOKEN),
+    "Accept": "application/vnd.github+json"
+}
+
+# Fill the GitHub issue description with the below variables.
+_ISSUE_DESCRIPTION_HEADER = """

Review Comment:
   I'd like to have a way to easily find current values, for example, by 
looking up a graph or running a query. Ideally with some pointers to exact 
executions. We can discuss offline how to do that and can add this later. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to