This is an automated email from the ASF dual-hosted git repository.

jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d95cb69  [TE] fix database connection close issue (#3647)
d95cb69 is described below

commit d95cb69c096c80d2df39d73fb6e09bdb001aa89d
Author: Jihao Zhang <[email protected]>
AuthorDate: Mon Jan 7 10:15:40 2019 -0800

    [TE] fix database connection close issue (#3647)
    
    Fix database connection close issue. Revert the previous PR revert.
---
 .../linkedin/thirdeye/anomaly/task/TaskDriver.java | 12 ++++++++++
 .../anomaly/utils/ThirdeyeMetricsUtil.java         | 22 ++++++++++++++++++
 .../thirdeye/datalayer/bao/TaskManager.java        |  1 +
 .../datalayer/bao/jdbc/TaskManagerImpl.java        | 27 +++++++++++++++++++++-
 4 files changed, 61 insertions(+), 1 deletion(-)

diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java
index a4e9c7f..93f97da 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java
@@ -18,8 +18,10 @@ package com.linkedin.thirdeye.anomaly.task;
 
 import 
com.linkedin.thirdeye.anomaly.classification.classifier.AnomalyClassifierFactory;
 import com.linkedin.thirdeye.anomaly.utils.AnomalyUtils;
+import com.linkedin.thirdeye.anomaly.utils.ThirdeyeMetricsUtil;
 import com.linkedin.thirdeye.detector.email.filter.AlertFilterFactory;
 
+import com.linkedin.thirdeye.util.ThirdEyeUtils;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -86,6 +88,9 @@ public class TaskDriver {
             TaskDTO anomalyTaskSpec = TaskDriver.this.acquireTask();
 
             if (anomalyTaskSpec != null) { // a task has acquired and we must 
finish executing it before termination
+              long tStart = System.nanoTime();
+              ThirdeyeMetricsUtil.taskCounter.inc();
+
               try {
                 LOG.info("Thread {} : Executing task: {} {}", 
Thread.currentThread().getId(), anomalyTaskSpec.getId(),
                     anomalyTaskSpec.getTaskInfo());
@@ -101,8 +106,12 @@ public class TaskDriver {
                 LOG.info("Thread {} : DONE Executing task: {}", 
Thread.currentThread().getId(), anomalyTaskSpec.getId());
                 // update status to COMPLETED
                 updateStatusAndTaskEndTime(anomalyTaskSpec.getId(), 
TaskStatus.RUNNING, TaskStatus.COMPLETED, "");
+                ThirdeyeMetricsUtil.taskSuccessCounter.inc();
+
               } catch (Exception e) {
+                ThirdeyeMetricsUtil.taskExceptionCounter.inc();
                 LOG.error("Exception in electing and executing task", e);
+
                 try {
                   // update task status failed
                   updateStatusAndTaskEndTime(anomalyTaskSpec.getId(), 
TaskStatus.RUNNING, TaskStatus.FAILED,
@@ -110,6 +119,9 @@ public class TaskDriver {
                 } catch (Exception e1) {
                   LOG.error("Error in updating failed status", e1);
                 }
+
+              } finally {
+                ThirdeyeMetricsUtil.taskDurationCounter.inc(System.nanoTime() 
- tStart);
               }
             }
           }
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/utils/ThirdeyeMetricsUtil.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/utils/ThirdeyeMetricsUtil.java
index c7cab9a..876800d 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/utils/ThirdeyeMetricsUtil.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/utils/ThirdeyeMetricsUtil.java
@@ -16,8 +16,10 @@
 
 package com.linkedin.thirdeye.anomaly.utils;
 
+import com.linkedin.thirdeye.datasource.DAORegistry;
 import com.linkedin.thirdeye.tracking.RequestLog;
 import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
 import com.yammer.metrics.core.MetricsRegistry;
 import com.yammer.metrics.reporting.JmxReporter;
 
@@ -34,6 +36,26 @@ public class ThirdeyeMetricsUtil {
   private ThirdeyeMetricsUtil() {
   }
 
+  public static final Counter taskCounter =
+      metricsRegistry.newCounter(ThirdeyeMetricsUtil.class, "taskCounter");
+
+  public static final Counter taskSuccessCounter =
+      metricsRegistry.newCounter(ThirdeyeMetricsUtil.class, 
"taskSuccessCounter");
+
+  public static final Counter taskExceptionCounter =
+      metricsRegistry.newCounter(ThirdeyeMetricsUtil.class, 
"taskExceptionCounter");
+
+  public static final Counter taskDurationCounter =
+      metricsRegistry.newCounter(ThirdeyeMetricsUtil.class, 
"taskDurationCounter");
+
+  public static final Gauge<Integer> taskBacklogGauge =
+      metricsRegistry.newGauge(ThirdeyeMetricsUtil.class, "taskBacklogGauge", 
new Gauge<Integer>() {
+        @Override
+        public Integer value() {
+          return DAORegistry.getInstance().getTaskDAO().countWaiting();
+        }
+      });
+
   public static final Counter detectionTaskCounter =
       metricsRegistry.newCounter(ThirdeyeMetricsUtil.class, 
"detectionTaskCounter");
 
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/TaskManager.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/TaskManager.java
index a9e79e7..8c93566 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/TaskManager.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/TaskManager.java
@@ -44,4 +44,5 @@ public interface TaskManager extends AbstractManager<TaskDTO>{
 
   int deleteRecordsOlderThanDaysWithStatus(int days, TaskStatus status);
 
+  int countWaiting();
 }
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
index d7a133c..42782aa 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
@@ -18,6 +18,10 @@ package com.linkedin.thirdeye.datalayer.bao.jdbc;
 
 import com.google.inject.Singleton;
 import com.linkedin.thirdeye.anomaly.task.TaskConstants;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -33,16 +37,23 @@ import com.linkedin.thirdeye.datalayer.bao.TaskManager;
 import com.linkedin.thirdeye.datalayer.dto.TaskDTO;
 import com.linkedin.thirdeye.datalayer.pojo.TaskBean;
 import com.linkedin.thirdeye.datalayer.util.Predicate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 @Singleton
 public class TaskManagerImpl extends AbstractManagerImpl<TaskDTO> implements 
TaskManager {
-
   private static final String FIND_BY_STATUS_ORDER_BY_CREATE_TIME_ASC =
       " WHERE status = :status order by startTime asc limit 10";
 
   private static final String FIND_BY_STATUS_ORDER_BY_CREATE_TIME_DESC =
       " WHERE status = :status order by startTime desc limit 10";
 
+  private static final String COUNT_WAITING_TASKS =
+      "SELECT COUNT(*) FROM task_index WHERE status = 'WAITING'";
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TaskManagerImpl.class);
+
   public TaskManagerImpl() {
     super(TaskDTO.class, TaskBean.class);
   }
@@ -158,4 +169,18 @@ public class TaskManagerImpl extends 
AbstractManagerImpl<TaskDTO> implements Tas
     Predicate timeoutTimestampPredicate = Predicate.LT("updateTime", 
timeoutTimestamp);
     return findByPredicate(Predicate.AND(statusPredicate, 
daysTimestampPredicate, timeoutTimestampPredicate));
   }
+
+  @Override
+  public int countWaiting() {
+    // NOTE: this aggregation should be supported by genericPojoDAO directly
+    // ensure each resource is closed at the end of the statement
+    try (Connection connection = this.genericPojoDao.getConnection();
+        PreparedStatement statement = 
connection.prepareStatement(COUNT_WAITING_TASKS);
+        ResultSet rs = statement.executeQuery()){
+      return rs.getInt(0);
+    } catch (Exception e) {
+      LOG.warn("Could not retrieve task backlog size. Defaulting to -1.", e);
+      return -1;
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to