tvalentyn commented on code in PR #23931:
URL: https://github.com/apache/beam/pull/23931#discussion_r1044799836


##########
sdks/python/apache_beam/testing/analyzers/github_issues_utils.py:
##########
@@ -0,0 +1,187 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+import logging
+import os
+from typing import List
+from typing import Optional
+from typing import Tuple
+
+import pandas as pd
+import requests
+
+try:
+  _GITHUB_TOKEN: Optional[str] = os.environ['GITHUB_TOKEN']
+except KeyError as e:
+  _GITHUB_TOKEN = None
+  logging.warning(
+      'A Github Personal Access token is required '
+      'to create Github Issues.')
+
+# TODO: Change the REPO owner name to apache before merging.
+_BEAM_GITHUB_REPO_OWNER = 'AnandInguva'
+_BEAM_GITHUB_REPO_NAME = 'beam'
+# Adding GitHub Rest API version to the header to maintain version stability.
+# For more information, please look at
+# 
https://github.blog/2022-11-28-to-infinity-and-beyond-enabling-the-future-of-githubs-rest-api-with-api-versioning/
 # pylint: disable=line-too-long
+_HEADERS = {
+    "Authorization": 'token {}'.format(_GITHUB_TOKEN),
+    "Accept": "application/vnd.github+json",
+    "X-GitHub-Api-Version": "2022-11-28"
+}
+
+# Fill the GitHub issue description with the below variables.
+_ISSUE_DESCRIPTION_HEADER = """

Review Comment:
   1. Let's define  _ISSUE_DESCRIPTION_HEADER in the same place where 
_TITLE_TEMPLATE is. Both are closely related.
   2. Naming suggestion: _ISSUE_DESCRIPTION_TEMPLATE , _ISSUE_TITLE_TEMPLATE
   3.  _ISSUE_DESCRIPTION_TEMPLATE could be extended to smth like:
    ```
   Performance change detected in <test path, ideally without replacing dots 
with _ so that it's easier to find in the codebase >  for the metric <...> 
   
   <runs that point anomaly>. 
   
   For more information, see: <link to the playbook.>
   4. nit: _ISSUE_TITLE_TEMPLATE could potentially be shortened, maybe use only 
the last identifier of the test name, so that we don't include 
`apache_beam.testing.benchmarks...` . Example: Performance change in  
pytorch_image_classification_benchmarks_13 : mean_load_model_latency_milli_secs
   



##########
sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py:
##########
@@ -0,0 +1,199 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from dataclasses import asdict
+from dataclasses import dataclass
+import logging
+
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers import constants
+from apache_beam.testing.analyzers.github_issues_utils import 
get_issue_description
+from apache_beam.testing.analyzers.github_issues_utils import 
report_change_point_on_issues
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsFetcher
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsPublisher
+from signal_processing_algorithms.energy_statistics.energy_statistics import 
e_divisive
+
+
+@dataclass
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  issue_timestamp: pd.Timestamp
+  change_point_timestamp: pd.Timestamp
+  test_name: str
+  metric_name: str
+  issue_number: int
+  issue_url: str
+  test_id: str
+  change_point: float
+
+
+def is_change_point_in_valid_window(
+    num_runs_in_change_point_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.

Review Comment:
   this comment line is not relevant to this method.



##########
sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py:
##########
@@ -620,3 +624,13 @@ def __init__(self):
   def process(self, element):
     yield self.timestamp_val_fn(
         element, self.timestamp_fn(micros=int(self.time_fn() * 1000000)))
+
+
+class BigQueryMetricsFetcher:
+  def __init__(self):
+    self.client = bigquery.Client()

Review Comment:
   thanks, looks better.



##########
sdks/python/apache_beam/testing/analyzers/github_issues_utils.py:
##########
@@ -0,0 +1,187 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+import logging
+import os
+from typing import List
+from typing import Optional
+from typing import Tuple
+
+import pandas as pd
+import requests
+
+try:
+  _GITHUB_TOKEN: Optional[str] = os.environ['GITHUB_TOKEN']
+except KeyError as e:
+  _GITHUB_TOKEN = None
+  logging.warning(
+      'A Github Personal Access token is required '
+      'to create Github Issues.')
+
+# TODO: Change the REPO owner name to apache before merging.
+_BEAM_GITHUB_REPO_OWNER = 'AnandInguva'
+_BEAM_GITHUB_REPO_NAME = 'beam'
+# Adding GitHub Rest API version to the header to maintain version stability.
+# For more information, please look at
+# 
https://github.blog/2022-11-28-to-infinity-and-beyond-enabling-the-future-of-githubs-rest-api-with-api-versioning/
 # pylint: disable=line-too-long
+_HEADERS = {
+    "Authorization": 'token {}'.format(_GITHUB_TOKEN),
+    "Accept": "application/vnd.github+json",
+    "X-GitHub-Api-Version": "2022-11-28"
+}
+
+# Fill the GitHub issue description with the below variables.
+_ISSUE_DESCRIPTION_HEADER = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+_AWAITING_TRIAGE_LABEL = 'awaiting triage'
+_PERF_ALERT_LABEL = 'perf-alert'
+
+
+def create_issue(
+    title: str,
+    description: str,
+    labels: Optional[List[str]] = None,
+) -> Tuple[int, str]:
+  """
+  Create an issue with title, description with a label.
+
+  Args:
+    title:  GitHub issue title.
+    description: GitHub issue description.
+    labels: Labels used to tag the GitHub issue.
+  Returns:
+    Tuple containing GitHub issue number and issue URL.
+  """
+  url = "https://api.github.com/repos/{}/{}/issues".format(
+      _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME)
+  data = {
+      'owner': _BEAM_GITHUB_REPO_OWNER,
+      'repo': _BEAM_GITHUB_REPO_NAME,
+      'title': title,
+      'body': description,
+      'labels': [_AWAITING_TRIAGE_LABEL, _PERF_ALERT_LABEL]
+  }
+  if labels:
+    data['labels'] += labels
+  response = requests.post(
+      url=url, data=json.dumps(data), headers=_HEADERS).json()
+  return response['number'], response['html_url']
+
+
+def comment_on_issue(issue_number: int,
+                     comment_description: str) -> Tuple[bool, Optional[str]]:
+  """
+  This method looks for an issue with provided issue_number. If an open
+  issue is found, comment on the open issue with provided description else
+  do nothing.
+
+  Args:
+    issue_number: A GitHub issue number.
+    comment_description: If an issue with issue_number is open,
+      then comment on the issue with the using comment_description.
+  Returns:
+    Boolean, indicating a comment was added to issue, and URL directing to

