akshayrai commented on a change in pull request #5398: URL: https://github.com/apache/incubator-pinot/pull/5398#discussion_r428376797
########## File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/spec/DataSlaQualityCheckerSpec.java ########## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.thirdeye.detection.dataquality.spec; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.pinot.thirdeye.dataframe.util.MetricSlice; +import org.apache.pinot.thirdeye.detection.spec.AbstractSpec; + + +@JsonIgnoreProperties(ignoreUnknown = true) +public class DataSlaQualityCheckerSpec extends AbstractSpec { + private String sla = "1_DAYS"; Review comment: updated ########## File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/components/DataSlaQualityChecker.java ########## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.thirdeye.detection.dataquality.components; + +import com.google.common.collect.ArrayListMultimap; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.thirdeye.anomaly.AnomalyType; +import org.apache.pinot.thirdeye.common.time.TimeGranularity; +import org.apache.pinot.thirdeye.dataframe.DataFrame; +import org.apache.pinot.thirdeye.dataframe.util.MetricSlice; +import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO; +import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO; +import org.apache.pinot.thirdeye.detection.DetectionUtils; +import org.apache.pinot.thirdeye.detection.InputDataFetcher; +import org.apache.pinot.thirdeye.detection.annotation.Components; +import org.apache.pinot.thirdeye.detection.annotation.DetectionTag; +import org.apache.pinot.thirdeye.detection.annotation.Param; +import org.apache.pinot.thirdeye.detection.annotation.PresentationOption; +import org.apache.pinot.thirdeye.detection.dataquality.spec.DataSlaQualityCheckerSpec; +import org.apache.pinot.thirdeye.detection.spi.components.AnomalyDetector; +import org.apache.pinot.thirdeye.detection.spi.components.BaselineProvider; +import org.apache.pinot.thirdeye.detection.spi.model.DetectionResult; +import org.apache.pinot.thirdeye.detection.spi.model.InputData; +import org.apache.pinot.thirdeye.detection.spi.model.InputDataSpec; +import org.apache.pinot.thirdeye.detection.spi.model.TimeSeries; +import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.Interval; +import org.joda.time.Period; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Performs data sla checks for the window and generates DATA_MISSING anomalies. + */ +@Components(title = "Data Sla Quality Checker", + type = "DATA_SLA", + tags = {DetectionTag.RULE_DETECTION}, + description = "Checks if data is missing or not based on the configured sla", + presentation = { + @PresentationOption(name = "data sla", template = "is ${sla}") + }, + params = { + @Param(name = "sla", placeholder = "value") + }) +public class DataSlaQualityChecker implements AnomalyDetector<DataSlaQualityCheckerSpec>, BaselineProvider<DataSlaQualityCheckerSpec> { + private static final Logger LOG = LoggerFactory.getLogger(DataSlaQualityChecker.class); + + private String sla; + private InputDataFetcher dataFetcher; + private final String DEFAULT_DATA_SLA = "3_DAYS"; + + @Override + public DetectionResult runDetection(Interval window, String metricUrn) { + return DetectionResult.from(runSLACheck(MetricEntity.fromURN(metricUrn), window)); + } + + @Override + public TimeSeries computePredictedTimeSeries(MetricSlice slice) { + return TimeSeries.empty(); + } + + @Override + public void init(DataSlaQualityCheckerSpec spec, InputDataFetcher dataFetcher) { + this.sla = spec.getSla(); + this.dataFetcher = dataFetcher; + } + + /** + * Runs the data sla check for the window on the given metric + */ + private List<MergedAnomalyResultDTO> runSLACheck(MetricEntity me, Interval window) { + List<MergedAnomalyResultDTO> anomalies = new ArrayList<>(); + + // We want to measure the overall dataset availability (filters can be ignored) + long startTime = window.getStart().getMillis(); + long endTime = window.getEnd().getMillis(); + MetricSlice metricSlice = MetricSlice.from(me.getId(), startTime, endTime, ArrayListMultimap.<String, String>create()); + InputData data = this.dataFetcher.fetchData(new InputDataSpec() + .withTimeseriesSlices(Collections.singletonList(metricSlice)) + .withMetricIdsForDataset(Collections.singletonList(me.getId())) + .withMetricIds(Collections.singletonList(me.getId()))); + DatasetConfigDTO datasetConfig = data.getDatasetForMetricId().get(me.getId()); + + try { + long datasetLastRefreshTime = datasetConfig.getLastRefreshTime(); + if (datasetLastRefreshTime <= 0) { + // no availability event -> assume we have processed data till the current detection start + datasetLastRefreshTime = startTime - 1; + } + + MetricSlice slice = MetricSlice.from(me.getId(), datasetLastRefreshTime + 1, endTime); + if (isMissingData(datasetLastRefreshTime, startTime)) { + // Double check with data source as 2 things are possible. + // 1. This dataset/source may not support availability events + // 2. The data availability event pipeline has some issue. + + DataFrame dataFrame = data.getTimeseries().get(metricSlice); + if (dataFrame == null || dataFrame.isEmpty()) { + // no data + if (hasMissedSLA(datasetLastRefreshTime, endTime)) { + anomalies.add(createDataSLAAnomaly(slice, datasetConfig)); + } + } else { + datasetLastRefreshTime = dataFrame.getDoubles("timestamp").max().longValue(); + if (isPartialData(datasetLastRefreshTime, endTime, datasetConfig)) { + if (hasMissedSLA(datasetLastRefreshTime, endTime)) { + slice = MetricSlice.from(me.getId(), datasetLastRefreshTime + 1, endTime); + anomalies.add(createDataSLAAnomaly(slice, datasetConfig)); + } + } + } + } else if (isPartialData(datasetLastRefreshTime, endTime, datasetConfig)) { + // Optimize for the common case - the common case is that the data availability events are arriving + // correctly and we need not re-fetch the data to double check. + if (hasMissedSLA(datasetLastRefreshTime, endTime)) { + anomalies.add(createDataSLAAnomaly(slice, datasetConfig)); + } + } + } catch (Exception e) { + LOG.error(String.format("Failed to run sla check on metric URN %s", me.getUrn()), e); + } Review comment: Not quite! We do have that core logic but the complexity comes while dealing with "partial data" and also trying to optimize for the common case at the same time (reduce unnecessary querying of data source). I have updated this section of code to make it more readable. Please review. Also, this section of code comes from the previous PR (listed here due to refactoring). ########## File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/detection/trigger/DataAvailabilityTaskScheduler.java ########## @@ -122,9 +122,10 @@ public void run() { long endtime = System.currentTimeMillis(); createDetectionTask(detectionConfig, endtime); - if (DetectionUtils.isDataAvailabilityCheckEnabled(detectionConfig)) { - createDataSLACheckTask(detectionConfig, endtime); - LOG.info("Scheduling a task for data availability {} due to the fallback mechanism.", detectionConfigId); + if (DetectionUtils.isDataQualityCheckEnabled(detectionConfig)) { Review comment: Here are the conventions I am following. * "Data Availability" refers to the event trigger pipeline and refers to dataset availability. This is not referred to in the context for the alert. * "Detection" refers to all the time-series detection rules like ALGORITHM, THRESHOLD, PERCENTAGE etc. * "Data Quality" refers to all the quality rules like DATA_SLA, DATA_COMPLETENESS, etc. ########## File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityPipelineJob.java ########## @@ -26,14 +26,16 @@ import org.apache.pinot.thirdeye.datalayer.bao.TaskManager; import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO; import org.apache.pinot.thirdeye.datasource.DAORegistry; +import org.apache.pinot.thirdeye.detection.DetectionPipelineTaskInfo; +import org.apache.pinot.thirdeye.detection.TaskUtils; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DetectionDataSLAJob implements Job { - private static final Logger LOG = LoggerFactory.getLogger(DetectionDataSLAJob.class); +public class DataQualityPipelineJob implements Job { Review comment: Added ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
