Repository: tajo Updated Branches: refs/heads/master aed97a8a5 -> 839081ac8
TAJO-873: Query status is still RUNNING after session expired. (Hyoungjun Kim, jinho) Closes #37 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/839081ac Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/839081ac Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/839081ac Branch: refs/heads/master Commit: 839081ac8a752de899ddeb0bcd15866722ae6c27 Parents: aed97a8 Author: jhkim <[email protected]> Authored: Fri Aug 22 20:53:12 2014 +0900 Committer: jhkim <[email protected]> Committed: Fri Aug 22 20:53:12 2014 +0900 ---------------------------------------------------------------------- CHANGES | 3 ++ .../tajo/master/querymaster/QueryMaster.java | 4 +- .../master/querymaster/QueryMasterTask.java | 26 +++++++---- .../org/apache/tajo/TajoTestingCluster.java | 31 +++++++++++++ .../tajo/scheduler/TestFifoScheduler.java | 47 ++++++++++---------- 5 files changed, 76 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/839081ac/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index f0734f2..f2798b4 100644 --- a/CHANGES +++ b/CHANGES @@ -124,6 +124,9 @@ Release 0.9.0 - unreleased BUG FIXES + TAJO-873: Query status is still RUNNING after session expired. + (Hyoungjun Kim, jinho) + TAJO-1004: UniformRangePartition cannot deal with unicode ranges. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/839081ac/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java index 9f90b05..deadd39 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java @@ -579,8 +579,8 @@ public class QueryMaster extends CompositeService implements EventHandler { long lastHeartbeat = eachTask.getLastClientHeartbeat(); long time = System.currentTimeMillis() - lastHeartbeat; if(lastHeartbeat > 0 && time > querySessionTimeout * 1000) { - LOG.warn("Query " + eachTask.getQueryId() + " stopped cause query sesstion timeout: " + time + " ms"); - eachTask.expiredSessionTimeout(); + LOG.warn("Query " + eachTask.getQueryId() + " stopped cause query session timeout: " + time + " ms"); + eachTask.expireQuerySession(); } } catch (Exception e) { LOG.error(eachTask.getQueryId() + ":" + e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/tajo/blob/839081ac/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index ce329fb..1ffaf56 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@ -187,6 +187,12 @@ public class QueryMasterTask extends CompositeService { LOG.info("Stopping QueryMasterTask:" + queryId); + try { + resourceAllocator.stop(); + } catch (Throwable t) { + LOG.fatal(t.getMessage(), t); + } + CallFuture future = new CallFuture(); RpcConnectionPool connPool = RpcConnectionPool.getPool(queryMasterContext.getConf()); @@ -328,14 +334,14 @@ public class QueryMasterTask extends CompositeService { LOG.info("Query final state: " + query.getSynchronizedState()); queryMasterContext.stopQuery(queryId); } + } - private boolean isTerminatedState(QueryState state) { - return - state == QueryState.QUERY_SUCCEEDED || - state == QueryState.QUERY_FAILED || - state == QueryState.QUERY_KILLED || - state == QueryState.QUERY_ERROR; - } + private static boolean isTerminatedState(QueryState state) { + return + state == QueryState.QUERY_SUCCEEDED || + state == QueryState.QUERY_FAILED || + state == QueryState.QUERY_KILLED || + state == QueryState.QUERY_ERROR; } public synchronized void startQuery() { @@ -462,8 +468,10 @@ public class QueryMasterTask extends CompositeService { return query; } - public void expiredSessionTimeout() { - stop(); + protected void expireQuerySession() { + if(!isTerminatedState(query.getState()) && !(query.getState() == QueryState.QUERY_KILL_WAIT)){ + query.handle(new QueryEvent(queryId, QueryEventType.KILL)); + } } public QueryMasterTaskContext getQueryTaskContext() { http://git-wip-us.apache.org/repos/asf/tajo/blob/839081ac/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index 948f018..7b87112 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -38,6 +38,7 @@ import org.apache.tajo.client.TajoClient; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.master.TajoMaster; +import org.apache.tajo.master.querymaster.QueryMasterTask; import org.apache.tajo.master.rm.TajoWorkerResourceManager; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.KeyValueSet; @@ -677,4 +678,34 @@ public class TajoTestingCluster { eachWorker.getConfig().set(key, value); } } + + public void waitForQueryRunning(QueryId queryId) throws Exception { + QueryMasterTask qmt = null; + + int i = 0; + while (qmt == null || TajoClient.isInPreNewState(qmt.getState())) { + try { + Thread.sleep(100); + if(qmt == null){ + qmt = getQueryMasterTask(queryId); + } + } catch (InterruptedException e) { + } + if (++i > 100) { + throw new IOException("Timed out waiting for query to start"); + } + } + } + + public QueryMasterTask getQueryMasterTask(QueryId queryId) { + QueryMasterTask qmt = null; + for (TajoWorker worker : getTajoWorkers()) { + qmt = worker.getWorkerContext().getQueryMaster().getQueryMasterTask(queryId); + if (qmt != null) { + break; + } + } + + return qmt; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/839081ac/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java b/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java index 76f22d0..069ee27 100644 --- a/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java +++ b/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java @@ -18,7 +18,6 @@ package org.apache.tajo.scheduler; -import com.google.protobuf.ServiceException; import org.apache.tajo.*; import org.apache.tajo.client.TajoClient; import org.apache.tajo.conf.TajoConf; @@ -28,7 +27,6 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.io.IOException; import java.sql.ResultSet; import static org.junit.Assert.*; @@ -52,51 +50,52 @@ public class TestFifoScheduler { } @Test - public final void testKillScheduledQuery() throws IOException, ServiceException, InterruptedException { - ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(2) from lineitem"); - ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select sleep(2) from lineitem"); - Thread.sleep(1000); + public final void testKillScheduledQuery() throws Exception { + ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(1) from lineitem"); + ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select sleep(1) from lineitem"); QueryId queryId = new QueryId(res.getQueryId()); QueryId queryId2 = new QueryId(res2.getQueryId()); - assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId2).getState()); + cluster.waitForQueryRunning(queryId); client.killQuery(queryId2); assertEquals(TajoProtos.QueryState.QUERY_KILLED, client.getQueryStatus(queryId2).getState()); - client.killQuery(queryId); - assertEquals(TajoProtos.QueryState.QUERY_KILLED, client.getQueryStatus(queryId).getState()); + + client.killQuery(queryId); // cleanup } @Test - public final void testForwardedQuery() throws IOException, ServiceException, InterruptedException { - ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(2) from lineitem"); + public final void testForwardedQuery() throws Exception { + ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(1) from lineitem"); ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select * from lineitem limit 1"); - - Thread.sleep(1000); + assertTrue(res.getIsForwarded()); assertFalse(res2.getIsForwarded()); + + QueryId queryId = new QueryId(res.getQueryId()); QueryId queryId2 = new QueryId(res2.getQueryId()); + cluster.waitForQueryRunning(queryId); + assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, client.getQueryStatus(queryId2).getState()); ResultSet resSet = TajoClient.createResultSet(client, res2); assertNotNull(resSet); - QueryId queryId = new QueryId(res.getQueryId()); - assertEquals(TajoProtos.QueryState.QUERY_RUNNING, client.getQueryStatus(queryId).getState()); - client.killQuery(queryId); + client.killQuery(queryId); //cleanup } @Test - public final void testScheduledQuery() throws IOException, ServiceException, InterruptedException { - ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(2) from lineitem"); - ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select sleep(2) from lineitem"); - ClientProtos.SubmitQueryResponse res3 = client.executeQuery("select sleep(2) from lineitem"); - ClientProtos.SubmitQueryResponse res4 = client.executeQuery("select sleep(2) from lineitem"); - - Thread.sleep(1000); + public final void testScheduledQuery() throws Exception { + ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(1) from lineitem"); + ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select sleep(1) from lineitem"); + ClientProtos.SubmitQueryResponse res3 = client.executeQuery("select sleep(1) from lineitem"); + ClientProtos.SubmitQueryResponse res4 = client.executeQuery("select sleep(1) from lineitem"); QueryId queryId = new QueryId(res.getQueryId()); QueryId queryId2 = new QueryId(res2.getQueryId()); QueryId queryId3 = new QueryId(res3.getQueryId()); QueryId queryId4 = new QueryId(res4.getQueryId()); - assertEquals(TajoProtos.QueryState.QUERY_RUNNING, client.getQueryStatus(queryId).getState()); + + cluster.waitForQueryRunning(queryId); + + assertTrue(TajoClient.isInRunningState(client.getQueryStatus(queryId).getState())); assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId2).getState()); assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId3).getState());