Review Comment:
   ```suggestion
       tuple[bool, Optional[str]] indicating if a comment was added to issue, 
and the comment URL.
   ```



##########
sdks/python/apache_beam/testing/analyzers/perf_analysis.py:
##########
@@ -0,0 +1,175 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for benchmark/load/performance test.
+
+import argparse
+
+import logging
+import os
+import uuid
+from datetime import datetime
+from datetime import timezone
+from typing import Any
+from typing import Dict
+from typing import Optional
+
+import pandas as pd
+
+from apache_beam.testing.analyzers import constants
+from apache_beam.testing.analyzers.perf_analysis_utils import 
create_performance_alert
+from apache_beam.testing.analyzers.perf_analysis_utils import fetch_metric_data
+from apache_beam.testing.analyzers.perf_analysis_utils import 
get_existing_issues_data
+from apache_beam.testing.analyzers.perf_analysis_utils import 
find_latest_change_point_index
+from apache_beam.testing.analyzers.perf_analysis_utils import 
GitHubIssueMetaData
+from apache_beam.testing.analyzers.perf_analysis_utils import 
is_change_point_in_valid_window
+from apache_beam.testing.analyzers.perf_analysis_utils import is_perf_alert
+from apache_beam.testing.analyzers.perf_analysis_utils import 
publish_issue_metadata_to_big_query
+from apache_beam.testing.analyzers.perf_analysis_utils import read_test_config
+from apache_beam.testing.analyzers.perf_analysis_utils import validate_config
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsFetcher
+
+
+def run_change_point_analysis(params, test_id, big_query_metrics_fetcher):
+  if not validate_config(params.keys()):
+    raise ValueError(
+        f"Please make sure all these keys {constants._PERF_TEST_KEYS} "
+        f"are specified for the {test_id}")
+
+  metric_name = params['metric_name']
+  test_name = params['test_name'].replace('.', '_') + f'_{metric_name}'
+
+  min_runs_between_change_points = (
+      constants._DEFAULT_MIN_RUNS_BETWEEN_CHANGE_POINTS)
+  if 'min_runs_between_change_points' in params:
+    min_runs_between_change_points = params['min_runs_between_change_points']
+
+  num_runs_in_change_point_window = (
+      constants._DEFAULT_NUM_RUMS_IN_CHANGE_POINT_WINDOW)
+  if 'num_runs_in_change_point_window' in params:
+    num_runs_in_change_point_window = params['num_runs_in_change_point_window']
+
+  metric_values, timestamps = fetch_metric_data(
+    params=params,
+    big_query_metrics_fetcher=big_query_metrics_fetcher
+  )
+
+  change_point_index = find_latest_change_point_index(
+      metric_values=metric_values)
+  if not change_point_index:
+    return
+
+  if not is_change_point_in_valid_window(num_runs_in_change_point_window,
+                                         change_point_index):
+    logging.info(
+        'Performance regression/improvement found for the test: %s. '
+        'Since the change point index %s '
+        'lies outside the num_runs_in_change_point_window distance: %s, '
+        'alert is not raised.' %
+        (test_name, change_point_index, num_runs_in_change_point_window))
+    return
+
+  is_alert = True
+  last_reported_issue_number = None
+  existing_issue_data = get_existing_issues_data(
+      test_name=test_name, big_query_metrics_fetcher=big_query_metrics_fetcher)
+
+  if existing_issue_data is not None:
+    existing_issue_timestamps = existing_issue_data[
+        constants._CHANGE_POINT_TIMESTAMP_LABEL].tolist()
+    last_reported_issue_number = existing_issue_data[
+        constants._ISSUE_NUMBER].tolist()[0]
+
+    is_alert = is_perf_alert(
+        previous_change_point_timestamps=existing_issue_timestamps,
+        change_point_index=change_point_index,
+        timestamps=timestamps,
+        min_runs_between_change_points=min_runs_between_change_points)
+
+  logging.info("Performance alert is %s for test %s" % (is_alert, test_name))
+  if is_alert:
+    issue_number, issue_url = create_performance_alert(
+     metric_name, test_name, timestamps,
+     metric_values, change_point_index,
+     params.get('labels', None),
+      last_reported_issue_number)

