AnandInguva commented on code in PR #23931:
URL: https://github.com/apache/beam/pull/23931#discussion_r1040015944


##########
sdks/python/apache_beam/testing/analyzers/perf_analysis.py:
##########
@@ -0,0 +1,196 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for benchmark/load/performance test.
+
+import argparse
+
+import logging
+import os
+import uuid
+from datetime import datetime
+from datetime import timezone
+from typing import Any
+from typing import Dict
+from typing import Optional
+
+import pandas as pd
+
+from apache_beam.testing.analyzers import constants
+from apache_beam.testing.analyzers.perf_analysis_utils import 
create_performance_alert
+from apache_beam.testing.analyzers.perf_analysis_utils import fetch_metric_data
+from apache_beam.testing.analyzers.perf_analysis_utils import 
get_existing_issues_data
+from apache_beam.testing.analyzers.perf_analysis_utils import 
find_latest_change_point_index
+from apache_beam.testing.analyzers.perf_analysis_utils import 
GitHubIssueMetaData
+from apache_beam.testing.analyzers.perf_analysis_utils import 
is_change_point_in_valid_window
+from apache_beam.testing.analyzers.perf_analysis_utils import is_perf_alert
+from apache_beam.testing.analyzers.perf_analysis_utils import 
publish_issue_metadata_to_big_query
+from apache_beam.testing.analyzers.perf_analysis_utils import read_test_config
+from apache_beam.testing.analyzers.perf_analysis_utils import validate_config
+
+
+def run_change_point_analysis(params, test_id):
+  """
+  Runs change point analysis for a given test parameters defined in params.
+
+  Steps:

Review Comment:
   Added it to the PR description. Removed it from here



##########
sdks/python/apache_beam/testing/analyzers/github_issues_utils.py:
##########
@@ -0,0 +1,183 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+import logging
+import os
+from typing import List
+from typing import Optional
+from typing import Tuple
+
+import pandas as pd
+import requests
+
+try:
+  _GITHUB_TOKEN: Optional[str] = os.environ['GITHUB_TOKEN']
+except KeyError as e:
+  _GITHUB_TOKEN = None
+  logging.warning(
+      'A Github Personal Access token is required '
+      'to create Github Issues.')
+
+_BEAM_GITHUB_REPO_OWNER = 'AnandInguva'
+_BEAM_GITHUB_REPO_NAME = 'beam'
+# Adding GitHub Rest API version to the header to maintain version stability.
+# For more information, please look at
+# 
https://github.blog/2022-11-28-to-infinity-and-beyond-enabling-the-future-of-githubs-rest-api-with-api-versioning/
 # pylint: disable=line-too-long
+_HEADERS = {
+    "Authorization": 'token {}'.format(_GITHUB_TOKEN),
+    "Accept": "application/vnd.github+json",
+    "X-GitHub-Api-Version": "2022-11-28"
+}
+
+# Fill the GitHub issue description with the below variables.
+_ISSUE_DESCRIPTION_HEADER = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+_AWAITING_TRIAGE_LABEL = 'awaiting triage'
+
+
+def create_issue(
+    title: str,
+    description: str,
+    labels: Optional[List[str]] = None,
+) -> Tuple[int, str]:
+  """
+  Create an issue with title, description with a label.
+
+  Args:
+    title:  GitHub issue title.
+    description: GitHub issue description.
+    labels: Labels used to tag the GitHub issue.
+  """
+  url = "https://api.github.com/repos/{}/{}/issues".format(
+      _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME)
+  data = {
+      'owner': _BEAM_GITHUB_REPO_OWNER,
+      'repo': _BEAM_GITHUB_REPO_NAME,
+      'title': title,
+      'body': description,
+  }
+  if labels:
+    data['labels'] = labels + [_AWAITING_TRIAGE_LABEL]

Review Comment:
   Changed it. Added the `perf-alert` label as well along with `awaiting 
triage` label  as default. 



##########
sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py:
##########
@@ -0,0 +1,200 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from dataclasses import asdict
+from dataclasses import dataclass
+import logging
+
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers import constants
+from apache_beam.testing.analyzers.github_issues_utils import 
get_issue_description
+from apache_beam.testing.analyzers.github_issues_utils import 
report_change_point_on_issues
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsFetcher
+from signal_processing_algorithms.energy_statistics.energy_statistics import 
e_divisive
+
+
+@dataclass
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  issue_timestamp: pd.Timestamp
+  change_point_timestamp: pd.Timestamp
+  test_name: str
+  metric_name: str
+  issue_number: int
+  issue_url: str
+  test_id: str
+  change_point: float
+
+
+def is_change_point_in_valid_window(
+    num_runs_in_change_point_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  return num_runs_in_change_point_window >= change_point_index
+
+
+def get_existing_issues_data(test_name: str, ) -> Optional[pd.DataFrame]:
+  """
+  Finds the most recent GitHub issue created for the test_name.
+  If no table found with name=test_name, return (None, None)
+  else return latest created issue_number along with
+  """
+  query_template = f"""
+  SELECT * FROM 
{constants._BQ_PROJECT_NAME}.{constants._BQ_DATASET}.{test_name}
+  ORDER BY {constants._ISSUE_CREATION_TIMESTAMP_LABEL} DESC
+  LIMIT 10
+  """
+  try:
+    df = BigQueryMetricsFetcher().get_metrics(query_template=query_template)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test+metric.
+    return None
+  return df
+
+
+def is_perf_alert(
+    previous_change_point_timestamps: List[pd.Timestamp],
+    change_point_index: int,
+    timestamps: List[pd.Timestamp],
+    min_runs_between_change_points: int) -> bool:
+  """
+  Search the previous_change_point_timestamps with current observed
+  change point sibling window and determine if it is a duplicate
+  change point or not.
+
+  Return False if the current observed change point is a duplicate of
+  already reported change points else return True.
+  """
+  sibling_change_point_min_timestamp = timestamps[min(
+      change_point_index + min_runs_between_change_points, len(timestamps) - 
1)]
+  sibling_change_point_max_timestamp = timestamps[max(
+      0, change_point_index - min_runs_between_change_points)]
+  # Search a list of previous change point timestamps and compare it with
+  # current change point timestamp. We do this in case, if a current change
+  # point is already reported in the past.
+  for previous_change_point_timestamp in previous_change_point_timestamps:
+    if (sibling_change_point_min_timestamp <= previous_change_point_timestamp 
<=
+        sibling_change_point_max_timestamp):
+      return False
+  return True
+
+
+def read_test_config(config_file_path: str) -> Dict:
+  """
+  Reads the config file in which the data required to
+  run the change point analysis is specified.
+  """
+  with open(config_file_path, 'r') as stream:
+    config = yaml.safe_load(stream)
+  return config
+
+
+def validate_config(keys):
+  return constants._PERF_TEST_KEYS.issubset(keys)
+
+
+def fetch_metric_data(
+    params: Dict[str,
+                 Any]) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]:
+  # replace . with _ in test_name. This test name would be used later
+  # as a BQ table name and the BQ table doesn't accept . in the name.
+  try:

Review Comment:
   That should be removed as well. It was part of old code where I was catching 
exceptions to a list but not needed now. Thanks for catching. I removed it



##########
sdks/python/apache_beam/testing/analyzers/github_issues_utils.py:
##########
@@ -0,0 +1,183 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+import logging
+import os
+from typing import List
+from typing import Optional
+from typing import Tuple
+
+import pandas as pd
+import requests
+
+try:
+  _GITHUB_TOKEN: Optional[str] = os.environ['GITHUB_TOKEN']
+except KeyError as e:
+  _GITHUB_TOKEN = None
+  logging.warning(
+      'A Github Personal Access token is required '
+      'to create Github Issues.')
+
+_BEAM_GITHUB_REPO_OWNER = 'AnandInguva'
+_BEAM_GITHUB_REPO_NAME = 'beam'
+# Adding GitHub Rest API version to the header to maintain version stability.
+# For more information, please look at
+# 
https://github.blog/2022-11-28-to-infinity-and-beyond-enabling-the-future-of-githubs-rest-api-with-api-versioning/
 # pylint: disable=line-too-long
+_HEADERS = {
+    "Authorization": 'token {}'.format(_GITHUB_TOKEN),
+    "Accept": "application/vnd.github+json",
+    "X-GitHub-Api-Version": "2022-11-28"
+}
+
+# Fill the GitHub issue description with the below variables.
+_ISSUE_DESCRIPTION_HEADER = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+_AWAITING_TRIAGE_LABEL = 'awaiting triage'
+
+
+def create_issue(
+    title: str,
+    description: str,
+    labels: Optional[List[str]] = None,
+) -> Tuple[int, str]:
+  """
+  Create an issue with title, description with a label.
+
+  Args:
+    title:  GitHub issue title.
+    description: GitHub issue description.
+    labels: Labels used to tag the GitHub issue.
+  """
+  url = "https://api.github.com/repos/{}/{}/issues".format(
+      _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME)
+  data = {
+      'owner': _BEAM_GITHUB_REPO_OWNER,
+      'repo': _BEAM_GITHUB_REPO_NAME,
+      'title': title,
+      'body': description,
+  }
+  if labels:
+    data['labels'] = labels + [_AWAITING_TRIAGE_LABEL]
+  response = requests.post(
+      url=url, data=json.dumps(data), headers=_HEADERS).json()
+  return response['number'], response['html_url']
+
+
+def comment_on_issue(issue_number: int,
+                     comment_description: str) -> Tuple[bool, Optional[str]]:
+  """
+  This method looks for an issue with provided issue_number. If an open
+  issue is found, comment on the open issue with provided description else
+  do nothing.
+
+  Args:
+    issue_number: A GitHub issue number.
+    comment_description: If an issue with issue_number is open,
+      then comment on the issue with the using comment_description.
+  """
+  url = 'https://api.github.com/repos/{}/{}/issues/{}'.format(
+      _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME, issue_number)
+  open_issue_response = requests.get(
+      url,
+      json.dumps({
+          'owner': _BEAM_GITHUB_REPO_OWNER,
+          'repo': _BEAM_GITHUB_REPO_NAME,
+          'issue_number': issue_number
+      }),
+      headers=_HEADERS).json()
+  if open_issue_response['state'] == 'open':
+    data = {
+        'owner': _BEAM_GITHUB_REPO_OWNER,
+        'repo': _BEAM_GITHUB_REPO_NAME,
+        'body': comment_description,
+        issue_number: issue_number,
+    }
+    response = requests.post(
+        open_issue_response['comments_url'], json.dumps(data), 
headers=_HEADERS)
+    return True, response.json()['html_url']
+  return False, None
+
+
+def add_label_to_issue(issue_number: int, labels: Optional[List[str]] = None):
+  url = 'https://api.github.com/repos/{}/{}/issues/{}/labels'.format(
+      _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME, issue_number)
+  if labels:
+    requests.post(url, json.dumps({'labels': labels}), headers=_HEADERS)
+
+
+def get_issue_description(
+    metric_name: str,
+    timestamps: List[pd.Timestamp],
+    metric_values: List,
+    change_point_index: int,
+    max_results_to_display: int = 5) -> str:
+  """
+  Args:
+   metric_name: Metric name used for the Change Point Analysis.
+   timestamps: Timestamps of the metrics when they were published to the
+    Database.
+   metric_values: Values of the metric for the previous runs.
+   change_point_index: Index for the change point. The element in the
+    index of the metric_values would be the change point.
+   max_results_to_display: Max number of results to display from the change
+    point index, in both directions of the change point index.
+
+  Returns:
+    str: Description used to fill the GitHub issues description.
+  """
+
+  # TODO: Add mean and median before and after the changepoint index.
+  indices_to_display = []
+  upper_bound = min(
+      change_point_index + max_results_to_display + 1, len(metric_values))
+  lower_bound = max(0, change_point_index - max_results_to_display)
+  for i in range(lower_bound, upper_bound):
+    indices_to_display.append(i)
+
+  indices_to_display.sort()
+  description = _ISSUE_DESCRIPTION_HEADER.format(metric_name) + 2 * '\n'
+  for index_to_display in indices_to_display:
+    description += _METRIC_INFO.format(
+        timestamps[index_to_display].ctime(), metric_values[index_to_display])
+    if index_to_display == change_point_index:
+      description += ' <---- Anomaly'
+    description += '\n'
+  return description
+
+
+def report_change_point_on_issues(
+    title: str,
+    issue_number: Optional[int],
+    description: str,
+    labels: Optional[List[str]] = None) -> Tuple[int, str]:
+  """
+  Looks for a GitHub issue with the issue number. First, we try to
+  find the issue that's open and comment on it with the provided description.
+  If that issue is closed, we create a new issue.

