Repository: tajo
Updated Branches:
  refs/heads/branch-0.10.1 9ca7688f6 -> c9b5e1134


TAJO-1581: Does not update last state of query stage in non-hash shuffle. 
(jinho)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/c9b5e113
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/c9b5e113
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/c9b5e113

Branch: refs/heads/branch-0.10.1
Commit: c9b5e11347c4e380a3dd97671758a841941ae75b
Parents: 9ca7688
Author: Jinho Kim <[email protected]>
Authored: Mon Apr 27 14:45:47 2015 +0900
Committer: Jinho Kim <[email protected]>
Committed: Mon Apr 27 14:45:47 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 +
 .../java/org/apache/tajo/querymaster/Stage.java | 12 +--
 .../apache/tajo/querymaster/TestQueryState.java | 93 ++++++++++++++++++++
 3 files changed, 102 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/c9b5e113/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index b7012c8..6cd7ad7 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,6 +27,9 @@ Release 0.10.1 - unreleased
 
   BUG FIXES
 
+    TAJO-1581: Does not update last state of query stage in non-hash shuffle.
+    (jinho)
+
     TAJO-1580: Error line number is incorrect.
     (Contributed by Jongyoung Park. Committed by jaehwa)
     

http://git-wip-us.apache.org/repos/asf/tajo/blob/c9b5e113/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 80ccc21..cf8c1ce 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -1312,7 +1312,7 @@ public class Stage implements EventHandler<StageEvent> {
     if (!report.getReportSuccess()) {
       stopFinalization();
       LOG.error(getId() + ", " + type + " report are failed. Caused by:" + 
report.getReportErrorMessage());
-      eventHandler.handle(new StageEvent(getId(), StageEventType.SQ_FAILED));
+      getEventHandler().handle(new StageEvent(getId(), 
StageEventType.SQ_FAILED));
     }
 
     completedShuffleTasks.addAndGet(report.getSucceededTasks());
@@ -1324,7 +1324,7 @@ public class Stage implements EventHandler<StageEvent> {
 
     if (completedShuffleTasks.get() >= succeededObjectCount) {
       LOG.info(getId() + ", Finalized " + type + " reports: " + 
completedShuffleTasks.get());
-      eventHandler.handle(new StageEvent(getId(), 
StageEventType.SQ_STAGE_COMPLETED));
+      getEventHandler().handle(new StageEvent(getId(), 
StageEventType.SQ_STAGE_COMPLETED));
       if (timeoutChecker != null) {
         stopFinalization();
         synchronized (timeoutChecker){
@@ -1390,7 +1390,7 @@ public class Stage implements EventHandler<StageEvent> {
                       stage.stopFinalization();
                       LOG.error(stage.getId() + ": Timed out while receiving 
intermediate reports: " + elapsedTime
                           + " ms, report:" + stage.completedShuffleTasks.get() 
+ "/" + stage.succeededObjectCount);
-                      stage.eventHandler.handle(new StageEvent(stage.getId(), 
StageEventType.SQ_FAILED));
+                      stage.getEventHandler().handle(new 
StageEvent(stage.getId(), StageEventType.SQ_FAILED));
                     }
                     synchronized (this) {
                       try {
@@ -1404,14 +1404,14 @@ public class Stage implements EventHandler<StageEvent> {
               stage.timeoutChecker.start();
             }
           } else {
-            stage.handle(new StageEvent(stage.getId(), 
StageEventType.SQ_STAGE_COMPLETED));
+            stage.getEventHandler().handle(new StageEvent(stage.getId(), 
StageEventType.SQ_STAGE_COMPLETED));
           }
         }
       } catch (Throwable t) {
         LOG.error(t.getMessage(), t);
         stage.stopFinalization();
-        stage.eventHandler.handle(new 
StageDiagnosticsUpdateEvent(stage.getId(), t.getMessage()));
-        stage.eventHandler.handle(new StageEvent(stage.getId(), 
StageEventType.SQ_INTERNAL_ERROR));
+        stage.getEventHandler().handle(new 
StageDiagnosticsUpdateEvent(stage.getId(), t.getMessage()));
+        stage.getEventHandler().handle(new StageEvent(stage.getId(), 
StageEventType.SQ_INTERNAL_ERROR));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/c9b5e113/tajo-core/src/test/java/org/apache/tajo/querymaster/TestQueryState.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestQueryState.java 
b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestQueryState.java
new file mode 100644
index 0000000..a822e42
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestQueryState.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.querymaster;
+
+import org.apache.tajo.*;
+import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.master.QueryManager;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+@Category(IntegrationTest.class)
+public class TestQueryState {
+  private static TajoTestingCluster cluster;
+  private static TajoClient client;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    cluster = TpchTestBase.getInstance().getTestingCluster();
+    client = cluster.newTajoClient();
+  }
+
+  @Test(timeout = 10000)
+  public void testSucceededState() throws Exception {
+    String queryStr = "select l_orderkey from lineitem group by l_orderkey 
order by l_orderkey";
+    /*
+    =======================================================
+    Block Id: eb_1429886996479_0001_000001 [LEAF] HASH_SHUFFLE
+    Block Id: eb_1429886996479_0001_000002 [INTERMEDIATE] RANGE_SHUFFLE
+    Block Id: eb_1429886996479_0001_000003 [ROOT] NONE_SHUFFLE
+    Block Id: eb_1429886996479_0001_000004 [TERMINAL]
+    =======================================================
+
+    The order of execution:
+
+    1: eb_1429886996479_0001_000001
+    2: eb_1429886996479_0001_000002
+    3: eb_1429886996479_0001_000003
+    4: eb_1429886996479_0001_000004
+    */
+
+    ClientProtos.SubmitQueryResponse res = client.executeQuery(queryStr);
+    QueryId queryId = new QueryId(res.getQueryId());
+    cluster.waitForQuerySubmitted(queryId);
+
+    QueryMasterTask qmt = cluster.getQueryMasterTask(queryId);
+    Query query = qmt.getQuery();
+
+    // wait for query complete
+    cluster.waitForQueryState(query, TajoProtos.QueryState.QUERY_SUCCEEDED, 
100);
+
+    assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, qmt.getState());
+
+    assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, 
query.getSynchronizedState());
+    assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, query.getState());
+
+    assertFalse(query.getStages().isEmpty());
+    for (Stage stage : query.getStages()) {
+      assertEquals(StageState.SUCCEEDED, stage.getSynchronizedState());
+      assertEquals(StageState.SUCCEEDED, stage.getState());
+    }
+
+    /* wait for heartbeat from QueryMaster */
+    QueryManager queryManager = 
cluster.getMaster().getContext().getQueryJobManager();
+    for (; ; ) {
+      if (queryManager.getFinishedQuery(queryId) != null) break;
+      else Thread.sleep(100);
+    }
+
+    /* get status from TajoMaster */
+    assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, 
client.getQueryStatus(queryId).getState());
+  }
+}

Reply via email to