Review Comment:
   ```suggestion
        last_reported_issue_number)
   ```



##########
sdks/python/apache_beam/testing/analyzers/github_issues_utils.py:
##########
@@ -0,0 +1,187 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+import logging
+import os
+from typing import List
+from typing import Optional
+from typing import Tuple
+
+import pandas as pd
+import requests
+
+try:
+  _GITHUB_TOKEN: Optional[str] = os.environ['GITHUB_TOKEN']
+except KeyError as e:
+  _GITHUB_TOKEN = None
+  logging.warning(
+      'A Github Personal Access token is required '
+      'to create Github Issues.')
+
+# TODO: Change the REPO owner name to apache before merging.
+_BEAM_GITHUB_REPO_OWNER = 'AnandInguva'
+_BEAM_GITHUB_REPO_NAME = 'beam'
+# Adding GitHub Rest API version to the header to maintain version stability.
+# For more information, please look at
+# 
https://github.blog/2022-11-28-to-infinity-and-beyond-enabling-the-future-of-githubs-rest-api-with-api-versioning/
 # pylint: disable=line-too-long
+_HEADERS = {
+    "Authorization": 'token {}'.format(_GITHUB_TOKEN),
+    "Accept": "application/vnd.github+json",
+    "X-GitHub-Api-Version": "2022-11-28"
+}
+
+# Fill the GitHub issue description with the below variables.

Review Comment:
   nit: I don't quite follow which variables the comment refers to. Remove the 
comment?



##########
sdks/python/apache_beam/testing/analyzers/github_issues_utils.py:
##########
@@ -0,0 +1,187 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+import logging
+import os
+from typing import List
+from typing import Optional
+from typing import Tuple
+
+import pandas as pd
+import requests
+
+try:
+  _GITHUB_TOKEN: Optional[str] = os.environ['GITHUB_TOKEN']
+except KeyError as e:
+  _GITHUB_TOKEN = None
+  logging.warning(
+      'A Github Personal Access token is required '
+      'to create Github Issues.')
+
+# TODO: Change the REPO owner name to apache before merging.
+_BEAM_GITHUB_REPO_OWNER = 'AnandInguva'
+_BEAM_GITHUB_REPO_NAME = 'beam'
+# Adding GitHub Rest API version to the header to maintain version stability.
+# For more information, please look at
+# 
https://github.blog/2022-11-28-to-infinity-and-beyond-enabling-the-future-of-githubs-rest-api-with-api-versioning/
 # pylint: disable=line-too-long