Review Comment:
   Changed it



##########
sdks/python/apache_beam/testing/analyzers/perf_analysis.py:
##########
@@ -0,0 +1,196 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for benchmark/load/performance test.
+
+import argparse
+
+import logging
+import os
+import uuid
+from datetime import datetime
+from datetime import timezone
+from typing import Any
+from typing import Dict
+from typing import Optional
+
+import pandas as pd
+
+from apache_beam.testing.analyzers import constants
+from apache_beam.testing.analyzers.perf_analysis_utils import 
create_performance_alert
+from apache_beam.testing.analyzers.perf_analysis_utils import fetch_metric_data
+from apache_beam.testing.analyzers.perf_analysis_utils import 
get_existing_issues_data
+from apache_beam.testing.analyzers.perf_analysis_utils import 
find_latest_change_point_index
+from apache_beam.testing.analyzers.perf_analysis_utils import 
GitHubIssueMetaData
+from apache_beam.testing.analyzers.perf_analysis_utils import 
is_change_point_in_valid_window
+from apache_beam.testing.analyzers.perf_analysis_utils import is_perf_alert
+from apache_beam.testing.analyzers.perf_analysis_utils import 
publish_issue_metadata_to_big_query
+from apache_beam.testing.analyzers.perf_analysis_utils import read_test_config
+from apache_beam.testing.analyzers.perf_analysis_utils import validate_config
+
+
+def run_change_point_analysis(params, test_id):
+  """
+  Runs change point analysis for a given test parameters defined in params.
+
+  Steps:
+  1. Validate the params to check for required keys to fetch data for
+    change point analysis.
+  2. Initialize labels, min_runs_between_change_points,
+    num_runs_in_change_point_window. If they are passed in params,
+    override/append the default values with values in params.
+  3. Find most recent change point from the metric data of the
+      specified test+metric_name in params.
+  4. Find if the current observed change point is a duplicate/sibling change
+      point.
+      a. Check if the current observed change point lies in
+          num_runs_in_change_point_window.
+      b. Check if the current observed change point is a duplicate/sibling
+          change point of the last 10 reported change points for the current
+          test+metric_name
+  5. File an alert as a GitHub issue or GitHub issue comment if the
+      current observed change point is not a duplicate change point.
+  6. Publish the alerted GitHub issue metadata for BigQuery, This data is used
+      to determine whether a change point is duplicate or not.
+
+  """
+  if not validate_config(params.keys()):
+    raise Exception(

Review Comment:
   Added



##########
sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py:
##########
@@ -620,3 +624,42 @@ def __init__(self):
   def process(self, element):
     yield self.timestamp_val_fn(
         element, self.timestamp_fn(micros=int(self.time_fn() * 1000000)))
+
+
+class MetricsFetcher:
+  def get_metrics(self):
+    raise NotImplementedError
+
+
+class BigQueryMetricsFetcher(MetricsFetcher):
+  def get_metrics(
+      self,
+      project_name=None,
+      table=None,
+      dataset=None,
+      metric_name=None,
+      limit=1000,
+      query_template=None) -> pd.DataFrame:
+    if not query_template:
+      query_template = """
+        SELECT *
+        FROM {}.{}.{}
+        WHERE CONTAINS_SUBSTR(({}), '{}')
+        ORDER BY {} DESC
+        LIMIT {}
+      """.format(
+          project_name,
+          dataset,
+          table,
+          METRICS_TYPE_LABEL,
+          metric_name,
+          SUBMIT_TIMESTAMP_LABEL,
+          limit)
+    bq_client = bigquery.Client()
+    query_job = bq_client.query(query_template)
+    result = query_job.result()
+    return result.to_dataframe()
+
+  @staticmethod
+  def fetch_from_influxdb():

Review Comment:
   Again, old code. Removed it. Thanks :)



