This is an automated email from the ASF dual-hosted git repository.

xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d50b4e49981 use Median Absolute Deviation for the perf alerts (#35294)
d50b4e49981 is described below

commit d50b4e4998162c56decf1440c7a64df24f1d3b28
Author: liferoad <huxiangq...@gmail.com>
AuthorDate: Tue Jun 17 10:23:45 2025 -0400

    use Median Absolute Deviation for the perf alerts (#35294)
    
    * use Median Absolute Deviation for the perf alerts
    
    * use the old threshold as the backup
---
 .../apache_beam/testing/analyzers/constants.py     |  1 +
 .../apache_beam/testing/analyzers/perf_analysis.py |  9 +++++--
 .../testing/analyzers/perf_analysis_utils.py       | 28 ++++++++++++++++++++--
 3 files changed, 34 insertions(+), 4 deletions(-)

diff --git a/sdks/python/apache_beam/testing/analyzers/constants.py 
b/sdks/python/apache_beam/testing/analyzers/constants.py
index 09ab5c59590..95d70bda59c 100644
--- a/sdks/python/apache_beam/testing/analyzers/constants.py
+++ b/sdks/python/apache_beam/testing/analyzers/constants.py
@@ -35,6 +35,7 @@ _NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS = 100
 # Variables used for finding duplicate change points.
 _DEFAULT_MIN_RUNS_BETWEEN_CHANGE_POINTS = 3
 _DEFAULT_NUM_RUMS_IN_CHANGE_POINT_WINDOW = 14
+_DEFAULT_MEDIAN_ABS_DEVIATION_THRESHOLD = 2
 
 _PERF_TEST_KEYS = {
     'test_description',
diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis.py 
b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py
index 93b9fb342bc..829f93fc1a3 100644
--- a/sdks/python/apache_beam/testing/analyzers/perf_analysis.py
+++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py
@@ -83,7 +83,10 @@ def get_change_point_config(
           constants._DEFAULT_MIN_RUNS_BETWEEN_CHANGE_POINTS),
       num_runs_in_change_point_window=params.get(
           'num_runs_in_change_point_window',
-          constants._DEFAULT_NUM_RUMS_IN_CHANGE_POINT_WINDOW))
+          constants._DEFAULT_NUM_RUMS_IN_CHANGE_POINT_WINDOW),
+      median_abs_deviation_threshold=params.get(
+          'median_abs_deviation_threshold',
+          constants._DEFAULT_MEDIAN_ABS_DEVIATION_THRESHOLD))
 
 
 def run_change_point_analysis(
@@ -130,7 +133,9 @@ def run_change_point_analysis(
   timestamps = metric_container.timestamps
 
   change_point_index = find_latest_change_point_index(
-      metric_values=metric_values)
+      metric_values=metric_values,
+      median_abs_deviation_threshold=change_point_config.
+      median_abs_deviation_threshold)
   if not change_point_index:
     logging.info(
         "Change point is not detected for the test ID %s" %
diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py 
b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py
index a9015d715e9..ac3eac0f764 100644
--- a/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py
+++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py
@@ -67,6 +67,8 @@ class ChangePointConfig:
       constants._DEFAULT_MIN_RUNS_BETWEEN_CHANGE_POINTS)
   num_runs_in_change_point_window: int = (
       constants._DEFAULT_NUM_RUMS_IN_CHANGE_POINT_WINDOW)
+  median_abs_deviation_threshold: int = (
+      constants._DEFAULT_MEDIAN_ABS_DEVIATION_THRESHOLD)
 
 
 @dataclass
@@ -197,7 +199,9 @@ def find_change_points(metric_values: List[Union[float, 
int]]):
   return e_divisive(metric_values)
 
 
-def find_latest_change_point_index(metric_values: List[Union[float, int]]):
+def find_latest_change_point_index(
+    metric_values: List[Union[float, int]],
+    median_abs_deviation_threshold: int = 2):
   """
   Args:
    metric_values: Metric values used to run change point analysis.
@@ -208,7 +212,10 @@ def find_latest_change_point_index(metric_values: 
List[Union[float, int]]):
   # reduce noise in the change point analysis by filtering out
   # the change points that are not significant enough.
   change_points_indices = filter_change_points_by_median_threshold(
-      metric_values, change_points_indices)
+      metric_values,
+      change_points_indices,
+      threshold=0.1,
+      median_abs_deviation_threshold=median_abs_deviation_threshold)
   # Consider the latest change point.
   if not change_points_indices:
     return None
@@ -289,6 +296,7 @@ def filter_change_points_by_median_threshold(
     data: List[Union[int, float]],
     change_points: List[int],
     threshold: float = 0.05,
+    median_abs_deviation_threshold: int = 2,
 ):
   """
   Reduces the number of change points by filtering out the ones that are
@@ -308,6 +316,22 @@ def filter_change_points_by_median_threshold(
     left_value = median(left_segment)
     right_value = median(right_segment)
 
+    # MAD (Median Absolute Deviation) is a robust measure of variability.
+    # A low MAD indicates that the data points are tightly clustered around the
+    # median, while a high MAD suggests greater spread.
+    left_mad = median([abs(x - left_value) for x in left_segment])
+    right_mad = median([abs(x - right_value) for x in right_segment])
+
+    # The change is considered significant only if the absolute difference
+    # between the medians of the two segments is greater than a threshold
+    # times the median absolute deviation (MAD) of the respective segments.
+    # This approach helps to filter out changes that are not statistically
+    # significant, making the detection more robust to noise and outliers.
+    if abs(right_value - left_value) > (median_abs_deviation_threshold *
+                                        (left_mad + right_mad)):
+      valid_change_points.append(idx)
+      continue
+
     relative_change = abs(right_value - left_value) / (left_value + epsilon)
 
     if relative_change > threshold:

Reply via email to