+_HEADERS = {
+    "Authorization": 'token {}'.format(_GITHUB_TOKEN),
+    "Accept": "application/vnd.github+json",
+    "X-GitHub-Api-Version": "2022-11-28"
+}
+
+# Fill the GitHub issue description with the below variables.
+_ISSUE_DESCRIPTION_HEADER = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+_AWAITING_TRIAGE_LABEL = 'awaiting triage'
+_PERF_ALERT_LABEL = 'perf-alert'
+
+
+def create_issue(
+    title: str,
+    description: str,
+    labels: Optional[List[str]] = None,
+) -> Tuple[int, str]:
+  """
+  Create an issue with title, description with a label.
+
+  Args:
+    title:  GitHub issue title.
+    description: GitHub issue description.
+    labels: Labels used to tag the GitHub issue.
+  Returns:
+    Tuple containing GitHub issue number and issue URL.
+  """
+  url = "https://api.github.com/repos/{}/{}/issues".format(
+      _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME)
+  data = {
+      'owner': _BEAM_GITHUB_REPO_OWNER,
+      'repo': _BEAM_GITHUB_REPO_NAME,
+      'title': title,
+      'body': description,
+      'labels': [_AWAITING_TRIAGE_LABEL, _PERF_ALERT_LABEL]
+  }
+  if labels:
+    data['labels'] += labels
+  response = requests.post(
+      url=url, data=json.dumps(data), headers=_HEADERS).json()
+  return response['number'], response['html_url']
+
+
+def comment_on_issue(issue_number: int,
+                     comment_description: str) -> Tuple[bool, Optional[str]]:
+  """
+  This method looks for an issue with provided issue_number. If an open
+  issue is found, comment on the open issue with provided description else
+  do nothing.
+
+  Args:
+    issue_number: A GitHub issue number.
+    comment_description: If an issue with issue_number is open,
+      then comment on the issue with the using comment_description.
+  Returns:
+    Boolean, indicating a comment was added to issue, and URL directing to
+     the comment.
+  """
+  url = 'https://api.github.com/repos/{}/{}/issues/{}'.format(
+      _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME, issue_number)
+  open_issue_response = requests.get(
+      url,
+      json.dumps({
+          'owner': _BEAM_GITHUB_REPO_OWNER,
+          'repo': _BEAM_GITHUB_REPO_NAME,
+          'issue_number': issue_number
+      }),
+      headers=_HEADERS).json()
+  if open_issue_response['state'] == 'open':
+    data = {
+        'owner': _BEAM_GITHUB_REPO_OWNER,
+        'repo': _BEAM_GITHUB_REPO_NAME,
+        'body': comment_description,
+        issue_number: issue_number,
+    }
+    response = requests.post(
+        open_issue_response['comments_url'], json.dumps(data), 
headers=_HEADERS)
+    return True, response.json()['html_url']
+  return False, None
+
+
+def add_awaiting_triage_label_to_issue(issue_number: int):

Review Comment:
   nit: add_awaiting_triage_label_to_issue can be shortened to 
add_awaiting_triage_label. _to_issue doesn't add new information - is well 
understood from the context (e.g. name of the file), arg name, etc. A rule of 
thumb is to name in a method in way that makes it clear what it does without 
making it hard to read. Shorter names help with the latter.