##########
sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py:
##########
@@ -620,3 +624,42 @@ def __init__(self):
   def process(self, element):
     yield self.timestamp_val_fn(
         element, self.timestamp_fn(micros=int(self.time_fn() * 1000000)))
+
+
+class MetricsFetcher:
+  def get_metrics(self):
+    raise NotImplementedError
+
+
+class BigQueryMetricsFetcher(MetricsFetcher):

Review Comment:
   Changed it to a function.
   
   When we add InfluxDB, then we can extend this design.



##########
sdks/python/apache_beam/testing/analyzers/github_issues_utils.py:
##########
@@ -0,0 +1,183 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+import logging
+import os
+from typing import List
+from typing import Optional
+from typing import Tuple
+
+import pandas as pd
+import requests
+
+try:
+  _GITHUB_TOKEN: Optional[str] = os.environ['GITHUB_TOKEN']
+except KeyError as e:
+  _GITHUB_TOKEN = None
+  logging.warning(
+      'A Github Personal Access token is required '
+      'to create Github Issues.')
+
+_BEAM_GITHUB_REPO_OWNER = 'AnandInguva'
+_BEAM_GITHUB_REPO_NAME = 'beam'
+# Adding GitHub Rest API version to the header to maintain version stability.
+# For more information, please look at
+# 
https://github.blog/2022-11-28-to-infinity-and-beyond-enabling-the-future-of-githubs-rest-api-with-api-versioning/
 # pylint: disable=line-too-long
