damccorm commented on code in PR #33994: URL: https://github.com/apache/beam/pull/33994#discussion_r1960470739
########## sdks/python/apache_beam/ml/anomaly/univariate/mean.py: ########## @@ -0,0 +1,141 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Trackers for calculating mean in windowed fashion. + +This module defines different types of mean trackers that operate on windows +of data. It includes: + + * `SimpleSlidingMeanTracker`: Calculates mean using numpy in a sliding window. + * `IncLandmarkMeanTracker`: Incremental mean tracker in landmark window mode. + * `IncSlidingMeanTracker`: Incremental mean tracker in sliding window mode. +""" + +import math +import warnings + +import numpy as np + +from apache_beam.ml.anomaly.univariate.base import WindowedTracker +from apache_beam.ml.anomaly.univariate.base import WindowMode + + +class MeanTracker(WindowedTracker): + """Abstract base class for mean trackers. + + Currently, it does not add any specific functionality but provides a type + hierarchy for mean trackers. + """ + pass + + +class SimpleSlidingMeanTracker(MeanTracker): + """Sliding window mean tracker that calculates mean using NumPy. + + This tracker uses NumPy's `nanmean` function to calculate the mean of the + values currently in the sliding window. It's a simple, non-incremental + approach. + + Args: + window_size: The size of the sliding window. + """ + def __init__(self, window_size): + super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size) + + def get(self): + """Calculates and returns the mean of the current sliding window. + + Returns: + float: The mean of the values in the current sliding window. + Returns NaN if the window is empty. + """ + if len(self._queue) == 0: + return float('nan') + + with warnings.catch_warnings(record=False): + warnings.simplefilter("ignore") + return np.nanmean(self._queue) + + +class IncMeanTracker(MeanTracker): + """Base class for incremental mean trackers. + + This class implements incremental calculation of the mean, which is more + efficient for streaming data as it updates the mean with each new data point + instead of recalculating from scratch. + + Args: + window_mode: A `WindowMode` enum specifying whether the window is `LANDMARK` + or `SLIDING`. + **kwargs: Keyword arguments passed to the parent class constructor. + """ + def __init__(self, window_mode, **kwargs): + super().__init__(window_mode=window_mode, **kwargs) + self._mean = 0 + + def push(self, x): + """Pushes a new value and updates the incremental mean. + + Args: + x: The new value to be pushed. + """ + if not math.isnan(x): + self._n += 1 + delta = x - self._mean + else: + delta = 0 Review Comment: > Moreover, if users really want to ignore or discard or even more sophisticatedly, impute NaNs, they can easily prepend a preprocessing map step to the current implementation to handle them. The opposite direction is not easy, because the information loss from early returns cannot be reversed. This is a good point. I agree, thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
