This is an automated email from the ASF dual-hosted git repository.
jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d95cb69 [TE] fix database connection close issue (#3647)
d95cb69 is described below
commit d95cb69c096c80d2df39d73fb6e09bdb001aa89d
Author: Jihao Zhang <[email protected]>
AuthorDate: Mon Jan 7 10:15:40 2019 -0800
[TE] fix database connection close issue (#3647)
Fix database connection close issue. Revert the previous PR revert.
---
.../linkedin/thirdeye/anomaly/task/TaskDriver.java | 12 ++++++++++
.../anomaly/utils/ThirdeyeMetricsUtil.java | 22 ++++++++++++++++++
.../thirdeye/datalayer/bao/TaskManager.java | 1 +
.../datalayer/bao/jdbc/TaskManagerImpl.java | 27 +++++++++++++++++++++-
4 files changed, 61 insertions(+), 1 deletion(-)
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java
index a4e9c7f..93f97da 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java
@@ -18,8 +18,10 @@ package com.linkedin.thirdeye.anomaly.task;
import
com.linkedin.thirdeye.anomaly.classification.classifier.AnomalyClassifierFactory;
import com.linkedin.thirdeye.anomaly.utils.AnomalyUtils;
+import com.linkedin.thirdeye.anomaly.utils.ThirdeyeMetricsUtil;
import com.linkedin.thirdeye.detector.email.filter.AlertFilterFactory;
+import com.linkedin.thirdeye.util.ThirdEyeUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -86,6 +88,9 @@ public class TaskDriver {
TaskDTO anomalyTaskSpec = TaskDriver.this.acquireTask();
if (anomalyTaskSpec != null) { // a task has acquired and we must
finish executing it before termination
+ long tStart = System.nanoTime();
+ ThirdeyeMetricsUtil.taskCounter.inc();
+
try {
LOG.info("Thread {} : Executing task: {} {}",
Thread.currentThread().getId(), anomalyTaskSpec.getId(),
anomalyTaskSpec.getTaskInfo());
@@ -101,8 +106,12 @@ public class TaskDriver {
LOG.info("Thread {} : DONE Executing task: {}",
Thread.currentThread().getId(), anomalyTaskSpec.getId());
// update status to COMPLETED
updateStatusAndTaskEndTime(anomalyTaskSpec.getId(),
TaskStatus.RUNNING, TaskStatus.COMPLETED, "");
+ ThirdeyeMetricsUtil.taskSuccessCounter.inc();
+
} catch (Exception e) {
+ ThirdeyeMetricsUtil.taskExceptionCounter.inc();
LOG.error("Exception in electing and executing task", e);
+
try {
// update task status failed
updateStatusAndTaskEndTime(anomalyTaskSpec.getId(),
TaskStatus.RUNNING, TaskStatus.FAILED,
@@ -110,6 +119,9 @@ public class TaskDriver {
} catch (Exception e1) {
LOG.error("Error in updating failed status", e1);
}
+
+ } finally {
+ ThirdeyeMetricsUtil.taskDurationCounter.inc(System.nanoTime()
- tStart);
}
}
}
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/utils/ThirdeyeMetricsUtil.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/utils/ThirdeyeMetricsUtil.java
index c7cab9a..876800d 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/utils/ThirdeyeMetricsUtil.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/utils/ThirdeyeMetricsUtil.java
@@ -16,8 +16,10 @@
package com.linkedin.thirdeye.anomaly.utils;
+import com.linkedin.thirdeye.datasource.DAORegistry;
import com.linkedin.thirdeye.tracking.RequestLog;
import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricsRegistry;
import com.yammer.metrics.reporting.JmxReporter;
@@ -34,6 +36,26 @@ public class ThirdeyeMetricsUtil {
private ThirdeyeMetricsUtil() {
}
+ public static final Counter taskCounter =
+ metricsRegistry.newCounter(ThirdeyeMetricsUtil.class, "taskCounter");
+
+ public static final Counter taskSuccessCounter =
+ metricsRegistry.newCounter(ThirdeyeMetricsUtil.class,
"taskSuccessCounter");
+
+ public static final Counter taskExceptionCounter =
+ metricsRegistry.newCounter(ThirdeyeMetricsUtil.class,
"taskExceptionCounter");
+
+ public static final Counter taskDurationCounter =
+ metricsRegistry.newCounter(ThirdeyeMetricsUtil.class,
"taskDurationCounter");
+
+ public static final Gauge<Integer> taskBacklogGauge =
+ metricsRegistry.newGauge(ThirdeyeMetricsUtil.class, "taskBacklogGauge",
new Gauge<Integer>() {
+ @Override
+ public Integer value() {
+ return DAORegistry.getInstance().getTaskDAO().countWaiting();
+ }
+ });
+
public static final Counter detectionTaskCounter =
metricsRegistry.newCounter(ThirdeyeMetricsUtil.class,
"detectionTaskCounter");
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/TaskManager.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/TaskManager.java
index a9e79e7..8c93566 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/TaskManager.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/TaskManager.java
@@ -44,4 +44,5 @@ public interface TaskManager extends AbstractManager<TaskDTO>{
int deleteRecordsOlderThanDaysWithStatus(int days, TaskStatus status);
+ int countWaiting();
}
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
index d7a133c..42782aa 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
@@ -18,6 +18,10 @@ package com.linkedin.thirdeye.datalayer.bao.jdbc;
import com.google.inject.Singleton;
import com.linkedin.thirdeye.anomaly.task.TaskConstants;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
@@ -33,16 +37,23 @@ import com.linkedin.thirdeye.datalayer.bao.TaskManager;
import com.linkedin.thirdeye.datalayer.dto.TaskDTO;
import com.linkedin.thirdeye.datalayer.pojo.TaskBean;
import com.linkedin.thirdeye.datalayer.util.Predicate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
@Singleton
public class TaskManagerImpl extends AbstractManagerImpl<TaskDTO> implements
TaskManager {
-
private static final String FIND_BY_STATUS_ORDER_BY_CREATE_TIME_ASC =
" WHERE status = :status order by startTime asc limit 10";
private static final String FIND_BY_STATUS_ORDER_BY_CREATE_TIME_DESC =
" WHERE status = :status order by startTime desc limit 10";
+ private static final String COUNT_WAITING_TASKS =
+ "SELECT COUNT(*) FROM task_index WHERE status = 'WAITING'";
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TaskManagerImpl.class);
+
public TaskManagerImpl() {
super(TaskDTO.class, TaskBean.class);
}
@@ -158,4 +169,18 @@ public class TaskManagerImpl extends
AbstractManagerImpl<TaskDTO> implements Tas
Predicate timeoutTimestampPredicate = Predicate.LT("updateTime",
timeoutTimestamp);
return findByPredicate(Predicate.AND(statusPredicate,
daysTimestampPredicate, timeoutTimestampPredicate));
}
+
+ @Override
+ public int countWaiting() {
+ // NOTE: this aggregation should be supported by genericPojoDAO directly
+ // ensure each resource is closed at the end of the statement
+ try (Connection connection = this.genericPojoDao.getConnection();
+ PreparedStatement statement =
connection.prepareStatement(COUNT_WAITING_TASKS);
+ ResultSet rs = statement.executeQuery()){
+ return rs.getInt(0);
+ } catch (Exception e) {
+ LOG.warn("Could not retrieve task backlog size. Defaulting to -1.", e);
+ return -1;
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]