http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java new file mode 100644 index 0000000..b139645 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java @@ -0,0 +1,135 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master; + +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.benchmark.TPCH; +import org.apache.tajo.catalog.CatalogService; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.planner.global.ExecutionBlock; +import org.apache.tajo.engine.planner.global.ExecutionBlockCursor; +import org.apache.tajo.engine.planner.global.GlobalPlanner; +import org.apache.tajo.engine.planner.global.MasterPlan; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.plan.LogicalOptimizer; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.storage.TablespaceManager; +import org.apache.tajo.util.CommonTestingUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; +import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; +import static org.junit.Assert.assertEquals; + +public class TestExecutionBlockCursor { + private static TajoTestingCluster util; + private static TajoConf conf; + private static CatalogService catalog; + private static GlobalPlanner planner; + private static SQLAnalyzer analyzer; + private static LogicalPlanner logicalPlanner; + private static LogicalOptimizer optimizer; + private static AsyncDispatcher dispatcher; + + @BeforeClass + public static void setUp() throws Exception { + util = new TajoTestingCluster(); + util.startCatalogCluster(); + + conf = util.getConfiguration(); + conf.set(TajoConf.ConfVars.$TEST_BROADCAST_JOIN_ENABLED.varname, "false"); + + catalog = util.getMiniCatalogCluster().getCatalog(); + catalog.createTablespace(DEFAULT_TABLESPACE_NAME, "hdfs://localhost:!234/warehouse"); + catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); + TPCH tpch = new TPCH(); + tpch.loadSchemas(); + tpch.loadOutSchema(); + for (String table : tpch.getTableNames()) { + TableMeta m = CatalogUtil.newTableMeta("TEXT"); + TableDesc d = CatalogUtil.newTableDesc( + CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, table), tpch.getSchema(table), m, CommonTestingUtil.getTestDir()); + TableStats stats = new TableStats(); + stats.setNumBytes(TPCH.tableVolumes.get(table)); + d.setStats(stats); + catalog.createTable(d); + } + + analyzer = new SQLAnalyzer(); + logicalPlanner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); + optimizer = new LogicalOptimizer(conf, catalog); + + dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + planner = new GlobalPlanner(conf, catalog); + } + + @AfterClass + public static void tearDown() { + util.shutdownCatalogCluster(); + if (dispatcher != null) { + dispatcher.stop(); + } + } + + @Test + public void testNextBlock() throws Exception { + Expr context = analyzer.parse( + "select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment, ps_supplycost, " + + "r_name, p_type, p_size " + + "from region join nation on n_regionkey = r_regionkey and r_name = 'AMERICA' " + + "join supplier on s_nationkey = n_nationkey " + + "join partsupp on s_suppkey = ps_suppkey " + + "join part on p_partkey = ps_partkey and p_type like '%BRASS' and p_size = 15"); + LogicalPlan logicalPlan = logicalPlanner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), context); + optimizer.optimize(logicalPlan); + QueryContext queryContext = new QueryContext(conf); + MasterPlan plan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), queryContext, logicalPlan); + planner.build(queryContext, plan); + + ExecutionBlockCursor cursor = new ExecutionBlockCursor(plan); + + int count = 0; + for (ExecutionBlock eb : cursor) { + count++; + } + + /* + |-eb_0000000000000_0001_000010 + |-eb_0000000000000_0001_000009 + |-eb_0000000000000_0001_000008 + |-eb_0000000000000_0001_000007 + |-eb_0000000000000_0001_000006 + |-eb_0000000000000_0001_000005 + |-eb_0000000000000_0001_000004 + |-eb_0000000000000_0001_000003 + |-eb_0000000000000_0001_000002 + |-eb_0000000000000_0001_000001 + */ + assertEquals(10, count); + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java new file mode 100644 index 0000000..207f64d --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master; + +import org.apache.tajo.QueryTestCaseBase; +import org.junit.Test; + +public class TestNonForwardQueryResultSystemScanner extends QueryTestCaseBase { + @Test + public void testGetNextRowsForAggregateFunction() throws Exception { + assertQueryStr("SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES " + + "WHERE TABLE_NAME = 'lineitem' OR TABLE_NAME = 'nation' OR TABLE_NAME = 'customer'"); + } + + @Test + public void testGetNextRowsForTable() throws Exception { + assertQueryStr("SELECT TABLE_NAME, TABLE_TYPE FROM INFORMATION_SCHEMA.TABLES " + + "WHERE TABLE_NAME = 'lineitem' OR TABLE_NAME = 'nation' OR TABLE_NAME = 'customer'"); + } + + @Test + public void testGetClusterDetails() throws Exception { + assertQueryStr("SELECT TYPE FROM INFORMATION_SCHEMA.CLUSTER"); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java new file mode 100644 index 0000000..36942a4 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java @@ -0,0 +1,511 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import io.netty.handler.codec.http.QueryStringDecoder; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.QueryId; +import org.apache.tajo.ResourceProtos.FetchProto; +import org.apache.tajo.TestTajoIds; +import org.apache.tajo.querymaster.Repartitioner; +import org.apache.tajo.querymaster.Task; +import org.apache.tajo.querymaster.Task.IntermediateEntry; +import org.apache.tajo.util.Pair; +import org.apache.tajo.util.TUtil; +import org.apache.tajo.worker.FetchImpl; +import org.junit.Test; + +import java.net.URI; +import java.util.*; + +import static junit.framework.Assert.assertEquals; +import static org.apache.tajo.plan.serder.PlanProto.ShuffleType; +import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.HASH_SHUFFLE; +import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.SCATTERED_HASH_SHUFFLE; +import static org.apache.tajo.querymaster.Repartitioner.FetchGroupMeta; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +public class TestRepartitioner { + @Test + public void testCreateHashFetchURL() throws Exception { + QueryId q1 = TestTajoIds.createQueryId(1315890136000l, 2); + String hostName = "tajo1"; + int port = 1234; + ExecutionBlockId sid = new ExecutionBlockId(q1, 2); + int numPartition = 10; + + Map<Integer, List<IntermediateEntry>> intermediateEntries = new HashMap<Integer, List<IntermediateEntry>>(); + for (int i = 0; i < numPartition; i++) { + intermediateEntries.put(i, new ArrayList<IntermediateEntry>()); + } + for (int i = 0; i < 1000; i++) { + int partitionId = i % numPartition; + IntermediateEntry entry = new IntermediateEntry(i, 0, partitionId, new Task.PullHost(hostName, port)); + entry.setEbId(sid); + entry.setVolume(10); + intermediateEntries.get(partitionId).add(entry); + } + + Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> hashEntries = + new HashMap<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>>(); + + for (Map.Entry<Integer, List<IntermediateEntry>> eachEntry: intermediateEntries.entrySet()) { + FetchImpl fetch = new FetchImpl(new Task.PullHost(hostName, port), ShuffleType.HASH_SHUFFLE, + sid, eachEntry.getKey(), eachEntry.getValue()); + + fetch.setName(sid.toString()); + + FetchProto proto = fetch.getProto(); + fetch = new FetchImpl(proto); + assertEquals(proto, fetch.getProto()); + + Map<ExecutionBlockId, List<IntermediateEntry>> ebEntries = new HashMap<ExecutionBlockId, List<IntermediateEntry>>(); + ebEntries.put(sid, eachEntry.getValue()); + + hashEntries.put(eachEntry.getKey(), ebEntries); + + List<URI> uris = fetch.getURIs(); + assertEquals(1, uris.size()); //In Hash Suffle, Fetcher return only one URI per partition. + + URI uri = uris.get(0); + final Map<String, List<String>> params = + new QueryStringDecoder(uri).parameters(); + + assertEquals(eachEntry.getKey().toString(), params.get("p").get(0)); + assertEquals("h", params.get("type").get(0)); + assertEquals("" + sid.getId(), params.get("sid").get(0)); + } + + Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> mergedHashEntries = + Repartitioner.mergeIntermediateByPullHost(hashEntries); + + assertEquals(numPartition, mergedHashEntries.size()); + for (int i = 0; i < numPartition; i++) { + Map<ExecutionBlockId, List<IntermediateEntry>> eachEntry = mergedHashEntries.get(0); + assertEquals(1, eachEntry.size()); + List<IntermediateEntry> interEntry = eachEntry.get(sid); + assertEquals(1, interEntry.size()); + + assertEquals(1000, interEntry.get(0).getVolume()); + } + } + + @Test + public void testScheduleFetchesByEvenDistributedVolumes() { + Map<Integer, FetchGroupMeta> fetchGroups = Maps.newHashMap(); + String tableName = "test1"; + + + ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0); + FetchImpl [] fetches = new FetchImpl[12]; + for (int i = 0; i < 12; i++) { + fetches[i] = new FetchImpl(new Task.PullHost("localhost", 10000 + i), HASH_SHUFFLE, ebId, i / 2); + } + + int [] VOLUMES = {100, 80, 70, 30, 10, 5}; + + for (int i = 0; i < 12; i += 2) { + fetchGroups.put(i, new FetchGroupMeta(VOLUMES[i / 2], fetches[i]).addFetche(fetches[i + 1])); + } + + Pair<Long [], Map<String, List<FetchImpl>>[]> results; + + results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 1); + long expected [] = {100 + 80 + 70 + 30 + 10 + 5}; + assertFetchVolumes(expected, results.getFirst()); + assertFetchImpl(fetches, results.getSecond()); + + results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 2); + long expected0 [] = {140, 155}; + assertFetchVolumes(expected0, results.getFirst()); + assertFetchImpl(fetches, results.getSecond()); + + results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 3); + long expected1 [] = {100, 95, 100}; + assertFetchVolumes(expected1, results.getFirst()); + assertFetchImpl(fetches, results.getSecond()); + + results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 4); + long expected2 [] = {100, 80, 70, 45}; + assertFetchVolumes(expected2, results.getFirst()); + assertFetchImpl(fetches, results.getSecond()); + + results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 5); + long expected3 [] = {100, 80, 70, 30, 15}; + assertFetchVolumes(expected3, results.getFirst()); + assertFetchImpl(fetches, results.getSecond()); + + results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 6); + long expected4 [] = {100, 80, 70, 30, 10, 5}; + assertFetchVolumes(expected4, results.getFirst()); + assertFetchImpl(fetches, results.getSecond()); + } + + private static void assertFetchVolumes(long [] expected, Long [] results) { + assertEquals("the lengths of volumes are mismatch", expected.length, results.length); + + for (int i = 0; i < expected.length; i++) { + assertTrue(expected[i] + " is expected, but " + results[i], expected[i] == results[i]); + } + } + + @Test + public void testMergeIntermediates() { + //Test Merge + List<IntermediateEntry> intermediateEntries = new ArrayList<IntermediateEntry>(); + + int[] pageLengths = {10 * 1024 * 1024, 10 * 1024 * 1024, 10 * 1024 * 1024, 5 * 1024 * 1024}; //35 MB + long expectedTotalLength = 0; + for (int i = 0; i < 20; i++) { + List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>(); + long offset = 0; + for (int j = 0; j < pageLengths.length; j++) { + pages.add(new Pair(offset, pageLengths[j])); + offset += pageLengths[j]; + expectedTotalLength += pageLengths[j]; + } + IntermediateEntry interm = new IntermediateEntry(i, -1, -1, new Task.PullHost("" + i, i)); + interm.setPages(pages); + interm.setVolume(offset); + intermediateEntries.add(interm); + } + + long splitVolume = 128 * 1024 * 1024; + List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries, + splitVolume, 10 * 1024 * 1024); + assertEquals(6, fetches.size()); + + int totalInterms = 0; + int index = 0; + int numZeroPosFetcher = 0; + long totalLength = 0; + for (List<FetchImpl> eachFetchList: fetches) { + totalInterms += eachFetchList.size(); + long eachFetchVolume = 0; + for (FetchImpl eachFetch: eachFetchList) { + eachFetchVolume += eachFetch.getLength(); + if (eachFetch.getOffset() == 0) { + numZeroPosFetcher++; + } + totalLength += eachFetch.getLength(); + } + assertTrue(eachFetchVolume + " should be smaller than splitVolume", eachFetchVolume < splitVolume); + if (index < fetches.size() - 1) { + assertTrue(eachFetchVolume + " should be great than 100MB", eachFetchVolume >= 100 * 1024 * 1024); + } + index++; + } + assertEquals(23, totalInterms); + assertEquals(20, numZeroPosFetcher); + assertEquals(expectedTotalLength, totalLength); + } + + @Test + public void testSplitIntermediates() { + List<IntermediateEntry> intermediateEntries = new ArrayList<IntermediateEntry>(); + + int[] pageLengths = new int[20]; //195MB + for (int i = 0 ; i < pageLengths.length; i++) { + if (i < pageLengths.length - 1) { + pageLengths[i] = 10 * 1024 * 1024; + } else { + pageLengths[i] = 5 * 1024 * 1024; + } + } + + long expectedTotalLength = 0; + for (int i = 0; i < 20; i++) { + List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>(); + long offset = 0; + for (int j = 0; j < pageLengths.length; j++) { + pages.add(new Pair(offset, pageLengths[j])); + offset += pageLengths[j]; + expectedTotalLength += pageLengths[j]; + } + IntermediateEntry interm = new IntermediateEntry(i, -1, 0, new Task.PullHost("" + i, i)); + interm.setPages(pages); + interm.setVolume(offset); + intermediateEntries.add(interm); + } + + long splitVolume = 128 * 1024 * 1024; + List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries, + splitVolume, 10 * 1024 * 1024); + assertEquals(32, fetches.size()); + + int index = 0; + int numZeroPosFetcher = 0; + long totalLength = 0; + Set<String> uniqPullHost = new HashSet<String>(); + + for (List<FetchImpl> eachFetchList: fetches) { + long length = 0; + for (FetchImpl eachFetch: eachFetchList) { + if (eachFetch.getOffset() == 0) { + numZeroPosFetcher++; + } + totalLength += eachFetch.getLength(); + length += eachFetch.getLength(); + uniqPullHost.add(eachFetch.getPullHost().toString()); + } + assertTrue(length + " should be smaller than splitVolume", length < splitVolume); + if (index < fetches.size() - 1) { + assertTrue(length + " should be great than 100MB" + fetches.size() + "," + index, length >= 100 * 1024 * 1024); + } + index++; + } + assertEquals(20, numZeroPosFetcher); + assertEquals(20, uniqPullHost.size()); + assertEquals(expectedTotalLength, totalLength); + } + + @Test + public void testSplitIntermediates2() { + long[][] pageDatas = { + {0, 10538717}, + {10538717, 10515884}, + {21054601, 10514343}, + {31568944, 10493988}, + {42062932, 10560639}, + {52623571, 10548486}, + {63172057, 10537811}, + {73709868, 10571060}, + {84280928, 10515062}, + {94795990, 10502964}, + {105298954, 10514011}, + {115812965, 10532154}, + {126345119, 10534133}, + {136879252, 10549749}, + {147429001, 10566547}, + {157995548, 10543700}, + {168539248, 10490324}, + {179029572, 10500720}, + {189530292, 10505425}, + {200035717, 10548418}, + {210584135, 10562887}, + {221147022, 10554967}, + {231701989, 10507297}, + {242209286, 10515612}, + {252724898, 10491274}, + {263216172, 10512956}, + {273729128, 10490736}, + {284219864, 10501878}, + {294721742, 10564568}, + {305286310, 10488896}, + {315775206, 10516308}, + {326291514, 10517965}, + {336809479, 10487038}, + {347296517, 10603472}, + {357899989, 10507330}, + {368407319, 10549429}, + {378956748, 10533443}, + {389490191, 10530852}, + {400021043, 11036431}, + {411057474, 10541007}, + {421598481, 10600477}, + {432198958, 10519805}, + {442718763, 10500769}, + {453219532, 10507192}, + {463726724, 10540424}, + {474267148, 10509129}, + {484776277, 10527100}, + {495303377, 10720789}, + {506024166, 10568542}, + {516592708, 11046886}, + {527639594, 10580358}, + {538219952, 10508940}, + {548728892, 10523968}, + {559252860, 10580626}, + {569833486, 10539361}, + {580372847, 10496662}, + {590869509, 10505280}, + {601374789, 10564655}, + {611939444, 10505842}, + {622445286, 10523889}, + {632969175, 10553186}, + {643522361, 10535866}, + {654058227, 10501796}, + {664560023, 10530358}, + {675090381, 10585340}, + {685675721, 10602017}, + {696277738, 10546614}, + {706824352, 10511511}, + {717335863, 11019221}, + {728355084, 10558143}, + {738913227, 10516245}, + {749429472, 10502613}, + {759932085, 10522145}, + {770454230, 10489373}, + {780943603, 10520973}, + {791464576, 11021218}, + {802485794, 10496362}, + {812982156, 10502354}, + {823484510, 10515932}, + {834000442, 10591044}, + {844591486, 5523957} + }; + + List<IntermediateEntry> entries = new ArrayList<IntermediateEntry>(); + for (int i = 0; i < 2; i++) { + List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>(); + for (int j = 0; j < pageDatas.length; j++) { + pages.add(new Pair(pageDatas[j][0], (int) (pageDatas[j][1]))); + } + IntermediateEntry entry = new IntermediateEntry(-1, -1, 1, new Task.PullHost("host" + i , 9000)); + entry.setPages(pages); + + entries.add(entry); + } + + long splitVolume = 256 * 1024 * 1024; + List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, entries, splitVolume, + 10 * 1024 * 1024); + + + long[][] expected = { + {0,263216172}, + {263216172,264423422}, + {527639594,263824982}, + {791464576,58650867}, + {0,200035717}, + {200035717,263691007}, + {463726724,264628360}, + {728355084,121760359}, + }; + int index = 0; + for (List<FetchImpl> eachFetchList: fetches) { + if (index == 3) { + assertEquals(2, eachFetchList.size()); + } else { + assertEquals(1, eachFetchList.size()); + } + for (FetchImpl eachFetch: eachFetchList) { + assertEquals(expected[index][0], eachFetch.getOffset()); + assertEquals(expected[index][1], eachFetch.getLength()); + index++; + } + } + } + + @Test + public void testSplitIntermediatesWithUniqueHost() { + List<IntermediateEntry> intermediateEntries = new ArrayList<IntermediateEntry>(); + + int[] pageLengths = new int[20]; //195MB + for (int i = 0 ; i < pageLengths.length; i++) { + if (i < pageLengths.length - 1) { + pageLengths[i] = 10 * 1024 * 1024; + } else { + pageLengths[i] = 5 * 1024 * 1024; + } + } + + long expectedTotalLength = 0; + Task.PullHost pullHost = new Task.PullHost("host", 0); + + for (int i = 0; i < 20; i++) { + List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>(); + long offset = 0; + for (int j = 0; j < pageLengths.length; j++) { + pages.add(new Pair(offset, pageLengths[j])); + offset += pageLengths[j]; + expectedTotalLength += pageLengths[j]; + } + IntermediateEntry interm = new IntermediateEntry(i, -1, 0, pullHost); + interm.setPages(pages); + interm.setVolume(offset); + intermediateEntries.add(interm); + } + + long splitVolume = 128 * 1024 * 1024; + List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries, + splitVolume, 10 * 1024 * 1024); + assertEquals(32, fetches.size()); + + int expectedSize = 0; + Set<FetchImpl> fetchSet = TUtil.newHashSet(); + for(List<FetchImpl> list : fetches){ + expectedSize += list.size(); + fetchSet.addAll(list); + } + assertEquals(expectedSize, fetchSet.size()); + + + int index = 0; + int numZeroPosFetcher = 0; + long totalLength = 0; + Set<String> uniqPullHost = new HashSet<String>(); + + for (List<FetchImpl> eachFetchList: fetches) { + long length = 0; + for (FetchImpl eachFetch: eachFetchList) { + if (eachFetch.getOffset() == 0) { + numZeroPosFetcher++; + } + totalLength += eachFetch.getLength(); + length += eachFetch.getLength(); + uniqPullHost.add(eachFetch.getPullHost().toString()); + } + assertTrue(length + " should be smaller than splitVolume", length < splitVolume); + if (index < fetches.size() - 1) { + assertTrue(length + " should be great than 100MB" + fetches.size() + "," + index, length >= 100 * 1024 * 1024); + } + index++; + } + assertEquals(20, numZeroPosFetcher); + assertEquals(1, uniqPullHost.size()); + assertEquals(expectedTotalLength, totalLength); + } + + @Test + public void testFetchImpl() { + ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0); + Task.PullHost pullHost = new Task.PullHost("localhost", 0); + + FetchImpl expected = new FetchImpl(pullHost, SCATTERED_HASH_SHUFFLE, ebId, 1); + FetchImpl fetch2 = new FetchImpl(pullHost, SCATTERED_HASH_SHUFFLE, ebId, 1); + assertEquals(expected, fetch2); + fetch2.setOffset(5); + fetch2.setLength(10); + assertNotEquals(expected, fetch2); + } + + private static void assertFetchImpl(FetchImpl [] expected, Map<String, List<FetchImpl>>[] result) { + Set<FetchImpl> expectedURLs = Sets.newHashSet(); + + for (FetchImpl f : expected) { + expectedURLs.add(f); + } + + Set<FetchImpl> resultURLs = Sets.newHashSet(); + + for (Map<String, List<FetchImpl>> e : result) { + for (List<FetchImpl> list : e.values()) { + resultURLs.addAll(list); + } + } + + assertEquals(expectedURLs.size(), resultURLs.size()); + assertEquals(expectedURLs, resultURLs); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/master/rule/TestMasterRules.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/master/rule/TestMasterRules.java b/tajo-core-tests/src/test/java/org/apache/tajo/master/rule/TestMasterRules.java new file mode 100644 index 0000000..dc34620 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/master/rule/TestMasterRules.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.rule; + +import static org.junit.Assert.*; +import static org.hamcrest.CoreMatchers.*; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.rule.EvaluationContext; +import org.apache.tajo.rule.EvaluationFailedException; +import org.apache.tajo.rule.EvaluationResult; +import org.apache.tajo.rule.EvaluationResult.EvaluationResultCode; +import org.apache.tajo.rule.SelfDiagnosisRuleEngine; +import org.apache.tajo.rule.SelfDiagnosisRuleSession; +import org.apache.tajo.rule.base.TajoConfValidationRule; +import org.apache.tajo.util.CommonTestingUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestMasterRules { + + private static Path rootFilePath; + + @BeforeClass + public static void setUpClass() throws Exception { + rootFilePath = CommonTestingUtil.getTestDir(); + } + + @AfterClass + public static void tearDownClass() throws Exception { + CommonTestingUtil.cleanupTestDir(rootFilePath.toUri().getPath()); + } + + @Test + public void testTajoConfValidationRule() throws Exception { + TajoConf tajoConf = new TajoConf(new YarnConfiguration()); + + EvaluationContext context = new EvaluationContext(); + context.addParameter(TajoConf.class.getName(), tajoConf); + + TajoConfValidationRule validationRule = new TajoConfValidationRule(); + EvaluationResult result = validationRule.evaluate(context); + + assertThat(result, is(notNullValue())); + assertThat(result.getReturnCode(), is(EvaluationResultCode.OK)); + } + + @Test(expected=EvaluationFailedException.class) + public void testTajoConfValidationRuleWithException() throws Exception { + TajoConf tajoConf = new TajoConf(new YarnConfiguration()); + SelfDiagnosisRuleEngine ruleEngine = SelfDiagnosisRuleEngine.getInstance(); + SelfDiagnosisRuleSession ruleSession = ruleEngine.newRuleSession(); + + tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, "invalid path."); + + EvaluationContext context = new EvaluationContext(); + context.addParameter(TajoConf.class.getName(), tajoConf); + + ruleSession.withRuleNames("TajoConfValidationRule").fireRules(context); + + fail("EvaluationFailedException exception is expected, but it does not happen."); + } + + protected void createTajoDirectories(TajoConf tajoConf) throws Exception { + Path tajoRootDir = new Path(rootFilePath, "tajo-root"); + FileSystem rootFs = tajoRootDir.getFileSystem(tajoConf); + FsPermission defaultPermission = FsPermission.createImmutable((short)0700); + + if (!rootFs.exists(tajoRootDir)) { + rootFs.mkdirs(tajoRootDir, new FsPermission(defaultPermission)); + } + + tajoConf.setVar(ConfVars.ROOT_DIR, tajoRootDir.toUri().toString()); + + Path tajoSystemDir = new Path(tajoRootDir, TajoConstants.SYSTEM_DIR_NAME); + if (!rootFs.exists(tajoSystemDir)) { + rootFs.mkdirs(tajoSystemDir, new FsPermission(defaultPermission)); + } + + Path tajoSystemResourceDir = new Path(tajoSystemDir, TajoConstants.SYSTEM_RESOURCE_DIR_NAME); + if (!rootFs.exists(tajoSystemResourceDir)) { + rootFs.mkdirs(tajoSystemResourceDir, new FsPermission(defaultPermission)); + } + + Path tajoWarehouseDir = new Path(tajoRootDir, TajoConstants.WAREHOUSE_DIR_NAME); + if (!rootFs.exists(tajoWarehouseDir)) { + rootFs.mkdirs(tajoWarehouseDir, new FsPermission(defaultPermission)); + } + + Path tajoStagingDir = new Path(tajoRootDir, "staging"); + if (!rootFs.exists(tajoStagingDir)) { + rootFs.mkdirs(tajoStagingDir, new FsPermission(defaultPermission)); + } + tajoConf.setVar(ConfVars.STAGING_ROOT_DIR, tajoStagingDir.toUri().toString()); + } + + @Test + public void testFileSystemRule() throws Exception { + TajoConf tajoConf = new TajoConf(new YarnConfiguration()); + + createTajoDirectories(tajoConf); + + EvaluationContext context = new EvaluationContext(); + context.addParameter(TajoConf.class.getName(), tajoConf); + + FileSystemRule fsRule = new FileSystemRule(); + EvaluationResult result = fsRule.evaluate(context); + + assertThat(result, is(notNullValue())); + assertThat(result.getReturnCode(), is(EvaluationResultCode.OK)); + } + + @Test + public void testFileSystemRuleWithError() throws Exception { + TajoConf tajoConf = new TajoConf(new YarnConfiguration()); + + createTajoDirectories(tajoConf); + Path systemResourceDir = TajoConf.getSystemResourceDir(tajoConf); + FileSystem defaultFs = systemResourceDir.getFileSystem(tajoConf); + if (defaultFs.exists(systemResourceDir)) { + defaultFs.delete(systemResourceDir, true); + } + + EvaluationContext context = new EvaluationContext(); + context.addParameter(TajoConf.class.getName(), tajoConf); + + FileSystemRule fsRule = new FileSystemRule(); + EvaluationResult result = fsRule.evaluate(context); + + assertThat(result, is(notNullValue())); + assertThat(result.getReturnCode(), is(EvaluationResultCode.ERROR)); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/master/scheduler/TestSimpleScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/master/scheduler/TestSimpleScheduler.java b/tajo-core-tests/src/test/java/org/apache/tajo/master/scheduler/TestSimpleScheduler.java new file mode 100644 index 0000000..328c281 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/master/scheduler/TestSimpleScheduler.java @@ -0,0 +1,301 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.scheduler; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.tajo.QueryId; +import org.apache.tajo.QueryIdFactory; +import org.apache.tajo.annotation.NotThreadSafe; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.master.QueryInfo; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.master.rm.*; +import org.apache.tajo.master.scheduler.event.ResourceReserveSchedulerEvent; +import org.apache.tajo.master.scheduler.event.SchedulerEvent; +import org.apache.tajo.master.scheduler.event.SchedulerEventType; +import org.apache.tajo.resource.NodeResource; +import org.apache.tajo.resource.NodeResources; +import org.apache.tajo.rpc.CallFuture; +import org.junit.*; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; + +import static org.apache.tajo.ResourceProtos.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@NotThreadSafe +public class TestSimpleScheduler { + private CompositeService service; + private SimpleScheduler scheduler; + private TajoRMContext rmContext; + private AsyncDispatcher dispatcher; + private TajoConf conf; + private int workerNum = 3; + private NodeResource nodeResource; + private NodeResource totalResource; + private Semaphore barrier; + private int testDelay = 50; + private static ScheduledExecutorService executorService; + + @BeforeClass + public static void setupClass() { + executorService = Executors.newScheduledThreadPool(10); + } + + @AfterClass + public static void tearDownClass() { + executorService.shutdown(); + } + + @Before + public void setup() { + conf = new TajoConf(); + nodeResource = NodeResource.createResource(1500, 2, 3); + service = new CompositeService(TestSimpleScheduler.class.getSimpleName()) { + + @Override + protected void serviceInit(Configuration conf) throws Exception { + dispatcher = new AsyncDispatcher(); + addService(dispatcher); + + rmContext = new TajoRMContext(dispatcher); + rmContext.getDispatcher().register(NodeEventType.class, + new TajoResourceManager.WorkerEventDispatcher(rmContext)); + + barrier = new Semaphore(0); + scheduler = new MySimpleScheduler(rmContext, barrier); + addService(scheduler); + rmContext.getDispatcher().register(SchedulerEventType.class, scheduler); + + for (int i = 0; i < workerNum; i++) { + WorkerConnectionInfo conn = new WorkerConnectionInfo("host" + i, 28091 + i, 28092, 21000, 28093, 28080); + rmContext.getNodes().putIfAbsent(conn.getId(), + new NodeStatus(rmContext, NodeResources.clone(nodeResource), conn)); + rmContext.getDispatcher().getEventHandler().handle(new NodeEvent(conn.getId(), NodeEventType.STARTED)); + } + super.serviceInit(conf); + } + }; + service.init(conf); + service.start(); + + assertEquals(workerNum, rmContext.getNodes().size()); + totalResource = NodeResources.createResource(0); + for(NodeStatus nodeStatus : rmContext.getNodes().values()) { + NodeResources.addTo(totalResource, nodeStatus.getTotalResourceCapability()); + } + } + + @After + public void tearDown() { + service.stop(); + } + + @Test + public void testInitialCapacity() throws InterruptedException { + assertEquals(workerNum, scheduler.getNumClusterNodes()); + assertEquals(0, scheduler.getRunningQuery()); + + assertEquals(totalResource, scheduler.getMaximumResourceCapability()); + assertEquals(totalResource, scheduler.getClusterResource()); + + assertEquals(TajoConf.ConfVars.QUERYMASTER_MINIMUM_MEMORY.defaultIntVal, + scheduler.getQMMinimumResourceCapability().getMemory()); + + assertEquals(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY.defaultIntVal, + scheduler.getMinimumResourceCapability().getMemory()); + } + + @Test(timeout = 10000) + public void testSubmitOneQuery() throws InterruptedException { + QuerySchedulingInfo schedulingInfo = new QuerySchedulingInfo("default", + "user", + QueryIdFactory.newQueryId(System.nanoTime(), 0), + 1, + System.currentTimeMillis()); + + assertEquals(0, scheduler.getRunningQuery()); + + scheduler.submitQuery(schedulingInfo); + barrier.acquire(); + assertEquals(1, scheduler.getRunningQuery()); + + assertEquals(totalResource, scheduler.getMaximumResourceCapability()); + assertEquals(totalResource, + NodeResources.add(scheduler.getQMMinimumResourceCapability(), scheduler.getClusterResource())); + } + + @Test(timeout = 10000) + public void testMaximumSubmitQuery() throws InterruptedException { + assertEquals(0, scheduler.getRunningQuery()); + int maximumParallelQuery = scheduler.getResourceCalculator().computeAvailableContainers( + scheduler.getMaximumResourceCapability(), scheduler.getQMMinimumResourceCapability()); + + int testParallelNum = 10; + for (int i = 0; i < testParallelNum; i++) { + QuerySchedulingInfo schedulingInfo = new QuerySchedulingInfo("default", + "user", + QueryIdFactory.newQueryId(System.nanoTime(), 0), + 1, + System.currentTimeMillis()); + scheduler.submitQuery(schedulingInfo); + } + + barrier.acquire(); + // allow 50% parallel running + assertEquals(Math.floor(maximumParallelQuery * 0.5f), (double) scheduler.getRunningQuery(), 1.0f); + assertEquals(testParallelNum, scheduler.getRunningQuery() + scheduler.getQueryQueue().size()); + } + + @Test(timeout = 10000) + public void testReserveResource() throws InterruptedException, ExecutionException { + int requestNum = 3; + assertEquals(totalResource, scheduler.getMaximumResourceCapability()); + assertEquals(totalResource, scheduler.getClusterResource()); + + QueryId queryId = QueryIdFactory.newQueryId(System.nanoTime(), 0); + CallFuture<NodeResourceResponse> callBack = new CallFuture<NodeResourceResponse>(); + rmContext.getDispatcher().getEventHandler().handle(new ResourceReserveSchedulerEvent( + createResourceRequest(queryId, requestNum, new ArrayList<Integer>()), callBack)); + + NodeResourceResponse responseProto = callBack.get(); + assertEquals(queryId, new QueryId(responseProto.getQueryId())); + assertEquals(requestNum, responseProto.getResourceCount()); + + NodeResource allocations = NodeResources.createResource(0); + for (AllocationResourceProto resourceProto : responseProto.getResourceList()) { + NodeResources.addTo(allocations, new NodeResource(resourceProto.getResource())); + } + + assertEquals(NodeResources.subtract(totalResource, allocations), scheduler.getClusterResource()); + } + + @Test(timeout = 10000) + public void testReserveResourceWithWorkerPriority() throws InterruptedException, ExecutionException { + int requestNum = 2; + assertEquals(totalResource, scheduler.getMaximumResourceCapability()); + assertEquals(totalResource, scheduler.getClusterResource()); + + List<Integer> targetWorkers = Lists.newArrayList(); + Map.Entry<Integer, NodeStatus> workerEntry = rmContext.getNodes().entrySet().iterator().next(); + targetWorkers.add(workerEntry.getKey()); + + NodeResource expectResource = NodeResources.multiply(scheduler.getMinimumResourceCapability(), requestNum); + assertTrue(NodeResources.fitsIn(expectResource, workerEntry.getValue().getAvailableResource())); + + QueryId queryId = QueryIdFactory.newQueryId(System.nanoTime(), 0); + NodeResourceRequest requestProto = createResourceRequest(queryId, requestNum, targetWorkers); + CallFuture<NodeResourceResponse> callBack = new CallFuture<NodeResourceResponse>(); + rmContext.getDispatcher().getEventHandler().handle(new ResourceReserveSchedulerEvent( + requestProto, callBack)); + + NodeResourceResponse responseProto = callBack.get(); + assertEquals(queryId, new QueryId(responseProto.getQueryId())); + assertEquals(requestNum, responseProto.getResourceCount()); + + for (AllocationResourceProto resourceProto : responseProto.getResourceList()) { + assertEquals(workerEntry.getKey().intValue(), resourceProto.getWorkerId()); + } + } + + private NodeResourceRequest + createResourceRequest(QueryId queryId, int containerNum, List<Integer> candidateWorkers) { + NodeResourceRequest.Builder request = + NodeResourceRequest.newBuilder(); + request.setCapacity(scheduler.getMinimumResourceCapability().getProto()) + .setNumContainers(containerNum) + .setPriority(1) + .setQueryId(queryId.getProto()) + .setType(ResourceType.LEAF) + .setUserId("test user") + .setRunningTasks(0) + .addAllCandidateNodes(candidateWorkers) + .setQueue("default"); + return request.build(); + } + + class MySimpleScheduler extends SimpleScheduler { + Semaphore barrier; + Map<QueryId, QueryInfo> queryInfoMap = Maps.newHashMap(); + Map<QueryId, AllocationResourceProto> qmAllocationMap = Maps.newHashMap(); + + public MySimpleScheduler(TajoRMContext rmContext, Semaphore barrier) { + super(null, rmContext); + this.barrier = barrier; + } + + @Override + public void submitQuery(QuerySchedulingInfo schedulingInfo) { + queryInfoMap.put(schedulingInfo.getQueryId(), new QueryInfo(schedulingInfo.getQueryId()) { + QueryContext context; + @Override + public QueryContext getQueryContext() { + if(context == null) { + context = new QueryContext(conf); + context.setUser("user"); + } + return context; + } + }); + super.submitQuery(schedulingInfo); + } + + @Override + protected boolean startQuery(final QueryId queryId, final AllocationResourceProto allocation) { + executorService.schedule(new Runnable() { + @Override + public void run() { + barrier.release(); + qmAllocationMap.put(queryId, allocation); + rmContext.getDispatcher().getEventHandler().handle(new SchedulerEvent(SchedulerEventType.RESOURCE_UPDATE)); + } + }, testDelay, TimeUnit.MILLISECONDS); + return true; + } + + @Override + public void handle(SchedulerEvent event) { + super.handle(event); + barrier.release(); + } + + @Override + protected QueryInfo getQueryInfo(QueryId queryId) { + return queryInfoMap.get(queryId); + } + + @Override + public void stopQuery(QueryId queryId) { + queryInfoMap.remove(queryId); + AllocationResourceProto allocationResourceProto = qmAllocationMap.remove(queryId); + NodeResources.addTo(rmContext.getNodes().get(allocationResourceProto.getWorkerId()).getAvailableResource(), + new NodeResource(allocationResourceProto.getResource())); + super.stopQuery(queryId); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java new file mode 100644 index 0000000..237fb32 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.querymaster; + +import org.apache.tajo.util.Pair; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class TestIntermediateEntry { + @Test + public void testPage() { + Task.IntermediateEntry interm = new Task.IntermediateEntry(-1, -1, 1, null); + + List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>(); + pages.add(new Pair(0L, 1441275)); + pages.add(new Pair(1441275L, 1447446)); + pages.add(new Pair(2888721L, 1442507)); + + interm.setPages(pages); + + long splitBytes = 3 * 1024 * 1024; + + List<Pair<Long, Long>> splits = interm.split(splitBytes, splitBytes); + assertEquals(2, splits.size()); + + long[][] expected = { {0, 1441275 + 1447446}, {1441275 + 1447446, 1442507} }; + for (int i = 0; i < 2; i++) { + Pair<Long, Long> eachSplit = splits.get(i); + assertEquals(expected[i][0], eachSplit.getFirst().longValue()); + assertEquals(expected[i][1], eachSplit.getSecond().longValue()); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java new file mode 100644 index 0000000..0e3e63e --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java @@ -0,0 +1,312 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.querymaster; + +import com.google.common.collect.Lists; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.tajo.*; +import org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.benchmark.TPCH; +import org.apache.tajo.catalog.CatalogService; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.client.TajoClient; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.planner.global.GlobalPlanner; +import org.apache.tajo.engine.planner.global.MasterPlan; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.engine.query.TaskRequestImpl; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.master.event.QueryEvent; +import org.apache.tajo.master.event.QueryEventType; +import org.apache.tajo.master.event.StageEvent; +import org.apache.tajo.master.event.StageEventType; +import org.apache.tajo.plan.LogicalOptimizer; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.serder.PlanProto; +import org.apache.tajo.resource.NodeResources; +import org.apache.tajo.session.Session; +import org.apache.tajo.storage.TablespaceManager; +import org.apache.tajo.worker.*; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +public class TestKillQuery { + private static TajoTestingCluster cluster; + private static TajoConf conf; + private static TajoClient client; + private static String queryStr = "select t1.l_orderkey, t1.l_partkey, t2.c_custkey " + + "from lineitem t1 join customer t2 " + + "on t1.l_orderkey = t2.c_custkey order by t1.l_orderkey"; + + @BeforeClass + public static void setUp() throws Exception { + cluster = new TajoTestingCluster(); + cluster.startMiniClusterInLocal(1); + conf = cluster.getConfiguration(); + client = cluster.newTajoClient(); + File file = TPCH.getDataFile("lineitem"); + client.executeQueryAndGetResult("create external table default.lineitem (l_orderkey int, l_partkey int) " + + "using text location 'file://" + file.getAbsolutePath() + "'"); + assertTrue(client.existTable("default.lineitem")); + + file = TPCH.getDataFile("customer"); + client.executeQueryAndGetResult("create external table default.customer (c_custkey int, c_name text) " + + "using text location 'file://" + file.getAbsolutePath() + "'"); + assertTrue(client.existTable("default.customer")); + } + + @AfterClass + public static void tearDown() throws IOException { + if (client != null) client.close(); + if (cluster != null) cluster.shutdownMiniCluster(); + } + + @Test + public final void testKillQueryFromInitState() throws Exception { + SQLAnalyzer analyzer = new SQLAnalyzer(); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(conf); + Session session = LocalTajoTestingUtility.createDummySession(); + CatalogService catalog = cluster.getMaster().getCatalog(); + + LogicalPlanner planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); + LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog); + Expr expr = analyzer.parse(queryStr); + LogicalPlan plan = planner.createPlan(defaultContext, expr); + + optimizer.optimize(plan); + + QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0); + QueryContext queryContext = new QueryContext(conf); + MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan); + GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog); + globalPlanner.build(queryContext, masterPlan); + + CountDownLatch barrier = new CountDownLatch(1); + MockAsyncDispatch dispatch = new MockAsyncDispatch(barrier, StageEventType.SQ_INIT); + + QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster(); + QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(), + queryId, session, defaultContext, expr.toJson(), NodeResources.createResource(512), dispatch); + + queryMasterTask.init(conf); + queryMasterTask.getQueryTaskContext().getDispatcher().start(); + queryMasterTask.startQuery(); + + try{ + barrier.await(5000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + fail("Query state : " + queryMasterTask.getQuery().getSynchronizedState()); + } + + Stage stage = queryMasterTask.getQuery().getStages().iterator().next(); + assertNotNull(stage); + + // fire kill event + queryMasterTask.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.KILL)); + + try { + cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50); + assertEquals(TajoProtos.QueryState.QUERY_KILLED, queryMasterTask.getQuery().getSynchronizedState()); + } catch (Exception e) { + e.printStackTrace(); + if (stage != null) { + System.err.println(String.format("Stage: [%s] (Total: %d, Complete: %d, Success: %d, Killed: %d, Failed: %d)", + stage.getId().toString(), + stage.getTotalScheduledObjectsCount(), + stage.getCompletedTaskCount(), + stage.getSucceededObjectCount(), + stage.getKilledObjectCount(), + stage.getFailedObjectCount())); + } + throw e; + } finally { + queryMasterTask.stop(); + } + } + + @Test + public final void testIgnoreStageStateFromKilled() throws Exception { + + SQLAnalyzer analyzer = new SQLAnalyzer(); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(conf); + Session session = LocalTajoTestingUtility.createDummySession(); + CatalogService catalog = cluster.getMaster().getCatalog(); + + LogicalPlanner planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); + LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog); + Expr expr = analyzer.parse(queryStr); + LogicalPlan plan = planner.createPlan(defaultContext, expr); + + optimizer.optimize(plan); + + QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0); + QueryContext queryContext = new QueryContext(conf); + MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan); + GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog); + globalPlanner.build(queryContext, masterPlan); + + CountDownLatch barrier = new CountDownLatch(1); + MockAsyncDispatch dispatch = new MockAsyncDispatch(barrier, TajoProtos.QueryState.QUERY_RUNNING); + + QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster(); + QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(), + queryId, session, defaultContext, expr.toJson(), NodeResources.createResource(512), dispatch); + + queryMasterTask.init(conf); + queryMasterTask.getQueryTaskContext().getDispatcher().start(); + queryMasterTask.startQuery(); + + try{ + barrier.await(5000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + fail("Query state : " + queryMasterTask.getQuery().getSynchronizedState()); + } + + Stage stage = queryMasterTask.getQuery().getStages().iterator().next(); + assertNotNull(stage); + + // fire kill event + queryMasterTask.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.KILL)); + + try { + cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50); + assertEquals(TajoProtos.QueryState.QUERY_KILLED, queryMasterTask.getQuery().getSynchronizedState()); + } finally { + queryMasterTask.stop(); + } + + List<Stage> stages = Lists.newArrayList(queryMasterTask.getQuery().getStages()); + Stage lastStage = stages.get(stages.size() - 1); + + assertEquals(StageState.KILLED, lastStage.getSynchronizedState()); + + lastStage.getStateMachine().doTransition(StageEventType.SQ_START, + new StageEvent(lastStage.getId(), StageEventType.SQ_START)); + + lastStage.getStateMachine().doTransition(StageEventType.SQ_KILL, + new StageEvent(lastStage.getId(), StageEventType.SQ_KILL)); + + lastStage.getStateMachine().doTransition(StageEventType.SQ_SHUFFLE_REPORT, + new StageEvent(lastStage.getId(), StageEventType.SQ_SHUFFLE_REPORT)); + + lastStage.getStateMachine().doTransition(StageEventType.SQ_STAGE_COMPLETED, + new StageEvent(lastStage.getId(), StageEventType.SQ_STAGE_COMPLETED)); + + lastStage.getStateMachine().doTransition(StageEventType.SQ_FAILED, + new StageEvent(lastStage.getId(), StageEventType.SQ_FAILED)); + } + + @Test + public void testKillTask() throws Throwable { + QueryId qid = LocalTajoTestingUtility.newQueryId(); + ExecutionBlockId eid = QueryIdFactory.newExecutionBlockId(qid, 1); + TaskId tid = QueryIdFactory.newTaskId(eid); + final TajoConf conf = new TajoConf(); + TaskRequestImpl taskRequest = new TaskRequestImpl(); + WorkerConnectionInfo queryMaster = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080); + + TaskAttemptId attemptId = new TaskAttemptId(tid, 1); + taskRequest.set(attemptId, new ArrayList<CatalogProtos.FragmentProto>(), + null, false, PlanProto.LogicalNodeTree.newBuilder().build(), new QueryContext(conf), + null, null, queryMaster.getHostAndQMPort()); + taskRequest.setInterQuery(); + + + ExecutionBlockContextResponse.Builder requestProtoBuilder = + ExecutionBlockContextResponse.newBuilder(); + requestProtoBuilder.setExecutionBlockId(eid.getProto()) + .setPlanJson("test") + .setQueryContext(new QueryContext(conf).getProto()) + .setQueryOutputPath("testpath") + .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE); + + TajoWorker.WorkerContext workerContext = new MockWorkerContext() { + @Override + public TajoConf getConf() { + return conf; + } + + @Override + public TaskManager getTaskManager() { + return null; + } + + @Override + public TaskExecutor getTaskExecuor() { + return null; + } + + @Override + public NodeResourceManager getNodeResourceManager() { + return null; + } + }; + + ExecutionBlockContext context = new MockExecutionBlock(workerContext, requestProtoBuilder.build()) { + @Override + public Path createBaseDir() throws IOException { + return new Path("test"); + } + }; + + org.apache.tajo.worker.Task task = new TaskImpl(taskRequest, context); + task.kill(); + assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getTaskContext().getState()); + try { + task.run(); + assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getTaskContext().getState()); + } catch (Exception e) { + assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getTaskContext().getState()); + } + } + + static class MockAsyncDispatch extends AsyncDispatcher { + private CountDownLatch latch; + private Enum eventType; + + MockAsyncDispatch(CountDownLatch latch, Enum eventType) { + super(); + this.latch = latch; + this.eventType = eventType; + } + + @Override + protected void dispatch(Event event) { + if (event.getType() == eventType) { + latch.countDown(); + } + super.dispatch(event); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryProgress.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryProgress.java b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryProgress.java new file mode 100644 index 0000000..f9ed367 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryProgress.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.querymaster; + +import org.apache.tajo.IntegrationTest; +import org.apache.tajo.QueryId; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.TpchTestBase; +import org.apache.tajo.client.QueryStatus; +import org.apache.tajo.client.TajoClient; +import org.apache.tajo.client.TajoClientUtil; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ipc.ClientProtos; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category(IntegrationTest.class) +public class TestQueryProgress { + private static TajoTestingCluster cluster; + private static TajoConf conf; + private static TajoClient client; + + @BeforeClass + public static void setUp() throws Exception { + cluster = TpchTestBase.getInstance().getTestingCluster(); + conf = cluster.getConfiguration(); + client = cluster.newTajoClient(); + } + + @AfterClass + public static void tearDown() throws Exception { + client.close(); + } + + @Test(timeout = 10000) + public final void testQueryProgress() throws Exception { + ClientProtos.SubmitQueryResponse res = client.executeQuery("select l_orderkey from lineitem group by l_orderkey"); + QueryId queryId = new QueryId(res.getQueryId()); + + float prevProgress = 0; + while (true) { + QueryStatus status = client.getQueryStatus(queryId); + if (status == null) continue; + + float progress = status.getProgress(); + + if (prevProgress > progress) { + fail("Previous progress: " + prevProgress + ", Current progress : " + progress); + } + prevProgress = progress; + assertTrue(progress <= 1.0f); + + if (TajoClientUtil.isQueryComplete(status.getState())) break; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java new file mode 100644 index 0000000..978d709 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.querymaster; + +import org.apache.tajo.*; +import org.apache.tajo.annotation.NotThreadSafe; +import org.apache.tajo.client.QueryStatus; +import org.apache.tajo.client.TajoClient; +import org.apache.tajo.client.TajoClientUtil; +import org.apache.tajo.ipc.ClientProtos; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.*; + +@Category(IntegrationTest.class) +@NotThreadSafe +public class TestQueryState { + private static TajoTestingCluster cluster; + private static TajoClient client; + + @BeforeClass + public static void setUpClass() throws Exception { + cluster = TpchTestBase.getInstance().getTestingCluster(); + client = cluster.newTajoClient(); + } + + @AfterClass + public static void tearDownClass() { + client.close(); + } + + @Test(timeout = 10000) + public void testSucceededState() throws Exception { + String queryStr = "select l_orderkey from lineitem group by l_orderkey order by l_orderkey"; + /* + ======================================================= + Block Id: eb_1429886996479_0001_000001 [LEAF] HASH_SHUFFLE + Block Id: eb_1429886996479_0001_000002 [INTERMEDIATE] RANGE_SHUFFLE + Block Id: eb_1429886996479_0001_000003 [ROOT] NONE_SHUFFLE + Block Id: eb_1429886996479_0001_000004 [TERMINAL] + ======================================================= + + The order of execution: + + 1: eb_1429886996479_0001_000001 + 2: eb_1429886996479_0001_000002 + 3: eb_1429886996479_0001_000003 + 4: eb_1429886996479_0001_000004 + */ + + ClientProtos.SubmitQueryResponse res = client.executeQuery(queryStr); + QueryId queryId = new QueryId(res.getQueryId()); + + QueryStatus queryState = client.getQueryStatus(queryId); + while (!TajoClientUtil.isQueryComplete(queryState.getState())) { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + fail("Query state : " + queryState); + } + queryState = client.getQueryStatus(queryId); + } + + QueryMasterTask qmt = cluster.getQueryMasterTask(queryId); + Query query = qmt.getQuery(); + + assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, qmt.getState()); + assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, query.getSynchronizedState()); + assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, query.getState()); + + assertFalse(query.getStages().isEmpty()); + for (Stage stage : query.getStages()) { + assertEquals(StageState.SUCCEEDED, stage.getSynchronizedState()); + assertEquals(StageState.SUCCEEDED, stage.getState()); + } + + /* get status from TajoMaster */ + assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, client.getQueryStatus(queryId).getState()); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java new file mode 100644 index 0000000..b468e37 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.querymaster; + +import org.apache.tajo.IntegrationTest; +import org.apache.tajo.QueryTestCaseBase; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.worker.TajoWorker; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.sql.ResultSet; +import java.util.*; + +import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; +import static org.junit.Assert.*; + +@Category(IntegrationTest.class) +public class TestTaskStatusUpdate extends QueryTestCaseBase { + + public TestTaskStatusUpdate() { + super(TajoConstants.DEFAULT_DATABASE_NAME); + } + + @BeforeClass + public static void setUp() throws Exception { + conf.set(TajoConf.ConfVars.$TEST_BROADCAST_JOIN_ENABLED.varname, "false"); + } + + @Test + public final void case1() throws Exception { + // select l_linenumber, count(1) as unique_key from lineitem group by l_linenumber; + ResultSet res = null; + try { + res = executeQuery(); + + // tpch/lineitem.tbl + long[] expectedNumRows = new long[]{5, 2, 2, 2}; + long[] expectedNumBytes = new long[]{604, 18, 18, 8}; + long[] expectedReadBytes = new long[]{604, 604, 18, 0}; + + assertStatus(2, expectedNumRows, expectedNumBytes, expectedReadBytes); + } finally { + cleanupQuery(res); + } + } + + @Test + public final void case2() throws Exception { + // ExternalMergeSort + ResultSet res = null; + try { + res = executeQuery(); + + // tpch/lineitem.tbl + long[] expectedNumRows = new long[]{5, 2, 2, 2, 2, 2}; + long[] expectedNumBytes = new long[]{604, 162, 162, 138, 138, 194}; + long[] expectedReadBytes = new long[]{604, 604, 162, 0, 138, 0}; + + assertStatus(3, expectedNumRows, expectedNumBytes, expectedReadBytes); + } finally { + cleanupQuery(res); + } + } + + + @Test + public final void case3() throws Exception { + // Partition Scan + ResultSet res = null; + try { + createColumnPartitionedTable(); + + /* + |-eb_1404143727281_0002_000005 + |-eb_1404143727281_0002_000004 (order by) + |-eb_1404143727281_0002_000003 (join) + |-eb_1404143727281_0002_000002 (scan, filter) + |-eb_1404143727281_0002_000001 (scan) + */ + res = executeQuery(); + + // in/out * stage(4) + long[] expectedNumRows = new long[]{5, 5, 2, 2, 7, 2, 2, 2}; + long[] expectedNumBytes = new long[]{20, 75, 8, 34, 109, 34, 34, 18}; + long[] expectedReadBytes = new long[]{20, 20, 8, 8, 109, 0, 34, 0}; + + assertStatus(4, expectedNumRows, expectedNumBytes, expectedReadBytes); + } finally { + cleanupQuery(res); + } + } + + private void createColumnPartitionedTable() throws Exception { + String tableName = CatalogUtil.normalizeIdentifier("ColumnPartitionedTable"); + ResultSet res = executeString( + "create table " + tableName + " (col1 int4, col2 int4) partition by column(key float8) "); + res.close(); + + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size()); + assertEquals(3, + catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size()); + + res = testBase.execute( + "insert overwrite into " + tableName + " select l_orderkey, l_partkey, l_quantity from lineitem"); + + res.close(); + } + + private void assertStatus(int numStages, + long[] expectedNumRows, + long[] expectedNumBytes, + long[] expectedReadBytes) throws Exception { + List<TajoWorker> tajoWorkers = testingCluster.getTajoWorkers(); + Collection<QueryMasterTask> finishedTasks = null; + for (TajoWorker eachWorker: tajoWorkers) { + finishedTasks = eachWorker.getWorkerContext().getQueryMaster().getFinishedQueryMasterTasks(); + if (finishedTasks != null && !finishedTasks.isEmpty()) { + break; + } + } + + assertNotNull(finishedTasks); + assertTrue(!finishedTasks.isEmpty()); + + List<QueryMasterTask> finishedTaskList = new ArrayList<QueryMasterTask>(finishedTasks); + + Collections.sort(finishedTaskList, new Comparator<QueryMasterTask>() { + @Override + public int compare(QueryMasterTask o1, QueryMasterTask o2) { + return o2.getQueryId().compareTo(o1.getQueryId()); + } + }); + + Query query = finishedTaskList.get(0).getQuery(); + + assertNotNull(query); + + List<Stage> stages = new ArrayList<Stage>(query.getStages()); + assertEquals(numStages, stages.size()); + + Collections.sort(stages, new Comparator<Stage>() { + @Override + public int compare(Stage o1, Stage o2) { + return o1.getId().compareTo(o2.getId()); + } + }); + + int index = 0; + for (Stage eachStage : stages) { + TableStats inputStats = eachStage.getInputStats(); + TableStats resultStats = eachStage.getResultStats(); + + assertNotNull(inputStats); + assertEquals(expectedNumRows[index], inputStats.getNumRows().longValue()); + assertEquals(expectedNumBytes[index], inputStats.getNumBytes().longValue()); + assertEquals(expectedReadBytes[index], inputStats.getReadBytes().longValue()); + + index++; + + assertNotNull(resultStats); + assertEquals(expectedNumRows[index], resultStats.getNumRows().longValue()); + assertEquals(expectedNumBytes[index], resultStats.getNumBytes().longValue()); + assertEquals(expectedReadBytes[index], resultStats.getReadBytes().longValue()); + + index++; + } + + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/resource/TestResources.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/resource/TestResources.java b/tajo-core-tests/src/test/java/org/apache/tajo/resource/TestResources.java new file mode 100644 index 0000000..eb0d732 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/resource/TestResources.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.resource; + +import org.junit.Test; + +import static org.apache.tajo.resource.NodeResources.componentwiseMin; +import static org.apache.tajo.resource.NodeResources.createResource; +import static org.apache.tajo.resource.NodeResources.fitsIn; +import static org.junit.Assert.*; + +public class TestResources { + @Test + public void testFitsIn() { + assertTrue(fitsIn(createResource(512, 1, 1), createResource(1024, 2, 1))); + assertTrue(fitsIn(createResource(1024, 2, 1), createResource(1024, 2, 1))); + assertFalse(fitsIn(createResource(1024, 2, 1), createResource(512, 1, 1))); + assertFalse(fitsIn(createResource(512, 2, 1), createResource(1024, 1, 1))); + assertFalse(fitsIn(createResource(1024, 1, 1), createResource(512, 2, 1))); + assertFalse(fitsIn(createResource(512, 1, 2), createResource(512, 1, 1))); + } + + @Test + public void testComponentwiseMin() { + assertEquals(createResource(1, 1), + componentwiseMin(createResource(1, 1), createResource(2, 2))); + assertEquals(createResource(1, 1), + componentwiseMin(createResource(2, 2), createResource(1, 1))); + assertEquals(createResource(1, 1), + componentwiseMin(createResource(1, 2), createResource(2, 1))); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java new file mode 100644 index 0000000..d0ab1c0 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.collect.Sets; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.FragmentConvertor; +import org.apache.tajo.util.CommonTestingUtil; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.SortedSet; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestFileFragment { + private Path path; + + @Before + public final void setUp() throws Exception { + path = CommonTestingUtil.getTestDir(); + } + + @Test + public final void testGetAndSetFields() { + FileFragment fragment1 = new FileFragment("table1_1", new Path(path, "table0"), 0, 500); + + assertEquals("table1_1", fragment1.getTableName()); + assertEquals(new Path(path, "table0"), fragment1.getPath()); + assertTrue(0 == fragment1.getStartKey()); + assertTrue(500 == fragment1.getLength()); + } + + @Test + public final void testGetProtoAndRestore() { + FileFragment fragment = new FileFragment("table1_1", new Path(path, "table0"), 0, 500); + + FileFragment fragment1 = FragmentConvertor.convert(FileFragment.class, fragment.getProto()); + assertEquals("table1_1", fragment1.getTableName()); + assertEquals(new Path(path, "table0"), fragment1.getPath()); + assertTrue(0 == fragment1.getStartKey()); + assertTrue(500 == fragment1.getLength()); + } + + @Test + public final void testCompareTo() { + final int num = 10; + FileFragment[] tablets = new FileFragment[num]; + for (int i = num - 1; i >= 0; i--) { + tablets[i] = new FileFragment("tablet1_"+i, new Path(path, "tablet0"), i * 500, (i+1) * 500); + } + + Arrays.sort(tablets); + + for(int i = 0; i < num; i++) { + assertEquals("tablet1_"+i, tablets[i].getTableName()); + } + } + + @Test + public final void testCompareTo2() { + final int num = 1860; + FileFragment[] tablets = new FileFragment[num]; + for (int i = num - 1; i >= 0; i--) { + tablets[i] = new FileFragment("tablet1_"+i, new Path(path, "tablet0"), (long)i * 6553500, (long)(i+1) * 6553500); + } + + SortedSet sortedSet = Sets.newTreeSet(); + for (FileFragment frag : tablets) { + sortedSet.add(frag); + } + assertEquals(num, sortedSet.size()); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestRowFile.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestRowFile.java b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestRowFile.java new file mode 100644 index 0000000..e45dd75 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestRowFile.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.TpchTestBase; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos.TableProto; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.util.FileUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +public class TestRowFile { + private static final Log LOG = LogFactory.getLog(TestRowFile.class); + + private TajoTestingCluster cluster; + private TajoConf conf; + + @Before + public void setup() throws Exception { + cluster = TpchTestBase.getInstance().getTestingCluster(); + conf = cluster.getConfiguration(); + } + + @After + public void teardown() throws Exception { + } + + @Test + public void test() throws IOException { + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + schema.addColumn("description", Type.TEXT); + + TableMeta meta = CatalogUtil.newTableMeta("ROWFILE"); + + FileTablespace sm = (FileTablespace) TablespaceManager.get(cluster.getDefaultFileSystem().getUri()).get(); + + Path tablePath = new Path("/test"); + Path metaPath = new Path(tablePath, ".meta"); + Path dataPath = new Path(tablePath, "test.tbl"); + FileSystem fs = sm.getFileSystem(); + fs.mkdirs(tablePath); + + FileUtil.writeProto(fs, metaPath, meta.getProto()); + + Appender appender = sm.getAppender(meta, schema, dataPath); + appender.enableStats(); + appender.init(); + + int tupleNum = 200; + Tuple tuple; + Datum stringDatum = DatumFactory.createText("abcdefghijklmnopqrstuvwxyz"); + Set<Integer> idSet = Sets.newHashSet(); + + tuple = new VTuple(3); + long start = System.currentTimeMillis(); + for(int i = 0; i < tupleNum; i++) { + tuple.put(0, DatumFactory.createInt4(i + 1)); + tuple.put(1, DatumFactory.createInt8(25l)); + tuple.put(2, stringDatum); + appender.addTuple(tuple); + idSet.add(i+1); + } + appender.close(); + + TableStats stat = appender.getStats(); + assertEquals(tupleNum, stat.getNumRows().longValue()); + + FileStatus file = fs.getFileStatus(dataPath); + TableProto proto = (TableProto) FileUtil.loadProto( + cluster.getDefaultFileSystem(), metaPath, TableProto.getDefaultInstance()); + meta = new TableMeta(proto); + FileFragment fragment = new FileFragment("test.tbl", dataPath, 0, file.getLen()); + + int tupleCnt = 0; + start = System.currentTimeMillis(); + Scanner scanner = sm.getScanner(meta, schema, fragment); + scanner.init(); + while ((tuple=scanner.next()) != null) { + tupleCnt++; + } + scanner.close(); + + assertEquals(tupleNum, tupleCnt); + + tupleCnt = 0; + long fileStart = 0; + long fileLen = file.getLen()/13; + + for (int i = 0; i < 13; i++) { + fragment = new FileFragment("test.tbl", dataPath, fileStart, fileLen); + scanner = new RowFile.RowFileScanner(conf, schema, meta, fragment); + scanner.init(); + while ((tuple=scanner.next()) != null) { + if (!idSet.remove(tuple.getInt4(0)) && LOG.isDebugEnabled()) { + LOG.debug("duplicated! " + tuple.getInt4(0)); + } + tupleCnt++; + } + scanner.close(); + fileStart += fileLen; + if (i == 11) { + fileLen = file.getLen() - fileStart; + } + } + assertEquals(tupleNum, tupleCnt); + } +}
