shunping commented on code in PR #33994:
URL: https://github.com/apache/beam/pull/33994#discussion_r1960249091


##########
sdks/python/apache_beam/ml/anomaly/univariate/mean.py:
##########
@@ -0,0 +1,141 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Trackers for calculating mean in windowed fashion.
+
+This module defines different types of mean trackers that operate on windows
+of data. It includes:
+
+  * `SimpleSlidingMeanTracker`: Calculates mean using numpy in a sliding 
window.
+  * `IncLandmarkMeanTracker`: Incremental mean tracker in landmark window mode.
+  * `IncSlidingMeanTracker`: Incremental mean tracker in sliding window mode.
+"""
+
+import math
+import warnings
+
+import numpy as np
+
+from apache_beam.ml.anomaly.univariate.base import WindowedTracker
+from apache_beam.ml.anomaly.univariate.base import WindowMode
+
+
+class MeanTracker(WindowedTracker):
+  """Abstract base class for mean trackers.
+
+  Currently, it does not add any specific functionality but provides a type
+  hierarchy for mean trackers.
+  """
+  pass
+
+
+class SimpleSlidingMeanTracker(MeanTracker):
+  """Sliding window mean tracker that calculates mean using NumPy.
+
+  This tracker uses NumPy's `nanmean` function to calculate the mean of the
+  values currently in the sliding window. It's a simple, non-incremental
+  approach.
+
+  Args:
+    window_size: The size of the sliding window.
+  """
+  def __init__(self, window_size):
+    super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size)
+
+  def get(self):
+    """Calculates and returns the mean of the current sliding window.
+
+    Returns:
+      float: The mean of the values in the current sliding window.
+             Returns NaN if the window is empty.
+    """
+    if len(self._queue) == 0:
+      return float('nan')
+
+    with warnings.catch_warnings(record=False):
+      warnings.simplefilter("ignore")
+      return np.nanmean(self._queue)
+
+
+class IncMeanTracker(MeanTracker):
+  """Base class for incremental mean trackers.
+
+  This class implements incremental calculation of the mean, which is more
+  efficient for streaming data as it updates the mean with each new data point
+  instead of recalculating from scratch.
+
+  Args:
+    window_mode: A `WindowMode` enum specifying whether the window is 
`LANDMARK`
+      or `SLIDING`.
+    **kwargs: Keyword arguments passed to the parent class constructor.
+  """
+  def __init__(self, window_mode, **kwargs):
+    super().__init__(window_mode=window_mode, **kwargs)
+    self._mean = 0
+
+  def push(self, x):
+    """Pushes a new value and updates the incremental mean.
+
+    Args:
+      x: The new value to be pushed.
+    """
+    if not math.isnan(x):
+      self._n += 1
+      delta = x - self._mean
+    else:
+      delta = 0

Review Comment:
   If we early return here (either discarding or ignoring), the result could 
have a subtle difference.
   
   Let's say there is a variable called X and it has 10% of missing values. For 
every second, we receive a new X. If we want to compute the rolling mean for X 
in a sliding window of a minute (window_size = 60) , the current implementation 
will yield the correct result, because missing values will still be pushed in 
the sliding window.
   
   On the other hand, if we ignore or discard these NaN, we will end up with a 
window of (non-NaN) data points that may include 6 (10%*60) data points on 
average that is outside the predefined one minute window.
   
   Moreover, if users really want to ignore or discard or even more 
sophisticatedly, impute NaNs, they can easily prepend a preprocessing map step 
to the current implementation to handle them. The opposite direction is not 
easy, because if those NaNs are discarded or ignored in the first place, they 
cannot be recovered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to