Repository: tajo Updated Branches: refs/heads/branch-0.10.1 a5350257d -> becf85b0a
TAJO-1440: Some tests fail in parallel test environment in TestKillQuery. Closes #472 Signed-off-by: Jinho Kim <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/becf85b0 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/becf85b0 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/becf85b0 Branch: refs/heads/branch-0.10.1 Commit: becf85b0ac83e764e431f4c30706cca0c16d836f Parents: a535025 Author: Jongyoung Park <[email protected]> Authored: Fri Mar 27 18:03:53 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Fri Mar 27 18:03:53 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 ++ .../java/org/apache/tajo/querymaster/Query.java | 5 +-- .../tajo/querymaster/QueryMasterTask.java | 12 ++++-- .../apache/tajo/querymaster/TestKillQuery.java | 40 +++++++++++++++----- 4 files changed, 44 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/becf85b0/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 2b7b257..d111c8b 100644 --- a/CHANGES +++ b/CHANGES @@ -20,6 +20,9 @@ Release 0.10.1 - unreleased (Contributed by navis, Committed by hyunsik) BUG FIXES + + TAJO-1440: Some tests fail in parallel test environment in TestKillQuery. + (Contributed by Jongyoung Park. Committed by jinho) TAJO-1147: Simple query doesn't work in Web UI. (Contributed by Jongyoung Park. Committed by jaehwa) http://git-wip-us.apache.org/repos/asf/tajo/blob/becf85b0/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java index c2740e5..1ce15fc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java @@ -393,8 +393,7 @@ public class Query implements EventHandler<QueryEvent> { query.getExecutionBlockCursor().nextBlock()); stage.setPriority(query.priority--); query.addStage(stage); - - stage.handle(new StageEvent(stage.getId(), StageEventType.SQ_INIT)); + stage.getEventHandler().handle(new StageEvent(stage.getId(), StageEventType.SQ_INIT)); LOG.debug("Schedule unit plan: \n" + stage.getBlock().getPlan()); } } @@ -630,7 +629,7 @@ public class Query implements EventHandler<QueryEvent> { Stage nextStage = new Stage(query.context, query.getPlan(), nextBlock); nextStage.setPriority(query.priority--); query.addStage(nextStage); - nextStage.handle(new StageEvent(nextStage.getId(), StageEventType.SQ_INIT)); + nextStage.getEventHandler().handle(new StageEvent(nextStage.getId(), StageEventType.SQ_INIT)); LOG.info("Scheduling Stage:" + nextStage.getId()); if(LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/tajo/blob/becf85b0/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java index 0d1924b..465fa84 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java @@ -116,7 +116,8 @@ public class QueryMasterTask extends CompositeService { new ArrayList<TajoWorkerProtocol.TaskFatalErrorReport>(); public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext, - QueryId queryId, Session session, QueryContext queryContext, String jsonExpr) { + QueryId queryId, Session session, QueryContext queryContext, + String jsonExpr, AsyncDispatcher dispatcher) { super(QueryMasterTask.class.getName()); this.queryMasterContext = queryMasterContext; @@ -125,6 +126,13 @@ public class QueryMasterTask extends CompositeService { this.queryContext = queryContext; this.jsonExpr = jsonExpr; this.querySubmitTime = System.currentTimeMillis(); + this.dispatcher = dispatcher; + } + + public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext, + QueryId queryId, Session session, QueryContext queryContext, + String jsonExpr) { + this(queryMasterContext, queryId, session, queryContext, jsonExpr, new AsyncDispatcher()); } @Override @@ -144,8 +152,6 @@ public class QueryMasterTask extends CompositeService { throw new UnimplementedException(resourceManagerClassName + " is not supported yet"); } addService(resourceAllocator); - - dispatcher = new AsyncDispatcher(); addService(dispatcher); dispatcher.register(StageEventType.class, new StageEventDispatcher()); http://git-wip-us.apache.org/repos/asf/tajo/blob/becf85b0/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java index 8fb8e73..09be700 100644 --- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java @@ -19,6 +19,8 @@ package org.apache.tajo.querymaster; import com.google.common.collect.Lists; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Event; import org.apache.tajo.*; import org.apache.tajo.algebra.Expr; import org.apache.tajo.benchmark.TPCH; @@ -52,6 +54,8 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.*; @@ -106,29 +110,26 @@ public class TestKillQuery { GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog); globalPlanner.build(masterPlan); + CountDownLatch barrier = new CountDownLatch(1); + MockAsyncDispatch dispatch = new MockAsyncDispatch(barrier, StageEventType.SQ_INIT); + QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster(); QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(), - queryId, session, defaultContext, expr.toJson()); + queryId, session, defaultContext, expr.toJson(), dispatch); queryMasterTask.init(conf); queryMasterTask.getQueryTaskContext().getDispatcher().start(); queryMasterTask.startQuery(); try{ - cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_RUNNING, 2); - } finally { - assertEquals(TajoProtos.QueryState.QUERY_RUNNING, queryMasterTask.getQuery().getSynchronizedState()); + barrier.await(5000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + fail("Query state : " + queryMasterTask.getQuery().getSynchronizedState()); } Stage stage = queryMasterTask.getQuery().getStages().iterator().next(); assertNotNull(stage); - try{ - cluster.waitForStageState(stage, StageState.INITED, 2); - } finally { - assertEquals(StageState.INITED, stage.getSynchronizedState()); - } - // fire kill event Query q = queryMasterTask.getQuery(); q.handle(new QueryEvent(queryId, QueryEventType.KILL)); @@ -223,4 +224,23 @@ public class TestKillQuery { assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getStatus()); } } + + static class MockAsyncDispatch extends AsyncDispatcher { + private CountDownLatch latch; + private Enum eventType; + + MockAsyncDispatch(CountDownLatch latch, Enum eventType) { + super(); + this.latch = latch; + this.eventType = eventType; + } + + @Override + protected void dispatch(Event event) { + if (event.getType() == eventType) { + latch.countDown(); + } + super.dispatch(event); + } + } }
