AnandInguva commented on code in PR #23931:
URL: https://github.com/apache/beam/pull/23931#discussion_r1047295084
##########
sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py:
##########
@@ -58,8 +55,6 @@ class GitHubIssueMetaData:
def is_change_point_in_valid_window(
num_runs_in_change_point_window: int, change_point_index: int) -> bool:
- # If the change point is more than N runs behind the most recent run,
- # Ignore the change point and don't raise an alert for it.
return num_runs_in_change_point_window >= change_point_index
Review Comment:
That sounds right. Changed it
##########
sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py:
##########
@@ -0,0 +1,199 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from dataclasses import asdict
+from dataclasses import dataclass
+import logging
+
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers import constants
+from apache_beam.testing.analyzers.github_issues_utils import
get_issue_description
+from apache_beam.testing.analyzers.github_issues_utils import
report_change_point_on_issues
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import
BigQueryMetricsFetcher
+from apache_beam.testing.load_tests.load_test_metrics_utils import
BigQueryMetricsPublisher
+from signal_processing_algorithms.energy_statistics.energy_statistics import
e_divisive
+
+
+@dataclass
+class GitHubIssueMetaData:
+ """
+ This class holds metadata that needs to be published to the
+ BigQuery when a GitHub issue is created on a performance
+ alert.
+ """
+ issue_timestamp: pd.Timestamp
+ change_point_timestamp: pd.Timestamp
+ test_name: str
+ metric_name: str
+ issue_number: int
+ issue_url: str
+ test_id: str
+ change_point: float
+
+
+def is_change_point_in_valid_window(
+ num_runs_in_change_point_window: int, change_point_index: int) -> bool:
+ # If the change point is more than N runs behind the most recent run,
+ # Ignore the change point and don't raise an alert for it.
+ return num_runs_in_change_point_window >= change_point_index
+
+
+def get_existing_issues_data(
+ test_name: str, big_query_metrics_fetcher: BigQueryMetricsFetcher
+) -> Optional[pd.DataFrame]:
+ """
+ Finds the most recent GitHub issue created for the test_name.
+ If no table found with name=test_name, return (None, None)
+ else return latest created issue_number along with
+ """
+ query = f"""
+ SELECT * FROM
{constants._BQ_PROJECT_NAME}.{constants._BQ_DATASET}.{test_name}
+ ORDER BY {constants._ISSUE_CREATION_TIMESTAMP_LABEL} DESC
+ LIMIT 10
+ """
+ try:
+ df = big_query_metrics_fetcher.fetch(query=query)
+ except exceptions.NotFound:
+ # If no table found, that means this is first performance regression
+ # on the current test+metric.
+ return None
+ return df
+
+
+def is_perf_alert(
+ previous_change_point_timestamps: List[pd.Timestamp],
+ change_point_index: int,
+ timestamps: List[pd.Timestamp],
+ min_runs_between_change_points: int) -> bool:
+ """
+ Search the previous_change_point_timestamps with current observed
+ change point sibling window and determine if it is a duplicate
+ change point or not.
+
+ Return False if the current observed change point is a duplicate of
+ already reported change points else return True.
+ """
+ sibling_change_point_min_timestamp = timestamps[min(
+ change_point_index + min_runs_between_change_points, len(timestamps) -
1)]
+ sibling_change_point_max_timestamp = timestamps[max(
+ 0, change_point_index - min_runs_between_change_points)]
+ # Search a list of previous change point timestamps and compare it with
+ # current change point timestamp. We do this in case, if a current change
+ # point is already reported in the past.
+ for previous_change_point_timestamp in previous_change_point_timestamps:
+ if (sibling_change_point_min_timestamp <= previous_change_point_timestamp
<=
+ sibling_change_point_max_timestamp):
+ return False
+ return True
+
+
+def read_test_config(config_file_path: str) -> Dict:
+ """
+ Reads the config file in which the data required to
+ run the change point analysis is specified.
+ """
+ with open(config_file_path, 'r') as stream:
+ config = yaml.safe_load(stream)
+ return config
+
+
+def validate_config(keys):
+ return constants._PERF_TEST_KEYS.issubset(keys)
+
+
+def fetch_metric_data(
+ params: Dict[str, Any], big_query_metrics_fetcher: BigQueryMetricsFetcher
+) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]:
+ query = f"""
+ SELECT *
+ FROM
{params['project']}.{params['metrics_dataset']}.{params['metrics_table']}
+ WHERE CONTAINS_SUBSTR(({load_test_metrics_utils.METRICS_TYPE_LABEL}),
'{params['metric_name']}')
+ ORDER BY {load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL} DESC
+ LIMIT {constants._NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS}
+ """
+ metric_data: pd.DataFrame = big_query_metrics_fetcher.fetch(query=query)
+ return (
+ metric_data[load_test_metrics_utils.VALUE_LABEL],
+ metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL])
+
+
+def find_latest_change_point_index(metric_values: List[Union[float, int]]):
+ change_points_idx = e_divisive(metric_values)
+ if not change_points_idx:
+ return None
+ # Consider the latest change point.
Review Comment:
From the table, we only need latest 100 runs data. To get that, we sort the
data by timestamps and fetch 100 runs.
We sort the data to chronological order once we fetch latest 100 runs,
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]