Repository: tajo Updated Branches: refs/heads/master bcec5eb7e -> 009e8e5ae
TAJO-1702: Fix race condition in finished query cache. (jinho) Closes #647 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/009e8e5a Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/009e8e5a Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/009e8e5a Branch: refs/heads/master Commit: 009e8e5aee3a28cbfb1e380204bc65fe81acbea2 Parents: bcec5eb Author: Jinho Kim <[email protected]> Authored: Fri Jul 24 14:46:34 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Fri Jul 24 14:46:34 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../org/apache/tajo/master/QueryManager.java | 23 +++++++---- .../apache/tajo/querymaster/QueryMaster.java | 41 ++++++++++++-------- 3 files changed, 42 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/009e8e5a/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 420aa4b..ae1e2d8 100644 --- a/CHANGES +++ b/CHANGES @@ -191,6 +191,8 @@ Release 0.11.0 - unreleased BUG FIXES + TAJO-1702: Fix race condition in finished query cache. (jinho) + TAJO-1597: Problem of ignoring theta join condition. (jihoon) TAJO-1697: RCFile progress causes NPE occasionally. (jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/009e8e5a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java index b4ed5fd..9a9ec50 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java @@ -119,14 +119,16 @@ public class QueryManager extends CompositeService { return Collections.unmodifiableCollection(runningQueries.values()); } - public synchronized Collection<QueryInfo> getFinishedQueries() { + public Collection<QueryInfo> getFinishedQueries() { Set<QueryInfo> result = Sets.newTreeSet(); synchronized (historyCache) { result.addAll(historyCache.values()); } try { - result.addAll(this.masterContext.getHistoryReader().getQueries(null)); + synchronized (this) { + result.addAll(this.masterContext.getHistoryReader().getQueries(null)); + } return result; } catch (Throwable e) { LOG.error(e, e); @@ -134,11 +136,16 @@ public class QueryManager extends CompositeService { } } - public synchronized QueryInfo getFinishedQuery(QueryId queryId) { + public QueryInfo getFinishedQuery(QueryId queryId) { try { - QueryInfo queryInfo = (QueryInfo) historyCache.get(queryId); + QueryInfo queryInfo; + synchronized (historyCache) { + queryInfo = (QueryInfo) historyCache.get(queryId); + } if (queryInfo == null) { - queryInfo = this.masterContext.getHistoryReader().getQueryInfo(queryId.toString()); + synchronized (this) { + queryInfo = this.masterContext.getHistoryReader().getQueryInfo(queryId.toString()); + } } return queryInfo; } catch (Throwable e) { @@ -240,14 +247,14 @@ public class QueryManager extends CompositeService { QueryInProgress queryInProgress = getQueryInProgress(queryId); if(queryInProgress != null) { queryInProgress.stopProgress(); - submittedQueries.remove(queryId); - runningQueries.remove(queryId); - QueryInfo queryInfo = queryInProgress.getQueryInfo(); synchronized (historyCache) { historyCache.put(queryInfo.getQueryId(), queryInfo); } + submittedQueries.remove(queryId); + runningQueries.remove(queryId); + long executionTime = queryInfo.getFinishTime() - queryInfo.getStartTime(); if (executionTime < minExecutionTime.get()) { minExecutionTime.set(executionTime); http://git-wip-us.apache.org/repos/asf/tajo/blob/009e8e5a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java index e07b43f..c471aea 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java @@ -209,12 +209,14 @@ public class QueryMaster extends CompositeService implements EventHandler { @Deprecated public QueryMasterTask getQueryMasterTask(QueryId queryId, boolean includeFinished) { - QueryMasterTask queryMasterTask = queryMasterTasks.get(queryId); - if(queryMasterTask != null) { + QueryMasterTask queryMasterTask = queryMasterTasks.get(queryId); + if (queryMasterTask != null) { return queryMasterTask; } else { - if(includeFinished) { - return (QueryMasterTask) finishedQueryMasterTasksCache.get(queryId); + if (includeFinished) { + synchronized (finishedQueryMasterTasksCache) { + return (QueryMasterTask) finishedQueryMasterTasksCache.get(queryId); + } } else { return null; } @@ -231,7 +233,9 @@ public class QueryMaster extends CompositeService implements EventHandler { @Deprecated public Collection<QueryMasterTask> getFinishedQueryMasterTasks() { - return finishedQueryMasterTasksCache.values(); + synchronized (finishedQueryMasterTasksCache) { + return new ArrayList<QueryMasterTask>(finishedQueryMasterTasksCache.values()); + } } public class QueryMasterContext { @@ -278,13 +282,17 @@ public class QueryMaster extends CompositeService implements EventHandler { } public void stopQuery(QueryId queryId) { - QueryMasterTask queryMasterTask = queryMasterTasks.remove(queryId); + QueryMasterTask queryMasterTask = queryMasterTasks.get(queryId); if(queryMasterTask == null) { LOG.warn("No query info:" + queryId); return; } - finishedQueryMasterTasksCache.put(queryId, queryMasterTask); + synchronized (finishedQueryMasterTasksCache) { + finishedQueryMasterTasksCache.put(queryId, queryMasterTask); + } + + queryMasterTasks.remove(queryId); TajoHeartbeatRequest queryHeartbeat = buildTajoHeartBeat(queryMasterTask); CallFuture<TajoHeartbeatResponse> future = new CallFuture<TajoHeartbeatResponse>(); @@ -464,11 +472,14 @@ public class QueryMaster extends CompositeService implements EventHandler { } private void cleanExpiredFinishedQueryMasterTask(long expireTime) { - List<QueryId> expiredQueryIds = new ArrayList<QueryId>(); - for(Object key: new ArrayList<Object>(finishedQueryMasterTasksCache.keySet())) { - QueryId queryId = (QueryId) key; + List<Object> finishedList; + synchronized (finishedQueryMasterTasksCache) { + finishedList = new ArrayList<Object>(finishedQueryMasterTasksCache.values()); + } + + for(Object finishedTask: finishedList) { + QueryMasterTask queryMasterTask = (QueryMasterTask) finishedTask; /* If a query are abnormal termination, the finished time will be zero. */ - QueryMasterTask queryMasterTask = (QueryMasterTask) finishedQueryMasterTasksCache.get(queryId); long finishedTime = queryMasterTask.getStartTime(); Query query = queryMasterTask.getQuery(); if (query != null && query.getFinishTime() > 0) { @@ -476,13 +487,11 @@ public class QueryMaster extends CompositeService implements EventHandler { } if(finishedTime < expireTime) { - expiredQueryIds.add(queryId); + synchronized (finishedQueryMasterTasksCache) { + finishedQueryMasterTasksCache.remove(queryMasterTask.getQueryId()); + } } } - - for(QueryId eachId: expiredQueryIds) { - finishedQueryMasterTasksCache.remove(eachId); - } } } }