+_HEADERS = {
+    "Authorization": 'token {}'.format(_GITHUB_TOKEN),
+    "Accept": "application/vnd.github+json",
+    "X-GitHub-Api-Version": "2022-11-28"
+}
+
+# Fill the GitHub issue description with the below variables.
+_ISSUE_DESCRIPTION_HEADER = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+_AWAITING_TRIAGE_LABEL = 'awaiting triage'
+
+
+def create_issue(
+    title: str,
+    description: str,
+    labels: Optional[List[str]] = None,
+) -> Tuple[int, str]:
+  """
+  Create an issue with title, description with a label.
+
+  Args:
+    title:  GitHub issue title.
+    description: GitHub issue description.
+    labels: Labels used to tag the GitHub issue.
+  """
+  url = "https://api.github.com/repos/{}/{}/issues".format(
+      _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME)
+  data = {
+      'owner': _BEAM_GITHUB_REPO_OWNER,
+      'repo': _BEAM_GITHUB_REPO_NAME,
+      'title': title,
+      'body': description,
+  }
+  if labels:
+    data['labels'] = labels + [_AWAITING_TRIAGE_LABEL]
+  response = requests.post(
+      url=url, data=json.dumps(data), headers=_HEADERS).json()
+  return response['number'], response['html_url']
+
+
+def comment_on_issue(issue_number: int,
+                     comment_description: str) -> Tuple[bool, Optional[str]]:
+  """
+  This method looks for an issue with provided issue_number. If an open
+  issue is found, comment on the open issue with provided description else
+  do nothing.
+
+  Args:
+    issue_number: A GitHub issue number.
+    comment_description: If an issue with issue_number is open,
+      then comment on the issue with the using comment_description.
+  """
+  url = 'https://api.github.com/repos/{}/{}/issues/{}'.format(
+      _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME, issue_number)
+  open_issue_response = requests.get(
+      url,
+      json.dumps({
+          'owner': _BEAM_GITHUB_REPO_OWNER,
+          'repo': _BEAM_GITHUB_REPO_NAME,
+          'issue_number': issue_number
+      }),
+      headers=_HEADERS).json()
+  if open_issue_response['state'] == 'open':
+    data = {
+        'owner': _BEAM_GITHUB_REPO_OWNER,
+        'repo': _BEAM_GITHUB_REPO_NAME,
+        'body': comment_description,
+        issue_number: issue_number,
+    }
+    response = requests.post(
+        open_issue_response['comments_url'], json.dumps(data), 
headers=_HEADERS)
+    return True, response.json()['html_url']
+  return False, None
+
+
+def add_label_to_issue(issue_number: int, labels: Optional[List[str]] = None):
+  url = 'https://api.github.com/repos/{}/{}/issues/{}/labels'.format(
+      _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME, issue_number)
+  if labels:
+    requests.post(url, json.dumps({'labels': labels}), headers=_HEADERS)
+
+
+def get_issue_description(
+    metric_name: str,
+    timestamps: List[pd.Timestamp],
+    metric_values: List,
+    change_point_index: int,
+    max_results_to_display: int = 5) -> str:
+  """
+  Args:
+   metric_name: Metric name used for the Change Point Analysis.
+   timestamps: Timestamps of the metrics when they were published to the
+    Database.
+   metric_values: Values of the metric for the previous runs.
+   change_point_index: Index for the change point. The element in the
+    index of the metric_values would be the change point.
+   max_results_to_display: Max number of results to display from the change
+    point index, in both directions of the change point index.
+
+  Returns:
+    str: Description used to fill the GitHub issues description.
+  """
+
+  # TODO: Add mean and median before and after the changepoint index.
+  indices_to_display = []
+  upper_bound = min(
+      change_point_index + max_results_to_display + 1, len(metric_values))
+  lower_bound = max(0, change_point_index - max_results_to_display)
+  for i in range(lower_bound, upper_bound):
+    indices_to_display.append(i)
+
+  indices_to_display.sort()
+  description = _ISSUE_DESCRIPTION_HEADER.format(metric_name) + 2 * '\n'
+  for index_to_display in indices_to_display:
+    description += _METRIC_INFO.format(
+        timestamps[index_to_display].ctime(), metric_values[index_to_display])
+    if index_to_display == change_point_index:
+      description += ' <---- Anomaly'
+    description += '\n'
+  return description
+
+
+def report_change_point_on_issues(
+    title: str,
+    issue_number: Optional[int],
+    description: str,
+    labels: Optional[List[str]] = None) -> Tuple[int, str]:
+  """
+  Looks for a GitHub issue with the issue number. First, we try to
+  find the issue that's open and comment on it with the provided description.
+  If that issue is closed, we create a new issue.
+  """
+  if issue_number is not None:
+    commented_on_issue, issue_url = comment_on_issue(
+          issue_number=issue_number,
+          comment_description=description
+          )
+    if commented_on_issue:
+      add_label_to_issue(
+          issue_number=issue_number, labels=[_AWAITING_TRIAGE_LABEL])

