Repository: tajo Updated Branches: refs/heads/branch-0.10.0-rc1 [created] a19912461
TAJO-1356: Race conditions in QueryInProgress. (jinho) Closes #386 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1c6665ac Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1c6665ac Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1c6665ac Branch: refs/heads/branch-0.10.0-rc1 Commit: 1c6665ac65aaabcd78e5842e5129cddd9b8b0f11 Parents: 1c2de65 Author: jhkim <[email protected]> Authored: Mon Feb 23 20:15:48 2015 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Mon Feb 23 17:50:37 2015 -0800 ---------------------------------------------------------------------- CHANGES | 2 + .../org/apache/tajo/master/QueryInProgress.java | 70 +++++++++++++------- .../org/apache/tajo/master/QueryManager.java | 8 +-- .../apache/tajo/querymaster/QueryJobEvent.java | 1 - 4 files changed, 48 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/1c6665ac/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 0166397..ec03000 100644 --- a/CHANGES +++ b/CHANGES @@ -185,6 +185,8 @@ Release 0.10.0 - unreleased BUG FIXES + TAJO-1356: Race conditions in QueryInProgress. (jinho) + TAJO-1277: GreedyHeuristicJoinOrderAlgorithm sometimes wrongly assumes associativity of joins. (Keuntae Park via jihoon) http://git-wip-us.apache.org/repos/asf/tajo/blob/1c6665ac/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java index 45bdc5a..9e50797 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java @@ -31,7 +31,6 @@ import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto; import org.apache.tajo.master.rm.WorkerResourceManager; import org.apache.tajo.plan.logical.LogicalRootNode; -import org.apache.tajo.querymaster.QueryJobEvent; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.RpcConnectionPool; @@ -41,6 +40,9 @@ import org.apache.tajo.util.NetUtils; import java.net.InetSocketAddress; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; public class QueryInProgress { private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName()); @@ -63,6 +65,9 @@ public class QueryInProgress { private QueryMasterProtocolService queryMasterRpcClient; + private final Lock readLock; + private final Lock writeLock; + public QueryInProgress( TajoMaster.MasterContext masterContext, Session session, @@ -76,16 +81,23 @@ public class QueryInProgress { queryInfo = new QueryInfo(queryId, queryContext, sql, jsonExpr); queryInfo.setStartTime(System.currentTimeMillis()); + + ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + this.readLock = readWriteLock.readLock(); + this.writeLock = readWriteLock.writeLock(); } - public synchronized void kill() { - getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED); - if (queryMasterRpcClient != null) { - try { + public void kill() { + writeLock.lock(); + try { + getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED); + if (queryMasterRpcClient != null) { queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get()); - } catch (Throwable e) { - catchException(e); } + } catch (Throwable e) { + catchException(e); + } finally { + writeLock.unlock(); } } @@ -112,14 +124,14 @@ public class QueryInProgress { public boolean startQueryMaster() { try { + writeLock.lockInterruptibly(); LOG.info("Initializing QueryInProgress for QueryID=" + queryId); WorkerResourceManager resourceManager = masterContext.getResourceManager(); WorkerAllocatedResource resource = resourceManager.allocateQueryMaster(this); // if no resource to allocate a query master if(resource == null) { - LOG.info("No Available Resources for QueryMaster"); - return false; + throw new RuntimeException("No Available Resources for QueryMaster"); } queryInfo.setQueryMaster(resource.getConnectionInfo().getHost()); @@ -131,6 +143,8 @@ public class QueryInProgress { } catch (Exception e) { catchException(e); return false; + } finally { + writeLock.unlock(); } } @@ -142,12 +156,14 @@ public class QueryInProgress { queryMasterRpcClient = queryMasterRpc.getStub(); } - public synchronized void submmitQueryToMaster() { + public void submmitQueryToMaster() { if(querySubmitted.get()) { return; } try { + writeLock.lockInterruptibly(); + if(queryMasterRpcClient == null) { connectQueryMaster(); } @@ -171,6 +187,8 @@ public class QueryInProgress { getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_MASTER_LAUNCHED); } catch (Exception e) { LOG.error(e.getMessage(), e); + } finally { + writeLock.unlock(); } } @@ -185,7 +203,12 @@ public class QueryInProgress { } public QueryInfo getQueryInfo() { - return this.queryInfo; + readLock.lock(); + try { + return this.queryInfo; + } finally { + readLock.unlock(); + } } public boolean isStarted() { @@ -195,17 +218,8 @@ public class QueryInProgress { public void heartbeat(QueryInfo queryInfo) { LOG.info("Received QueryMaster heartbeat:" + queryInfo); - // to avoid partial update by different heartbeats - synchronized (this.queryInfo) { - - // terminal state will let client to retrieve a query result - // So, we must set the query result before changing query state - if (isFinishState(queryInfo.getQueryState())) { - if (queryInfo.hasResultdesc()) { - this.queryInfo.setResultDesc(queryInfo.getResultDesc()); - } - } - + writeLock.lock(); + try { this.queryInfo.setQueryState(queryInfo.getQueryState()); this.queryInfo.setProgress(queryInfo.getProgress()); @@ -220,12 +234,18 @@ public class QueryInProgress { LOG.warn(queryId + " failed, " + queryInfo.getLastMessage()); } - + // terminal state will let client to retrieve a query result + // So, we must set the query result before changing query state if (isFinishState(this.queryInfo.getQueryState())) { + if (queryInfo.hasResultdesc()) { + this.queryInfo.setResultDesc(queryInfo.getResultDesc()); + } + this.queryInfo.setFinishTime(System.currentTimeMillis()); - masterContext.getQueryJobManager().getEventHandler().handle( - new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_STOP, this.queryInfo)); + masterContext.getQueryJobManager().stopQuery(queryInfo.getQueryId()); } + } finally { + writeLock.unlock(); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/1c6665ac/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java index 8070a7c..a502e4b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java @@ -203,8 +203,7 @@ public class QueryManager extends CompositeService { dispatcher.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, queryInProgress.getQueryInfo())); } else { - dispatcher.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_STOP, - queryInProgress.getQueryInfo())); + masterContext.getQueryJobManager().stopQuery(queryInProgress.getQueryId()); } return queryInProgress.getQueryInfo(); @@ -216,19 +215,14 @@ public class QueryManager extends CompositeService { public void handle(QueryJobEvent event) { QueryInProgress queryInProgress = getQueryInProgress(event.getQueryInfo().getQueryId()); - if (queryInProgress == null) { LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]"); return; } - if (event.getType() == QueryJobEvent.Type.QUERY_MASTER_START) { queryInProgress.submmitQueryToMaster(); - } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) { - stopQuery(event.getQueryInfo().getQueryId()); - } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) { scheduler.removeQuery(queryInProgress.getQueryId()); queryInProgress.kill(); http://git-wip-us.apache.org/repos/asf/tajo/blob/1c6665ac/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java index 27eb2b6..91a82f6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java @@ -37,7 +37,6 @@ public class QueryJobEvent extends AbstractEvent<QueryJobEvent.Type> { public enum Type { QUERY_MASTER_START, QUERY_JOB_HEARTBEAT, - QUERY_JOB_STOP, QUERY_JOB_KILL } }
