Repository: tajo Updated Branches: refs/heads/index_support 1c1dfa0ed -> 55e46301e
TAJO-1325: Invalid history cleaner timeout. (jinho) Closes #371 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/8ac86835 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/8ac86835 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/8ac86835 Branch: refs/heads/index_support Commit: 8ac868352a2a3e5c26cf438e81ef4e23af88dc2b Parents: 02c6c26 Author: jhkim <[email protected]> Authored: Tue Feb 3 17:26:46 2015 +0900 Committer: jhkim <[email protected]> Committed: Tue Feb 3 17:26:46 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 ++ .../src/main/java/org/apache/tajo/conf/TajoConf.java | 5 +++-- .../org/apache/tajo/querymaster/QueryMaster.java | 15 +++++++++++---- .../org/apache/tajo/worker/TaskRunnerManager.java | 6 ++++-- 4 files changed, 20 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/8ac86835/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 5fe46e8..07c34ec 100644 --- a/CHANGES +++ b/CHANGES @@ -176,6 +176,8 @@ Release 0.10.0 - unreleased BUG FIXES + TAJO-1325: Invalid history cleaner timeout. (jinho) + TAJO-1283: ORDER BY with the first descending order causes wrong results. (Keuntae Park) http://git-wip-us.apache.org/repos/asf/tajo/blob/8ac86835/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index fe5ff54..4ed8097 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -175,8 +175,9 @@ public class TajoConf extends Configuration { WORKER_RESOURCE_DEDICATED_MEMORY_RATIO("tajo.worker.resource.dedicated-memory-ratio", 0.8f, Validators.range("0.0f", "1.0f")), - // Tajo Worker History - WORKER_HISTORY_EXPIRE_PERIOD("tajo.worker.history.expire-interval-minutes", 12 * 60), // 12 hours + // Tajo History + WORKER_HISTORY_EXPIRE_PERIOD("tajo.worker.history.expire-interval-minutes", 60), // 1 hours + QUERYMASTER_HISTORY_EXPIRE_PERIOD("tajo.qm.history.expire-interval-minutes", 6 * 60), // 6 hours WORKER_HEARTBEAT_TIMEOUT("tajo.worker.heartbeat.timeout", 120 * 1000), // 120 sec http://git-wip-us.apache.org/repos/asf/tajo/blob/8ac86835/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java index 234a46a..c3899d6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java @@ -33,7 +33,6 @@ import org.apache.tajo.*; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.global.GlobalPlanner; import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.ipc.QueryCoordinatorProtocol; import org.apache.tajo.ipc.QueryCoordinatorProtocol.*; import org.apache.tajo.ipc.TajoWorkerProtocol; @@ -504,11 +503,11 @@ public class QueryMaster extends CompositeService implements EventHandler { class FinishedQueryMasterTaskCleanThread extends Thread { public void run() { - int expireIntervalTime = systemConf.getIntVar(TajoConf.ConfVars.WORKER_HISTORY_EXPIRE_PERIOD); + int expireIntervalTime = systemConf.getIntVar(TajoConf.ConfVars.QUERYMASTER_HISTORY_EXPIRE_PERIOD); LOG.info("FinishedQueryMasterTaskCleanThread started: expire interval minutes = " + expireIntervalTime); while(!queryMasterStop.get()) { try { - Thread.sleep(60 * 1000 * 60); // hourly + Thread.sleep(60 * 1000); // minimum interval minutes } catch (InterruptedException e) { break; } @@ -525,7 +524,15 @@ public class QueryMaster extends CompositeService implements EventHandler { synchronized(finishedQueryMasterTasks) { List<QueryId> expiredQueryIds = new ArrayList<QueryId>(); for(Map.Entry<QueryId, QueryMasterTask> entry: finishedQueryMasterTasks.entrySet()) { - if(entry.getValue().getStartTime() < expireTime) { + + /* If a query are abnormal termination, the finished time will be zero. */ + long finishedTime = entry.getValue().getStartTime(); + Query query = entry.getValue().getQuery(); + if (query != null && query.getFinishTime() > 0) { + finishedTime = query.getFinishTime(); + } + + if(finishedTime < expireTime) { expiredQueryIds.add(entry.getKey()); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/8ac86835/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java index 3c1fcc5..4b10203 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java @@ -223,7 +223,7 @@ public class TaskRunnerManager extends CompositeService implements EventHandler< LOG.info("FinishedQueryMasterTaskCleanThread started: expire interval minutes = " + expireIntervalTime); while(!stop.get()) { try { - Thread.sleep(60 * 1000 * 60); // hourly check + Thread.sleep(60 * 1000); } catch (InterruptedException e) { break; } @@ -240,7 +240,9 @@ public class TaskRunnerManager extends CompositeService implements EventHandler< synchronized(taskRunnerHistoryMap) { List<String> expiredIds = new ArrayList<String>(); for(Map.Entry<String, TaskRunnerHistory> entry: taskRunnerHistoryMap.entrySet()) { - if(entry.getValue().getStartTime() > expireTime) { + /* If a task runner are abnormal termination, the finished time will be zero. */ + long finishedTime = Math.max(entry.getValue().getStartTime(), entry.getValue().getFinishTime()); + if(finishedTime < expireTime) { expiredIds.add(entry.getKey()); } }