Review Comment:
   Sg. Changed it



##########
sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py:
##########
@@ -0,0 +1,200 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from dataclasses import asdict
+from dataclasses import dataclass
+import logging
+
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers import constants
+from apache_beam.testing.analyzers.github_issues_utils import 
get_issue_description
+from apache_beam.testing.analyzers.github_issues_utils import 
report_change_point_on_issues
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsFetcher
+from signal_processing_algorithms.energy_statistics.energy_statistics import 
e_divisive
+
+
+@dataclass
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  issue_timestamp: pd.Timestamp
+  change_point_timestamp: pd.Timestamp
+  test_name: str
+  metric_name: str
+  issue_number: int
+  issue_url: str
+  test_id: str
+  change_point: float
+
+
+def is_change_point_in_valid_window(
+    num_runs_in_change_point_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  return num_runs_in_change_point_window >= change_point_index
+
+
+def get_existing_issues_data(test_name: str, ) -> Optional[pd.DataFrame]:
+  """
+  Finds the most recent GitHub issue created for the test_name.
+  If no table found with name=test_name, return (None, None)
+  else return latest created issue_number along with
+  """
+  query_template = f"""
+  SELECT * FROM 
{constants._BQ_PROJECT_NAME}.{constants._BQ_DATASET}.{test_name}
+  ORDER BY {constants._ISSUE_CREATION_TIMESTAMP_LABEL} DESC
+  LIMIT 10
+  """
+  try:
+    df = BigQueryMetricsFetcher().get_metrics(query_template=query_template)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test+metric.
+    return None
+  return df
+
+
+def is_perf_alert(
+    previous_change_point_timestamps: List[pd.Timestamp],
+    change_point_index: int,
+    timestamps: List[pd.Timestamp],
+    min_runs_between_change_points: int) -> bool:
+  """
+  Search the previous_change_point_timestamps with current observed
+  change point sibling window and determine if it is a duplicate
+  change point or not.
+
+  Return False if the current observed change point is a duplicate of
+  already reported change points else return True.
+  """
+  sibling_change_point_min_timestamp = timestamps[min(
+      change_point_index + min_runs_between_change_points, len(timestamps) - 
1)]
+  sibling_change_point_max_timestamp = timestamps[max(
+      0, change_point_index - min_runs_between_change_points)]
+  # Search a list of previous change point timestamps and compare it with
+  # current change point timestamp. We do this in case, if a current change
+  # point is already reported in the past.
+  for previous_change_point_timestamp in previous_change_point_timestamps:
+    if (sibling_change_point_min_timestamp <= previous_change_point_timestamp 
<=
+        sibling_change_point_max_timestamp):
+      return False
+  return True
+
+
+def read_test_config(config_file_path: str) -> Dict:
+  """
+  Reads the config file in which the data required to
+  run the change point analysis is specified.
+  """
+  with open(config_file_path, 'r') as stream:
+    config = yaml.safe_load(stream)
+  return config
+
+
+def validate_config(keys):
+  return constants._PERF_TEST_KEYS.issubset(keys)
+
+
+def fetch_metric_data(
+    params: Dict[str,
+                 Any]) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]:
+  # replace . with _ in test_name. This test name would be used later

