TAJO-1312: Stage causes Invalid event error: SQ_SHUFFLE_REPORT at KILLED. 
(jinho)

Closes #359


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/d7ee6cd6
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/d7ee6cd6
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/d7ee6cd6

Branch: refs/heads/index_support
Commit: d7ee6cd682769f4c56aa1c053dc0bb4071813202
Parents: 5ba8e38
Author: jhkim <[email protected]>
Authored: Tue Jan 27 16:26:22 2015 +0900
Committer: jhkim <[email protected]>
Committed: Tue Jan 27 16:26:22 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 +
 .../org/apache/tajo/master/QueryInProgress.java |  1 +
 .../java/org/apache/tajo/querymaster/Stage.java |  2 +
 .../java/org/apache/tajo/querymaster/Task.java  |  5 +-
 .../tajo/worker/TajoWorkerClientService.java    |  4 +-
 .../org/apache/tajo/TajoTestingCluster.java     | 28 ++++++---
 .../master/scheduler/TestFifoScheduler.java     |  8 +--
 .../apache/tajo/querymaster/TestKillQuery.java  | 60 +++++++++++++++++++-
 8 files changed, 92 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/d7ee6cd6/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index eb1c3dc..ceacf97 100644
--- a/CHANGES
+++ b/CHANGES
@@ -171,6 +171,9 @@ Release 0.10.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1312: Stage causes Invalid event error: SQ_SHUFFLE_REPORT 
+    at KILLED. (jinho)
+
     TAJO-1318: Unit test failure after miniDFS cluster restart. (jinho)
 
     TAJO-1289: History reader fails to get the query information after 

http://git-wip-us.apache.org/repos/asf/tajo/blob/d7ee6cd6/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java 
b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
index 7e2c05f..e7371dd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
@@ -168,6 +168,7 @@ public class QueryInProgress {
 
       queryMasterRpcClient.executeQuery(null, builder.build(), 
NullCallback.get());
       querySubmitted.set(true);
+      
getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_MASTER_LAUNCHED);
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d7ee6cd6/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 13394f8..208d4a6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -257,6 +257,8 @@ public class Stage implements EventHandler<StageEvent> {
                   StageEventType.SQ_START,
                   StageEventType.SQ_KILL,
                   StageEventType.SQ_CONTAINER_ALLOCATED,
+                  StageEventType.SQ_SHUFFLE_REPORT,
+                  StageEventType.SQ_STAGE_COMPLETED,
                   StageEventType.SQ_FAILED))
 
           // Transitions from FAILED state

http://git-wip-us.apache.org/repos/asf/tajo/blob/d7ee6cd6/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
index 1c6a9a3..ad01b62 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
@@ -172,7 +172,10 @@ public class Task implements EventHandler<TaskEvent> {
           // Ignore-able transitions
           .addTransition(TaskState.KILLED, TaskState.KILLED,
               EnumSet.of(
-                  TaskEventType.T_KILL, TaskEventType.T_ATTEMPT_SUCCEEDED, 
TaskEventType.T_ATTEMPT_FAILED))
+                  TaskEventType.T_KILL,
+                  TaskEventType.T_SCHEDULE,
+                  TaskEventType.T_ATTEMPT_SUCCEEDED,
+                  TaskEventType.T_ATTEMPT_FAILED))
 
           .installTopology();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/d7ee6cd6/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index 2ae4bed..0b815d8 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -118,8 +118,8 @@ public class TajoWorkerClientService extends 
AbstractService {
       try {
         QueryId queryId = new QueryId(request.getQueryId());
 
-        QueryMasterTask queryMasterTask = 
workerContext.getQueryMaster().getQueryMasterTask(queryId);
-        QueryHistory queryHistory = null;
+        QueryMasterTask queryMasterTask = 
workerContext.getQueryMaster().getQueryMasterTask(queryId, true);
+        QueryHistory queryHistory;
         if (queryMasterTask == null) {
           queryHistory = 
workerContext.getHistoryReader().getQueryHistory(queryId.toString());
         } else {

http://git-wip-us.apache.org/repos/asf/tajo/blob/d7ee6cd6/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java 
b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 64b38ac..8714fc4 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -43,11 +43,11 @@ import org.apache.tajo.client.TajoClientUtil;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import 
org.apache.tajo.engine.planner.global.rewriter.GlobalPlanTestRuleProvider;
-import org.apache.tajo.master.QueryInProgress;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.plan.rewrite.LogicalPlanTestRuleProvider;
 import org.apache.tajo.querymaster.Query;
+import org.apache.tajo.querymaster.QueryMasterTask;
 import org.apache.tajo.querymaster.Stage;
 import org.apache.tajo.querymaster.StageState;
 import org.apache.tajo.util.CommonTestingUtil;
@@ -772,21 +772,20 @@ public class TajoTestingCluster {
     }
   }
 
-  public void waitForQueryRunning(QueryId queryId) throws Exception {
-    waitForQueryRunning(queryId, 50);
+  public void waitForQuerySubmitted(QueryId queryId) throws Exception {
+    waitForQuerySubmitted(queryId, 50);
   }
 
-  public void waitForQueryRunning(QueryId queryId, int delay) throws Exception 
{
-    QueryInProgress qip = null;
+  public void waitForQuerySubmitted(QueryId queryId, int delay) throws 
Exception {
+    QueryMasterTask qmt = null;
 
     int i = 0;
-    while (qip == null || 
TajoClientUtil.isQueryWaitingForSchedule(qip.getQueryInfo().getQueryState())) {
+    while (qmt == null || 
TajoClientUtil.isQueryWaitingForSchedule(qmt.getState())) {
       try {
         Thread.sleep(delay);
-        if(qip == null){
 
-          TajoMaster master = getMaster();
-          qip = 
master.getContext().getQueryJobManager().getQueryInProgress(queryId);
+        if (qmt == null) {
+          qmt = getQueryMasterTask(queryId);
         }
       } catch (InterruptedException e) {
       }
@@ -822,4 +821,15 @@ public class TajoTestingCluster {
       }
     }
   }
+
+  public QueryMasterTask getQueryMasterTask(QueryId queryId) {
+    QueryMasterTask qmt = null;
+    for (TajoWorker worker : getTajoWorkers()) {
+      qmt = 
worker.getWorkerContext().getQueryMaster().getQueryMasterTask(queryId, true);
+      if (qmt != null && queryId.equals(qmt.getQueryId())) {
+        break;
+      }
+    }
+    return qmt;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d7ee6cd6/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestFifoScheduler.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestFifoScheduler.java
 
b/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestFifoScheduler.java
index e0c30a8..0a8a51c 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestFifoScheduler.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestFifoScheduler.java
@@ -68,7 +68,7 @@ public class TestFifoScheduler {
     QueryId queryId = new QueryId(res.getQueryId());
     QueryId queryId2 = new QueryId(res2.getQueryId());
 
-    cluster.waitForQueryRunning(queryId);
+    cluster.waitForQuerySubmitted(queryId);
     client.killQuery(queryId2);
     assertEquals(TajoProtos.QueryState.QUERY_KILLED, 
client.getQueryStatus(queryId2).getState());
   }
@@ -82,7 +82,7 @@ public class TestFifoScheduler {
 
     QueryId queryId = new QueryId(res.getQueryId());
     QueryId queryId2 = new QueryId(res2.getQueryId());
-    cluster.waitForQueryRunning(queryId);
+    cluster.waitForQuerySubmitted(queryId);
 
     assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, 
client.getQueryStatus(queryId2).getState());
     ResultSet resSet = TajoClientUtil.createResultSet(conf, client, res2);
@@ -101,9 +101,9 @@ public class TestFifoScheduler {
     QueryId queryId3 = new QueryId(res3.getQueryId());
     QueryId queryId4 = new QueryId(res4.getQueryId());
 
-    cluster.waitForQueryRunning(queryId);
+    cluster.waitForQuerySubmitted(queryId);
 
-    
assertTrue(TajoClientUtil.isQueryRunning(client.getQueryStatus(queryId).getState()));
+    
assertFalse(TajoClientUtil.isQueryComplete(client.getQueryStatus(queryId).getState()));
 
     assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, 
client.getQueryStatus(queryId2).getState());
     assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, 
client.getQueryStatus(queryId3).getState());

http://git-wip-us.apache.org/repos/asf/tajo/blob/d7ee6cd6/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java 
b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index bd899cd..42ad8da 100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.querymaster;
 
+import com.google.common.collect.Lists;
 import org.apache.tajo.*;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.benchmark.TPCH;
@@ -29,18 +30,22 @@ import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.master.event.QueryEvent;
 import org.apache.tajo.master.event.QueryEventType;
-import org.apache.tajo.session.Session;
+import org.apache.tajo.master.event.StageEvent;
+import org.apache.tajo.master.event.StageEventType;
 import org.apache.tajo.plan.LogicalOptimizer;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.session.Session;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.List;
 
 import static org.junit.Assert.*;
 
@@ -48,6 +53,9 @@ public class TestKillQuery {
   private static TajoTestingCluster cluster;
   private static TajoConf conf;
   private static TajoClient client;
+  private static String queryStr = "select t1.l_orderkey, t1.l_partkey, 
t2.c_custkey " +
+      "from lineitem t1 join customer t2 " +
+      "on t1.l_orderkey = t2.c_custkey order by t1.l_orderkey";
 
   @BeforeClass
   public static void setUp() throws Exception {
@@ -59,6 +67,11 @@ public class TestKillQuery {
     client.executeQueryAndGetResult("create external table default.lineitem 
(l_orderkey int, l_partkey int) "
         + "using text location 'file://" + file.getAbsolutePath() + "'");
     assertTrue(client.existTable("default.lineitem"));
+
+    file = TPCH.getDataFile("customer");
+    client.executeQueryAndGetResult("create external table default.customer 
(c_custkey int, c_name text) "
+        + "using text location 'file://" + file.getAbsolutePath() + "'");
+    assertTrue(client.existTable("default.customer"));
   }
 
   @AfterClass
@@ -73,11 +86,10 @@ public class TestKillQuery {
     QueryContext defaultContext = 
LocalTajoTestingUtility.createDummyContext(conf);
     Session session = LocalTajoTestingUtility.createDummySession();
     CatalogService catalog = cluster.getMaster().getCatalog();
-    String query = "select l_orderkey, l_partkey from lineitem group by 
l_orderkey, l_partkey order by l_orderkey";
 
     LogicalPlanner planner = new LogicalPlanner(catalog);
     LogicalOptimizer optimizer = new LogicalOptimizer(conf);
-    Expr expr =  analyzer.parse(query);
+    Expr expr =  analyzer.parse(queryStr);
     LogicalPlan plan = planner.createPlan(defaultContext, expr);
 
     optimizer.optimize(plan);
@@ -122,4 +134,46 @@ public class TestKillQuery {
     }
     queryMasterTask.stop();
   }
+
+  @Test
+  public final void testIgnoreStageStateFromKilled() throws Exception {
+
+    ClientProtos.SubmitQueryResponse res = client.executeQuery(queryStr);
+    QueryId queryId = new QueryId(res.getQueryId());
+    cluster.waitForQuerySubmitted(queryId);
+
+    QueryMasterTask qmt = cluster.getQueryMasterTask(queryId);
+    Query query = qmt.getQuery();
+
+    query.handle(new QueryEvent(queryId, QueryEventType.KILL));
+
+    try{
+      cluster.waitForQueryState(query, TajoProtos.QueryState.QUERY_KILLED, 50);
+    } finally {
+      assertEquals(TajoProtos.QueryState.QUERY_KILLED, 
query.getSynchronizedState());
+    }
+
+    List<Stage> stages = Lists.newArrayList(query.getStages());
+    Stage lastStage = stages.get(stages.size() - 1);
+
+    assertEquals(StageState.KILLED, lastStage.getSynchronizedState());
+
+    lastStage.getStateMachine().doTransition(StageEventType.SQ_START,
+        new StageEvent(lastStage.getId(), StageEventType.SQ_START));
+
+    lastStage.getStateMachine().doTransition(StageEventType.SQ_KILL,
+        new StageEvent(lastStage.getId(), StageEventType.SQ_KILL));
+
+    
lastStage.getStateMachine().doTransition(StageEventType.SQ_CONTAINER_ALLOCATED,
+        new StageEvent(lastStage.getId(), 
StageEventType.SQ_CONTAINER_ALLOCATED));
+
+    lastStage.getStateMachine().doTransition(StageEventType.SQ_SHUFFLE_REPORT,
+        new StageEvent(lastStage.getId(), StageEventType.SQ_SHUFFLE_REPORT));
+
+    lastStage.getStateMachine().doTransition(StageEventType.SQ_STAGE_COMPLETED,
+        new StageEvent(lastStage.getId(), StageEventType.SQ_STAGE_COMPLETED));
+
+    lastStage.getStateMachine().doTransition(StageEventType.SQ_FAILED,
+        new StageEvent(lastStage.getId(), StageEventType.SQ_FAILED));
+  }
 }

Reply via email to