##########
sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py:
##########
@@ -0,0 +1,199 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from dataclasses import asdict
+from dataclasses import dataclass
+import logging
+
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers import constants
+from apache_beam.testing.analyzers.github_issues_utils import 
get_issue_description
+from apache_beam.testing.analyzers.github_issues_utils import 
report_change_point_on_issues
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsFetcher
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsPublisher
+from signal_processing_algorithms.energy_statistics.energy_statistics import 
e_divisive
+
+
+@dataclass
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  issue_timestamp: pd.Timestamp
+  change_point_timestamp: pd.Timestamp
+  test_name: str
+  metric_name: str
+  issue_number: int
+  issue_url: str
+  test_id: str
+  change_point: float
+
+
+def is_change_point_in_valid_window(
+    num_runs_in_change_point_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  return num_runs_in_change_point_window >= change_point_index
+
+
+def get_existing_issues_data(
+    test_name: str, big_query_metrics_fetcher: BigQueryMetricsFetcher
+) -> Optional[pd.DataFrame]:
+  """
+  Finds the most recent GitHub issue created for the test_name.
+  If no table found with name=test_name, return (None, None)
+  else return latest created issue_number along with
+  """
+  query = f"""
+  SELECT * FROM 
{constants._BQ_PROJECT_NAME}.{constants._BQ_DATASET}.{test_name}
+  ORDER BY {constants._ISSUE_CREATION_TIMESTAMP_LABEL} DESC
+  LIMIT 10
+  """
+  try:
+    df = big_query_metrics_fetcher.fetch(query=query)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test+metric.
+    return None
+  return df
+
+
+def is_perf_alert(
+    previous_change_point_timestamps: List[pd.Timestamp],
+    change_point_index: int,
+    timestamps: List[pd.Timestamp],

Review Comment:
   Add a docstring for args/returns  and make sure to specify expected sorting 
order for datapoints and timestamps must be specified in a 
reverse-chronological order. it's a very important detail to be able to reason 
about this method.
   
   Optionally, you could consider to actually pass around datapoints in 
chronological order. It may be easier that way. no strong opinion.



##########
sdks/python/apache_beam/testing/analyzers/README.md:
##########
@@ -0,0 +1,80 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+<h1>Performance alerts for Beam Python performance and load tests</h1>
+
+
+<h2> Alerts </h2>
+
+Performance regressions or improvements detected with the [Change Point 
Analysis](https://en.wikipedia.org/wiki/Change_detection) using 
[edivisive](https://github.com/apache/beam/blob/0a91d139dea4276dc46176c4cdcdfce210fc50c4/.test-infra/jenkins/job_InferenceBenchmarkTests_Python.groovy#L30)
+analyzer are automatically filed as Beam GitHub issues with a label 
`perf-alert`.
+
+The GitHub issue description will contain the information on the affected test 
and metric by providing the metric values for N consecutive runs with timestamps
+before and after the observed change point. Observed change point is pointed 
as `Anomaly` in the issue description.
+
+Example: [sample perf alert GitHub 
issue](https://github.com/AnandInguva/beam/issues/83).
+
+If a performance alert is created on a test, a GitHub issue will be created 
and the GitHub issue metadata such as GitHub issue
+URL, issue number along with the change point value and timestamp are exported 
to BigQuery. This data will be used to analyze the next change point observed 
on the same test to
+update already created GitHub issue or ignore performance alert by not 
creating GitHub issue to avoid duplicate issue creation.
+
+<h2> Config file structure </h2>
+The config file defines the structure to run change point analysis on a given 
test. To add a test to the config file,
+please follow the below structure.
+
+**NOTE**: The Change point analysis only supports reading the metric data from 
Big Query for now.
+
+```
+# the test_1 must be a unique id.
+test_1:
+  test_name: 
apache_beam.testing.benchmarks.inference.pytorch_image_classification_benchmarks
+  source: big_query
+  metrics_dataset: beam_run_inference
+  metrics_table: torch_inference_imagenet_results_resnet152
+  project: apache-beam-testing
+  metric_name: mean_load_model_latency_milli_secs
+  labels:
+    - run-inference
+  min_runs_between_change_points: 5
+  num_runs_in_change_point_window: 7
+```
+
+**Note**: If the source is **BigQuery**, the metrics_dataset, metrics_table, 
project and metric_name should match with the values defined for 
performance/load tests.
+The above example uses this [test 
configuration](https://github.com/apache/beam/blob/0a91d139dea4276dc46176c4cdcdfce210fc50c4/.test-infra/jenkins/job_InferenceBenchmarkTests_Python.groovy#L30)
+to fill up the values required to fetch the data from source.
+
+<h3>Different ways to avoid false positive change points</h3>
+
+**min_runs_between_change_points**: As the metric data moves across the runs, 
the change point analysis can place the
+change point in a slightly different place. These change points refer to the 
same regression and are just noise.
+When we find a new change point, we will search up to the 
`min_runs_between_change_points` in both directions from the
+current change point. If an existing change point is found within the 
distance, then the current change point will be
+suppressed.
+
+**num_runs_in_change_point_window**: This defines how many runs to consider 
from the most recent run to be in change point window.
+Sometimes, the change point found might be way back in time and could be 
irrelevant. For a test, if a change point needs to be
+reported only when it was observed in the last 7 runs from the current run,
+setting `num_runs_in_change_point_window=7` will achieve it.
+
+<h2> Register a test for performance alerts. </h2>

Review Comment:
   Add a section for triaging alerts. Issue description can point to this page.



##########
sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py:
##########
@@ -0,0 +1,199 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from dataclasses import asdict
+from dataclasses import dataclass
+import logging
+
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers import constants
+from apache_beam.testing.analyzers.github_issues_utils import 
get_issue_description
+from apache_beam.testing.analyzers.github_issues_utils import 
report_change_point_on_issues
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsFetcher
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
BigQueryMetricsPublisher
+from signal_processing_algorithms.energy_statistics.energy_statistics import 
e_divisive
+
+
+@dataclass
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  issue_timestamp: pd.Timestamp
+  change_point_timestamp: pd.Timestamp
+  test_name: str
+  metric_name: str
+  issue_number: int
+  issue_url: str
+  test_id: str
+  change_point: float
+
+
+def is_change_point_in_valid_window(
+    num_runs_in_change_point_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  return num_runs_in_change_point_window >= change_point_index
+
+
+def get_existing_issues_data(
+    test_name: str, big_query_metrics_fetcher: BigQueryMetricsFetcher
+) -> Optional[pd.DataFrame]:
+  """
+  Finds the most recent GitHub issue created for the test_name.
+  If no table found with name=test_name, return (None, None)
+  else return latest created issue_number along with
+  """
+  query = f"""
+  SELECT * FROM 
{constants._BQ_PROJECT_NAME}.{constants._BQ_DATASET}.{test_name}
+  ORDER BY {constants._ISSUE_CREATION_TIMESTAMP_LABEL} DESC
+  LIMIT 10
+  """
+  try:
+    df = big_query_metrics_fetcher.fetch(query=query)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test+metric.
+    return None
+  return df
+
+
+def is_perf_alert(
+    previous_change_point_timestamps: List[pd.Timestamp],
+    change_point_index: int,
+    timestamps: List[pd.Timestamp],
+    min_runs_between_change_points: int) -> bool:
+  """
+  Search the previous_change_point_timestamps with current observed
+  change point sibling window and determine if it is a duplicate
+  change point or not.
+
+  Return False if the current observed change point is a duplicate of
+  already reported change points else return True.
+  """
+  sibling_change_point_min_timestamp = timestamps[min(
+      change_point_index + min_runs_between_change_points, len(timestamps) - 
1)]
+  sibling_change_point_max_timestamp = timestamps[max(
+      0, change_point_index - min_runs_between_change_points)]
+  # Search a list of previous change point timestamps and compare it with
+  # current change point timestamp. We do this in case, if a current change
+  # point is already reported in the past.
+  for previous_change_point_timestamp in previous_change_point_timestamps:
+    if (sibling_change_point_min_timestamp <= previous_change_point_timestamp 
<=
+        sibling_change_point_max_timestamp):
+      return False
+  return True
+
+
+def read_test_config(config_file_path: str) -> Dict:
+  """
+  Reads the config file in which the data required to
+  run the change point analysis is specified.
+  """
+  with open(config_file_path, 'r') as stream:
+    config = yaml.safe_load(stream)
+  return config
+
+
+def validate_config(keys):
+  return constants._PERF_TEST_KEYS.issubset(keys)
+
+
+def fetch_metric_data(
+    params: Dict[str, Any], big_query_metrics_fetcher: BigQueryMetricsFetcher
+) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]:
+  query = f"""
+      SELECT *
+      FROM 
{params['project']}.{params['metrics_dataset']}.{params['metrics_table']}
+      WHERE CONTAINS_SUBSTR(({load_test_metrics_utils.METRICS_TYPE_LABEL}), 
'{params['metric_name']}')
+      ORDER BY {load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL} DESC
+      LIMIT {constants._NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS}
+    """
+  metric_data: pd.DataFrame = big_query_metrics_fetcher.fetch(query=query)
+  return (
+      metric_data[load_test_metrics_utils.VALUE_LABEL],
+      metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL])
+
+
+def find_latest_change_point_index(metric_values: List[Union[float, int]]):
+  change_points_idx = e_divisive(metric_values)
+  if not change_points_idx:
+    return None
+  # Consider the latest change point.

Review Comment:
   I'm having a hard time reasoning about the order of elements here.
   
   fetch_metric_data returns elements in descending order of the timestamp (new 
timestamps first), same order applies to metric_values.
   
   find_latest_change_point_index  gets indices returned by e_devisive, sorts 
them in descending ascending order (larger indicies first), then returns, first 
element. Larger indices would correspond to older timestamps. End result seems 
like the oldest changepoint to me, not the latest (more recent changepoint)
   
   Note that order of elements, again, is not documented.



##########
sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py:
##########
@@ -0,0 +1,113 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+import os
+import time
+import unittest
+
+try:
+  import apache_beam.testing.analyzers.perf_analysis as analysis
+  from apache_beam.testing.analyzers import constants
+  from apache_beam.testing.analyzers.perf_analysis_utils import 
is_change_point_in_valid_window
+  from apache_beam.testing.analyzers.perf_analysis_utils import is_perf_alert
+  from apache_beam.testing.analyzers.perf_analysis_utils import e_divisive
+  from apache_beam.testing.analyzers.perf_analysis_utils import validate_config
+except ImportError as e:
+  analysis = None
+
+
[email protected](
+    analysis is None,

Review Comment:
   signal-processing-algorithms seems light-weight. I think we  can add it to 
the [test] dependencies.



##########
sdks/python/apache_beam/testing/analyzers/tests_config.yaml:
##########
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+test_1:
+  test_name: 
apache_beam.testing.benchmarks.inference.pytorch_image_classification_benchmarks
+  metrics_dataset: beam_run_inference
+  metrics_table: torch_inference_imagenet_results_resnet152
+  project: apache-beam-testing
+  metric_name: mean_load_model_latency_milli_secs
+  labels:
+    - run-inference
+  min_runs_between_change_points: 5

Review Comment:
   do we have to speficy min_runs_between_change_points and 
num_runs_in_change_point_window ? are default values good enough? if this is an 
example, specify that this is optional. let's make it as easy as possible for 
folks to configure a test.



##########
sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py:
##########
@@ -0,0 +1,113 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+import os
+import time
+import unittest
+
+try:
+  import apache_beam.testing.analyzers.perf_analysis as analysis
+  from apache_beam.testing.analyzers import constants
+  from apache_beam.testing.analyzers.perf_analysis_utils import 
is_change_point_in_valid_window
+  from apache_beam.testing.analyzers.perf_analysis_utils import is_perf_alert
+  from apache_beam.testing.analyzers.perf_analysis_utils import e_divisive
+  from apache_beam.testing.analyzers.perf_analysis_utils import validate_config
+except ImportError as e:
+  analysis = None
+
+
[email protected](

Review Comment:
   high-level note: you could create a more robust unit test if you stub-out 
the call to bigquery and mock-out the calls to github. then, you could actually 
exercise most of the code in the unit test on a sample test config. 



##########
sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py:
##########
@@ -0,0 +1,113 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+import os
+import time
+import unittest
+
+try:
+  import apache_beam.testing.analyzers.perf_analysis as analysis
+  from apache_beam.testing.analyzers import constants
+  from apache_beam.testing.analyzers.perf_analysis_utils import 
is_change_point_in_valid_window
+  from apache_beam.testing.analyzers.perf_analysis_utils import is_perf_alert
+  from apache_beam.testing.analyzers.perf_analysis_utils import e_divisive
+  from apache_beam.testing.analyzers.perf_analysis_utils import validate_config
+except ImportError as e:
+  analysis = None
+
+
[email protected](
+    analysis is None,
+    'Missing dependencies. '
+    'Test dependencies are missing for the Analyzer.')
+class TestChangePointAnalysis(unittest.TestCase):
+  def setUp(self) -> None:
+    self.single_change_point_series = [0] * 10 + [1] * 10
+    self.multiple_change_point_series = self.single_change_point_series + [
+        2
+    ] * 20
+
+  def test_edivisive_means(self):
+    change_point_indexes = e_divisive(self.single_change_point_series)
+    self.assertEqual(change_point_indexes, [10])
+    change_point_indexes = e_divisive(self.multiple_change_point_series)
+    self.assertEqual(sorted(change_point_indexes), [10, 20])
+
+  def test_is_changepoint_in_valid_window(self):
+
+    changepoint_to_recent_run_window = 19
+    change_point_index = 14
+
+    is_valid = is_change_point_in_valid_window(
+        changepoint_to_recent_run_window, change_point_index)
+    self.assertEqual(is_valid, True)
+
+    changepoint_to_recent_run_window = 13
+    is_valid = is_change_point_in_valid_window(
+        changepoint_to_recent_run_window, change_point_index)
+    self.assertEqual(is_valid, False)
+
+    changepoint_to_recent_run_window = 14
+    is_valid = is_change_point_in_valid_window(
+        changepoint_to_recent_run_window, change_point_index)
+    self.assertEqual(is_valid, True)
+
+  def test_validate_config(self):
+    test_keys = {
+        'test_name',
+        'metrics_dataset',
+        'metrics_table',
+        'project',
+        'metric_name'
+    }
+    self.assertEqual(test_keys, constants._PERF_TEST_KEYS)
+    self.assertTrue(validate_config(test_keys))
+
+  def test_is_perf_alert(self):

Review Comment:
   try to name tests in a way that specifies a behavior under test and expected 
result, for example
   test_when_change_points_are_too_close_alert_is_not_fired. 



##########
sdks/python/apache_beam/testing/analyzers/github_issues_utils.py:
##########
@@ -0,0 +1,187 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+import logging
+import os
+from typing import List
+from typing import Optional
+from typing import Tuple
+
+import pandas as pd
+import requests
+
+try:
+  _GITHUB_TOKEN: Optional[str] = os.environ['GITHUB_TOKEN']
+except KeyError as e:
+  _GITHUB_TOKEN = None
+  logging.warning(
+      'A Github Personal Access token is required '
+      'to create Github Issues.')
+
+# TODO: Change the REPO owner name to apache before merging.
+_BEAM_GITHUB_REPO_OWNER = 'AnandInguva'
+_BEAM_GITHUB_REPO_NAME = 'beam'
+# Adding GitHub Rest API version to the header to maintain version stability.
+# For more information, please look at
+# 
https://github.blog/2022-11-28-to-infinity-and-beyond-enabling-the-future-of-githubs-rest-api-with-api-versioning/
 # pylint: disable=line-too-long
+_HEADERS = {
+    "Authorization": 'token {}'.format(_GITHUB_TOKEN),
+    "Accept": "application/vnd.github+json",
+    "X-GitHub-Api-Version": "2022-11-28"
+}
+
+# Fill the GitHub issue description with the below variables.
+_ISSUE_DESCRIPTION_HEADER = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+_AWAITING_TRIAGE_LABEL = 'awaiting triage'
+_PERF_ALERT_LABEL = 'perf-alert'
+
+
+def create_issue(
+    title: str,
+    description: str,
+    labels: Optional[List[str]] = None,
+) -> Tuple[int, str]:
+  """
+  Create an issue with title, description with a label.
+
+  Args:
+    title:  GitHub issue title.
+    description: GitHub issue description.
+    labels: Labels used to tag the GitHub issue.
+  Returns:
+    Tuple containing GitHub issue number and issue URL.
+  """
+  url = "https://api.github.com/repos/{}/{}/issues".format(
+      _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME)
+  data = {
+      'owner': _BEAM_GITHUB_REPO_OWNER,
+      'repo': _BEAM_GITHUB_REPO_NAME,
+      'title': title,
+      'body': description,
+      'labels': [_AWAITING_TRIAGE_LABEL, _PERF_ALERT_LABEL]
+  }
+  if labels:
+    data['labels'] += labels
+  response = requests.post(
+      url=url, data=json.dumps(data), headers=_HEADERS).json()
+  return response['number'], response['html_url']
+
+
+def comment_on_issue(issue_number: int,
+                     comment_description: str) -> Tuple[bool, Optional[str]]:
+  """
+  This method looks for an issue with provided issue_number. If an open
+  issue is found, comment on the open issue with provided description else
+  do nothing.
+
+  Args:
+    issue_number: A GitHub issue number.
+    comment_description: If an issue with issue_number is open,
+      then comment on the issue with the using comment_description.
+  Returns:
+    Boolean, indicating a comment was added to issue, and URL directing to
+     the comment.
+  """
+  url = 'https://api.github.com/repos/{}/{}/issues/{}'.format(
+      _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME, issue_number)
+  open_issue_response = requests.get(
+      url,
+      json.dumps({
+          'owner': _BEAM_GITHUB_REPO_OWNER,
+          'repo': _BEAM_GITHUB_REPO_NAME,
+          'issue_number': issue_number
+      }),
+      headers=_HEADERS).json()
+  if open_issue_response['state'] == 'open':
+    data = {
+        'owner': _BEAM_GITHUB_REPO_OWNER,
+        'repo': _BEAM_GITHUB_REPO_NAME,
+        'body': comment_description,
+        issue_number: issue_number,
+    }
+    response = requests.post(
+        open_issue_response['comments_url'], json.dumps(data), 
headers=_HEADERS)
+    return True, response.json()['html_url']
+  return False, None
+
+
+def add_awaiting_triage_label_to_issue(issue_number: int):
+  url = 'https://api.github.com/repos/{}/{}/issues/{}/labels'.format(
+      _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME, issue_number)
+  requests.post(
+      url, json.dumps({'labels': [_AWAITING_TRIAGE_LABEL]}), headers=_HEADERS)
+
+
+def get_issue_description(
+    metric_name: str,
+    timestamps: List[pd.Timestamp],

Review Comment:
   Document the expected order (same as in other comment).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to