Repository: tajo Updated Branches: refs/heads/branch-0.10.1 9ca7688f6 -> c9b5e1134
TAJO-1581: Does not update last state of query stage in non-hash shuffle. (jinho) Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/c9b5e113 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/c9b5e113 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/c9b5e113 Branch: refs/heads/branch-0.10.1 Commit: c9b5e11347c4e380a3dd97671758a841941ae75b Parents: 9ca7688 Author: Jinho Kim <[email protected]> Authored: Mon Apr 27 14:45:47 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Mon Apr 27 14:45:47 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../java/org/apache/tajo/querymaster/Stage.java | 12 +-- .../apache/tajo/querymaster/TestQueryState.java | 93 ++++++++++++++++++++ 3 files changed, 102 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/c9b5e113/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index b7012c8..6cd7ad7 100644 --- a/CHANGES +++ b/CHANGES @@ -27,6 +27,9 @@ Release 0.10.1 - unreleased BUG FIXES + TAJO-1581: Does not update last state of query stage in non-hash shuffle. + (jinho) + TAJO-1580: Error line number is incorrect. (Contributed by Jongyoung Park. Committed by jaehwa) http://git-wip-us.apache.org/repos/asf/tajo/blob/c9b5e113/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index 80ccc21..cf8c1ce 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -1312,7 +1312,7 @@ public class Stage implements EventHandler<StageEvent> { if (!report.getReportSuccess()) { stopFinalization(); LOG.error(getId() + ", " + type + " report are failed. Caused by:" + report.getReportErrorMessage()); - eventHandler.handle(new StageEvent(getId(), StageEventType.SQ_FAILED)); + getEventHandler().handle(new StageEvent(getId(), StageEventType.SQ_FAILED)); } completedShuffleTasks.addAndGet(report.getSucceededTasks()); @@ -1324,7 +1324,7 @@ public class Stage implements EventHandler<StageEvent> { if (completedShuffleTasks.get() >= succeededObjectCount) { LOG.info(getId() + ", Finalized " + type + " reports: " + completedShuffleTasks.get()); - eventHandler.handle(new StageEvent(getId(), StageEventType.SQ_STAGE_COMPLETED)); + getEventHandler().handle(new StageEvent(getId(), StageEventType.SQ_STAGE_COMPLETED)); if (timeoutChecker != null) { stopFinalization(); synchronized (timeoutChecker){ @@ -1390,7 +1390,7 @@ public class Stage implements EventHandler<StageEvent> { stage.stopFinalization(); LOG.error(stage.getId() + ": Timed out while receiving intermediate reports: " + elapsedTime + " ms, report:" + stage.completedShuffleTasks.get() + "/" + stage.succeededObjectCount); - stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED)); + stage.getEventHandler().handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED)); } synchronized (this) { try { @@ -1404,14 +1404,14 @@ public class Stage implements EventHandler<StageEvent> { stage.timeoutChecker.start(); } } else { - stage.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED)); + stage.getEventHandler().handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED)); } } } catch (Throwable t) { LOG.error(t.getMessage(), t); stage.stopFinalization(); - stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), t.getMessage())); - stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_INTERNAL_ERROR)); + stage.getEventHandler().handle(new StageDiagnosticsUpdateEvent(stage.getId(), t.getMessage())); + stage.getEventHandler().handle(new StageEvent(stage.getId(), StageEventType.SQ_INTERNAL_ERROR)); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/c9b5e113/tajo-core/src/test/java/org/apache/tajo/querymaster/TestQueryState.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestQueryState.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestQueryState.java new file mode 100644 index 0000000..a822e42 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestQueryState.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.querymaster; + +import org.apache.tajo.*; +import org.apache.tajo.client.TajoClient; +import org.apache.tajo.ipc.ClientProtos; +import org.apache.tajo.master.QueryManager; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +@Category(IntegrationTest.class) +public class TestQueryState { + private static TajoTestingCluster cluster; + private static TajoClient client; + + @BeforeClass + public static void setUp() throws Exception { + cluster = TpchTestBase.getInstance().getTestingCluster(); + client = cluster.newTajoClient(); + } + + @Test(timeout = 10000) + public void testSucceededState() throws Exception { + String queryStr = "select l_orderkey from lineitem group by l_orderkey order by l_orderkey"; + /* + ======================================================= + Block Id: eb_1429886996479_0001_000001 [LEAF] HASH_SHUFFLE + Block Id: eb_1429886996479_0001_000002 [INTERMEDIATE] RANGE_SHUFFLE + Block Id: eb_1429886996479_0001_000003 [ROOT] NONE_SHUFFLE + Block Id: eb_1429886996479_0001_000004 [TERMINAL] + ======================================================= + + The order of execution: + + 1: eb_1429886996479_0001_000001 + 2: eb_1429886996479_0001_000002 + 3: eb_1429886996479_0001_000003 + 4: eb_1429886996479_0001_000004 + */ + + ClientProtos.SubmitQueryResponse res = client.executeQuery(queryStr); + QueryId queryId = new QueryId(res.getQueryId()); + cluster.waitForQuerySubmitted(queryId); + + QueryMasterTask qmt = cluster.getQueryMasterTask(queryId); + Query query = qmt.getQuery(); + + // wait for query complete + cluster.waitForQueryState(query, TajoProtos.QueryState.QUERY_SUCCEEDED, 100); + + assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, qmt.getState()); + + assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, query.getSynchronizedState()); + assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, query.getState()); + + assertFalse(query.getStages().isEmpty()); + for (Stage stage : query.getStages()) { + assertEquals(StageState.SUCCEEDED, stage.getSynchronizedState()); + assertEquals(StageState.SUCCEEDED, stage.getState()); + } + + /* wait for heartbeat from QueryMaster */ + QueryManager queryManager = cluster.getMaster().getContext().getQueryJobManager(); + for (; ; ) { + if (queryManager.getFinishedQuery(queryId) != null) break; + else Thread.sleep(100); + } + + /* get status from TajoMaster */ + assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, client.getQueryStatus(queryId).getState()); + } +}
