This is an automated email from the ASF dual-hosted git repository. xqhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new d50b4e49981 use Median Absolute Deviation for the perf alerts (#35294) d50b4e49981 is described below commit d50b4e4998162c56decf1440c7a64df24f1d3b28 Author: liferoad <huxiangq...@gmail.com> AuthorDate: Tue Jun 17 10:23:45 2025 -0400 use Median Absolute Deviation for the perf alerts (#35294) * use Median Absolute Deviation for the perf alerts * use the old threshold as the backup --- .../apache_beam/testing/analyzers/constants.py | 1 + .../apache_beam/testing/analyzers/perf_analysis.py | 9 +++++-- .../testing/analyzers/perf_analysis_utils.py | 28 ++++++++++++++++++++-- 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/testing/analyzers/constants.py b/sdks/python/apache_beam/testing/analyzers/constants.py index 09ab5c59590..95d70bda59c 100644 --- a/sdks/python/apache_beam/testing/analyzers/constants.py +++ b/sdks/python/apache_beam/testing/analyzers/constants.py @@ -35,6 +35,7 @@ _NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS = 100 # Variables used for finding duplicate change points. _DEFAULT_MIN_RUNS_BETWEEN_CHANGE_POINTS = 3 _DEFAULT_NUM_RUMS_IN_CHANGE_POINT_WINDOW = 14 +_DEFAULT_MEDIAN_ABS_DEVIATION_THRESHOLD = 2 _PERF_TEST_KEYS = { 'test_description', diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py index 93b9fb342bc..829f93fc1a3 100644 --- a/sdks/python/apache_beam/testing/analyzers/perf_analysis.py +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py @@ -83,7 +83,10 @@ def get_change_point_config( constants._DEFAULT_MIN_RUNS_BETWEEN_CHANGE_POINTS), num_runs_in_change_point_window=params.get( 'num_runs_in_change_point_window', - constants._DEFAULT_NUM_RUMS_IN_CHANGE_POINT_WINDOW)) + constants._DEFAULT_NUM_RUMS_IN_CHANGE_POINT_WINDOW), + median_abs_deviation_threshold=params.get( + 'median_abs_deviation_threshold', + constants._DEFAULT_MEDIAN_ABS_DEVIATION_THRESHOLD)) def run_change_point_analysis( @@ -130,7 +133,9 @@ def run_change_point_analysis( timestamps = metric_container.timestamps change_point_index = find_latest_change_point_index( - metric_values=metric_values) + metric_values=metric_values, + median_abs_deviation_threshold=change_point_config. + median_abs_deviation_threshold) if not change_point_index: logging.info( "Change point is not detected for the test ID %s" % diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py index a9015d715e9..ac3eac0f764 100644 --- a/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py @@ -67,6 +67,8 @@ class ChangePointConfig: constants._DEFAULT_MIN_RUNS_BETWEEN_CHANGE_POINTS) num_runs_in_change_point_window: int = ( constants._DEFAULT_NUM_RUMS_IN_CHANGE_POINT_WINDOW) + median_abs_deviation_threshold: int = ( + constants._DEFAULT_MEDIAN_ABS_DEVIATION_THRESHOLD) @dataclass @@ -197,7 +199,9 @@ def find_change_points(metric_values: List[Union[float, int]]): return e_divisive(metric_values) -def find_latest_change_point_index(metric_values: List[Union[float, int]]): +def find_latest_change_point_index( + metric_values: List[Union[float, int]], + median_abs_deviation_threshold: int = 2): """ Args: metric_values: Metric values used to run change point analysis. @@ -208,7 +212,10 @@ def find_latest_change_point_index(metric_values: List[Union[float, int]]): # reduce noise in the change point analysis by filtering out # the change points that are not significant enough. change_points_indices = filter_change_points_by_median_threshold( - metric_values, change_points_indices) + metric_values, + change_points_indices, + threshold=0.1, + median_abs_deviation_threshold=median_abs_deviation_threshold) # Consider the latest change point. if not change_points_indices: return None @@ -289,6 +296,7 @@ def filter_change_points_by_median_threshold( data: List[Union[int, float]], change_points: List[int], threshold: float = 0.05, + median_abs_deviation_threshold: int = 2, ): """ Reduces the number of change points by filtering out the ones that are @@ -308,6 +316,22 @@ def filter_change_points_by_median_threshold( left_value = median(left_segment) right_value = median(right_segment) + # MAD (Median Absolute Deviation) is a robust measure of variability. + # A low MAD indicates that the data points are tightly clustered around the + # median, while a high MAD suggests greater spread. + left_mad = median([abs(x - left_value) for x in left_segment]) + right_mad = median([abs(x - right_value) for x in right_segment]) + + # The change is considered significant only if the absolute difference + # between the medians of the two segments is greater than a threshold + # times the median absolute deviation (MAD) of the respective segments. + # This approach helps to filter out changes that are not statistically + # significant, making the detection more robust to noise and outliers. + if abs(right_value - left_value) > (median_abs_deviation_threshold * + (left_mad + right_mad)): + valid_change_points.append(idx) + continue + relative_change = abs(right_value - left_value) / (left_value + epsilon) if relative_change > threshold: