xiaohui-sun commented on a change in pull request #4724: [TE] Add event trigger 
listener for event driven scheduling
URL: https://github.com/apache/incubator-pinot/pull/4724#discussion_r337287176
 
 

 ##########
 File path: 
thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/detection/trigger/utils/DatasetTriggerInfoRepo.java
 ##########
 @@ -0,0 +1,121 @@
+package org.apache.pinot.thirdeye.anomaly.detection.trigger.utils;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
+import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
+import org.apache.pinot.thirdeye.datalayer.bao.MetricConfigManager;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.formatter.DetectionConfigFormatter;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DatasetTriggerInfoRepo {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DatasetTriggerInfoRepo.class);
+  private static DatasetTriggerInfoRepo _instance = null;
+  private static final DAORegistry DAO_REGISTRY = DAORegistry.getInstance();
+  private Map<String, Long> datasetRefreshTimeMap;
+  private Set<String> dataSourceWhitelist;
+  private ScheduledThreadPoolExecutor executorService;
+
+  private DatasetTriggerInfoRepo(int refreshFreqMin, Collection<String> 
dataSourceWhitelist) {
+    this.datasetRefreshTimeMap = new ConcurrentHashMap<>();
+    this.dataSourceWhitelist = new HashSet<>(dataSourceWhitelist);
+    this.refresh(); // initial refresh
+    this.executorService = new ScheduledThreadPoolExecutor(1, r -> {
+      Thread t = Executors.defaultThreadFactory().newThread(r);
+      t.setDaemon(true);
+      return t;
+    });
+    this.executorService.scheduleAtFixedRate(
+        this::refresh, refreshFreqMin, refreshFreqMin, TimeUnit.MINUTES);
+  }
+
+  public static DatasetTriggerInfoRepo initAndGetInstance(int refreshFreqMin, 
Collection<String> dataSourceWhitelist) {
+    if (_instance == null) {
+      synchronized (DatasetTriggerInfoRepo.class) {
+        if (_instance == null) {
+          _instance = new DatasetTriggerInfoRepo(refreshFreqMin, 
dataSourceWhitelist);
+        }
+      }
+    }
+    return _instance;
+  }
+
+  public static DatasetTriggerInfoRepo getInstance() {
+    if (_instance == null)
+      throw new IllegalStateException("DatasetTriggerInfoRepo is not 
initialized before getInstance is called.");
+    return _instance;
+  }
+
+  public boolean isDatasetActive(String dataset) {
+    return datasetRefreshTimeMap.containsKey(dataset);
+  }
+
+  public long getLastUpdateTimestamp(String dataset) {
+    return (isDatasetActive(dataset)) ? datasetRefreshTimeMap.get(dataset) : 0;
+  }
+
+  public void setLastUpdateTimestamp(String dataset, long timestamp) {
+    datasetRefreshTimeMap.put(dataset, timestamp);
+  }
+
+  public void close() {
+    executorService.shutdown();
+    _instance = null;
+  }
+
+  private void refresh() {
+    Set<String> activeDatasetSet = new HashSet<>();
+    DetectionConfigManager detectionConfigDAO = 
DAO_REGISTRY.getDetectionConfigManager();
+    MetricConfigManager metricsConfigDAO = DAO_REGISTRY.getMetricConfigDAO();
+    DatasetConfigManager datasetConfigDAO = DAO_REGISTRY.getDatasetConfigDAO();
+    List<DetectionConfigDTO> detectionConfigDTOs = 
detectionConfigDAO.findAll();
+    LOG.info(String.format("Found %d detection configs", 
detectionConfigDTOs.size()));
+    for (DetectionConfigDTO detectionConfigDTO : detectionConfigDTOs) {
+      if (!detectionConfigDTO.isActive()) continue; // skip inactive detection
+      List<String> metricUrns = DetectionConfigFormatter
+          .extractMetricUrnsFromProperties(detectionConfigDTO.getProperties());
+      if (metricUrns.size() < 1) {
+        LOG.error("Empty metricUrn for detection {}", 
detectionConfigDTO.getId());
+        continue;
+      }
+      for (String urn : metricUrns) {
+        MetricEntity me = MetricEntity.fromURN(urn);
+        MetricConfigDTO metricConfig = metricsConfigDAO.findById(me.getId());
+        if (metricConfig == null) {
+          LOG.warn("Found null value for metric: " + me.getId());
+          continue;
+        }
+        DatasetConfigDTO datasetConfig = 
datasetConfigDAO.findByDataset(metricConfig.getDataset());
 
 Review comment:
   For derived metrics one metric may map to multiple datasets.
   You need to parse the formula -> get all the metrics -> get the 
corresponding datasets.
   One example is "pageviews_per_user".

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to