http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java b/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java new file mode 100644 index 0000000..a6f9f74 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ha; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.net.NetUtils; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.client.TajoClient; +import org.apache.tajo.client.TajoClientImpl; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.master.TajoMaster; +import org.junit.Test; + +import static junit.framework.TestCase.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +public class TestHAServiceHDFSImpl { + private static Log LOG = LogFactory.getLog(TestHAServiceHDFSImpl.class); + + private TajoTestingCluster cluster; + private TajoMaster backupMaster; + + private TajoConf conf; + private TajoClient client; + + private Path haPath, activePath, backupPath; + + private String masterAddress; + + @Test + public final void testAutoFailOver() throws Exception { + cluster = new TajoTestingCluster(true); + + cluster.startMiniCluster(1); + conf = cluster.getConfiguration(); + client = new TajoClientImpl(conf); + + try { + FileSystem fs = cluster.getDefaultFileSystem(); + + masterAddress = HAServiceUtil.getMasterUmbilicalName(conf).split(":")[0]; + + setConfiguration(); + + backupMaster = new TajoMaster(); + backupMaster.init(conf); + backupMaster.start(); + + assertNotEquals(cluster.getMaster().getMasterName(), backupMaster.getMasterName()); + + verifySystemDirectories(fs); + + Path backupMasterFile = new Path(backupPath, backupMaster.getMasterName() + .replaceAll(":", "_")); + assertTrue(fs.exists(backupMasterFile)); + + assertTrue(cluster.getMaster().isActiveMaster()); + assertFalse(backupMaster.isActiveMaster()); + + createDatabaseAndTable(); + verifyDataBaseAndTable(); + client.close(); + + cluster.getMaster().stop(); + + Thread.sleep(7000); + + assertFalse(cluster.getMaster().isActiveMaster()); + assertTrue(backupMaster.isActiveMaster()); + + client = new TajoClientImpl(conf); + verifyDataBaseAndTable(); + } finally { + client.close(); + backupMaster.stop(); + cluster.shutdownMiniCluster(); + } + } + + private void setConfiguration() { + conf = cluster.getConfiguration(); + + conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, + masterAddress + ":" + NetUtils.getFreeSocketPort()); + conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS, + masterAddress + ":" + NetUtils.getFreeSocketPort()); + conf.setVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, + masterAddress + ":" + NetUtils.getFreeSocketPort()); + conf.setVar(TajoConf.ConfVars.CATALOG_ADDRESS, + masterAddress + ":" + NetUtils.getFreeSocketPort()); + conf.setVar(TajoConf.ConfVars.TAJO_MASTER_INFO_ADDRESS, + masterAddress + ":" + NetUtils.getFreeSocketPort()); + conf.setBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE, true); + + //Client API service RPC Server + conf.setIntVar(TajoConf.ConfVars.MASTER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM, 2); + conf.setIntVar(TajoConf.ConfVars.WORKER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM, 2); + + // Internal RPC Server + conf.setIntVar(TajoConf.ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM, 2); + conf.setIntVar(TajoConf.ConfVars.QUERY_MASTER_RPC_SERVER_WORKER_THREAD_NUM, 2); + conf.setIntVar(TajoConf.ConfVars.WORKER_RPC_SERVER_WORKER_THREAD_NUM, 2); + conf.setIntVar(TajoConf.ConfVars.CATALOG_RPC_SERVER_WORKER_THREAD_NUM, 2); + conf.setIntVar(TajoConf.ConfVars.SHUFFLE_RPC_SERVER_WORKER_THREAD_NUM, 2); + } + + private void verifySystemDirectories(FileSystem fs) throws Exception { + haPath = TajoConf.getSystemHADir(cluster.getConfiguration()); + assertTrue(fs.exists(haPath)); + + activePath = new Path(haPath, TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME); + assertTrue(fs.exists(activePath)); + + backupPath = new Path(haPath, TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME); + assertTrue(fs.exists(backupPath)); + + assertEquals(1, fs.listStatus(activePath).length); + assertEquals(1, fs.listStatus(backupPath).length); + } + + private void createDatabaseAndTable() throws Exception { + client.executeQuery("CREATE TABLE default.table1 (age int);"); + client.executeQuery("CREATE TABLE default.table2 (age int);"); + } + + private void verifyDataBaseAndTable() throws Exception { + client.existDatabase("default"); + client.existTable("default.table1"); + client.existTable("default.table2"); + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java index bdd6dfc..fa7fdf0 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java @@ -40,6 +40,8 @@ import org.apache.tajo.datum.Datum; import org.apache.tajo.engine.parser.SQLAnalyzer; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.master.TajoMaster.MasterContext; +import org.apache.tajo.master.exec.NonForwardQueryResultScanner; +import org.apache.tajo.master.exec.NonForwardQueryResultSystemScanner; import org.apache.tajo.plan.LogicalOptimizer; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.LogicalPlanner; @@ -188,7 +190,7 @@ public class TestNonForwardQueryResultSystemScanner { } NonForwardQueryResultScanner queryResultScanner = - new NonForwardQueryResultSystemScanner(masterContext, logicalPlan, queryId, + new NonForwardQueryResultSystemScanner(masterContext, logicalPlan, queryId, sessionId, maxRow); return queryResultScanner; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java index 66d74c4..438867e 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java @@ -25,9 +25,9 @@ import org.apache.tajo.LocalTajoTestingUtility; import org.apache.tajo.QueryId; import org.apache.tajo.TestTajoIds; import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.master.querymaster.Task; -import org.apache.tajo.master.querymaster.Task.IntermediateEntry; -import org.apache.tajo.master.querymaster.Repartitioner; +import org.apache.tajo.querymaster.Task; +import org.apache.tajo.querymaster.Task.IntermediateEntry; +import org.apache.tajo.querymaster.Repartitioner; import org.apache.tajo.util.Pair; import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.FetchImpl; @@ -38,7 +38,7 @@ import java.net.URI; import java.util.*; import static junit.framework.Assert.assertEquals; -import static org.apache.tajo.master.querymaster.Repartitioner.FetchGroupMeta; +import static org.apache.tajo.querymaster.Repartitioner.FetchGroupMeta; import static org.apache.tajo.plan.serder.PlanProto.ShuffleType; import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.HASH_SHUFFLE; import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.SCATTERED_HASH_SHUFFLE; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java b/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java deleted file mode 100644 index e1806e1..0000000 --- a/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java +++ /dev/null @@ -1,158 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.ha; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.net.NetUtils; -import org.apache.tajo.TajoConstants; -import org.apache.tajo.TajoTestingCluster; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.client.TajoClient; -import org.apache.tajo.client.TajoClientImpl; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ha.HAServiceUtil; -import org.apache.tajo.master.TajoMaster; -import org.junit.Test; - -import java.util.List; - -import static junit.framework.TestCase.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; - -public class TestHAServiceHDFSImpl { - private static Log LOG = LogFactory.getLog(TestHAServiceHDFSImpl.class); - - private TajoTestingCluster cluster; - private TajoMaster backupMaster; - - private TajoConf conf; - private TajoClient client; - - private Path haPath, activePath, backupPath; - - private String masterAddress; - - @Test - public final void testAutoFailOver() throws Exception { - cluster = new TajoTestingCluster(true); - - cluster.startMiniCluster(1); - conf = cluster.getConfiguration(); - client = new TajoClientImpl(conf); - - try { - FileSystem fs = cluster.getDefaultFileSystem(); - - masterAddress = HAServiceUtil.getMasterUmbilicalName(conf).split(":")[0]; - - setConfiguration(); - - backupMaster = new TajoMaster(); - backupMaster.init(conf); - backupMaster.start(); - - assertNotEquals(cluster.getMaster().getMasterName(), backupMaster.getMasterName()); - - verifySystemDirectories(fs); - - Path backupMasterFile = new Path(backupPath, backupMaster.getMasterName() - .replaceAll(":", "_")); - assertTrue(fs.exists(backupMasterFile)); - - assertTrue(cluster.getMaster().isActiveMaster()); - assertFalse(backupMaster.isActiveMaster()); - - createDatabaseAndTable(); - verifyDataBaseAndTable(); - client.close(); - - cluster.getMaster().stop(); - - Thread.sleep(7000); - - assertFalse(cluster.getMaster().isActiveMaster()); - assertTrue(backupMaster.isActiveMaster()); - - client = new TajoClientImpl(conf); - verifyDataBaseAndTable(); - } finally { - client.close(); - backupMaster.stop(); - cluster.shutdownMiniCluster(); - } - } - - private void setConfiguration() { - conf = cluster.getConfiguration(); - - conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, - masterAddress + ":" + NetUtils.getFreeSocketPort()); - conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS, - masterAddress + ":" + NetUtils.getFreeSocketPort()); - conf.setVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, - masterAddress + ":" + NetUtils.getFreeSocketPort()); - conf.setVar(TajoConf.ConfVars.CATALOG_ADDRESS, - masterAddress + ":" + NetUtils.getFreeSocketPort()); - conf.setVar(TajoConf.ConfVars.TAJO_MASTER_INFO_ADDRESS, - masterAddress + ":" + NetUtils.getFreeSocketPort()); - conf.setBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE, true); - - //Client API service RPC Server - conf.setIntVar(TajoConf.ConfVars.MASTER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM, 2); - conf.setIntVar(TajoConf.ConfVars.WORKER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM, 2); - - // Internal RPC Server - conf.setIntVar(TajoConf.ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM, 2); - conf.setIntVar(TajoConf.ConfVars.QUERY_MASTER_RPC_SERVER_WORKER_THREAD_NUM, 2); - conf.setIntVar(TajoConf.ConfVars.WORKER_RPC_SERVER_WORKER_THREAD_NUM, 2); - conf.setIntVar(TajoConf.ConfVars.CATALOG_RPC_SERVER_WORKER_THREAD_NUM, 2); - conf.setIntVar(TajoConf.ConfVars.SHUFFLE_RPC_SERVER_WORKER_THREAD_NUM, 2); - } - - private void verifySystemDirectories(FileSystem fs) throws Exception { - haPath = TajoConf.getSystemHADir(cluster.getConfiguration()); - assertTrue(fs.exists(haPath)); - - activePath = new Path(haPath, TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME); - assertTrue(fs.exists(activePath)); - - backupPath = new Path(haPath, TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME); - assertTrue(fs.exists(backupPath)); - - assertEquals(1, fs.listStatus(activePath).length); - assertEquals(1, fs.listStatus(backupPath).length); - } - - private void createDatabaseAndTable() throws Exception { - client.executeQuery("CREATE TABLE default.table1 (age int);"); - client.executeQuery("CREATE TABLE default.table2 (age int);"); - } - - private void verifyDataBaseAndTable() throws Exception { - client.existDatabase("default"); - client.existTable("default.table1"); - client.existTable("default.table2"); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestIntermediateEntry.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestIntermediateEntry.java b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestIntermediateEntry.java deleted file mode 100644 index 7698987..0000000 --- a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestIntermediateEntry.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.querymaster; - -import org.apache.tajo.util.Pair; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; - -import static org.junit.Assert.assertEquals; - -public class TestIntermediateEntry { - @Test - public void testPage() { - Task.IntermediateEntry interm = new Task.IntermediateEntry(-1, -1, 1, null); - - List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>(); - pages.add(new Pair(0L, 1441275)); - pages.add(new Pair(1441275L, 1447446)); - pages.add(new Pair(2888721L, 1442507)); - - interm.setPages(pages); - - long splitBytes = 3 * 1024 * 1024; - - List<Pair<Long, Long>> splits = interm.split(splitBytes, splitBytes); - assertEquals(2, splits.size()); - - long[][] expected = { {0, 1441275 + 1447446}, {1441275 + 1447446, 1442507} }; - for (int i = 0; i < 2; i++) { - Pair<Long, Long> eachSplit = splits.get(i); - assertEquals(expected[i][0], eachSplit.getFirst().longValue()); - assertEquals(expected[i][1], eachSplit.getSecond().longValue()); - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java deleted file mode 100644 index 8ca4cff..0000000 --- a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java +++ /dev/null @@ -1,125 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.querymaster; - -import org.apache.tajo.*; -import org.apache.tajo.algebra.Expr; -import org.apache.tajo.benchmark.TPCH; -import org.apache.tajo.catalog.CatalogService; -import org.apache.tajo.client.TajoClient; -import org.apache.tajo.client.TajoClientImpl; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.engine.parser.SQLAnalyzer; -import org.apache.tajo.engine.planner.global.GlobalPlanner; -import org.apache.tajo.engine.planner.global.MasterPlan; -import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.master.event.QueryEvent; -import org.apache.tajo.master.event.QueryEventType; -import org.apache.tajo.master.session.Session; -import org.apache.tajo.plan.LogicalOptimizer; -import org.apache.tajo.plan.LogicalPlan; -import org.apache.tajo.plan.LogicalPlanner; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; - -import static org.junit.Assert.*; - -public class TestKillQuery { - private static TajoTestingCluster cluster; - private static TajoConf conf; - private static TajoClient client; - - @BeforeClass - public static void setUp() throws Exception { - cluster = new TajoTestingCluster(); - cluster.startMiniClusterInLocal(1); - conf = cluster.getConfiguration(); - client = new TajoClientImpl(cluster.getConfiguration()); - File file = TPCH.getDataFile("lineitem"); - client.executeQueryAndGetResult("create external table default.lineitem (l_orderkey int, l_partkey int) " - + "using text location 'file://" + file.getAbsolutePath() + "'"); - assertTrue(client.existTable("default.lineitem")); - } - - @AfterClass - public static void tearDown() throws IOException { - if (client != null) client.close(); - if (cluster != null) cluster.shutdownMiniCluster(); - } - - @Test - public final void testKillQueryFromInitState() throws Exception { - SQLAnalyzer analyzer = new SQLAnalyzer(); - QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(conf); - Session session = LocalTajoTestingUtility.createDummySession(); - CatalogService catalog = cluster.getMaster().getCatalog(); - String query = "select l_orderkey, l_partkey from lineitem group by l_orderkey, l_partkey order by l_orderkey"; - - LogicalPlanner planner = new LogicalPlanner(catalog); - LogicalOptimizer optimizer = new LogicalOptimizer(conf); - Expr expr = analyzer.parse(query); - LogicalPlan plan = planner.createPlan(defaultContext, expr); - - optimizer.optimize(plan); - - QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0); - QueryContext queryContext = new QueryContext(conf); - MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan); - GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog); - globalPlanner.build(masterPlan); - - QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster(); - QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(), - queryId, session, defaultContext, expr.toJson(), plan.getRootBlock().getRoot().toJson()); - - queryMasterTask.init(conf); - queryMasterTask.getQueryTaskContext().getDispatcher().start(); - queryMasterTask.startQuery(); - - try{ - cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_RUNNING, 2); - } finally { - assertEquals(TajoProtos.QueryState.QUERY_RUNNING, queryMasterTask.getQuery().getSynchronizedState()); - } - - Stage stage = queryMasterTask.getQuery().getStages().iterator().next(); - assertNotNull(stage); - - try{ - cluster.waitForStageState(stage, StageState.INITED, 2); - } finally { - assertEquals(StageState.INITED, stage.getSynchronizedState()); - } - - // fire kill event - Query q = queryMasterTask.getQuery(); - q.handle(new QueryEvent(queryId, QueryEventType.KILL)); - - try{ - cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50); - } finally { - assertEquals(TajoProtos.QueryState.QUERY_KILLED, queryMasterTask.getQuery().getSynchronizedState()); - } - queryMasterTask.stop(); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryProgress.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryProgress.java b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryProgress.java deleted file mode 100644 index 4a6ca00..0000000 --- a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryProgress.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.querymaster; - -import org.apache.tajo.*; -import org.apache.tajo.client.QueryStatus; -import org.apache.tajo.client.TajoClient; -import org.apache.tajo.client.TajoClientImpl; -import org.apache.tajo.client.TajoClientUtil; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ipc.ClientProtos; - -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import static org.junit.Assert.*; - -@Category(IntegrationTest.class) -public class TestQueryProgress { - private static TajoTestingCluster cluster; - private static TajoConf conf; - private static TajoClient client; - - @BeforeClass - public static void setUp() throws Exception { - cluster = TpchTestBase.getInstance().getTestingCluster(); - conf = cluster.getConfiguration(); - client = new TajoClientImpl(conf); - } - - @AfterClass - public static void tearDown() throws Exception { - client.close(); - } - - @Test(timeout = 10000) - public final void testQueryProgress() throws Exception { - ClientProtos.SubmitQueryResponse res = client.executeQuery("select l_orderkey from lineitem group by l_orderkey"); - QueryId queryId = new QueryId(res.getQueryId()); - - float prevProgress = 0; - while (true) { - QueryStatus status = client.getQueryStatus(queryId); - if (status == null) continue; - - float progress = status.getProgress(); - - if (prevProgress > progress) { - fail("Previous progress: " + prevProgress + ", Current progress : " + progress); - } - prevProgress = progress; - assertTrue(progress <= 1.0f); - - if (TajoClientUtil.isQueryComplete(status.getState())) break; - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestTaskStatusUpdate.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestTaskStatusUpdate.java b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestTaskStatusUpdate.java deleted file mode 100644 index 3a54478..0000000 --- a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestTaskStatusUpdate.java +++ /dev/null @@ -1,194 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.querymaster; - -import org.apache.tajo.IntegrationTest; -import org.apache.tajo.QueryTestCaseBase; -import org.apache.tajo.TajoConstants; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.worker.TajoWorker; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import java.sql.ResultSet; -import java.util.*; - -import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; -import static org.junit.Assert.*; - -@Category(IntegrationTest.class) -public class TestTaskStatusUpdate extends QueryTestCaseBase { - - public TestTaskStatusUpdate() { - super(TajoConstants.DEFAULT_DATABASE_NAME); - } - - @BeforeClass - public static void setUp() throws Exception { - conf.set(TajoConf.ConfVars.$TEST_BROADCAST_JOIN_ENABLED.varname, "false"); - } - - @Test - public final void case1() throws Exception { - // select l_linenumber, count(1) as unique_key from lineitem group by l_linenumber; - ResultSet res = null; - try { - res = executeQuery(); - - // tpch/lineitem.tbl - long[] expectedNumRows = new long[]{5, 2, 2, 2}; - long[] expectedNumBytes = new long[]{604, 18, 18, 8}; - long[] expectedReadBytes = new long[]{604, 604, 18, 0}; - - assertStatus(2, expectedNumRows, expectedNumBytes, expectedReadBytes); - } finally { - cleanupQuery(res); - } - } - - @Test - public final void case2() throws Exception { - // ExternalMergeSort - ResultSet res = null; - try { - res = executeQuery(); - - // tpch/lineitem.tbl - long[] expectedNumRows = new long[]{5, 2, 2, 2, 2, 2}; - long[] expectedNumBytes = new long[]{604, 162, 162, 138, 138, 194}; - long[] expectedReadBytes = new long[]{604, 604, 162, 0, 138, 0}; - - assertStatus(3, expectedNumRows, expectedNumBytes, expectedReadBytes); - } finally { - cleanupQuery(res); - } - } - - - @Test - public final void case3() throws Exception { - // Partition Scan - ResultSet res = null; - try { - createColumnPartitionedTable(); - - /* - |-eb_1404143727281_0002_000005 - |-eb_1404143727281_0002_000004 (order by) - |-eb_1404143727281_0002_000003 (join) - |-eb_1404143727281_0002_000002 (scan) - |-eb_1404143727281_0002_000001 (scan, filter) - */ - res = executeQuery(); - - String actualResult = resultSetToString(res); - System.out.println(actualResult); - - // in/out * stage(4) - long[] expectedNumRows = new long[]{2, 2, 5, 5, 7, 2, 2, 2}; - long[] expectedNumBytes = new long[]{8, 34, 20, 75, 109, 34, 34, 18}; - long[] expectedReadBytes = new long[]{8, 8, 20, 20, 109, 0, 34, 0}; - - assertStatus(4, expectedNumRows, expectedNumBytes, expectedReadBytes); - } finally { - cleanupQuery(res); - } - } - - private void createColumnPartitionedTable() throws Exception { - String tableName = CatalogUtil.normalizeIdentifier("ColumnPartitionedTable"); - ResultSet res = executeString( - "create table " + tableName + " (col1 int4, col2 int4) partition by column(key float8) "); - res.close(); - - assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); - assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size()); - assertEquals(3, - catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size()); - - res = testBase.execute( - "insert overwrite into " + tableName + " select l_orderkey, l_partkey, l_quantity from lineitem"); - - res.close(); - } - - private void assertStatus(int numStages, - long[] expectedNumRows, - long[] expectedNumBytes, - long[] expectedReadBytes) throws Exception { - List<TajoWorker> tajoWorkers = testingCluster.getTajoWorkers(); - Collection<QueryMasterTask> finishedTasks = null; - for (TajoWorker eachWorker: tajoWorkers) { - finishedTasks = eachWorker.getWorkerContext().getQueryMaster().getFinishedQueryMasterTasks(); - if (finishedTasks != null && !finishedTasks.isEmpty()) { - break; - } - } - - assertNotNull(finishedTasks); - assertTrue(!finishedTasks.isEmpty()); - - List<QueryMasterTask> finishedTaskList = new ArrayList<QueryMasterTask>(finishedTasks); - - Collections.sort(finishedTaskList, new Comparator<QueryMasterTask>() { - @Override - public int compare(QueryMasterTask o1, QueryMasterTask o2) { - return o2.getQueryId().compareTo(o1.getQueryId()); - } - }); - - Query query = finishedTaskList.get(0).getQuery(); - - assertNotNull(query); - - List<Stage> stages = new ArrayList<Stage>(query.getStages()); - assertEquals(numStages, stages.size()); - - Collections.sort(stages, new Comparator<Stage>() { - @Override - public int compare(Stage o1, Stage o2) { - return o1.getId().compareTo(o2.getId()); - } - }); - - int index = 0; - for (Stage eachStage : stages) { - TableStats inputStats = eachStage.getInputStats(); - TableStats resultStats = eachStage.getResultStats(); - - assertNotNull(inputStats); - assertEquals(expectedNumRows[index], inputStats.getNumRows().longValue()); - assertEquals(expectedNumBytes[index], inputStats.getNumBytes().longValue()); - assertEquals(expectedReadBytes[index], inputStats.getReadBytes().longValue()); - - index++; - - assertNotNull(resultStats); - assertEquals(expectedNumRows[index], resultStats.getNumRows().longValue()); - assertEquals(expectedNumBytes[index], resultStats.getNumBytes().longValue()); - assertEquals(expectedReadBytes[index], resultStats.getReadBytes().longValue()); - - index++; - } - - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestFifoScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestFifoScheduler.java b/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestFifoScheduler.java new file mode 100644 index 0000000..e0c30a8 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestFifoScheduler.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.scheduler; + +import org.apache.tajo.QueryId; +import org.apache.tajo.TajoProtos; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.benchmark.TPCH; +import org.apache.tajo.client.TajoClient; +import org.apache.tajo.client.TajoClientImpl; +import org.apache.tajo.client.TajoClientUtil; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ipc.ClientProtos; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.sql.ResultSet; + +import static org.junit.Assert.*; + +public class TestFifoScheduler { + private static TajoTestingCluster cluster; + private static TajoConf conf; + private static TajoClient client; + private static String query = + "select l_orderkey, l_partkey from lineitem group by l_orderkey, l_partkey order by l_orderkey"; + + @BeforeClass + public static void setUp() throws Exception { + cluster = new TajoTestingCluster(); + cluster.startMiniClusterInLocal(1); + conf = cluster.getConfiguration(); + client = new TajoClientImpl(cluster.getConfiguration()); + File file = TPCH.getDataFile("lineitem"); + client.executeQueryAndGetResult("create external table default.lineitem (l_orderkey int, l_partkey int) " + + "using text location 'file://" + file.getAbsolutePath() + "'"); + assertTrue(client.existTable("default.lineitem")); + } + + @AfterClass + public static void tearDown() throws Exception { + if (client != null) client.close(); + if (cluster != null) cluster.shutdownMiniCluster(); + } + + @Test + public final void testKillScheduledQuery() throws Exception { + ClientProtos.SubmitQueryResponse res = client.executeQuery(query); + ClientProtos.SubmitQueryResponse res2 = client.executeQuery(query); + QueryId queryId = new QueryId(res.getQueryId()); + QueryId queryId2 = new QueryId(res2.getQueryId()); + + cluster.waitForQueryRunning(queryId); + client.killQuery(queryId2); + assertEquals(TajoProtos.QueryState.QUERY_KILLED, client.getQueryStatus(queryId2).getState()); + } + + @Test + public final void testForwardedQuery() throws Exception { + ClientProtos.SubmitQueryResponse res = client.executeQuery(query); + ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select * from lineitem limit 1"); + assertTrue(res.getIsForwarded()); + assertFalse(res2.getIsForwarded()); + + QueryId queryId = new QueryId(res.getQueryId()); + QueryId queryId2 = new QueryId(res2.getQueryId()); + cluster.waitForQueryRunning(queryId); + + assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, client.getQueryStatus(queryId2).getState()); + ResultSet resSet = TajoClientUtil.createResultSet(conf, client, res2); + assertNotNull(resSet); + } + + @Test + public final void testScheduledQuery() throws Exception { + ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(1) from lineitem"); + ClientProtos.SubmitQueryResponse res2 = client.executeQuery(query); + ClientProtos.SubmitQueryResponse res3 = client.executeQuery(query); + ClientProtos.SubmitQueryResponse res4 = client.executeQuery(query); + + QueryId queryId = new QueryId(res.getQueryId()); + QueryId queryId2 = new QueryId(res2.getQueryId()); + QueryId queryId3 = new QueryId(res3.getQueryId()); + QueryId queryId4 = new QueryId(res4.getQueryId()); + + cluster.waitForQueryRunning(queryId); + + assertTrue(TajoClientUtil.isQueryRunning(client.getQueryStatus(queryId).getState())); + + assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId2).getState()); + assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId3).getState()); + assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId4).getState()); + + client.killQuery(queryId4); + client.killQuery(queryId3); + client.killQuery(queryId2); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java new file mode 100644 index 0000000..237fb32 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.querymaster; + +import org.apache.tajo.util.Pair; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class TestIntermediateEntry { + @Test + public void testPage() { + Task.IntermediateEntry interm = new Task.IntermediateEntry(-1, -1, 1, null); + + List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>(); + pages.add(new Pair(0L, 1441275)); + pages.add(new Pair(1441275L, 1447446)); + pages.add(new Pair(2888721L, 1442507)); + + interm.setPages(pages); + + long splitBytes = 3 * 1024 * 1024; + + List<Pair<Long, Long>> splits = interm.split(splitBytes, splitBytes); + assertEquals(2, splits.size()); + + long[][] expected = { {0, 1441275 + 1447446}, {1441275 + 1447446, 1442507} }; + for (int i = 0; i < 2; i++) { + Pair<Long, Long> eachSplit = splits.get(i); + assertEquals(expected[i][0], eachSplit.getFirst().longValue()); + assertEquals(expected[i][1], eachSplit.getSecond().longValue()); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java new file mode 100644 index 0000000..a125196 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.querymaster; + +import org.apache.tajo.*; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.benchmark.TPCH; +import org.apache.tajo.catalog.CatalogService; +import org.apache.tajo.client.TajoClient; +import org.apache.tajo.client.TajoClientImpl; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.planner.global.GlobalPlanner; +import org.apache.tajo.engine.planner.global.MasterPlan; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.master.event.QueryEvent; +import org.apache.tajo.master.event.QueryEventType; +import org.apache.tajo.session.Session; +import org.apache.tajo.plan.LogicalOptimizer; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.LogicalPlanner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; + +import static org.junit.Assert.*; + +public class TestKillQuery { + private static TajoTestingCluster cluster; + private static TajoConf conf; + private static TajoClient client; + + @BeforeClass + public static void setUp() throws Exception { + cluster = new TajoTestingCluster(); + cluster.startMiniClusterInLocal(1); + conf = cluster.getConfiguration(); + client = new TajoClientImpl(cluster.getConfiguration()); + File file = TPCH.getDataFile("lineitem"); + client.executeQueryAndGetResult("create external table default.lineitem (l_orderkey int, l_partkey int) " + + "using text location 'file://" + file.getAbsolutePath() + "'"); + assertTrue(client.existTable("default.lineitem")); + } + + @AfterClass + public static void tearDown() throws IOException { + if (client != null) client.close(); + if (cluster != null) cluster.shutdownMiniCluster(); + } + + @Test + public final void testKillQueryFromInitState() throws Exception { + SQLAnalyzer analyzer = new SQLAnalyzer(); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(conf); + Session session = LocalTajoTestingUtility.createDummySession(); + CatalogService catalog = cluster.getMaster().getCatalog(); + String query = "select l_orderkey, l_partkey from lineitem group by l_orderkey, l_partkey order by l_orderkey"; + + LogicalPlanner planner = new LogicalPlanner(catalog); + LogicalOptimizer optimizer = new LogicalOptimizer(conf); + Expr expr = analyzer.parse(query); + LogicalPlan plan = planner.createPlan(defaultContext, expr); + + optimizer.optimize(plan); + + QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0); + QueryContext queryContext = new QueryContext(conf); + MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan); + GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog); + globalPlanner.build(masterPlan); + + QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster(); + QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(), + queryId, session, defaultContext, expr.toJson(), plan.getRootBlock().getRoot().toJson()); + + queryMasterTask.init(conf); + queryMasterTask.getQueryTaskContext().getDispatcher().start(); + queryMasterTask.startQuery(); + + try{ + cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_RUNNING, 2); + } finally { + assertEquals(TajoProtos.QueryState.QUERY_RUNNING, queryMasterTask.getQuery().getSynchronizedState()); + } + + Stage stage = queryMasterTask.getQuery().getStages().iterator().next(); + assertNotNull(stage); + + try{ + cluster.waitForStageState(stage, StageState.INITED, 2); + } finally { + assertEquals(StageState.INITED, stage.getSynchronizedState()); + } + + // fire kill event + Query q = queryMasterTask.getQuery(); + q.handle(new QueryEvent(queryId, QueryEventType.KILL)); + + try{ + cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50); + } finally { + assertEquals(TajoProtos.QueryState.QUERY_KILLED, queryMasterTask.getQuery().getSynchronizedState()); + } + queryMasterTask.stop(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/querymaster/TestQueryProgress.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestQueryProgress.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestQueryProgress.java new file mode 100644 index 0000000..7c61670 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestQueryProgress.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.querymaster; + +import org.apache.tajo.*; +import org.apache.tajo.client.QueryStatus; +import org.apache.tajo.client.TajoClient; +import org.apache.tajo.client.TajoClientImpl; +import org.apache.tajo.client.TajoClientUtil; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ipc.ClientProtos; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.*; + +@Category(IntegrationTest.class) +public class TestQueryProgress { + private static TajoTestingCluster cluster; + private static TajoConf conf; + private static TajoClient client; + + @BeforeClass + public static void setUp() throws Exception { + cluster = TpchTestBase.getInstance().getTestingCluster(); + conf = cluster.getConfiguration(); + client = new TajoClientImpl(conf); + } + + @AfterClass + public static void tearDown() throws Exception { + client.close(); + } + + @Test(timeout = 10000) + public final void testQueryProgress() throws Exception { + ClientProtos.SubmitQueryResponse res = client.executeQuery("select l_orderkey from lineitem group by l_orderkey"); + QueryId queryId = new QueryId(res.getQueryId()); + + float prevProgress = 0; + while (true) { + QueryStatus status = client.getQueryStatus(queryId); + if (status == null) continue; + + float progress = status.getProgress(); + + if (prevProgress > progress) { + fail("Previous progress: " + prevProgress + ", Current progress : " + progress); + } + prevProgress = progress; + assertTrue(progress <= 1.0f); + + if (TajoClientUtil.isQueryComplete(status.getState())) break; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java new file mode 100644 index 0000000..ab5375c --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.querymaster; + +import org.apache.tajo.IntegrationTest; +import org.apache.tajo.QueryTestCaseBase; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.worker.TajoWorker; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.sql.ResultSet; +import java.util.*; + +import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; +import static org.junit.Assert.*; + +@Category(IntegrationTest.class) +public class TestTaskStatusUpdate extends QueryTestCaseBase { + + public TestTaskStatusUpdate() { + super(TajoConstants.DEFAULT_DATABASE_NAME); + } + + @BeforeClass + public static void setUp() throws Exception { + conf.set(TajoConf.ConfVars.$TEST_BROADCAST_JOIN_ENABLED.varname, "false"); + } + + @Test + public final void case1() throws Exception { + // select l_linenumber, count(1) as unique_key from lineitem group by l_linenumber; + ResultSet res = null; + try { + res = executeQuery(); + + // tpch/lineitem.tbl + long[] expectedNumRows = new long[]{5, 2, 2, 2}; + long[] expectedNumBytes = new long[]{604, 18, 18, 8}; + long[] expectedReadBytes = new long[]{604, 604, 18, 0}; + + assertStatus(2, expectedNumRows, expectedNumBytes, expectedReadBytes); + } finally { + cleanupQuery(res); + } + } + + @Test + public final void case2() throws Exception { + // ExternalMergeSort + ResultSet res = null; + try { + res = executeQuery(); + + // tpch/lineitem.tbl + long[] expectedNumRows = new long[]{5, 2, 2, 2, 2, 2}; + long[] expectedNumBytes = new long[]{604, 162, 162, 138, 138, 194}; + long[] expectedReadBytes = new long[]{604, 604, 162, 0, 138, 0}; + + assertStatus(3, expectedNumRows, expectedNumBytes, expectedReadBytes); + } finally { + cleanupQuery(res); + } + } + + + @Test + public final void case3() throws Exception { + // Partition Scan + ResultSet res = null; + try { + createColumnPartitionedTable(); + + /* + |-eb_1404143727281_0002_000005 + |-eb_1404143727281_0002_000004 (order by) + |-eb_1404143727281_0002_000003 (join) + |-eb_1404143727281_0002_000002 (scan) + |-eb_1404143727281_0002_000001 (scan, filter) + */ + res = executeQuery(); + + String actualResult = resultSetToString(res); + System.out.println(actualResult); + + // in/out * stage(4) + long[] expectedNumRows = new long[]{2, 2, 5, 5, 7, 2, 2, 2}; + long[] expectedNumBytes = new long[]{8, 34, 20, 75, 109, 34, 34, 18}; + long[] expectedReadBytes = new long[]{8, 8, 20, 20, 109, 0, 34, 0}; + + assertStatus(4, expectedNumRows, expectedNumBytes, expectedReadBytes); + } finally { + cleanupQuery(res); + } + } + + private void createColumnPartitionedTable() throws Exception { + String tableName = CatalogUtil.normalizeIdentifier("ColumnPartitionedTable"); + ResultSet res = executeString( + "create table " + tableName + " (col1 int4, col2 int4) partition by column(key float8) "); + res.close(); + + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size()); + assertEquals(3, + catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size()); + + res = testBase.execute( + "insert overwrite into " + tableName + " select l_orderkey, l_partkey, l_quantity from lineitem"); + + res.close(); + } + + private void assertStatus(int numStages, + long[] expectedNumRows, + long[] expectedNumBytes, + long[] expectedReadBytes) throws Exception { + List<TajoWorker> tajoWorkers = testingCluster.getTajoWorkers(); + Collection<QueryMasterTask> finishedTasks = null; + for (TajoWorker eachWorker: tajoWorkers) { + finishedTasks = eachWorker.getWorkerContext().getQueryMaster().getFinishedQueryMasterTasks(); + if (finishedTasks != null && !finishedTasks.isEmpty()) { + break; + } + } + + assertNotNull(finishedTasks); + assertTrue(!finishedTasks.isEmpty()); + + List<QueryMasterTask> finishedTaskList = new ArrayList<QueryMasterTask>(finishedTasks); + + Collections.sort(finishedTaskList, new Comparator<QueryMasterTask>() { + @Override + public int compare(QueryMasterTask o1, QueryMasterTask o2) { + return o2.getQueryId().compareTo(o1.getQueryId()); + } + }); + + Query query = finishedTaskList.get(0).getQuery(); + + assertNotNull(query); + + List<Stage> stages = new ArrayList<Stage>(query.getStages()); + assertEquals(numStages, stages.size()); + + Collections.sort(stages, new Comparator<Stage>() { + @Override + public int compare(Stage o1, Stage o2) { + return o1.getId().compareTo(o2.getId()); + } + }); + + int index = 0; + for (Stage eachStage : stages) { + TableStats inputStats = eachStage.getInputStats(); + TableStats resultStats = eachStage.getResultStats(); + + assertNotNull(inputStats); + assertEquals(expectedNumRows[index], inputStats.getNumRows().longValue()); + assertEquals(expectedNumBytes[index], inputStats.getNumBytes().longValue()); + assertEquals(expectedReadBytes[index], inputStats.getReadBytes().longValue()); + + index++; + + assertNotNull(resultStats); + assertEquals(expectedNumRows[index], resultStats.getNumRows().longValue()); + assertEquals(expectedNumBytes[index], resultStats.getNumBytes().longValue()); + assertEquals(expectedReadBytes[index], resultStats.getReadBytes().longValue()); + + index++; + } + + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java b/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java deleted file mode 100644 index acd6b71..0000000 --- a/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.scheduler; - -import org.apache.tajo.QueryId; -import org.apache.tajo.TajoProtos; -import org.apache.tajo.TajoTestingCluster; -import org.apache.tajo.benchmark.TPCH; -import org.apache.tajo.client.TajoClient; -import org.apache.tajo.client.TajoClientImpl; -import org.apache.tajo.client.TajoClientUtil; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ipc.ClientProtos; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.File; -import java.sql.ResultSet; - -import static org.junit.Assert.*; - -public class TestFifoScheduler { - private static TajoTestingCluster cluster; - private static TajoConf conf; - private static TajoClient client; - private static String query = - "select l_orderkey, l_partkey from lineitem group by l_orderkey, l_partkey order by l_orderkey"; - - @BeforeClass - public static void setUp() throws Exception { - cluster = new TajoTestingCluster(); - cluster.startMiniClusterInLocal(1); - conf = cluster.getConfiguration(); - client = new TajoClientImpl(cluster.getConfiguration()); - File file = TPCH.getDataFile("lineitem"); - client.executeQueryAndGetResult("create external table default.lineitem (l_orderkey int, l_partkey int) " - + "using text location 'file://" + file.getAbsolutePath() + "'"); - assertTrue(client.existTable("default.lineitem")); - } - - @AfterClass - public static void tearDown() throws Exception { - if (client != null) client.close(); - if (cluster != null) cluster.shutdownMiniCluster(); - } - - @Test - public final void testKillScheduledQuery() throws Exception { - ClientProtos.SubmitQueryResponse res = client.executeQuery(query); - ClientProtos.SubmitQueryResponse res2 = client.executeQuery(query); - QueryId queryId = new QueryId(res.getQueryId()); - QueryId queryId2 = new QueryId(res2.getQueryId()); - - cluster.waitForQueryRunning(queryId); - client.killQuery(queryId2); - assertEquals(TajoProtos.QueryState.QUERY_KILLED, client.getQueryStatus(queryId2).getState()); - } - - @Test - public final void testForwardedQuery() throws Exception { - ClientProtos.SubmitQueryResponse res = client.executeQuery(query); - ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select * from lineitem limit 1"); - assertTrue(res.getIsForwarded()); - assertFalse(res2.getIsForwarded()); - - QueryId queryId = new QueryId(res.getQueryId()); - QueryId queryId2 = new QueryId(res2.getQueryId()); - cluster.waitForQueryRunning(queryId); - - assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, client.getQueryStatus(queryId2).getState()); - ResultSet resSet = TajoClientUtil.createResultSet(conf, client, res2); - assertNotNull(resSet); - } - - @Test - public final void testScheduledQuery() throws Exception { - ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(1) from lineitem"); - ClientProtos.SubmitQueryResponse res2 = client.executeQuery(query); - ClientProtos.SubmitQueryResponse res3 = client.executeQuery(query); - ClientProtos.SubmitQueryResponse res4 = client.executeQuery(query); - - QueryId queryId = new QueryId(res.getQueryId()); - QueryId queryId2 = new QueryId(res2.getQueryId()); - QueryId queryId3 = new QueryId(res3.getQueryId()); - QueryId queryId4 = new QueryId(res4.getQueryId()); - - cluster.waitForQueryRunning(queryId); - - assertTrue(TajoClientUtil.isQueryRunning(client.getQueryStatus(queryId).getState())); - - assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId2).getState()); - assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId3).getState()); - assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId4).getState()); - - client.killQuery(queryId4); - client.killQuery(queryId3); - client.killQuery(queryId2); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/util/TestJSPUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/util/TestJSPUtil.java b/tajo-core/src/test/java/org/apache/tajo/util/TestJSPUtil.java index 2e00138..f3b6936 100644 --- a/tajo-core/src/test/java/org/apache/tajo/util/TestJSPUtil.java +++ b/tajo-core/src/test/java/org/apache/tajo/util/TestJSPUtil.java @@ -23,7 +23,7 @@ import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.TaskId; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent; -import org.apache.tajo.master.querymaster.Task; +import org.apache.tajo.querymaster.Task; import org.junit.Test; import java.util.ArrayList; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java b/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java index 632e9c2..45282aa 100644 --- a/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java +++ b/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java @@ -28,7 +28,7 @@ import org.apache.tajo.TajoProtos.TaskAttemptState; import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.master.querymaster.QueryInfo; +import org.apache.tajo.master.QueryInfo; import org.apache.tajo.util.TajoIdUtils; import org.junit.After; import org.junit.Before; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java index 77aa1d4..9a51422 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java @@ -28,7 +28,7 @@ import org.apache.tajo.client.TajoClient; import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.master.TajoMaster; -import org.apache.tajo.master.querymaster.QueryInfo; +import org.apache.tajo.master.QueryInfo; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-dist/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-dist/pom.xml b/tajo-dist/pom.xml index e6b4479..d469ba9 100644 --- a/tajo-dist/pom.xml +++ b/tajo-dist/pom.xml @@ -223,10 +223,10 @@ <mainClass>org.apache.hadoop.yarn.state.VisualizeStateMachine</mainClass> <arguments> <argument>Tajo</argument> - <argument>org.apache.tajo.master.querymaster.Query, - org.apache.tajo.master.querymaster.Stage, - org.apache.tajo.master.querymaster.Task, - org.apache.tajo.master.querymaster.TaskAttempt + <argument>org.apache.tajo.querymaster.Query, + org.apache.tajo.querymaster.Stage, + org.apache.tajo.querymaster.Task, + org.apache.tajo.querymaster.TaskAttempt </argument> <argument>Tajo.gv</argument> </arguments>
