AnandInguva commented on code in PR #23931:
URL: https://github.com/apache/beam/pull/23931#discussion_r1027233895


##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import 
create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import 
get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import 
e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the sibling change point index. If not,
+  returns the original change point index.
+
+  Sibling change point is a neighbor of latest
+  change point, within the distance of change_point_sibling_distance.
+  For sibling change point, a GitHub issue is already created.
+  """
+
+  # Search backward from the current change point
+  sibling_indexes_to_search = []
+  for i in range(change_point_index - 1, -1, -1):
+    if change_point_index - i <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Search forward from the current change point
+  for i in range(change_point_index + 1, len(metric_values)):
+    if i - change_point_index <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Look for change points within change_point_sibling_distance.
+  # Return the first change point found.
+  query_template = """
+  SELECT * FROM {project}.{dataset}.{table}
+  WHERE {metric_name_id} = '{metric_name}'
+  ORDER BY {timestamp} DESC
+  LIMIT 10
+  """.format(
+      project=_BQ_PROJECT_NAME,
+      dataset=_BQ_DATASET,
+      metric_name_id=METRIC_NAME,
+      metric_name=metric_name,
+      timestamp=ISSUE_CREATION_TIMESTAMP_LABEL,
+      table=test_name)
+  try:
+    df = FetchMetrics.fetch_from_bq(query_template=query_template)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test:metric.
+    return True, None
+  previous_change_point = df[CHANGE_POINT_LABEL].tolist()[0]
+  previous_change_point_timestamp = df[CHANGEPOINT_TIMESTAMP_LABEL].tolist()[0]
+  issue_number = df[ISSUE_NUMBER].tolist()[0]
+
+  # Check if the current change point is equal to the
+  # latest reported change point on GitHub Issues using the
+  # value and the timestamp. If it's the same,
+  # don't create or comment on issues.
+  if ((previous_change_point == metric_values[change_point_index]) and
+      (previous_change_point_timestamp == change_point_timestamp)):
+    return False, None
+
+  alert = True
+  for sibling_index in sibling_indexes_to_search:
+    if metric_values[sibling_index] == previous_change_point:
+      alert = False
+      return alert, None
+  return alert, issue_number
+
+
+def read_test_config(config_file_path: str) -> Dict:
+  """
+  Reads the config file in which the data required to
+  run the change point analysis is specified.
+  """
+  with open(config_file_path, 'r') as stream:
+    config = yaml.safe_load(stream)
+  return config
+
+
+def run(args) -> None:
+  """
+  run is the entry point to run change point analysis on test metric
+  data, which is read from config file, and if there is a performance
+  regression observed for a test, an alert will filed with GitHub Issues.
+
+  The config file is provide as command line argument. If no config file was
+  provided on cmd line, the default config file will be used.
+
+  For each test in config yaml file, if the source is the big_query,
+  the expected keys for single test that are required to run the change point
+   analysis are test_name, metrics_dataset, metrics_table, project, 
metric_name.
+
+  """
+  config_file_path = args.config_file_path
+  if config_file_path is None:
+    config_file_path = os.path.join(
+        os.path.dirname(os.path.abspath(__file__)), 'tests_config.yaml')
+
+  tests_config: Dict[Dict[str, Any]] = read_test_config(config_file_path)
+
+  # change_point_sibling_distance, change_point_to_recent_run_window can be
+  # defined in the config file for each test whihc are used
+  # to avoid filing GitHub issues for duplicate change points. Please take
+  # a look at the README for more information on the parameters defined in the
+  # config file.
+  for _, params in tests_config.items():
+    metric_name = params['metric_name']
+    # replace . with _ in test_name. This test name would be used later
+    # as a BQ table name and the BQ table doesn't accept . in the name.
+    test_name = params['test_name'].replace('.', '_')
+    if params['source'] == 'big_query':
+      metric_data: pd.DataFrame = FetchMetrics.fetch_from_bq(
+          project_name=params['project'],
+          dataset=params['metrics_dataset'],
+          table=params['metrics_table'],
+          metric_name=metric_name)
+    else:
+      # (TODO): Implement fetching metric_data from InfluxDB.
+      params = None
+    assert params is not None
+
+    labels = params['labels']
+    change_point_sibling_distance = params['change_point_sibling_distance']
+    change_point_to_recent_run_window = params[
+        'change_point_to_recent_run_window']
+
+    metric_values = metric_data[load_test_metrics_utils.VALUE_LABEL]
+    timestamps = metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL]
+
+    # run change point analysis on the metric_values using edivisive means
+    cp_analyzer = ChangePointAnalysis(
+        metric_name=metric_name, data=metric_values)

Review Comment:
   Fetching the a larger set(greater than N runs) of change points will yield 
in robust result. 
   
   For example, we fetch 100 samples, run edivisive and then only pass the last 
N runs data for sibling change point detection. 



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import 
create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import 
get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import 
e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the sibling change point index. If not,
+  returns the original change point index.
+
+  Sibling change point is a neighbor of latest
+  change point, within the distance of change_point_sibling_distance.
+  For sibling change point, a GitHub issue is already created.
+  """
+
+  # Search backward from the current change point
+  sibling_indexes_to_search = []
+  for i in range(change_point_index - 1, -1, -1):
+    if change_point_index - i <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Search forward from the current change point
+  for i in range(change_point_index + 1, len(metric_values)):
+    if i - change_point_index <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Look for change points within change_point_sibling_distance.
+  # Return the first change point found.
+  query_template = """
+  SELECT * FROM {project}.{dataset}.{table}
+  WHERE {metric_name_id} = '{metric_name}'
+  ORDER BY {timestamp} DESC
+  LIMIT 10
+  """.format(
+      project=_BQ_PROJECT_NAME,
+      dataset=_BQ_DATASET,
+      metric_name_id=METRIC_NAME,
+      metric_name=metric_name,
+      timestamp=ISSUE_CREATION_TIMESTAMP_LABEL,
+      table=test_name)
+  try:
+    df = FetchMetrics.fetch_from_bq(query_template=query_template)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test:metric.
+    return True, None
+  previous_change_point = df[CHANGE_POINT_LABEL].tolist()[0]
+  previous_change_point_timestamp = df[CHANGEPOINT_TIMESTAMP_LABEL].tolist()[0]
+  issue_number = df[ISSUE_NUMBER].tolist()[0]
+
+  # Check if the current change point is equal to the
+  # latest reported change point on GitHub Issues using the
+  # value and the timestamp. If it's the same,
+  # don't create or comment on issues.
+  if ((previous_change_point == metric_values[change_point_index]) and
+      (previous_change_point_timestamp == change_point_timestamp)):
+    return False, None
+
+  alert = True
+  for sibling_index in sibling_indexes_to_search:
+    if metric_values[sibling_index] == previous_change_point:
+      alert = False
+      return alert, None
+  return alert, issue_number
+
+
+def read_test_config(config_file_path: str) -> Dict:
+  """
+  Reads the config file in which the data required to
+  run the change point analysis is specified.
+  """
+  with open(config_file_path, 'r') as stream:
+    config = yaml.safe_load(stream)
+  return config
+
+
+def run(args) -> None:
+  """
+  run is the entry point to run change point analysis on test metric
+  data, which is read from config file, and if there is a performance
+  regression observed for a test, an alert will filed with GitHub Issues.
+
+  The config file is provide as command line argument. If no config file was
+  provided on cmd line, the default config file will be used.
+
+  For each test in config yaml file, if the source is the big_query,
+  the expected keys for single test that are required to run the change point
+   analysis are test_name, metrics_dataset, metrics_table, project, 
metric_name.
+
+  """
+  config_file_path = args.config_file_path
+  if config_file_path is None:
+    config_file_path = os.path.join(
+        os.path.dirname(os.path.abspath(__file__)), 'tests_config.yaml')
+
+  tests_config: Dict[Dict[str, Any]] = read_test_config(config_file_path)
+
+  # change_point_sibling_distance, change_point_to_recent_run_window can be
+  # defined in the config file for each test whihc are used
+  # to avoid filing GitHub issues for duplicate change points. Please take
+  # a look at the README for more information on the parameters defined in the
+  # config file.
+  for _, params in tests_config.items():
+    metric_name = params['metric_name']
+    # replace . with _ in test_name. This test name would be used later
+    # as a BQ table name and the BQ table doesn't accept . in the name.
+    test_name = params['test_name'].replace('.', '_')
+    if params['source'] == 'big_query':
+      metric_data: pd.DataFrame = FetchMetrics.fetch_from_bq(
+          project_name=params['project'],
+          dataset=params['metrics_dataset'],
+          table=params['metrics_table'],
+          metric_name=metric_name)
+    else:
+      # (TODO): Implement fetching metric_data from InfluxDB.
+      params = None
+    assert params is not None
+
+    labels = params['labels']
+    change_point_sibling_distance = params['change_point_sibling_distance']
+    change_point_to_recent_run_window = params[
+        'change_point_to_recent_run_window']
+
+    metric_values = metric_data[load_test_metrics_utils.VALUE_LABEL]
+    timestamps = metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL]
+
+    # run change point analysis on the metric_values using edivisive means
+    cp_analyzer = ChangePointAnalysis(
+        metric_name=metric_name, data=metric_values)
+
+    change_points_idx = cp_analyzer.edivisive_means()
+    # No change point found. Continue on to the next test.
+    if not change_points_idx:
+      continue
+
+    # always consider the latest change points

