Updated Branches: refs/heads/master 0652815f5 -> 2923f4e3b
TAJO-58: Remove obsolete methods in GlobalPlanner. (hyunsik) Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/2923f4e3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/2923f4e3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/2923f4e3 Branch: refs/heads/master Commit: 2923f4e3b5812f25e73e26c2af0add4262c6a07e Parents: 0652815 Author: Hyunsik Choi <[email protected]> Authored: Thu May 9 16:34:29 2013 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Thu May 9 16:35:31 2013 +0900 ---------------------------------------------------------------------- CHANGES.txt | 4 + .../src/main/java/tajo/master/GlobalPlanner.java | 538 +-------------- .../engine/plan/global/TestGlobalQueryPlanner.java | 143 ---- 3 files changed, 8 insertions(+), 677 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2923f4e3/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index cba4f8f..7dd10de 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -4,6 +4,8 @@ Release 0.2.0 - unreleased NEW FEATURES + TAJO-57: Recognize Parser and Catalog Standard SQL data types. (hyunsik) + TAJO-33: Implement a basic query progress indicator. (hyunsik) IMPROVEMENTS @@ -37,6 +39,8 @@ Release 0.2.0 - unreleased BUG FIXES + TAJO-58: Remove obsolete methods in GlobalPlanner. (hyunsik) + TAJO-54: SubQuery::allocateContainers() may ask 0 containers. (hyunsik) TAJO-47: RowFile has the duplicated initialization problem and unflipped http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2923f4e3/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java index 376f15d..2176b8e 100644 --- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java +++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java @@ -18,39 +18,28 @@ package tajo.master; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.event.EventHandler; -import org.jboss.netty.handler.codec.http.QueryStringDecoder; import tajo.QueryId; import tajo.QueryIdFactory; -import tajo.QueryUnitAttemptId; import tajo.SubQueryId; import tajo.catalog.*; import tajo.catalog.proto.CatalogProtos.StoreType; -import tajo.common.exception.NotImplementedException; import tajo.conf.TajoConf; import tajo.engine.parser.QueryBlock.FromTable; import tajo.engine.planner.PlannerUtil; import tajo.engine.planner.global.MasterPlan; import tajo.engine.planner.logical.*; -import tajo.engine.utils.TupleUtil; import tajo.master.ExecutionBlock.PartitionType; -import tajo.storage.Fragment; import tajo.storage.StorageManager; -import tajo.storage.TupleRange; import tajo.util.TajoIdUtils; import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.net.URI; -import java.util.*; -import java.util.Map.Entry; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; public class GlobalPlanner { private static Log LOG = LogFactory.getLog(GlobalPlanner.class); @@ -690,75 +679,6 @@ public class GlobalPlanner { prevOutputType); } } - - @VisibleForTesting - public ExecutionBlock createMultilevelGroupby( - ExecutionBlock firstPhaseGroupby, Column[] keys) - throws CloneNotSupportedException, IOException { - ExecutionBlock secondPhaseGroupby = firstPhaseGroupby.getParentBlock(); - Preconditions.checkState(secondPhaseGroupby.getScanNodes().length == 1); - - ScanNode secondScan = secondPhaseGroupby.getScanNodes()[0]; - GroupbyNode secondGroupby = (GroupbyNode) secondPhaseGroupby. - getStoreTableNode().getSubNode(); - ExecutionBlock newPhaseGroupby = new ExecutionBlock( - QueryIdFactory.newSubQueryId(firstPhaseGroupby.getId().getQueryId())); - LogicalNode tmp = PlannerUtil.findTopParentNode( - firstPhaseGroupby.getPlan(), ExprType.GROUP_BY); - GroupbyNode firstGroupby; - if (tmp instanceof UnaryNode) { - firstGroupby = (GroupbyNode) ((UnaryNode)tmp).getSubNode(); - GroupbyNode newFirstGroupby = GlobalPlannerUtils.newGroupbyPlan( - firstGroupby.getInSchema(), - firstGroupby.getOutSchema(), - keys, - firstGroupby.getHavingCondition(), - firstGroupby.getTargets() - ); - newFirstGroupby.setSubNode(firstGroupby.getSubNode()); - ((UnaryNode) tmp).setSubNode(newFirstGroupby); - } - - // create a new SubQuery containing the group by plan - StoreTableNode newStore = GlobalPlannerUtils.newStorePlan( - secondScan.getInSchema(), - newPhaseGroupby.getId().toString()); - newStore.setLocal(true); - ScanNode newScan = GlobalPlannerUtils.newScanPlan( - firstPhaseGroupby.getOutputSchema(), - firstPhaseGroupby.getOutputName(), - sm.getTablePath(firstPhaseGroupby.getOutputName())); - newScan.setLocal(true); - GroupbyNode newGroupby = GlobalPlannerUtils.newGroupbyPlan( - newScan.getOutSchema(), - newStore.getInSchema(), - keys, - secondGroupby.getHavingCondition(), - secondGroupby.getTargets()); - newGroupby.setSubNode(newScan); - newStore.setSubNode(newGroupby); - newPhaseGroupby.setPlan(newStore); - - secondPhaseGroupby.removeChildBlock(secondScan); - - // update the scan node of last phase - secondScan = GlobalPlannerUtils.newScanPlan(secondScan.getInSchema(), - newPhaseGroupby.getOutputName(), - sm.getTablePath(newPhaseGroupby.getOutputName())); - secondScan.setLocal(true); - secondGroupby.setSubNode(secondScan); - secondPhaseGroupby.setPlan(secondPhaseGroupby.getPlan()); - - // insert the new SubQuery - // between the first phase and the second phase - secondPhaseGroupby.addChildBlock(secondScan, newPhaseGroupby); - newPhaseGroupby.addChildBlock(newPhaseGroupby.getScanNodes()[0], - firstPhaseGroupby); - newPhaseGroupby.setParentBlock(secondPhaseGroupby); - firstPhaseGroupby.setParentBlock(newPhaseGroupby); - - return newPhaseGroupby; - } private LogicalNode insertOuterScan(BinaryNode parent, String tableId, TableMeta meta) throws IOException { @@ -798,454 +718,4 @@ public class GlobalPlanner { } return new MasterPlan(root); } - - /** - * 2ê°ì scanì ê°ì§ QueryUnitë¤ì fragmentì fetch를 í ë¹ - * - * @param subQuery - * @param n - * @param fragMap - * @param fetchMap - * @return - */ - private List<QueryUnit> makeBinaryQueryUnit(SubQuery subQuery, final int n, - Map<ScanNode, List<Fragment>> fragMap, - Map<ScanNode, List<URI>> fetchMap) { - ExecutionBlock execBlock = subQuery.getBlock(); - ScanNode[] scans = execBlock.getScanNodes(); - List<QueryUnit> queryUnits = new ArrayList<QueryUnit>(); - final int maxQueryUnitNum = n; - - if (execBlock.hasChildBlock()) { - ExecutionBlock prev = execBlock.getChildBlock(scans[0]); - switch (prev.getPartitionType()) { - case BROADCAST: - throw new NotImplementedException(); - case HASH: - if (scans[0].isLocal()) { - queryUnits = assignFetchesToBinaryByHash(subQuery, queryUnits, - fetchMap, maxQueryUnitNum); - queryUnits = assignEqualFragment(queryUnits, fragMap); - } else { - throw new NotImplementedException(); - } - break; - case LIST: - throw new NotImplementedException(); - } - } else { - queryUnits = makeQueryUnitsForBinaryPlan(subQuery, - queryUnits, fragMap); - } - - return queryUnits; - } - - public List<QueryUnit> makeQueryUnitsForBinaryPlan( - SubQuery subQuery, List<QueryUnit> queryUnits, - Map<ScanNode, List<Fragment>> fragmentMap) { - ExecutionBlock execBlock = subQuery.getBlock(); - QueryUnit queryUnit; - if (execBlock.hasJoin()) { - // make query units for every composition of fragments of each scan - Preconditions.checkArgument(fragmentMap.size()==2); - - ScanNode [] scanNodes = execBlock.getScanNodes(); - String innerId = null, outerId = null; - - // If one relation is set to broadcast, it meaning that the relation - // is less than one block size. That is, the relation has only - // one fragment. If this assumption is kept, the below code is always - // correct. - if (scanNodes[0].isBroadcast() || scanNodes[1].isBroadcast()) { - List<Fragment> broadcastFrag = null; - List<Fragment> baseFrag = null; - if (scanNodes[0].isBroadcast()) { - broadcastFrag = fragmentMap.get(scanNodes[0]); - baseFrag = fragmentMap.get(scanNodes[1]); - - innerId = scanNodes[0].getTableId(); - outerId = scanNodes[1].getTableId(); - } else if (scanNodes[1].isBroadcast()) { - broadcastFrag = fragmentMap.get(scanNodes[1]); - baseFrag = fragmentMap.get(scanNodes[0]); - - innerId = scanNodes[1].getTableId(); - outerId = scanNodes[0].getTableId(); - } - - for (Fragment outer : baseFrag) { - queryUnit = newQueryUnit(subQuery); - queryUnit.setFragment(outerId, outer); - for (Fragment inner : broadcastFrag) { - queryUnit.setFragment(innerId, inner); - } - queryUnits.add(queryUnit); - } - } else { - List<Fragment> innerFrags, outerFrags; - Iterator<Entry<ScanNode, List<Fragment>>> it = - fragmentMap.entrySet().iterator(); - Entry<ScanNode, List<Fragment>> e = it.next(); - innerId = e.getKey().getTableId(); - innerFrags = e.getValue(); - e = it.next(); - outerId = e.getKey().getTableId(); - outerFrags = e.getValue(); - for (Fragment outer : outerFrags) { - for (Fragment inner : innerFrags) { - queryUnit = newQueryUnit(subQuery); - queryUnit.setFragment(innerId, inner); - queryUnit.setFragment(outerId, outer); - queryUnits.add(queryUnit); - } - } - } - } - - return queryUnits; - } - - private List<QueryUnit> makeQueryUnitsForEachFragment( - SubQuery subQuery, List<QueryUnit> queryUnits, - ScanNode scan, List<Fragment> fragments) { - QueryUnit queryUnit; - for (Fragment fragment : fragments) { - queryUnit = newQueryUnit(subQuery); - queryUnit.setFragment(scan.getTableId(), fragment); - queryUnits.add(queryUnit); - } - return queryUnits; - } - - private QueryUnit newQueryUnit(SubQuery subQuery) { - ExecutionBlock execBlock = subQuery.getBlock(); - QueryUnit unit = new QueryUnit( - QueryIdFactory.newQueryUnitId(subQuery.getId()), execBlock.isLeafBlock(), - subQuery.getEventHandler()); - unit.setLogicalPlan(execBlock.getPlan()); - return unit; - } - - /** - * Binary QueryUnitë¤ì hash íí°ì ë fetch를 í ë¹ - * - * @param subQuery - * @param unitList - * @param fetchMap - * @param n - * @return - */ - private List<QueryUnit> assignFetchesToBinaryByHash(SubQuery subQuery, - List<QueryUnit> unitList, Map<ScanNode, List<URI>> fetchMap, final int n) { - QueryUnit unit; - int i = 0; - Map<String, Map<ScanNode, List<URI>>> hashed = hashFetches(fetchMap); - Iterator<Entry<String, Map<ScanNode, List<URI>>>> it = - hashed.entrySet().iterator(); - Entry<String, Map<ScanNode, List<URI>>> e; - while (it.hasNext()) { - e = it.next(); - if (e.getValue().size() == 2) { // only if two matched partitions - if (unitList.size() == n) { - unit = unitList.get(i++); - if (i == unitList.size()) { - i = 0; - } - } else { - unit = newQueryUnit(subQuery); - unitList.add(unit); - } - Map<ScanNode, List<URI>> m = e.getValue(); - for (ScanNode scan : m.keySet()) { - for (URI uri : m.get(scan)) { - unit.addFetch(scan.getTableId(), uri); - } - } - } - } - - return unitList; - } - - /** - * Unary QueryUnitë¤ì hash íí°ì ë fetch를 í ë¹ - * - * @param subQuery - * @param unitList - * @param scan - * @param uriList - * @param n - * @return - */ - private List<QueryUnit> assignFetchesToUnaryByHash(SubQuery subQuery, - List<QueryUnit> unitList, ScanNode scan, List<URI> uriList, int n) { - Map<String, List<URI>> hashed = hashFetches(subQuery.getId(), uriList); // hash key, uri - QueryUnit unit; - int i = 0; - // TODO: unitsì hashed í ë¹ - Iterator<Entry<String, List<URI>>> it = hashed.entrySet().iterator(); - Entry<String, List<URI>> e; - while (it.hasNext()) { - e = it.next(); - if (unitList.size() == n) { - unit = unitList.get(i++); - if (i == unitList.size()) { - i = 0; - } - } else { - unit = newQueryUnit(subQuery); - unitList.add(unit); - } - unit.addFetches(scan.getTableId(), e.getValue()); - } - return unitList; - } - - private List<QueryUnit> assignFetchesByRange(SubQuery subQuery, - List<QueryUnit> unitList, - ScanNode scan, - List<URI> uriList, - int n, - Schema rangeSchema, - boolean ascendingFirstKey) - throws UnsupportedEncodingException { - Map<TupleRange, Set<URI>> partitions = - rangeFetches(rangeSchema, uriList, ascendingFirstKey); - // grouping urls by range - QueryUnit unit; - int i = 0; - Iterator<Entry<TupleRange, Set<URI>>> it = partitions.entrySet().iterator(); - Entry<TupleRange, Set<URI>> e; - while (it.hasNext()) { - e = it.next(); - if (unitList.size() == n) { - unit = unitList.get(i++); - if (i == unitList.size()) { - i = 0; - } - } else { - unit = newQueryUnit(subQuery); - unitList.add(unit); - } - unit.addFetches(scan.getTableId(), e.getValue()); - } - return unitList; - } - - /** - * Unary QueryUnitë¤ì list íí°ì ë fetch를 í ë¹ - * - * @param subQuery - * @param unitList - * @param scan - * @param uriList - * @param n - * @return - */ - private List<QueryUnit> assignFetchesByRoundRobin(SubQuery subQuery, - List<QueryUnit> unitList, ScanNode scan, List<URI> uriList, int n) { - QueryUnit unit; - int i = 0; - for (URI uri : uriList) { - if (unitList.size() < n) { - unit = newQueryUnit(subQuery); - unitList.add(unit); - } else { - unit = unitList.get(i++); - if (i == unitList.size()) { - i = 0; - } - } - unit.addFetch(scan.getTableId(), uri); - } - return unitList; - } - - @VisibleForTesting - public static Map<String, Map<ScanNode, List<URI>>> hashFetches(Map<ScanNode, List<URI>> uriMap) { - SortedMap<String, Map<ScanNode, List<URI>>> hashed = - new TreeMap<String, Map<ScanNode, List<URI>>>(); - String uriPath, key; - Map<ScanNode, List<URI>> m; - List<URI> uriList; - for (Entry<ScanNode, List<URI>> e : uriMap.entrySet()) { - for (URI uri : e.getValue()) { - uriPath = uri.toString(); - key = uriPath.substring(uriPath.lastIndexOf("=")+1); - if (hashed.containsKey(key)) { - m = hashed.get(key); - } else { - m = new HashMap<ScanNode, List<URI>>(); - } - if (m.containsKey(e.getKey())) { - uriList = m.get(e.getKey()); - } else { - uriList = new ArrayList<URI>(); - } - uriList.add(uri); - m.put(e.getKey(), uriList); - hashed.put(key, m); - } - } - - SortedMap<String, Map<ScanNode, List<URI>>> finalHashed = new TreeMap<String, Map<ScanNode,List<URI>>>(); - for (Entry<String, Map<ScanNode, List<URI>>> entry : hashed.entrySet()) { - finalHashed.put(entry.getKey(), combineURIByHostForBinary(entry.getValue())); - } - - return finalHashed; - } - - private static Map<ScanNode, List<URI>> combineURIByHostForBinary(Map<ScanNode, List<URI>> hashed) { - Map<ScanNode, List<URI>> finalHashed = Maps.newHashMap(); - for (Entry<ScanNode, List<URI>> urisByKey : hashed.entrySet()) { - Map<String, List<String>> param = new QueryStringDecoder(urisByKey.getValue().get(0)).getParameters(); - QueryUnitAttemptId quid = new QueryUnitAttemptId( - new QueryStringDecoder(urisByKey.getValue().get(0)).getParameters().get("qid").get(0)); - SubQueryId sid = quid.getSubQueryId(); - String fn = param.get("fn").get(0); - Map<String, List<String>> quidByHost = Maps.newHashMap(); - for(URI uri : urisByKey.getValue()) { - final Map<String,List<String>> params = - new QueryStringDecoder(uri).getParameters(); - if (quidByHost.containsKey(uri.getHost() + ":" + uri.getPort())) { - quidByHost.get(uri.getHost() + ":" + uri.getPort()).add(params.get("qid").get(0)); - } else { - quidByHost.put(uri.getHost() + ":" + uri.getPort(), Lists.newArrayList(params.get("qid"))); - } - } - - finalHashed.put(urisByKey.getKey(), mergeURI(quidByHost, sid.toString(), fn)); - } - - return finalHashed; - } - - - @VisibleForTesting - public static Map<String, List<URI>> hashFetches(SubQueryId sid, List<URI> uriList) { - SortedMap<String, List<URI>> hashed = new TreeMap<String, List<URI>>(); - String uriPath, key; - for (URI uri : uriList) { - // TODO - uriPath = uri.toString(); - key = uriPath.substring(uriPath.lastIndexOf("=")+1); - if (hashed.containsKey(key)) { - hashed.get(key).add(uri); - } else { - List<URI> list = new ArrayList<URI>(); - list.add(uri); - hashed.put(key, list); - } - } - - return combineURIByHost(hashed); - } - - private static Map<String, List<URI>> combineURIByHost(Map<String, List<URI>> hashed) { - Map<String, List<URI>> finalHashed = Maps.newTreeMap(); - for (Entry<String, List<URI>> urisByKey : hashed.entrySet()) { - QueryUnitAttemptId quid = new QueryUnitAttemptId( - new QueryStringDecoder(urisByKey.getValue().get(0)).getParameters().get("qid").get(0)); - SubQueryId sid = quid.getSubQueryId(); - Map<String,List<String>> quidByHost = Maps.newHashMap(); - for(URI uri : urisByKey.getValue()) { - final Map<String,List<String>> params = - new QueryStringDecoder(uri).getParameters(); - if (quidByHost.containsKey(uri.getHost() + ":" + uri.getPort())) { - quidByHost.get(uri.getHost() + ":" + uri.getPort()).add(params.get("qid").get(0)); - } else { - quidByHost.put(uri.getHost() + ":" + uri.getPort(), Lists.newArrayList(params.get("qid"))); - } - } - finalHashed.put(urisByKey.getKey(), mergeURI(quidByHost, sid.toString(), urisByKey.getKey())); - } - return finalHashed; - } - - private static List<URI> mergeURI(Map<String, List<String>> quidByKey, String sid, String fn) { - List<URI> uris = Lists.newArrayList(); - for (Entry<String, List<String>> quidByHost : quidByKey.entrySet()) { - StringBuilder sb = new StringBuilder("http://") - .append(quidByHost.getKey()).append("/").append("?fn="+fn).append("&sid="+sid); - sb.append("&qid="); - boolean first = true; - for (String qid : quidByHost.getValue()) { - if (first) { - first = false; - } else { - sb.append(","); - } - - QueryUnitAttemptId quid = new QueryUnitAttemptId(qid); - sb.append(quid.getQueryUnitId().getId() + "_"); - sb.append(quid.getId()); - } - uris.add(URI.create(sb.toString())); - } - return uris; - } - - private Map<TupleRange, Set<URI>> rangeFetches(final Schema schema, - final List<URI> uriList, - final boolean ascendingFirstKey) - throws UnsupportedEncodingException { - SortedMap<TupleRange, Set<URI>> map; - if (ascendingFirstKey) { - map = new TreeMap<TupleRange, Set<URI>>(); - } else { - map = new TreeMap<TupleRange, Set<URI>>(new TupleRange.DescendingTupleRangeComparator()); - } - TupleRange range; - Set<URI> uris; - for (URI uri : uriList) { - // URI.getQuery() returns a url-decoded query string. - range = TupleUtil.queryToRange(schema, uri.getQuery()); - if (map.containsKey(range)) { - uris = map.get(range); - uris.add(uri); - } else { - uris = Sets.newHashSet(); - uris.add(uri); - map.put(range, uris); - } - } - - return map; - } - - /** - * Unary QueryUnitë¤ì ëíì¬ ëì¼í fragment를 í ë¹ - * - * @param unitList - * @param scan - * @param frag - * @return - */ - private List<QueryUnit> assignEqualFragment(List<QueryUnit> unitList, - ScanNode scan, Fragment frag) { - for (int i = 0; i < unitList.size(); i++) { - unitList.get(i).setFragment(scan.getTableId(), frag); - } - - return unitList; - } - - /** - * Binary QueryUnitë¤ì ëíì¬ scanë³ë¡ ëì¼í fragment를 í ë¹ - * @param unitList - * @param fragMap - * @return - */ - private List<QueryUnit> assignEqualFragment(List<QueryUnit> unitList, - Map<ScanNode, List<Fragment>> fragMap) { - for (int i = 0; i < unitList.size(); i++) { - for (ScanNode scan : fragMap.keySet()) { - unitList.get(i).setFragment(scan.getTableId(), - fragMap.get(scan).get(0)); - } - } - return unitList; - } } http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2923f4e3/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java index ebd9d07..efe0562 100644 --- a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java +++ b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java @@ -18,13 +18,10 @@ package tajo.engine.plan.global; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.zookeeper.KeeperException; -import org.jboss.netty.handler.codec.http.QueryStringDecoder; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -40,7 +37,6 @@ import tajo.datum.Datum; import tajo.datum.DatumFactory; import tajo.engine.eval.TestEvalTree.TestSum; import tajo.engine.parser.QueryAnalyzer; -import tajo.engine.parser.QueryBlock; import tajo.engine.planner.LogicalOptimizer; import tajo.engine.planner.LogicalPlanner; import tajo.engine.planner.PlanningContext; @@ -53,10 +49,7 @@ import tajo.master.TajoMaster; import tajo.storage.*; import java.io.IOException; -import java.net.URI; import java.util.Iterator; -import java.util.List; -import java.util.Map; import static org.junit.Assert.*; @@ -389,140 +382,4 @@ public class TestGlobalQueryPlanner { assertEquals(ExprType.SCAN, groupby.getSubNode().getType()); } } - - @Test - public void testHashFetches() { - URI[] uris = { - URI.create("http://192.168.0.21:35385/?qid=query_20120726371_000_001_003_000835_00&fn=0"), - URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_003_001064_00&fn=0"), - URI.create("http://192.168.0.21:35385/?qid=query_20120726371_000_001_003_001059_00&fn=0"), - URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_003_000104_00&fn=0"), - URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_003_000104_00&fn=1"), - URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_003_001059_00&fn=1") - }; - - Map<String, List<URI>> hashed = GlobalPlanner.hashFetches(null, Lists.newArrayList(uris)); - assertEquals(2, hashed.size()); - List<URI> res = hashed.get("0"); - assertEquals(2, res.size()); - res = hashed.get("1"); - assertEquals(1, res.size()); - QueryStringDecoder decoder = new QueryStringDecoder(res.get(0)); - Map<String, List<String>> params = decoder.getParameters(); - String [] qids = params.get("qid").get(0).split(","); - assertEquals(2, qids.length); - assertEquals("104_0", qids[0]); - assertEquals("1059_0", qids[1]); - } - - @Test - public void testHashFetchesForBinary() { - URI[] urisOuter = { - URI.create("http://192.168.0.21:35385/?qid=query_20120726371_000_001_003_000835_00&fn=0"), - URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_003_001064_00&fn=0"), - URI.create("http://192.168.0.21:35385/?qid=query_20120726371_000_001_003_001059_00&fn=0"), - URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_003_000104_00&fn=0"), - URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_003_000104_00&fn=1"), - URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_003_001059_00&fn=1") - }; - - URI[] urisInner = { - URI.create("http://192.168.0.21:35385/?qid=query_20120726371_000_001_004_000111_00&fn=0"), - URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_004_000123_00&fn=0"), - URI.create("http://192.168.0.17:35385/?qid=query_20120726371_000_001_004_00134_00&fn=0"), - URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_004_000155_00&fn=0"), - URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_004_000255_00&fn=1"), - URI.create("http://192.168.0.8:55205/?qid=query_20120726371_000_001_004_001356_00&fn=1") - }; - - Schema schema1 = new Schema(); - schema1.addColumn("col1", Type.INT4); - TableMeta meta1 = new TableMetaImpl(schema1, StoreType.CSV, Options.create()); - TableDesc desc1 = new TableDescImpl("table1", meta1, new Path("/")); - TableDesc desc2 = new TableDescImpl("table2", meta1, new Path("/")); - - QueryBlock.FromTable table1 = new QueryBlock.FromTable(desc1); - QueryBlock.FromTable table2 = new QueryBlock.FromTable(desc2); - ScanNode scan1 = new ScanNode(table1); - ScanNode scan2 = new ScanNode(table2); - - Map<ScanNode, List<URI>> uris = Maps.newHashMap(); - uris.put(scan1, Lists.newArrayList(urisOuter)); - uris.put(scan2, Lists.newArrayList(urisInner)); - - Map<String, Map<ScanNode, List<URI>>> hashed = GlobalPlanner.hashFetches(uris); - assertEquals(2, hashed.size()); - assertTrue(hashed.keySet().contains("0")); - assertTrue(hashed.keySet().contains("1")); - - assertTrue(hashed.get("0").containsKey(scan1)); - assertTrue(hashed.get("0").containsKey(scan2)); - - assertEquals(2, hashed.get("0").get(scan1).size()); - assertEquals(3, hashed.get("0").get(scan2).size()); - - QueryStringDecoder decoder = new QueryStringDecoder(hashed.get("0").get(scan1).get(0)); - Map<String, List<String>> params = decoder.getParameters(); - String [] qids = params.get("qid").get(0).split(","); - assertEquals(2, qids.length); - assertEquals("1064_0", qids[0]); - assertEquals("104_0", qids[1]); - - decoder = new QueryStringDecoder(hashed.get("0").get(scan1).get(1)); - params = decoder.getParameters(); - qids = params.get("qid").get(0).split(","); - assertEquals(2, qids.length); - assertEquals("835_0", qids[0]); - assertEquals("1059_0", qids[1]); - } - - @Test - public void testCreateMultilevelGroupby() - throws IOException, CloneNotSupportedException { - PlanningContext context = analyzer.parse( - "create table store1 as select age, sumtest(salary) from table0 group by age"); - LogicalNode plan = logicalPlanner.createPlan(context); - plan = LogicalOptimizer.optimize(context, plan); - - MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) plan); - - ExecutionBlock second, first, mid; - ScanNode secondScan, firstScan, midScan; - - second = globalPlan.getRoot(); - assertTrue(second.getScanNodes().length == 1); - - first = second.getChildBlock(second.getScanNodes()[0]); - - GroupbyNode firstGroupby, secondGroupby, midGroupby; - secondGroupby = (GroupbyNode) second.getStoreTableNode().getSubNode(); - - Column[] originKeys = secondGroupby.getGroupingColumns(); - Column[] newKeys = new Column[2]; - newKeys[0] = new Column("age", Type.INT4); - newKeys[1] = new Column("name", Type.TEXT); - - mid = planner.createMultilevelGroupby(first, newKeys); - midGroupby = (GroupbyNode) mid.getStoreTableNode().getSubNode(); - firstGroupby = (GroupbyNode) first.getStoreTableNode().getSubNode(); - - secondScan = second.getScanNodes()[0]; - midScan = mid.getScanNodes()[0]; - firstScan = first.getScanNodes()[0]; - - assertTrue(first.getParentBlock().equals(mid)); - assertTrue(mid.getParentBlock().equals(second)); - assertTrue(second.getChildBlock(secondScan).equals(mid)); - assertTrue(mid.getChildBlock(midScan).equals(first)); - assertEquals(first.getOutputName(), midScan.getTableId()); - assertEquals(first.getOutputSchema(), midScan.getInSchema()); - assertEquals(mid.getOutputName(), secondScan.getTableId()); - assertEquals(mid.getOutputSchema(), secondScan.getOutSchema()); - assertArrayEquals(newKeys, firstGroupby.getGroupingColumns()); - assertArrayEquals(newKeys, midGroupby.getGroupingColumns()); - assertArrayEquals(originKeys, secondGroupby.getGroupingColumns()); - assertFalse(firstScan.isLocal()); - assertTrue(midScan.isLocal()); - assertTrue(secondScan.isLocal()); - } }