Review Comment:
   yes, removed it



##########
sdks/python/apache_beam/testing/analyzers/github_issues_utils.py:
##########
@@ -0,0 +1,183 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+import logging
+import os
+from typing import List
+from typing import Optional
+from typing import Tuple
+
+import pandas as pd
+import requests
+
+try:
+  _GITHUB_TOKEN: Optional[str] = os.environ['GITHUB_TOKEN']
+except KeyError as e:
+  _GITHUB_TOKEN = None
+  logging.warning(
+      'A Github Personal Access token is required '
+      'to create Github Issues.')
+
+_BEAM_GITHUB_REPO_OWNER = 'AnandInguva'
+_BEAM_GITHUB_REPO_NAME = 'beam'
+# Adding GitHub Rest API version to the header to maintain version stability.
+# For more information, please look at
+# 
https://github.blog/2022-11-28-to-infinity-and-beyond-enabling-the-future-of-githubs-rest-api-with-api-versioning/
 # pylint: disable=line-too-long
+_HEADERS = {
+    "Authorization": 'token {}'.format(_GITHUB_TOKEN),
+    "Accept": "application/vnd.github+json",
+    "X-GitHub-Api-Version": "2022-11-28"
+}
+
+# Fill the GitHub issue description with the below variables.
+_ISSUE_DESCRIPTION_HEADER = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+_AWAITING_TRIAGE_LABEL = 'awaiting triage'
+
+
+def create_issue(
+    title: str,
+    description: str,
+    labels: Optional[List[str]] = None,
+) -> Tuple[int, str]:
+  """
+  Create an issue with title, description with a label.
+
+  Args:
+    title:  GitHub issue title.
+    description: GitHub issue description.
+    labels: Labels used to tag the GitHub issue.
+  """
+  url = "https://api.github.com/repos/{}/{}/issues".format(
+      _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME)
+  data = {
+      'owner': _BEAM_GITHUB_REPO_OWNER,
+      'repo': _BEAM_GITHUB_REPO_NAME,
+      'title': title,
+      'body': description,
+  }
+  if labels:
+    data['labels'] = labels + [_AWAITING_TRIAGE_LABEL]
+  response = requests.post(
+      url=url, data=json.dumps(data), headers=_HEADERS).json()
+  return response['number'], response['html_url']
+
+
+def comment_on_issue(issue_number: int,
+                     comment_description: str) -> Tuple[bool, Optional[str]]:
+  """
+  This method looks for an issue with provided issue_number. If an open
+  issue is found, comment on the open issue with provided description else
+  do nothing.
+
+  Args:
+    issue_number: A GitHub issue number.
+    comment_description: If an issue with issue_number is open,
+      then comment on the issue with the using comment_description.
+  """
+  url = 'https://api.github.com/repos/{}/{}/issues/{}'.format(
+      _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME, issue_number)
+  open_issue_response = requests.get(
+      url,
+      json.dumps({
+          'owner': _BEAM_GITHUB_REPO_OWNER,
+          'repo': _BEAM_GITHUB_REPO_NAME,
+          'issue_number': issue_number
+      }),
+      headers=_HEADERS).json()
+  if open_issue_response['state'] == 'open':
+    data = {
+        'owner': _BEAM_GITHUB_REPO_OWNER,
+        'repo': _BEAM_GITHUB_REPO_NAME,
+        'body': comment_description,
+        issue_number: issue_number,
+    }
+    response = requests.post(
+        open_issue_response['comments_url'], json.dumps(data), 
headers=_HEADERS)
+    return True, response.json()['html_url']
+  return False, None
+
+
+def add_label_to_issue(issue_number: int, labels: Optional[List[str]] = None):
+  url = 'https://api.github.com/repos/{}/{}/issues/{}/labels'.format(
+      _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME, issue_number)
+  if labels:
+    requests.post(url, json.dumps({'labels': labels}), headers=_HEADERS)
+
+
+def get_issue_description(
+    metric_name: str,
+    timestamps: List[pd.Timestamp],
+    metric_values: List,
+    change_point_index: int,
+    max_results_to_display: int = 5) -> str:
+  """
+  Args:
+   metric_name: Metric name used for the Change Point Analysis.
+   timestamps: Timestamps of the metrics when they were published to the
+    Database.
+   metric_values: Values of the metric for the previous runs.
+   change_point_index: Index for the change point. The element in the
+    index of the metric_values would be the change point.
+   max_results_to_display: Max number of results to display from the change
+    point index, in both directions of the change point index.
+
+  Returns:
+    str: Description used to fill the GitHub issues description.
+  """
+
+  # TODO: Add mean and median before and after the changepoint index.
+  indices_to_display = []
+  upper_bound = min(
+      change_point_index + max_results_to_display + 1, len(metric_values))
+  lower_bound = max(0, change_point_index - max_results_to_display)
+  for i in range(lower_bound, upper_bound):
+    indices_to_display.append(i)
+
+  indices_to_display.sort()
+  description = _ISSUE_DESCRIPTION_HEADER.format(metric_name) + 2 * '\n'
+  for index_to_display in indices_to_display:
+    description += _METRIC_INFO.format(
+        timestamps[index_to_display].ctime(), metric_values[index_to_display])
+    if index_to_display == change_point_index:
+      description += ' <---- Anomaly'
+    description += '\n'
+  return description
+
+
+def report_change_point_on_issues(
+    title: str,
+    issue_number: Optional[int],

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to