Review Comment:
   Earlier change points would likely reveal recent regressions/improvements. 
    Also, earlier change points might lead to sibling change points, in which 
case we can ignore. 



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import 
create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import 
get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import 
e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the sibling change point index. If not,
+  returns the original change point index.
+
+  Sibling change point is a neighbor of latest
+  change point, within the distance of change_point_sibling_distance.
+  For sibling change point, a GitHub issue is already created.
+  """
+
+  # Search backward from the current change point
+  sibling_indexes_to_search = []
+  for i in range(change_point_index - 1, -1, -1):
+    if change_point_index - i <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Search forward from the current change point
+  for i in range(change_point_index + 1, len(metric_values)):
+    if i - change_point_index <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Look for change points within change_point_sibling_distance.
+  # Return the first change point found.
+  query_template = """
+  SELECT * FROM {project}.{dataset}.{table}
+  WHERE {metric_name_id} = '{metric_name}'
+  ORDER BY {timestamp} DESC
+  LIMIT 10
+  """.format(
+      project=_BQ_PROJECT_NAME,
+      dataset=_BQ_DATASET,
+      metric_name_id=METRIC_NAME,
+      metric_name=metric_name,
+      timestamp=ISSUE_CREATION_TIMESTAMP_LABEL,
+      table=test_name)
+  try:
+    df = FetchMetrics.fetch_from_bq(query_template=query_template)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test:metric.
+    return True, None
+  previous_change_point = df[CHANGE_POINT_LABEL].tolist()[0]
+  previous_change_point_timestamp = df[CHANGEPOINT_TIMESTAMP_LABEL].tolist()[0]
+  issue_number = df[ISSUE_NUMBER].tolist()[0]
+
+  # Check if the current change point is equal to the
+  # latest reported change point on GitHub Issues using the
+  # value and the timestamp. If it's the same,
+  # don't create or comment on issues.
+  if ((previous_change_point == metric_values[change_point_index]) and
+      (previous_change_point_timestamp == change_point_timestamp)):
+    return False, None
+
+  alert = True
+  for sibling_index in sibling_indexes_to_search:

Review Comment:
   Makes sense. Changed it



##########
sdks/python/apache_beam/testing/analyzers/github_issues_utils.py:
##########
@@ -0,0 +1,163 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+import logging
+import os
+from typing import List
+from typing import Optional
+from typing import Tuple
+
+import requests
+
+try:
+  _GITHUB_TOKEN = os.environ['GITHUB_TOKEN']
+except KeyError as e:
+  _GITHUB_TOKEN = None
+  logging.warning(
+      'A Github Personal Access token is required '
+      'to create Github Issues.')
+
+_BEAM_REPO_OWNER = 'AnandInguva'
+_BEAM_REPO_NAME = 'beam'
+_HEADERS = {
+    "Authorization": 'token {}'.format(_GITHUB_TOKEN),
+    "Accept": "application/vnd.github+json"
+}
+
+# Fill the GitHub issue description with the below variables.
+_ISSUE_DESCRIPTION_HEADER = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+
+
+def create_or_comment_issue(
+    title: str,
+    description: str,
+    labels: Optional[List] = None,
+    issue_number: Optional[int] = None) -> Tuple[int, str]:
+  """
+  Create an issue with title, description with a label.
+  If an issue is already created and is open,
+  then comment on the issue instead of creating a duplicate issue.
+
+  Args:
+    title:  GitHub issue title.
+    description: GitHub issue description.
+    labels: Labels used to tag the GitHub issue.
+    issue_number: GitHub issue number used to find the already created issue.
+  """
+  if issue_number:
+    commented_on_issue, comment_url = comment_on_issue(
+      issue_number=issue_number,
+      comment_description=description)
+    if commented_on_issue:
+      return issue_number, comment_url
+
+  # Issue number was not provided or issue with provided number
+  # is closed. In that case, create a new issue.
+  url = "https://api.github.com/repos/{}/{}/issues".format(
+      _BEAM_REPO_OWNER, _BEAM_REPO_NAME)
+  data = {
+      'owner': _BEAM_REPO_OWNER,
+      'repo': _BEAM_REPO_NAME,
+      'title': title,
+      'body': description,
+  }
+  if labels:
+    data['labels'] = labels
+  response = requests.post(
+      url=url, data=json.dumps(data), headers=_HEADERS).json()
+  return response['number'], response['html_url']
+
+
+def comment_on_issue(issue_number: int,
+                     comment_description: str) -> Tuple[bool, Optional[str]]:
+  """
+  This method looks for an issue with provided issue_number. If an open
+  issue is found, comment on the open issue with provided description else
+  do nothing.
+
+  Args:
+    issue_number: A GitHub issue number.
+    comment_description: If an issue with issue_number is open,
+      then comment on the issue with the using comment_description.
+  """
+  url = 'https://api.github.com/repos/{}/{}/issues/{}'.format(
+      _BEAM_REPO_OWNER, _BEAM_REPO_NAME, issue_number)
+  open_issue_response = requests.get(
+      url,
+      json.dumps({
+          'owner': _BEAM_REPO_OWNER,
+          'repo': _BEAM_REPO_NAME,
+          'issue_number': issue_number
+      }),
+      headers=_HEADERS)
+  status_code = open_issue_response.status_code
+  open_issue_response = open_issue_response.json()
+  if status_code == 200 and open_issue_response['state'] == 'open':
+    data = {
+        'owner': _BEAM_REPO_OWNER,
+        'repo': _BEAM_REPO_NAME,
+        'body': comment_description,
+        issue_number: issue_number,
+    }
+    response = requests.post(
+        open_issue_response['comments_url'], json.dumps(data), 
headers=_HEADERS)
+    return True, response.json()['html_url']
+
+  return False, None
+
+
+def get_issue_description(
+    metric_name: str,
+    timestamps: List,
+    metric_values: List,
+    change_point_index: int,
+    max_results_to_display: int = 5) -> str:
+  """
+  Args:
+   metric_name: Metric name used for the Change Point Analysis.
+   timestamps: Timestamps of the metrics when they were published to the
+    Database.
+   metric_values: Values of the metric for the previous runs.
+   change_point_index: Index for the change point. The element in the
+    index of the metric_values would be the change point.
+   max_results_to_display: Max number of results to display from the change
+    point index, in both directions of the change point index.
+
+  Returns:
+    str: Description used to fill the GitHub issues description.
+  """
+
+  # TODO: Add mean and median before and after the changepoint index.
+  indices_to_display = []
+  upper_bound = min(
+      change_point_index + max_results_to_display, len(metric_values))
+  for i in range(change_point_index, upper_bound):
+    indices_to_display.append(i)
+  lower_bound = max(0, change_point_index - max_results_to_display)
+  for i in range(lower_bound, change_point_index):
+    indices_to_display.append(i)
+  indices_to_display.sort()
+  description = _ISSUE_DESCRIPTION_HEADER.format(metric_name) + 2 * '\n'
+  for i in indices_to_display:

Review Comment:
   Done



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import 
create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import 
get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import 
e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,

Review Comment:
   they are pandas.Timestamp. Changed it.



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import 
create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import 
get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import 
e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(

Review Comment:
   Done



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import 
create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import 
get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import 
e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the sibling change point index. If not,
+  returns the original change point index.
+
+  Sibling change point is a neighbor of latest
+  change point, within the distance of change_point_sibling_distance.
+  For sibling change point, a GitHub issue is already created.
+  """
+
+  # Search backward from the current change point
+  sibling_indexes_to_search = []
+  for i in range(change_point_index - 1, -1, -1):
+    if change_point_index - i <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Search forward from the current change point
+  for i in range(change_point_index + 1, len(metric_values)):
+    if i - change_point_index <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Look for change points within change_point_sibling_distance.
+  # Return the first change point found.
+  query_template = """
+  SELECT * FROM {project}.{dataset}.{table}
+  WHERE {metric_name_id} = '{metric_name}'
+  ORDER BY {timestamp} DESC
+  LIMIT 10
+  """.format(
+      project=_BQ_PROJECT_NAME,
+      dataset=_BQ_DATASET,
+      metric_name_id=METRIC_NAME,
+      metric_name=metric_name,
+      timestamp=ISSUE_CREATION_TIMESTAMP_LABEL,
+      table=test_name)
+  try:
+    df = FetchMetrics.fetch_from_bq(query_template=query_template)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test:metric.
+    return True, None
+  previous_change_point = df[CHANGE_POINT_LABEL].tolist()[0]
+  previous_change_point_timestamp = df[CHANGEPOINT_TIMESTAMP_LABEL].tolist()[0]
+  issue_number = df[ISSUE_NUMBER].tolist()[0]
+
+  # Check if the current change point is equal to the
+  # latest reported change point on GitHub Issues using the
+  # value and the timestamp. If it's the same,
+  # don't create or comment on issues.
+  if ((previous_change_point == metric_values[change_point_index]) and
+      (previous_change_point_timestamp == change_point_timestamp)):
+    return False, None
+
+  alert = True
+  for sibling_index in sibling_indexes_to_search:
+    if metric_values[sibling_index] == previous_change_point:
+      alert = False
+      return alert, None
+  return alert, issue_number
+
+
+def read_test_config(config_file_path: str) -> Dict:
+  """
+  Reads the config file in which the data required to
+  run the change point analysis is specified.
+  """
+  with open(config_file_path, 'r') as stream:
+    config = yaml.safe_load(stream)
+  return config
+
+
+def run(args) -> None:
+  """
+  run is the entry point to run change point analysis on test metric
+  data, which is read from config file, and if there is a performance
+  regression observed for a test, an alert will filed with GitHub Issues.
+
+  The config file is provide as command line argument. If no config file was
+  provided on cmd line, the default config file will be used.
+
+  For each test in config yaml file, if the source is the big_query,
+  the expected keys for single test that are required to run the change point
+   analysis are test_name, metrics_dataset, metrics_table, project, 
metric_name.
+
+  """
+  config_file_path = args.config_file_path
+  if config_file_path is None:
+    config_file_path = os.path.join(
+        os.path.dirname(os.path.abspath(__file__)), 'tests_config.yaml')
+
+  tests_config: Dict[Dict[str, Any]] = read_test_config(config_file_path)
+
+  # change_point_sibling_distance, change_point_to_recent_run_window can be
+  # defined in the config file for each test whihc are used
+  # to avoid filing GitHub issues for duplicate change points. Please take
+  # a look at the README for more information on the parameters defined in the
+  # config file.
+  for _, params in tests_config.items():
+    metric_name = params['metric_name']
+    # replace . with _ in test_name. This test name would be used later
+    # as a BQ table name and the BQ table doesn't accept . in the name.
+    test_name = params['test_name'].replace('.', '_')
+    if params['source'] == 'big_query':
+      metric_data: pd.DataFrame = FetchMetrics.fetch_from_bq(
+          project_name=params['project'],
+          dataset=params['metrics_dataset'],
+          table=params['metrics_table'],
+          metric_name=metric_name)
+    else:
+      # (TODO): Implement fetching metric_data from InfluxDB.
+      params = None
+    assert params is not None
+
+    labels = params['labels']
+    change_point_sibling_distance = params['change_point_sibling_distance']
+    change_point_to_recent_run_window = params[
+        'change_point_to_recent_run_window']
+
+    metric_values = metric_data[load_test_metrics_utils.VALUE_LABEL]
+    timestamps = metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL]
+
+    # run change point analysis on the metric_values using edivisive means
+    cp_analyzer = ChangePointAnalysis(
+        metric_name=metric_name, data=metric_values)
+
+    change_points_idx = cp_analyzer.edivisive_means()
+    # No change point found. Continue on to the next test.
+    if not change_points_idx:
+      continue
+
+    # always consider the latest change points
+    change_points_idx.sort(reverse=True)
+    change_point_index = change_points_idx[0]
+    change_point_timestamp = timestamps[change_point_index]
+
+    # check if the change point lies in the valid window.

