Repository: tajo
Updated Branches:
  refs/heads/master bcec5eb7e -> 009e8e5ae


TAJO-1702: Fix race condition in finished query cache. (jinho)

Closes #647


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/009e8e5a
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/009e8e5a
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/009e8e5a

Branch: refs/heads/master
Commit: 009e8e5aee3a28cbfb1e380204bc65fe81acbea2
Parents: bcec5eb
Author: Jinho Kim <[email protected]>
Authored: Fri Jul 24 14:46:34 2015 +0900
Committer: Jinho Kim <[email protected]>
Committed: Fri Jul 24 14:46:34 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../org/apache/tajo/master/QueryManager.java    | 23 +++++++----
 .../apache/tajo/querymaster/QueryMaster.java    | 41 ++++++++++++--------
 3 files changed, 42 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/009e8e5a/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 420aa4b..ae1e2d8 100644
--- a/CHANGES
+++ b/CHANGES
@@ -191,6 +191,8 @@ Release 0.11.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1702: Fix race condition in finished query cache. (jinho)
+
     TAJO-1597: Problem of ignoring theta join condition. (jihoon)
 
     TAJO-1697: RCFile progress causes NPE occasionally. (jinho)

http://git-wip-us.apache.org/repos/asf/tajo/blob/009e8e5a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java 
b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
index b4ed5fd..9a9ec50 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
@@ -119,14 +119,16 @@ public class QueryManager extends CompositeService {
     return Collections.unmodifiableCollection(runningQueries.values());
   }
 
-  public synchronized Collection<QueryInfo> getFinishedQueries() {
+  public Collection<QueryInfo> getFinishedQueries() {
     Set<QueryInfo> result = Sets.newTreeSet();
     synchronized (historyCache) {
       result.addAll(historyCache.values());
     }
 
     try {
-      result.addAll(this.masterContext.getHistoryReader().getQueries(null));
+      synchronized (this) {
+        result.addAll(this.masterContext.getHistoryReader().getQueries(null));
+      }
       return result;
     } catch (Throwable e) {
       LOG.error(e, e);
@@ -134,11 +136,16 @@ public class QueryManager extends CompositeService {
     }
   }
 
-  public synchronized QueryInfo getFinishedQuery(QueryId queryId) {
+  public QueryInfo getFinishedQuery(QueryId queryId) {
     try {
-      QueryInfo queryInfo = (QueryInfo) historyCache.get(queryId);
+      QueryInfo queryInfo;
+      synchronized (historyCache) {
+        queryInfo = (QueryInfo) historyCache.get(queryId);
+      }
       if (queryInfo == null) {
-        queryInfo = 
this.masterContext.getHistoryReader().getQueryInfo(queryId.toString());
+        synchronized (this) {
+          queryInfo = 
this.masterContext.getHistoryReader().getQueryInfo(queryId.toString());
+        }
       }
       return queryInfo;
     } catch (Throwable e) {
@@ -240,14 +247,14 @@ public class QueryManager extends CompositeService {
     QueryInProgress queryInProgress = getQueryInProgress(queryId);
     if(queryInProgress != null) {
       queryInProgress.stopProgress();
-      submittedQueries.remove(queryId);
-      runningQueries.remove(queryId);
-
       QueryInfo queryInfo = queryInProgress.getQueryInfo();
       synchronized (historyCache) {
         historyCache.put(queryInfo.getQueryId(), queryInfo);
       }
 
+      submittedQueries.remove(queryId);
+      runningQueries.remove(queryId);
+
       long executionTime = queryInfo.getFinishTime() - 
queryInfo.getStartTime();
       if (executionTime < minExecutionTime.get()) {
         minExecutionTime.set(executionTime);

http://git-wip-us.apache.org/repos/asf/tajo/blob/009e8e5a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
index e07b43f..c471aea 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
@@ -209,12 +209,14 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
 
   @Deprecated
   public QueryMasterTask getQueryMasterTask(QueryId queryId, boolean 
includeFinished) {
-    QueryMasterTask queryMasterTask =  queryMasterTasks.get(queryId);
-    if(queryMasterTask != null) {
+    QueryMasterTask queryMasterTask = queryMasterTasks.get(queryId);
+    if (queryMasterTask != null) {
       return queryMasterTask;
     } else {
-      if(includeFinished) {
-        return (QueryMasterTask) finishedQueryMasterTasksCache.get(queryId);
+      if (includeFinished) {
+        synchronized (finishedQueryMasterTasksCache) {
+          return (QueryMasterTask) finishedQueryMasterTasksCache.get(queryId);
+        }
       } else {
         return null;
       }
@@ -231,7 +233,9 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
 
   @Deprecated
   public Collection<QueryMasterTask> getFinishedQueryMasterTasks() {
-    return finishedQueryMasterTasksCache.values();
+    synchronized (finishedQueryMasterTasksCache) {
+      return new 
ArrayList<QueryMasterTask>(finishedQueryMasterTasksCache.values());
+    }
   }
 
   public class QueryMasterContext {
@@ -278,13 +282,17 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
     }
 
     public void stopQuery(QueryId queryId) {
-      QueryMasterTask queryMasterTask = queryMasterTasks.remove(queryId);
+      QueryMasterTask queryMasterTask = queryMasterTasks.get(queryId);
       if(queryMasterTask == null) {
         LOG.warn("No query info:" + queryId);
         return;
       }
 
-      finishedQueryMasterTasksCache.put(queryId, queryMasterTask);
+      synchronized (finishedQueryMasterTasksCache) {
+        finishedQueryMasterTasksCache.put(queryId, queryMasterTask);
+      }
+
+      queryMasterTasks.remove(queryId);
 
       TajoHeartbeatRequest queryHeartbeat = 
buildTajoHeartBeat(queryMasterTask);
       CallFuture<TajoHeartbeatResponse> future = new 
CallFuture<TajoHeartbeatResponse>();
@@ -464,11 +472,14 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
     }
 
     private void cleanExpiredFinishedQueryMasterTask(long expireTime) {
-      List<QueryId> expiredQueryIds = new ArrayList<QueryId>();
-      for(Object key: new 
ArrayList<Object>(finishedQueryMasterTasksCache.keySet())) {
-        QueryId queryId = (QueryId) key;
+      List<Object> finishedList;
+      synchronized (finishedQueryMasterTasksCache) {
+        finishedList = new 
ArrayList<Object>(finishedQueryMasterTasksCache.values());
+      }
+
+      for(Object finishedTask: finishedList) {
+        QueryMasterTask queryMasterTask = (QueryMasterTask) finishedTask;
           /* If a query are abnormal termination, the finished time will be 
zero. */
-        QueryMasterTask queryMasterTask = (QueryMasterTask) 
finishedQueryMasterTasksCache.get(queryId);
         long finishedTime = queryMasterTask.getStartTime();
         Query query = queryMasterTask.getQuery();
         if (query != null && query.getFinishTime() > 0) {
@@ -476,13 +487,11 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
         }
 
         if(finishedTime < expireTime) {
-          expiredQueryIds.add(queryId);
+          synchronized (finishedQueryMasterTasksCache) {
+            finishedQueryMasterTasksCache.remove(queryMasterTask.getQueryId());
+          }
         }
       }
-
-      for(QueryId eachId: expiredQueryIds) {
-        finishedQueryMasterTasksCache.remove(eachId);
-      }
     }
   }
 }

Reply via email to