TAJO-1312: Stage causes Invalid event error: SQ_SHUFFLE_REPORT at KILLED. (jinho)
Closes #359 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/d7ee6cd6 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/d7ee6cd6 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/d7ee6cd6 Branch: refs/heads/index_support Commit: d7ee6cd682769f4c56aa1c053dc0bb4071813202 Parents: 5ba8e38 Author: jhkim <[email protected]> Authored: Tue Jan 27 16:26:22 2015 +0900 Committer: jhkim <[email protected]> Committed: Tue Jan 27 16:26:22 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../org/apache/tajo/master/QueryInProgress.java | 1 + .../java/org/apache/tajo/querymaster/Stage.java | 2 + .../java/org/apache/tajo/querymaster/Task.java | 5 +- .../tajo/worker/TajoWorkerClientService.java | 4 +- .../org/apache/tajo/TajoTestingCluster.java | 28 ++++++--- .../master/scheduler/TestFifoScheduler.java | 8 +-- .../apache/tajo/querymaster/TestKillQuery.java | 60 +++++++++++++++++++- 8 files changed, 92 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/d7ee6cd6/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index eb1c3dc..ceacf97 100644 --- a/CHANGES +++ b/CHANGES @@ -171,6 +171,9 @@ Release 0.10.0 - unreleased BUG FIXES + TAJO-1312: Stage causes Invalid event error: SQ_SHUFFLE_REPORT + at KILLED. (jinho) + TAJO-1318: Unit test failure after miniDFS cluster restart. (jinho) TAJO-1289: History reader fails to get the query information after http://git-wip-us.apache.org/repos/asf/tajo/blob/d7ee6cd6/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java index 7e2c05f..e7371dd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java @@ -168,6 +168,7 @@ public class QueryInProgress { queryMasterRpcClient.executeQuery(null, builder.build(), NullCallback.get()); querySubmitted.set(true); + getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_MASTER_LAUNCHED); } catch (Exception e) { LOG.error(e.getMessage(), e); } http://git-wip-us.apache.org/repos/asf/tajo/blob/d7ee6cd6/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index 13394f8..208d4a6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -257,6 +257,8 @@ public class Stage implements EventHandler<StageEvent> { StageEventType.SQ_START, StageEventType.SQ_KILL, StageEventType.SQ_CONTAINER_ALLOCATED, + StageEventType.SQ_SHUFFLE_REPORT, + StageEventType.SQ_STAGE_COMPLETED, StageEventType.SQ_FAILED)) // Transitions from FAILED state http://git-wip-us.apache.org/repos/asf/tajo/blob/d7ee6cd6/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java index 1c6a9a3..ad01b62 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java @@ -172,7 +172,10 @@ public class Task implements EventHandler<TaskEvent> { // Ignore-able transitions .addTransition(TaskState.KILLED, TaskState.KILLED, EnumSet.of( - TaskEventType.T_KILL, TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_ATTEMPT_FAILED)) + TaskEventType.T_KILL, + TaskEventType.T_SCHEDULE, + TaskEventType.T_ATTEMPT_SUCCEEDED, + TaskEventType.T_ATTEMPT_FAILED)) .installTopology(); http://git-wip-us.apache.org/repos/asf/tajo/blob/d7ee6cd6/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java index 2ae4bed..0b815d8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java @@ -118,8 +118,8 @@ public class TajoWorkerClientService extends AbstractService { try { QueryId queryId = new QueryId(request.getQueryId()); - QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId); - QueryHistory queryHistory = null; + QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId, true); + QueryHistory queryHistory; if (queryMasterTask == null) { queryHistory = workerContext.getHistoryReader().getQueryHistory(queryId.toString()); } else { http://git-wip-us.apache.org/repos/asf/tajo/blob/d7ee6cd6/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index 64b38ac..8714fc4 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -43,11 +43,11 @@ import org.apache.tajo.client.TajoClientUtil; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanTestRuleProvider; -import org.apache.tajo.master.QueryInProgress; import org.apache.tajo.master.TajoMaster; import org.apache.tajo.master.rm.TajoWorkerResourceManager; import org.apache.tajo.plan.rewrite.LogicalPlanTestRuleProvider; import org.apache.tajo.querymaster.Query; +import org.apache.tajo.querymaster.QueryMasterTask; import org.apache.tajo.querymaster.Stage; import org.apache.tajo.querymaster.StageState; import org.apache.tajo.util.CommonTestingUtil; @@ -772,21 +772,20 @@ public class TajoTestingCluster { } } - public void waitForQueryRunning(QueryId queryId) throws Exception { - waitForQueryRunning(queryId, 50); + public void waitForQuerySubmitted(QueryId queryId) throws Exception { + waitForQuerySubmitted(queryId, 50); } - public void waitForQueryRunning(QueryId queryId, int delay) throws Exception { - QueryInProgress qip = null; + public void waitForQuerySubmitted(QueryId queryId, int delay) throws Exception { + QueryMasterTask qmt = null; int i = 0; - while (qip == null || TajoClientUtil.isQueryWaitingForSchedule(qip.getQueryInfo().getQueryState())) { + while (qmt == null || TajoClientUtil.isQueryWaitingForSchedule(qmt.getState())) { try { Thread.sleep(delay); - if(qip == null){ - TajoMaster master = getMaster(); - qip = master.getContext().getQueryJobManager().getQueryInProgress(queryId); + if (qmt == null) { + qmt = getQueryMasterTask(queryId); } } catch (InterruptedException e) { } @@ -822,4 +821,15 @@ public class TajoTestingCluster { } } } + + public QueryMasterTask getQueryMasterTask(QueryId queryId) { + QueryMasterTask qmt = null; + for (TajoWorker worker : getTajoWorkers()) { + qmt = worker.getWorkerContext().getQueryMaster().getQueryMasterTask(queryId, true); + if (qmt != null && queryId.equals(qmt.getQueryId())) { + break; + } + } + return qmt; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/d7ee6cd6/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestFifoScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestFifoScheduler.java b/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestFifoScheduler.java index e0c30a8..0a8a51c 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestFifoScheduler.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestFifoScheduler.java @@ -68,7 +68,7 @@ public class TestFifoScheduler { QueryId queryId = new QueryId(res.getQueryId()); QueryId queryId2 = new QueryId(res2.getQueryId()); - cluster.waitForQueryRunning(queryId); + cluster.waitForQuerySubmitted(queryId); client.killQuery(queryId2); assertEquals(TajoProtos.QueryState.QUERY_KILLED, client.getQueryStatus(queryId2).getState()); } @@ -82,7 +82,7 @@ public class TestFifoScheduler { QueryId queryId = new QueryId(res.getQueryId()); QueryId queryId2 = new QueryId(res2.getQueryId()); - cluster.waitForQueryRunning(queryId); + cluster.waitForQuerySubmitted(queryId); assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, client.getQueryStatus(queryId2).getState()); ResultSet resSet = TajoClientUtil.createResultSet(conf, client, res2); @@ -101,9 +101,9 @@ public class TestFifoScheduler { QueryId queryId3 = new QueryId(res3.getQueryId()); QueryId queryId4 = new QueryId(res4.getQueryId()); - cluster.waitForQueryRunning(queryId); + cluster.waitForQuerySubmitted(queryId); - assertTrue(TajoClientUtil.isQueryRunning(client.getQueryStatus(queryId).getState())); + assertFalse(TajoClientUtil.isQueryComplete(client.getQueryStatus(queryId).getState())); assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId2).getState()); assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId3).getState()); http://git-wip-us.apache.org/repos/asf/tajo/blob/d7ee6cd6/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java index bd899cd..42ad8da 100644 --- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java @@ -18,6 +18,7 @@ package org.apache.tajo.querymaster; +import com.google.common.collect.Lists; import org.apache.tajo.*; import org.apache.tajo.algebra.Expr; import org.apache.tajo.benchmark.TPCH; @@ -29,18 +30,22 @@ import org.apache.tajo.engine.parser.SQLAnalyzer; import org.apache.tajo.engine.planner.global.GlobalPlanner; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.master.event.QueryEvent; import org.apache.tajo.master.event.QueryEventType; -import org.apache.tajo.session.Session; +import org.apache.tajo.master.event.StageEvent; +import org.apache.tajo.master.event.StageEventType; import org.apache.tajo.plan.LogicalOptimizer; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.session.Session; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import java.io.File; import java.io.IOException; +import java.util.List; import static org.junit.Assert.*; @@ -48,6 +53,9 @@ public class TestKillQuery { private static TajoTestingCluster cluster; private static TajoConf conf; private static TajoClient client; + private static String queryStr = "select t1.l_orderkey, t1.l_partkey, t2.c_custkey " + + "from lineitem t1 join customer t2 " + + "on t1.l_orderkey = t2.c_custkey order by t1.l_orderkey"; @BeforeClass public static void setUp() throws Exception { @@ -59,6 +67,11 @@ public class TestKillQuery { client.executeQueryAndGetResult("create external table default.lineitem (l_orderkey int, l_partkey int) " + "using text location 'file://" + file.getAbsolutePath() + "'"); assertTrue(client.existTable("default.lineitem")); + + file = TPCH.getDataFile("customer"); + client.executeQueryAndGetResult("create external table default.customer (c_custkey int, c_name text) " + + "using text location 'file://" + file.getAbsolutePath() + "'"); + assertTrue(client.existTable("default.customer")); } @AfterClass @@ -73,11 +86,10 @@ public class TestKillQuery { QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(conf); Session session = LocalTajoTestingUtility.createDummySession(); CatalogService catalog = cluster.getMaster().getCatalog(); - String query = "select l_orderkey, l_partkey from lineitem group by l_orderkey, l_partkey order by l_orderkey"; LogicalPlanner planner = new LogicalPlanner(catalog); LogicalOptimizer optimizer = new LogicalOptimizer(conf); - Expr expr = analyzer.parse(query); + Expr expr = analyzer.parse(queryStr); LogicalPlan plan = planner.createPlan(defaultContext, expr); optimizer.optimize(plan); @@ -122,4 +134,46 @@ public class TestKillQuery { } queryMasterTask.stop(); } + + @Test + public final void testIgnoreStageStateFromKilled() throws Exception { + + ClientProtos.SubmitQueryResponse res = client.executeQuery(queryStr); + QueryId queryId = new QueryId(res.getQueryId()); + cluster.waitForQuerySubmitted(queryId); + + QueryMasterTask qmt = cluster.getQueryMasterTask(queryId); + Query query = qmt.getQuery(); + + query.handle(new QueryEvent(queryId, QueryEventType.KILL)); + + try{ + cluster.waitForQueryState(query, TajoProtos.QueryState.QUERY_KILLED, 50); + } finally { + assertEquals(TajoProtos.QueryState.QUERY_KILLED, query.getSynchronizedState()); + } + + List<Stage> stages = Lists.newArrayList(query.getStages()); + Stage lastStage = stages.get(stages.size() - 1); + + assertEquals(StageState.KILLED, lastStage.getSynchronizedState()); + + lastStage.getStateMachine().doTransition(StageEventType.SQ_START, + new StageEvent(lastStage.getId(), StageEventType.SQ_START)); + + lastStage.getStateMachine().doTransition(StageEventType.SQ_KILL, + new StageEvent(lastStage.getId(), StageEventType.SQ_KILL)); + + lastStage.getStateMachine().doTransition(StageEventType.SQ_CONTAINER_ALLOCATED, + new StageEvent(lastStage.getId(), StageEventType.SQ_CONTAINER_ALLOCATED)); + + lastStage.getStateMachine().doTransition(StageEventType.SQ_SHUFFLE_REPORT, + new StageEvent(lastStage.getId(), StageEventType.SQ_SHUFFLE_REPORT)); + + lastStage.getStateMachine().doTransition(StageEventType.SQ_STAGE_COMPLETED, + new StageEvent(lastStage.getId(), StageEventType.SQ_STAGE_COMPLETED)); + + lastStage.getStateMachine().doTransition(StageEventType.SQ_FAILED, + new StageEvent(lastStage.getId(), StageEventType.SQ_FAILED)); + } }