Review Comment:
   Ack.



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import 
create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import 
get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import 
e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the sibling change point index. If not,
+  returns the original change point index.
+
+  Sibling change point is a neighbor of latest
+  change point, within the distance of change_point_sibling_distance.
+  For sibling change point, a GitHub issue is already created.
+  """
+
+  # Search backward from the current change point
+  sibling_indexes_to_search = []
+  for i in range(change_point_index - 1, -1, -1):
+    if change_point_index - i <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Search forward from the current change point
+  for i in range(change_point_index + 1, len(metric_values)):
+    if i - change_point_index <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Look for change points within change_point_sibling_distance.
+  # Return the first change point found.
+  query_template = """
+  SELECT * FROM {project}.{dataset}.{table}
+  WHERE {metric_name_id} = '{metric_name}'
+  ORDER BY {timestamp} DESC
+  LIMIT 10
+  """.format(
+      project=_BQ_PROJECT_NAME,
+      dataset=_BQ_DATASET,
+      metric_name_id=METRIC_NAME,
+      metric_name=metric_name,
+      timestamp=ISSUE_CREATION_TIMESTAMP_LABEL,
+      table=test_name)
+  try:
+    df = FetchMetrics.fetch_from_bq(query_template=query_template)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test:metric.
+    return True, None
+  previous_change_point = df[CHANGE_POINT_LABEL].tolist()[0]

Review Comment:
   This would be the sibling change point for the current change point. Yes it 
is a reading for a metric. I guess this could be float in some cases.



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import 
create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import 
get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import 
e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the sibling change point index. If not,
+  returns the original change point index.
+
+  Sibling change point is a neighbor of latest
+  change point, within the distance of change_point_sibling_distance.
+  For sibling change point, a GitHub issue is already created.
+  """
+
+  # Search backward from the current change point
+  sibling_indexes_to_search = []
+  for i in range(change_point_index - 1, -1, -1):
+    if change_point_index - i <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Search forward from the current change point
+  for i in range(change_point_index + 1, len(metric_values)):
+    if i - change_point_index <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Look for change points within change_point_sibling_distance.
+  # Return the first change point found.
+  query_template = """
+  SELECT * FROM {project}.{dataset}.{table}
+  WHERE {metric_name_id} = '{metric_name}'
+  ORDER BY {timestamp} DESC
+  LIMIT 10
+  """.format(
+      project=_BQ_PROJECT_NAME,
+      dataset=_BQ_DATASET,
+      metric_name_id=METRIC_NAME,
+      metric_name=metric_name,
+      timestamp=ISSUE_CREATION_TIMESTAMP_LABEL,
+      table=test_name)
+  try:
+    df = FetchMetrics.fetch_from_bq(query_template=query_template)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test:metric.
+    return True, None
+  previous_change_point = df[CHANGE_POINT_LABEL].tolist()[0]
+  previous_change_point_timestamp = df[CHANGEPOINT_TIMESTAMP_LABEL].tolist()[0]
+  issue_number = df[ISSUE_NUMBER].tolist()[0]
+
+  # Check if the current change point is equal to the
+  # latest reported change point on GitHub Issues using the
+  # value and the timestamp. If it's the same,
+  # don't create or comment on issues.
+  if ((previous_change_point == metric_values[change_point_index]) and
+      (previous_change_point_timestamp == change_point_timestamp)):
+    return False, None
+
+  alert = True
+  for sibling_index in sibling_indexes_to_search:
+    if metric_values[sibling_index] == previous_change_point:
+      alert = False
+      return alert, None
+  return alert, issue_number
+
+
+def read_test_config(config_file_path: str) -> Dict:
+  """
+  Reads the config file in which the data required to
+  run the change point analysis is specified.
+  """
+  with open(config_file_path, 'r') as stream:
+    config = yaml.safe_load(stream)
+  return config
+
+
+def run(args) -> None:
+  """
+  run is the entry point to run change point analysis on test metric
+  data, which is read from config file, and if there is a performance
+  regression observed for a test, an alert will filed with GitHub Issues.
+
+  The config file is provide as command line argument. If no config file was
+  provided on cmd line, the default config file will be used.
+
+  For each test in config yaml file, if the source is the big_query,
+  the expected keys for single test that are required to run the change point
+   analysis are test_name, metrics_dataset, metrics_table, project, 
metric_name.
+
+  """
+  config_file_path = args.config_file_path
+  if config_file_path is None:
+    config_file_path = os.path.join(
+        os.path.dirname(os.path.abspath(__file__)), 'tests_config.yaml')
+
+  tests_config: Dict[Dict[str, Any]] = read_test_config(config_file_path)
+
+  # change_point_sibling_distance, change_point_to_recent_run_window can be
+  # defined in the config file for each test whihc are used
+  # to avoid filing GitHub issues for duplicate change points. Please take
+  # a look at the README for more information on the parameters defined in the
+  # config file.
+  for _, params in tests_config.items():
+    metric_name = params['metric_name']
+    # replace . with _ in test_name. This test name would be used later
+    # as a BQ table name and the BQ table doesn't accept . in the name.
+    test_name = params['test_name'].replace('.', '_')
+    if params['source'] == 'big_query':
+      metric_data: pd.DataFrame = FetchMetrics.fetch_from_bq(

Review Comment:
   Commented below



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import 
create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import 
get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import 
e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the sibling change point index. If not,
+  returns the original change point index.
+
+  Sibling change point is a neighbor of latest
+  change point, within the distance of change_point_sibling_distance.
+  For sibling change point, a GitHub issue is already created.
+  """
+
+  # Search backward from the current change point
+  sibling_indexes_to_search = []
+  for i in range(change_point_index - 1, -1, -1):
+    if change_point_index - i <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Search forward from the current change point
+  for i in range(change_point_index + 1, len(metric_values)):
+    if i - change_point_index <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Look for change points within change_point_sibling_distance.
+  # Return the first change point found.
+  query_template = """
+  SELECT * FROM {project}.{dataset}.{table}
+  WHERE {metric_name_id} = '{metric_name}'
+  ORDER BY {timestamp} DESC
+  LIMIT 10
+  """.format(
+      project=_BQ_PROJECT_NAME,
+      dataset=_BQ_DATASET,
+      metric_name_id=METRIC_NAME,
+      metric_name=metric_name,
+      timestamp=ISSUE_CREATION_TIMESTAMP_LABEL,
+      table=test_name)
+  try:
+    df = FetchMetrics.fetch_from_bq(query_template=query_template)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test:metric.
+    return True, None
+  previous_change_point = df[CHANGE_POINT_LABEL].tolist()[0]
+  previous_change_point_timestamp = df[CHANGEPOINT_TIMESTAMP_LABEL].tolist()[0]
+  issue_number = df[ISSUE_NUMBER].tolist()[0]
+
+  # Check if the current change point is equal to the
+  # latest reported change point on GitHub Issues using the
+  # value and the timestamp. If it's the same,
+  # don't create or comment on issues.
+  if ((previous_change_point == metric_values[change_point_index]) and
+      (previous_change_point_timestamp == change_point_timestamp)):
+    return False, None
+
+  alert = True
+  for sibling_index in sibling_indexes_to_search:
+    if metric_values[sibling_index] == previous_change_point:
+      alert = False
+      return alert, None
+  return alert, issue_number
+
+
+def read_test_config(config_file_path: str) -> Dict:
+  """
+  Reads the config file in which the data required to
+  run the change point analysis is specified.
+  """
+  with open(config_file_path, 'r') as stream:
+    config = yaml.safe_load(stream)
+  return config
+
+
+def run(args) -> None:
+  """
+  run is the entry point to run change point analysis on test metric
+  data, which is read from config file, and if there is a performance
+  regression observed for a test, an alert will filed with GitHub Issues.
+
+  The config file is provide as command line argument. If no config file was
+  provided on cmd line, the default config file will be used.
+
+  For each test in config yaml file, if the source is the big_query,
+  the expected keys for single test that are required to run the change point
+   analysis are test_name, metrics_dataset, metrics_table, project, 
metric_name.
+
+  """
+  config_file_path = args.config_file_path
+  if config_file_path is None:
+    config_file_path = os.path.join(
+        os.path.dirname(os.path.abspath(__file__)), 'tests_config.yaml')
+
+  tests_config: Dict[Dict[str, Any]] = read_test_config(config_file_path)
+
+  # change_point_sibling_distance, change_point_to_recent_run_window can be
+  # defined in the config file for each test whihc are used
+  # to avoid filing GitHub issues for duplicate change points. Please take
+  # a look at the README for more information on the parameters defined in the
+  # config file.
+  for _, params in tests_config.items():
+    metric_name = params['metric_name']
+    # replace . with _ in test_name. This test name would be used later
+    # as a BQ table name and the BQ table doesn't accept . in the name.
+    test_name = params['test_name'].replace('.', '_')
+    if params['source'] == 'big_query':
+      metric_data: pd.DataFrame = FetchMetrics.fetch_from_bq(
+          project_name=params['project'],
+          dataset=params['metrics_dataset'],
+          table=params['metrics_table'],
+          metric_name=metric_name)
+    else:
+      # (TODO): Implement fetching metric_data from InfluxDB.
+      params = None
+    assert params is not None
+
+    labels = params['labels']
+    change_point_sibling_distance = params['change_point_sibling_distance']
+    change_point_to_recent_run_window = params[
+        'change_point_to_recent_run_window']
+
+    metric_values = metric_data[load_test_metrics_utils.VALUE_LABEL]
+    timestamps = metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL]
+
+    # run change point analysis on the metric_values using edivisive means
+    cp_analyzer = ChangePointAnalysis(
+        metric_name=metric_name, data=metric_values)
+
+    change_points_idx = cp_analyzer.edivisive_means()
+    # No change point found. Continue on to the next test.
+    if not change_points_idx:
+      continue
+
+    # always consider the latest change points
+    change_points_idx.sort(reverse=True)
+    change_point_index = change_points_idx[0]
+    change_point_timestamp = timestamps[change_point_index]
+
+    # check if the change point lies in the valid window.
+    # window - Number of runs between the
+    # change_point_to_recent_run_window run and the most recent run.
+    if not is_change_point_in_valid_window(change_point_to_recent_run_window,
+                                           change_point_index):
+      # change point lies outside the window from the recent run.
+      # Ignore this change point.
+      logging.info(
+          'Performance regression found for the test: %s. '
+          'but not creating an alert since the Change Point '
+          'lies outside the '
+          'change_point_to_recent_run_window distance' % test_name)
+      continue
+
+    # check for sibling change point. Sibling change point is a change
+    # point that lies in the distance of change_point_sibling_distance
+    # in both directions from the current change point index.
+    # Here, distance can be interpreted as number of runs between two change
+    # points. The idea here is that sibling change point will also point to
+    # the same performance regression.
+
+    create_alert, last_created_issue_number = (
+      has_sibling_change_point(
+        change_point_index=change_point_index,
+        change_point_sibling_distance=change_point_sibling_distance,
+        metric_values=metric_values,
+        metric_name=metric_name,
+        test_name=test_name,
+        change_point_timestamp=change_point_timestamp
+      )
+    )
+
+    logging.info(

Review Comment:
   Its just a log message whether or not alert is created for a test. I put 
this for my debugging purpose. We can actually remove this



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import 
create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import 
get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import 
e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.

Review Comment:
   So you are saying we should file an alert for the change point even though 
the change point lies outside of N runs? Sorry, I might be understanding this 
wrong. 



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import 
create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import 
get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import 
e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:

Review Comment:
   Its the default values for them in the edivisive module



##########
sdks/python/apache_beam/testing/analyzers/github_issues_utils.py:
##########
@@ -0,0 +1,163 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+import logging
+import os
+from typing import List
+from typing import Optional
+from typing import Tuple
+
+import requests
+
+try:
+  _GITHUB_TOKEN = os.environ['GITHUB_TOKEN']
+except KeyError as e:
+  _GITHUB_TOKEN = None
+  logging.warning(
+      'A Github Personal Access token is required '
+      'to create Github Issues.')
+
+_BEAM_REPO_OWNER = 'AnandInguva'

Review Comment:
   Yes. I will change this to `apache` before merging



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import 
create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import 
get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import 
e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the sibling change point index. If not,
+  returns the original change point index.
+
+  Sibling change point is a neighbor of latest
+  change point, within the distance of change_point_sibling_distance.
+  For sibling change point, a GitHub issue is already created.
+  """
+
+  # Search backward from the current change point
+  sibling_indexes_to_search = []
+  for i in range(change_point_index - 1, -1, -1):
+    if change_point_index - i <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Search forward from the current change point
+  for i in range(change_point_index + 1, len(metric_values)):
+    if i - change_point_index <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Look for change points within change_point_sibling_distance.
+  # Return the first change point found.
+  query_template = """
+  SELECT * FROM {project}.{dataset}.{table}
+  WHERE {metric_name_id} = '{metric_name}'
+  ORDER BY {timestamp} DESC
+  LIMIT 10
+  """.format(
+      project=_BQ_PROJECT_NAME,
+      dataset=_BQ_DATASET,
+      metric_name_id=METRIC_NAME,
+      metric_name=metric_name,
+      timestamp=ISSUE_CREATION_TIMESTAMP_LABEL,
+      table=test_name)
+  try:
+    df = FetchMetrics.fetch_from_bq(query_template=query_template)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test:metric.
+    return True, None
+  previous_change_point = df[CHANGE_POINT_LABEL].tolist()[0]
+  previous_change_point_timestamp = df[CHANGEPOINT_TIMESTAMP_LABEL].tolist()[0]
+  issue_number = df[ISSUE_NUMBER].tolist()[0]
+
+  # Check if the current change point is equal to the
+  # latest reported change point on GitHub Issues using the
+  # value and the timestamp. If it's the same,
+  # don't create or comment on issues.
+  if ((previous_change_point == metric_values[change_point_index]) and
+      (previous_change_point_timestamp == change_point_timestamp)):
+    return False, None
+
+  alert = True
+  for sibling_index in sibling_indexes_to_search:
+    if metric_values[sibling_index] == previous_change_point:
+      alert = False
+      return alert, None
+  return alert, issue_number
+
+
+def read_test_config(config_file_path: str) -> Dict:
+  """
+  Reads the config file in which the data required to
+  run the change point analysis is specified.
+  """
+  with open(config_file_path, 'r') as stream:
+    config = yaml.safe_load(stream)
+  return config
+
+
+def run(args) -> None:
+  """
+  run is the entry point to run change point analysis on test metric
+  data, which is read from config file, and if there is a performance
+  regression observed for a test, an alert will filed with GitHub Issues.
+
+  The config file is provide as command line argument. If no config file was
+  provided on cmd line, the default config file will be used.
+
+  For each test in config yaml file, if the source is the big_query,
+  the expected keys for single test that are required to run the change point
+   analysis are test_name, metrics_dataset, metrics_table, project, 
metric_name.
+
+  """
+  config_file_path = args.config_file_path
+  if config_file_path is None:
+    config_file_path = os.path.join(
+        os.path.dirname(os.path.abspath(__file__)), 'tests_config.yaml')
+
+  tests_config: Dict[Dict[str, Any]] = read_test_config(config_file_path)
+
+  # change_point_sibling_distance, change_point_to_recent_run_window can be
+  # defined in the config file for each test whihc are used
+  # to avoid filing GitHub issues for duplicate change points. Please take
+  # a look at the README for more information on the parameters defined in the
+  # config file.
+  for _, params in tests_config.items():
+    metric_name = params['metric_name']
+    # replace . with _ in test_name. This test name would be used later
+    # as a BQ table name and the BQ table doesn't accept . in the name.
+    test_name = params['test_name'].replace('.', '_')
+    if params['source'] == 'big_query':
+      metric_data: pd.DataFrame = FetchMetrics.fetch_from_bq(
+          project_name=params['project'],
+          dataset=params['metrics_dataset'],
+          table=params['metrics_table'],
+          metric_name=metric_name)
+    else:
+      # (TODO): Implement fetching metric_data from InfluxDB.
+      params = None
+    assert params is not None
+
+    labels = params['labels']
+    change_point_sibling_distance = params['change_point_sibling_distance']
+    change_point_to_recent_run_window = params[
+        'change_point_to_recent_run_window']
+
+    metric_values = metric_data[load_test_metrics_utils.VALUE_LABEL]
+    timestamps = metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL]
+
+    # run change point analysis on the metric_values using edivisive means
+    cp_analyzer = ChangePointAnalysis(
+        metric_name=metric_name, data=metric_values)
+
+    change_points_idx = cp_analyzer.edivisive_means()
+    # No change point found. Continue on to the next test.
+    if not change_points_idx:
+      continue
+
+    # always consider the latest change points
+    change_points_idx.sort(reverse=True)
+    change_point_index = change_points_idx[0]
+    change_point_timestamp = timestamps[change_point_index]
+
+    # check if the change point lies in the valid window.
+    # window - Number of runs between the
+    # change_point_to_recent_run_window run and the most recent run.
+    if not is_change_point_in_valid_window(change_point_to_recent_run_window,

Review Comment:
   We would need to fetch enough examples for the change point analysis to run. 
For examples, fetching only 7 samples may be lead to noisy result over running 
change point analysis on 100 samples.



##########
sdks/python/apache_beam/testing/analyzers/github_issues_utils.py:
##########
@@ -0,0 +1,163 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+import logging
+import os
+from typing import List
+from typing import Optional
+from typing import Tuple
+
+import requests
+
+try:
+  _GITHUB_TOKEN = os.environ['GITHUB_TOKEN']
+except KeyError as e:
+  _GITHUB_TOKEN = None
+  logging.warning(
+      'A Github Personal Access token is required '
+      'to create Github Issues.')
+
+_BEAM_REPO_OWNER = 'AnandInguva'
+_BEAM_REPO_NAME = 'beam'
+_HEADERS = {
+    "Authorization": 'token {}'.format(_GITHUB_TOKEN),
+    "Accept": "application/vnd.github+json"
+}
+
+# Fill the GitHub issue description with the below variables.
+_ISSUE_DESCRIPTION_HEADER = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+
+
+def create_or_comment_issue(
+    title: str,
+    description: str,
+    labels: Optional[List] = None,
+    issue_number: Optional[int] = None) -> Tuple[int, str]:
+  """
+  Create an issue with title, description with a label.
+  If an issue is already created and is open,
+  then comment on the issue instead of creating a duplicate issue.
+
+  Args:
+    title:  GitHub issue title.
+    description: GitHub issue description.
+    labels: Labels used to tag the GitHub issue.
+    issue_number: GitHub issue number used to find the already created issue.
+  """
+  if issue_number:
+    commented_on_issue, comment_url = comment_on_issue(
+      issue_number=issue_number,
+      comment_description=description)
+    if commented_on_issue:
+      return issue_number, comment_url
+
+  # Issue number was not provided or issue with provided number
+  # is closed. In that case, create a new issue.
+  url = "https://api.github.com/repos/{}/{}/issues".format(
+      _BEAM_REPO_OWNER, _BEAM_REPO_NAME)
+  data = {
+      'owner': _BEAM_REPO_OWNER,
+      'repo': _BEAM_REPO_NAME,
+      'title': title,
+      'body': description,
+  }
+  if labels:
+    data['labels'] = labels
+  response = requests.post(
+      url=url, data=json.dumps(data), headers=_HEADERS).json()
+  return response['number'], response['html_url']
+
+
+def comment_on_issue(issue_number: int,
+                     comment_description: str) -> Tuple[bool, Optional[str]]:
+  """
+  This method looks for an issue with provided issue_number. If an open
+  issue is found, comment on the open issue with provided description else
+  do nothing.
+
+  Args:
+    issue_number: A GitHub issue number.
+    comment_description: If an issue with issue_number is open,
+      then comment on the issue with the using comment_description.
+  """
+  url = 'https://api.github.com/repos/{}/{}/issues/{}'.format(
+      _BEAM_REPO_OWNER, _BEAM_REPO_NAME, issue_number)
+  open_issue_response = requests.get(
+      url,
+      json.dumps({
+          'owner': _BEAM_REPO_OWNER,
+          'repo': _BEAM_REPO_NAME,
+          'issue_number': issue_number
+      }),
+      headers=_HEADERS)
+  status_code = open_issue_response.status_code
+  open_issue_response = open_issue_response.json()
+  if status_code == 200 and open_issue_response['state'] == 'open':
+    data = {
+        'owner': _BEAM_REPO_OWNER,
+        'repo': _BEAM_REPO_NAME,
+        'body': comment_description,
+        issue_number: issue_number,
+    }
+    response = requests.post(
+        open_issue_response['comments_url'], json.dumps(data), 
headers=_HEADERS)
+    return True, response.json()['html_url']
+
+  return False, None
+
+
+def get_issue_description(
+    metric_name: str,
+    timestamps: List,
+    metric_values: List,
+    change_point_index: int,
+    max_results_to_display: int = 5) -> str:
+  """
+  Args:
+   metric_name: Metric name used for the Change Point Analysis.
+   timestamps: Timestamps of the metrics when they were published to the
+    Database.
+   metric_values: Values of the metric for the previous runs.
+   change_point_index: Index for the change point. The element in the
+    index of the metric_values would be the change point.
+   max_results_to_display: Max number of results to display from the change
+    point index, in both directions of the change point index.
+
+  Returns:
+    str: Description used to fill the GitHub issues description.
+  """
+
+  # TODO: Add mean and median before and after the changepoint index.
+  indices_to_display = []
+  upper_bound = min(
+      change_point_index + max_results_to_display, len(metric_values))

Review Comment:
   Thanks for catching



##########
sdks/python/apache_beam/testing/analyzers/github_issues_utils.py:
##########
@@ -0,0 +1,163 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+import logging
+import os
+from typing import List
+from typing import Optional
+from typing import Tuple
+
+import requests
+
+try:
+  _GITHUB_TOKEN = os.environ['GITHUB_TOKEN']
+except KeyError as e:
+  _GITHUB_TOKEN = None
+  logging.warning(
+      'A Github Personal Access token is required '
+      'to create Github Issues.')
+
+_BEAM_REPO_OWNER = 'AnandInguva'
+_BEAM_REPO_NAME = 'beam'
+_HEADERS = {
+    "Authorization": 'token {}'.format(_GITHUB_TOKEN),
+    "Accept": "application/vnd.github+json"
+}
+
+# Fill the GitHub issue description with the below variables.
+_ISSUE_DESCRIPTION_HEADER = """

Review Comment:
   The issue will be created and we need to triage it as soon as possible. This 
would be manual process I guess. We need a streamlined process to follow up on 
these alerts and triage them accordingly. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to