http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java index d9712c9..67f782a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java @@ -1362,6 +1362,15 @@ public class GlobalPlanner { } @Override + public LogicalNode visitIndexScan(GlobalPlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + IndexScanNode node, Stack<LogicalNode> stack) throws TajoException { + ExecutionBlock newBlock = context.plan.newExecutionBlock(); + newBlock.setPlan(node); + context.execBlockMap.put(node.getPID(), newBlock); + return node; + } + + @Override public LogicalNode visitPartitionedTableScan(GlobalPlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, PartitionedTableScanNode node, Stack<LogicalNode> stack)throws TajoException { @@ -1407,5 +1416,19 @@ public class GlobalPlanner { return node; } + + @Override + public LogicalNode visitCreateIndex(GlobalPlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock queryBlock, + CreateIndexNode node, Stack<LogicalNode> stack) throws TajoException { + LogicalNode child = super.visitCreateIndex(context, plan, queryBlock, node, stack); + + // Don't separate execution block. CreateIndex is pushed to the first execution block. + ExecutionBlock childBlock = context.execBlockMap.remove(child.getPID()); + node.setChild(childBlock.getPlan()); + childBlock.setPlan(node); + context.execBlockMap.put(node.getPID(), childBlock); + + return node; + } } }
http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java index 28622d7..a59960f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java @@ -18,23 +18,32 @@ package org.apache.tajo.engine.planner.physical; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; +import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.datum.Datum; import org.apache.tajo.engine.planner.Projector; +import org.apache.tajo.plan.Target; import org.apache.tajo.plan.expr.EvalNode; -import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.plan.expr.EvalTreeUtil; +import org.apache.tajo.plan.logical.IndexScanNode; +import org.apache.tajo.plan.rewrite.rules.IndexScanInfo.SimplePredicate; import org.apache.tajo.storage.*; -import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.index.bst.BSTIndex; +import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; +import java.net.URI; +import java.util.HashSet; +import java.util.Set; public class BSTIndexScanExec extends PhysicalExec { - private ScanNode scanNode; + private IndexScanNode plan; private SeekableScanner fileScanner; private EvalNode qual; @@ -48,31 +57,122 @@ public class BSTIndexScanExec extends PhysicalExec { private Tuple indexLookupKey; - public BSTIndexScanExec(TaskAttemptContext context, ScanNode scanNode , - FileFragment fragment, Path fileName , Schema keySchema, - TupleComparator comparator , Datum[] datum) throws IOException { - super(context, scanNode.getInSchema(), scanNode.getOutSchema()); - this.scanNode = scanNode; - this.qual = scanNode.getQual(); - indexLookupKey = new VTuple(datum); - - this.fileScanner = OldStorageManager.getSeekableScanner(context.getConf(), - scanNode.getTableDesc().getMeta(), scanNode.getInSchema(), fragment, outSchema); - this.fileScanner.init(); - this.projector = new Projector(context, inSchema, outSchema, scanNode.getTargets()); - - FileSystem fs = fileName.getFileSystem(context.getConf()); - this.reader = new BSTIndex(fs.getConf()). - getIndexReader(fileName, keySchema, comparator); + private TableStats inputStats; + + private CatalogProtos.FragmentProto fragment; + + private Schema keySchema; + + public BSTIndexScanExec(TaskAttemptContext context, IndexScanNode plan, + CatalogProtos.FragmentProto fragment, URI indexPrefix , Schema keySchema, + SimplePredicate [] predicates) throws IOException { + super(context, plan.getInSchema(), plan.getOutSchema()); + this.plan = plan; + this.qual = plan.getQual(); + this.fragment = fragment; + this.keySchema = keySchema; + + SortSpec[] keySortSpecs = new SortSpec[predicates.length]; + Datum[] values = new Datum[predicates.length]; + for (int i = 0; i < predicates.length; i++) { + keySortSpecs[i] = predicates[i].getKeySortSpec(); + values[i] = predicates[i].getValue(); + } + indexLookupKey = new VTuple(values); + + TupleComparator comparator = new BaseTupleComparator(keySchema, + keySortSpecs); + + this.projector = new Projector(context, inSchema, outSchema, plan.getTargets()); + + Path indexPath = new Path(indexPrefix.toString(), context.getUniqueKeyFromFragments()); + this.reader = new BSTIndex(context.getConf()). + getIndexReader(indexPath, keySchema, comparator); this.reader.open(); } + private static Schema mergeSubSchemas(Schema originalSchema, Schema subSchema, Target[] targets, EvalNode qual) { + Schema mergedSchema = new Schema(); + Set<Column> qualAndTargets = TUtil.newHashSet(); + qualAndTargets.addAll(EvalTreeUtil.findUniqueColumns(qual)); + for (Target target : targets) { + qualAndTargets.addAll(EvalTreeUtil.findUniqueColumns(target.getEvalTree())); + } + for (Column column : originalSchema.getRootColumns()) { + if (subSchema.contains(column) + || qualAndTargets.contains(column) + || qualAndTargets.contains(column)) { + mergedSchema.addColumn(column); + } + } + return mergedSchema; + } + @Override public void init() throws IOException { + Schema projected; + + // in the case where projected column or expression are given + // the target can be an empty list. + if (plan.hasTargets()) { + projected = new Schema(); + Set<Column> columnSet = new HashSet<Column>(); + + if (plan.hasQual()) { + columnSet.addAll(EvalTreeUtil.findUniqueColumns(qual)); + } + + for (Target t : plan.getTargets()) { + columnSet.addAll(EvalTreeUtil.findUniqueColumns(t.getEvalTree())); + } + + for (Column column : inSchema.getAllColumns()) { + if (columnSet.contains(column)) { + projected.addColumn(column); + } + } + + } else { + // no any projected columns, meaning that all columns should be projected. + // TODO - this implicit rule makes code readability bad. So, we should remove it later + projected = outSchema; + } + + initScanner(projected); super.init(); progress = 0.0f; - if (qual != null) { - qual.bind(context.getEvalContext(), inSchema); + + if (plan.hasQual()) { + if (fileScanner.isProjectable()) { + qual.bind(context.getEvalContext(), projected); + } else { + qual.bind(context.getEvalContext(), inSchema); + } + } + } + + private void initScanner(Schema projected) throws IOException { + + // Why we should check nullity? See https://issues.apache.org/jira/browse/TAJO-1422 + if (fragment != null) { + + Schema fileScanOutSchema = mergeSubSchemas(projected, keySchema, plan.getTargets(), qual); + + this.fileScanner = OldStorageManager.getStorageManager(context.getConf(), + plan.getTableDesc().getMeta().getStoreType()) + .getSeekableScanner(plan.getTableDesc().getMeta(), plan.getPhysicalSchema(), fragment, fileScanOutSchema); + this.fileScanner.init(); + + // See Scanner.isProjectable() method Depending on the result of isProjectable(), + // the width of retrieved tuple is changed. + // + // If TRUE, the retrieved tuple will contain only projected fields. + // If FALSE, the retrieved tuple will contain projected fields and NullDatum for non-projected fields. + if (fileScanner.isProjectable()) { + this.projector = new Projector(context, projected, outSchema, plan.getTargets()); + } else { + this.projector = new Projector(context, inSchema, outSchema, plan.getTargets()); + } } } @@ -102,8 +202,9 @@ public class BSTIndexScanExec extends PhysicalExec { fileScanner.seek(offset); } } + Tuple tuple; - if (!scanNode.hasQual()) { + if (!plan.hasQual()) { if ((tuple = fileScanner.next()) != null) { return projector.eval(tuple); } else { @@ -115,8 +216,11 @@ public class BSTIndexScanExec extends PhysicalExec { return projector.eval(tuple); } else { long offset = reader.next(); - if (offset == -1) return null; + if (offset == -1) { + return null; + } else fileScanner.seek(offset); + return null; } } } @@ -131,9 +235,19 @@ public class BSTIndexScanExec extends PhysicalExec { @Override public void close() throws IOException { IOUtils.cleanup(null, reader, fileScanner); + if (fileScanner != null) { + try { + TableStats stats = fileScanner.getInputStats(); + if (stats != null) { + inputStats = (TableStats) stats.clone(); + } + } catch (CloneNotSupportedException e) { + e.printStackTrace(); + } + } reader = null; fileScanner = null; - scanNode = null; + plan = null; qual = null; projector = null; indexLookupKey = null; @@ -143,4 +257,13 @@ public class BSTIndexScanExec extends PhysicalExec { public float getProgress() { return progress; } + + @Override + public TableStats getInputStats() { + if (fileScanner != null) { + return fileScanner.getInputStats(); + } else { + return inputStats; + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java index 8a79005..488c3ac 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java @@ -54,7 +54,9 @@ public class ProjectionExec extends UnaryPhysicalExec { return null; } - return projector.eval(tuple); + Tuple outTuple = projector.eval(tuple); + outTuple.setOffset(tuple.getOffset()); + return outTuple; } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java new file mode 100644 index 0000000..f9db842 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.plan.logical.CreateIndexNode; +import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.index.bst.BSTIndex; +import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter; +import org.apache.tajo.worker.TaskAttemptContext; + +import java.io.IOException; +import java.util.Arrays; + +public class StoreIndexExec extends UnaryPhysicalExec { + private static final Log LOG = LogFactory.getLog(StoreIndexExec.class); + private BSTIndexWriter indexWriter; + private final CreateIndexNode logicalPlan; + private int[] indexKeys = null; + private Schema keySchema; + private TupleComparator comparator; + + public StoreIndexExec(final TaskAttemptContext context, final CreateIndexNode logicalPlan, + final PhysicalExec child) { + super(context, logicalPlan.getInSchema(), logicalPlan.getOutSchema(), child); + this.logicalPlan = logicalPlan; + } + + @Override + public void init() throws IOException { + super.init(); + + SortSpec[] sortSpecs = logicalPlan.getKeySortSpecs(); + indexKeys = new int[sortSpecs.length]; + keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs); + + Column col; + for (int i = 0 ; i < sortSpecs.length; i++) { + col = sortSpecs[i].getSortKey(); + indexKeys[i] = inSchema.getColumnId(col.getQualifiedName()); + } + + TajoConf conf = context.getConf(); + Path indexPath = new Path(logicalPlan.getIndexPath().toString(), context.getUniqueKeyFromFragments()); + // TODO: Create factory using reflection + BSTIndex bst = new BSTIndex(conf); + this.comparator = new BaseTupleComparator(keySchema, sortSpecs); + this.indexWriter = bst.getIndexWriter(indexPath, BSTIndex.TWO_LEVEL_INDEX, keySchema, comparator); + this.indexWriter.setLoadNum(100); + this.indexWriter.open(); + } + + @Override + public Tuple next() throws IOException { + Tuple tuple; + Tuple keyTuple; + long offset; + + while((tuple = child.next()) != null) { + offset = tuple.getOffset(); + keyTuple = new VTuple(keySchema.size()); + RowStoreUtil.project(tuple, keyTuple, indexKeys); + indexWriter.write(keyTuple, offset); + } + return null; + } + + @Override + public void close() throws IOException { + super.close(); + + indexWriter.flush(); + IOUtils.cleanup(LOG, indexWriter); + + indexWriter = null; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java index 6582513..54b6c5e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java @@ -18,11 +18,10 @@ package org.apache.tajo.engine.utils.test; -import org.apache.tajo.OverridableConf; import org.apache.tajo.exception.TajoException; import org.apache.tajo.plan.LogicalPlan; -import org.apache.tajo.plan.PlanningException; import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; +import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext; @SuppressWarnings("unused") public class ErrorInjectionRewriter implements LogicalPlanRewriteRule { @@ -32,12 +31,12 @@ public class ErrorInjectionRewriter implements LogicalPlanRewriteRule { } @Override - public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) { + public boolean isEligible(LogicalPlanRewriteRuleContext context) { return true; } @Override - public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws TajoException { + public LogicalPlan rewrite(LogicalPlanRewriteRuleContext context) throws TajoException { throw new NullPointerException(); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java index 9cd20b4..2ad45ba 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java @@ -96,7 +96,8 @@ public class GlobalEngine extends AbstractService { analyzer = new SQLAnalyzer(); preVerifier = new PreLogicalPlanVerifier(context.getCatalog()); planner = new LogicalPlanner(context.getCatalog(), TablespaceManager.getInstance()); - optimizer = new LogicalOptimizer(context.getConf()); + // Access path rewriter is enabled only in QueryMasterTask + optimizer = new LogicalOptimizer(context.getConf(), context.getCatalog()); annotatedPlanVerifier = new LogicalPlanVerifier(context.getConf(), context.getCatalog()); } catch (Throwable t) { LOG.error(t.getMessage(), t); http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index 78fc0f5..a597d32 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -36,8 +36,7 @@ import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.exception.UndefinedDatabaseException; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.catalog.proto.CatalogProtos.FunctionListResponse; -import org.apache.tajo.catalog.proto.CatalogProtos.TableResponse; +import org.apache.tajo.catalog.proto.CatalogProtos.*; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.query.QueryContext; @@ -65,7 +64,10 @@ import org.apache.tajo.util.ProtoUtil; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.apache.tajo.exception.ExceptionUtil.printStackTraceIfError; @@ -311,7 +313,6 @@ public class TajoMasterClientService extends AbstractService { .setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()) .setUserName(context.getConf().getVar(ConfVars.USERNAME)) .build(); - } } @@ -959,5 +960,200 @@ public class TajoMasterClientService extends AbstractService { .build(); } } + + @Override + public IndexResponse getIndexWithName(RpcController controller, SessionedStringProto request) + throws ServiceException { + try { + context.getSessionManager().touch(request.getSessionId().getId()); + Session session = context.getSessionManager().getSession(request.getSessionId().getId()); + + String indexName, databaseName; + if (CatalogUtil.isFQTableName(request.getValue())) { + String [] splitted = CatalogUtil.splitFQTableName(request.getValue()); + databaseName = splitted[0]; + indexName = splitted[1]; + } else { + databaseName = session.getCurrentDatabase(); + indexName = request.getValue(); + } + IndexDescProto indexProto = catalog.getIndexByName(databaseName, indexName).getProto(); + return IndexResponse.newBuilder() + .setState(OK) + .setIndexDesc(indexProto) + .build(); + + } catch (Throwable t) { + return IndexResponse.newBuilder() + .setState(returnError(t)) + .build(); + } + } + + @Override + public ReturnState existIndexWithName(RpcController controller, SessionedStringProto request) + throws ServiceException { + try { + context.getSessionManager().touch(request.getSessionId().getId()); + Session session = context.getSessionManager().getSession(request.getSessionId().getId()); + + String indexName, databaseName; + if (CatalogUtil.isFQTableName(request.getValue())) { + String [] splitted = CatalogUtil.splitFQTableName(request.getValue()); + databaseName = splitted[0]; + indexName = splitted[1]; + } else { + databaseName = session.getCurrentDatabase(); + indexName = request.getValue(); + } + + if (catalog.existIndexByName(databaseName, indexName)) { + return OK; + } else { + return errUndefinedIndexName(indexName); + } + } catch (Throwable t) { + return returnError(t); + } + } + + @Override + public IndexListResponse getIndexesForTable(RpcController controller, SessionedStringProto request) + throws ServiceException { + try { + context.getSessionManager().touch(request.getSessionId().getId()); + Session session = context.getSessionManager().getSession(request.getSessionId().getId()); + + String tableName, databaseName; + if (CatalogUtil.isFQTableName(request.getValue())) { + String [] splitted = CatalogUtil.splitFQTableName(request.getValue()); + databaseName = splitted[0]; + tableName = splitted[1]; + } else { + databaseName = session.getCurrentDatabase(); + tableName = request.getValue(); + } + + IndexListResponse.Builder builder = IndexListResponse.newBuilder().setState(OK); + for (IndexDesc index : catalog.getAllIndexesByTable(databaseName, tableName)) { + builder.addIndexDesc(index.getProto()); + } + return builder.build(); + } catch (Throwable t) { + return IndexListResponse.newBuilder() + .setState(returnError(t)) + .build(); + } + } + + @Override + public ReturnState existIndexesForTable(RpcController controller, SessionedStringProto request) + throws ServiceException { + try { + context.getSessionManager().touch(request.getSessionId().getId()); + Session session = context.getSessionManager().getSession(request.getSessionId().getId()); + + String tableName, databaseName; + if (CatalogUtil.isFQTableName(request.getValue())) { + String [] splitted = CatalogUtil.splitFQTableName(request.getValue()); + databaseName = splitted[0]; + tableName = splitted[1]; + } else { + databaseName = session.getCurrentDatabase(); + tableName = request.getValue(); + } + if (catalog.existIndexesByTable(databaseName, tableName)) { + return OK; + } else { + return errUndefinedIndex(tableName); + } + } catch (Throwable t) { + return returnError(t); + } + } + + @Override + public IndexResponse getIndexWithColumns(RpcController controller, GetIndexWithColumnsRequest request) + throws ServiceException { + try { + context.getSessionManager().touch(request.getSessionId().getId()); + Session session = context.getSessionManager().getSession(request.getSessionId().getId()); + + String tableName, databaseName; + if (CatalogUtil.isFQTableName(request.getTableName())) { + String [] splitted = CatalogUtil.splitFQTableName(request.getTableName()); + databaseName = splitted[0]; + tableName = splitted[1]; + } else { + databaseName = session.getCurrentDatabase(); + tableName = request.getTableName(); + } + String[] columnNames = new String[request.getColumnNamesCount()]; + columnNames = request.getColumnNamesList().toArray(columnNames); + + return IndexResponse.newBuilder() + .setState(OK) + .setIndexDesc(catalog.getIndexByColumnNames(databaseName, tableName, columnNames).getProto()) + .build(); + + } catch (Throwable t) { + return IndexResponse.newBuilder() + .setState(returnError(t)) + .build(); + } + } + + @Override + public ReturnState existIndexWithColumns(RpcController controller, GetIndexWithColumnsRequest request) + throws ServiceException { + try { + context.getSessionManager().touch(request.getSessionId().getId()); + Session session = context.getSessionManager().getSession(request.getSessionId().getId()); + + String tableName, databaseName; + if (CatalogUtil.isFQTableName(request.getTableName())) { + String [] splitted = CatalogUtil.splitFQTableName(request.getTableName()); + databaseName = splitted[0]; + tableName = splitted[1]; + } else { + databaseName = session.getCurrentDatabase(); + tableName = request.getTableName(); + } + String[] columnNames = new String[request.getColumnNamesCount()]; + columnNames = request.getColumnNamesList().toArray(columnNames); + if (catalog.existIndexByColumnNames(databaseName, tableName, columnNames)) { + return OK; + } else { + return errUndefinedIndex(tableName, request.getColumnNamesList()); + } + } catch (Throwable t) { + return returnError(t); + } + } + + @Override + public ReturnState dropIndex(RpcController controller, SessionedStringProto request) + throws ServiceException { + try { + context.getSessionManager().touch(request.getSessionId().getId()); + Session session = context.getSessionManager().getSession(request.getSessionId().getId()); + QueryContext queryContext = new QueryContext(conf, session); + + String indexName, databaseName; + if (CatalogUtil.isFQTableName(request.getValue())) { + String [] splitted = CatalogUtil.splitFQTableName(request.getValue()); + databaseName = splitted[0]; + indexName = splitted[1]; + } else { + databaseName = session.getCurrentDatabase(); + indexName = request.getValue(); + } + catalog.dropIndex(databaseName, indexName); + + return OK; + } catch (Throwable t) { + return returnError(t); + } + } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java index f6bb4f7..a535f94 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java @@ -72,45 +72,123 @@ public class DDLExecutor { switch (root.getType()) { - case ALTER_TABLESPACE: - AlterTablespaceNode alterTablespace = (AlterTablespaceNode) root; - alterTablespace(context, queryContext, alterTablespace); - return true; + case ALTER_TABLESPACE: + AlterTablespaceNode alterTablespace = (AlterTablespaceNode) root; + alterTablespace(context, queryContext, alterTablespace); + return true; - case CREATE_DATABASE: - CreateDatabaseNode createDatabase = (CreateDatabaseNode) root; - createDatabase(queryContext, createDatabase.getDatabaseName(), null, createDatabase.isIfNotExists()); - return true; - case DROP_DATABASE: - DropDatabaseNode dropDatabaseNode = (DropDatabaseNode) root; - dropDatabase(queryContext, dropDatabaseNode.getDatabaseName(), dropDatabaseNode.isIfExists()); - return true; + case CREATE_DATABASE: + CreateDatabaseNode createDatabase = (CreateDatabaseNode) root; + createDatabase(queryContext, createDatabase.getDatabaseName(), null, createDatabase.isIfNotExists()); + return true; + case DROP_DATABASE: + DropDatabaseNode dropDatabaseNode = (DropDatabaseNode) root; + dropDatabase(queryContext, dropDatabaseNode.getDatabaseName(), dropDatabaseNode.isIfExists()); + return true; - case CREATE_TABLE: - CreateTableNode createTable = (CreateTableNode) root; - createTable(queryContext, createTable, createTable.isIfNotExists()); - return true; - case DROP_TABLE: - DropTableNode dropTable = (DropTableNode) root; - dropTable(queryContext, dropTable.getTableName(), dropTable.isIfExists(), dropTable.isPurge()); - return true; - case TRUNCATE_TABLE: - TruncateTableNode truncateTable = (TruncateTableNode) root; - truncateTable(queryContext, truncateTable); - return true; + case CREATE_TABLE: + CreateTableNode createTable = (CreateTableNode) root; + createTable(queryContext, createTable, createTable.isIfNotExists()); + return true; + case DROP_TABLE: + DropTableNode dropTable = (DropTableNode) root; + dropTable(queryContext, dropTable.getTableName(), dropTable.isIfExists(), dropTable.isPurge()); + return true; + case TRUNCATE_TABLE: + TruncateTableNode truncateTable = (TruncateTableNode) root; + truncateTable(queryContext, truncateTable); + return true; - case ALTER_TABLE: - AlterTableNode alterTable = (AlterTableNode) root; - alterTable(context, queryContext, alterTable); - return true; + case ALTER_TABLE: + AlterTableNode alterTable = (AlterTableNode) root; + alterTable(context, queryContext, alterTable); + return true; + + case CREATE_INDEX: + CreateIndexNode createIndex = (CreateIndexNode) root; + createIndex(queryContext, createIndex); + return true; + + case DROP_INDEX: + DropIndexNode dropIndexNode = (DropIndexNode) root; + dropIndex(queryContext, dropIndexNode); + return true; default: throw new InternalError("updateQuery cannot handle such query: \n" + root.toJson()); } } + public void createIndex(final QueryContext queryContext, final CreateIndexNode createIndexNode) { + String databaseName, simpleIndexName, qualifiedIndexName; + if (CatalogUtil.isFQTableName(createIndexNode.getIndexName())) { + String [] splits = CatalogUtil.splitFQTableName(createIndexNode.getIndexName()); + databaseName = splits[0]; + simpleIndexName = splits[1]; + qualifiedIndexName = createIndexNode.getIndexName(); + } else { + databaseName = queryContext.getCurrentDatabase(); + simpleIndexName = createIndexNode.getIndexName(); + qualifiedIndexName = CatalogUtil.buildFQName(databaseName, simpleIndexName); + } + + if (catalog.existIndexByName(databaseName, simpleIndexName)) { + throw new DuplicateIndexException(simpleIndexName); + } + + ScanNode scanNode = PlannerUtil.findTopNode(createIndexNode, NodeType.SCAN); + if (scanNode == null) { + throw new InternalError("Cannot find the table of the relation"); + } + + IndexDesc indexDesc = new IndexDesc(databaseName, CatalogUtil.extractSimpleName(scanNode.getTableName()), + simpleIndexName, createIndexNode.getIndexPath(), + createIndexNode.getKeySortSpecs(), createIndexNode.getIndexMethod(), + createIndexNode.isUnique(), false, scanNode.getLogicalSchema()); + + if (catalog.createIndex(indexDesc)) { + LOG.info("Index " + qualifiedIndexName + " is created for the table " + scanNode.getTableName() + "."); + } else { + LOG.info("Index creation " + qualifiedIndexName + " is failed."); + throw new TajoInternalError("Cannot create index \"" + qualifiedIndexName + "\"."); + } + } + + public void dropIndex(final QueryContext queryContext, final DropIndexNode dropIndexNode) { + String databaseName, simpleIndexName; + if (CatalogUtil.isFQTableName(dropIndexNode.getIndexName())) { + String [] splits = CatalogUtil.splitFQTableName(dropIndexNode.getIndexName()); + databaseName = splits[0]; + simpleIndexName = splits[1]; + } else { + databaseName = queryContext.getCurrentDatabase(); + simpleIndexName = dropIndexNode.getIndexName(); + } + + if (!catalog.existIndexByName(databaseName, simpleIndexName)) { + throw new UndefinedIndexException(simpleIndexName); + } + + IndexDesc desc = catalog.getIndexByName(databaseName, simpleIndexName); + + if (!catalog.dropIndex(databaseName, simpleIndexName)) { + LOG.info("Cannot drop index \"" + simpleIndexName + "\"."); + throw new TajoInternalError("Cannot drop index \"" + simpleIndexName + "\"."); + } + + Path indexPath = new Path(desc.getIndexPath()); + try { + FileSystem fs = indexPath.getFileSystem(context.getConf()); + fs.delete(indexPath, true); + } catch (IOException e) { + throw new InternalError(e.getMessage()); + } + + LOG.info("Index " + simpleIndexName + " is dropped."); + } + /** * Alter a given table */ http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java index 228c0b8..b348265 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java @@ -25,7 +25,11 @@ import org.apache.commons.logging.LogFactory; import org.apache.tajo.QueryId; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TaskId; -import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.proto.CatalogProtos.*; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.common.TajoDataTypes.DataType; @@ -296,43 +300,35 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult return tuples; } - + private List<Tuple> getIndexes(Schema outSchema) { - List<IndexProto> indexList = masterContext.getCatalog().getAllIndexes(); + List<IndexDescProto> indexList = masterContext.getCatalog().getAllIndexes(); List<Tuple> tuples = new ArrayList<Tuple>(indexList.size()); List<Column> columns = outSchema.getRootColumns(); Tuple aTuple; - for (IndexProto index: indexList) { + for (IndexDescProto index: indexList) { aTuple = new VTuple(outSchema.size()); - + for (int fieldId = 0; fieldId < columns.size(); fieldId++) { Column column = columns.get(fieldId); - + if ("db_id".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createInt4(index.getDbId())); + aTuple.put(fieldId, DatumFactory.createInt4(index.getTableIdentifier().getDbId())); } else if ("tid".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createInt4(index.getTId())); + aTuple.put(fieldId, DatumFactory.createInt4(index.getTableIdentifier().getTid())); } else if ("index_name".equalsIgnoreCase(column.getSimpleName())) { aTuple.put(fieldId, DatumFactory.createText(index.getIndexName())); - } else if ("column_name".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createText(index.getColumnName())); - } else if ("data_type".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createText(index.getDataType())); - } else if ("index_type".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createText(index.getIndexType())); - } else if ("is_unique".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createBool(index.getIsUnique())); - } else if ("is_clustered".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createBool(index.getIsClustered())); - } else if ("is_ascending".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createBool(index.getIsAscending())); + } else if ("index_method".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(index.getIndexMethod().name())); + } else if ("index_path".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(index.getIndexPath())); } } - + tuples.add(aTuple); } - + return tuples; } http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index 42e5f61..e7fc4d2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -25,10 +25,8 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.tajo.*; -import org.apache.tajo.catalog.CatalogService; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.exception.DuplicateIndexException; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.common.TajoDataTypes; @@ -71,8 +69,8 @@ import java.net.URI; import java.util.ArrayList; import java.util.List; -import static org.apache.tajo.exception.ReturnStateUtil.errUndefinedDatabase; import static org.apache.tajo.exception.ReturnStateUtil.OK; +import static org.apache.tajo.exception.ReturnStateUtil.errUndefinedDatabase; public class QueryExecutor { private static final Log LOG = LogFactory.getLog(QueryExecutor.class); @@ -105,10 +103,17 @@ public class QueryExecutor { } else if (PlannerUtil.checkIfDDLPlan(rootNode)) { - ddlExecutor.execute(queryContext, plan); - response.setState(OK); - response.setResultType(ResultType.NO_RESULT); + if (PlannerUtil.isDistExecDDL(rootNode)) { + if (rootNode.getChild().getType() == NodeType.CREATE_INDEX) { + checkIndexExistence(queryContext, (CreateIndexNode) rootNode.getChild()); + } + executeDistributedQuery(queryContext, session, plan, sql, jsonExpr, response); + } else { + ddlExecutor.execute(queryContext, plan); + response.setState(OK); + response.setResultType(ResultType.NO_RESULT); + } } else if (plan.isExplain()) { // explain query execExplain(session, sql, plan, queryContext, plan.isExplainGlobal(), response); @@ -135,7 +140,7 @@ public class QueryExecutor { public void execSetSession(Session session, LogicalPlan plan, SubmitQueryResponse.Builder response) { - SetSessionNode setSessionNode = ((LogicalRootNode)plan.getRootBlock().getRoot()).getChild(); + SetSessionNode setSessionNode = ((LogicalRootNode) plan.getRootBlock().getRoot()).getChild(); final String varName = setSessionNode.getName(); @@ -523,6 +528,25 @@ public class QueryExecutor { " is forwarded to " + queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort()); } + private void checkIndexExistence(final QueryContext queryContext, final CreateIndexNode createIndexNode) + throws IOException { + String databaseName, simpleIndexName, qualifiedIndexName; + if (CatalogUtil.isFQTableName(createIndexNode.getIndexName())) { + String[] splits = CatalogUtil.splitFQTableName(createIndexNode.getIndexName()); + databaseName = splits[0]; + simpleIndexName = splits[1]; + qualifiedIndexName = createIndexNode.getIndexName(); + } else { + databaseName = queryContext.getCurrentDatabase(); + simpleIndexName = createIndexNode.getIndexName(); + qualifiedIndexName = CatalogUtil.buildFQName(databaseName, simpleIndexName); + } + + if (catalog.existIndexByName(databaseName, simpleIndexName)) { + throw new DuplicateIndexException(qualifiedIndexName); + } + } + public MasterPlan compileMasterPlan(LogicalPlan plan, QueryContext context, GlobalPlanner planner) throws Exception { http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java index 6fc4ea9..e3629c7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java @@ -33,6 +33,8 @@ import org.apache.tajo.QueryId; import org.apache.tajo.QueryVars; import org.apache.tajo.SessionVars; import org.apache.tajo.TajoProtos.QueryState; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.exception.CatalogException; import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto; import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.TableDesc; @@ -43,6 +45,7 @@ import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.ExecutionBlockCursor; import org.apache.tajo.engine.planner.global.ExecutionQueue; import org.apache.tajo.engine.planner.global.MasterPlan; +import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.plan.logical.*; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.master.event.*; @@ -513,6 +516,7 @@ public class Query implements EventHandler<QueryEvent> { hookList.add(new MaterializedResultHook()); hookList.add(new CreateTableHook()); hookList.add(new InsertTableHook()); + hookList.add(new CreateIndexHook()); } public void execute(QueryContext queryContext, Query query, @@ -526,6 +530,48 @@ public class Query implements EventHandler<QueryEvent> { } } + private static class CreateIndexHook implements QueryHook { + + @Override + public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) { + Stage lastStage = query.getStage(finalExecBlockId); + return lastStage.getBlock().getPlan().getType() == NodeType.CREATE_INDEX; + } + + @Override + public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception { + CatalogService catalog = context.getWorkerContext().getCatalog(); + Stage lastStage = query.getStage(finalExecBlockId); + + CreateIndexNode createIndexNode = (CreateIndexNode) lastStage.getBlock().getPlan(); + String databaseName, simpleIndexName, qualifiedIndexName; + if (CatalogUtil.isFQTableName(createIndexNode.getIndexName())) { + String [] splits = CatalogUtil.splitFQTableName(createIndexNode.getIndexName()); + databaseName = splits[0]; + simpleIndexName = splits[1]; + qualifiedIndexName = createIndexNode.getIndexName(); + } else { + databaseName = queryContext.getCurrentDatabase(); + simpleIndexName = createIndexNode.getIndexName(); + qualifiedIndexName = CatalogUtil.buildFQName(databaseName, simpleIndexName); + } + ScanNode scanNode = PlannerUtil.findTopNode(createIndexNode, NodeType.SCAN); + if (scanNode == null) { + throw new IOException("Cannot find the table of the relation"); + } + IndexDesc indexDesc = new IndexDesc(databaseName, CatalogUtil.extractSimpleName(scanNode.getTableName()), + simpleIndexName, createIndexNode.getIndexPath(), + createIndexNode.getKeySortSpecs(), createIndexNode.getIndexMethod(), + createIndexNode.isUnique(), false, scanNode.getLogicalSchema()); + if (catalog.createIndex(indexDesc)) { + LOG.info("Index " + qualifiedIndexName + " is created for the table " + scanNode.getTableName() + "."); + } else { + LOG.info("Index creation " + qualifiedIndexName + " is failed."); + throw new TajoInternalError("Cannot create index \"" + qualifiedIndexName + "\"."); + } + } + } + private static class MaterializedResultHook implements QueryHook { @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java index 611560d..ac1bab5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java @@ -313,11 +313,10 @@ public class QueryMasterTask extends CompositeService { LOG.warn("Query already started"); return; } - - + LOG.info(SessionVars.INDEX_ENABLED.keyname() + " : " + queryContext.getBool(SessionVars.INDEX_ENABLED)); CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog(); LogicalPlanner planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); - LogicalOptimizer optimizer = new LogicalOptimizer(systemConf); + LogicalOptimizer optimizer = new LogicalOptimizer(systemConf, catalog); Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class); jsonExpr = null; // remove the possible OOM @@ -346,6 +345,14 @@ public class QueryMasterTask extends CompositeService { tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc()); } } + + scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.INDEX_SCAN); + if (scanNodes != null) { + for (LogicalNode eachScanNode : scanNodes) { + ScanNode scanNode = (ScanNode) eachScanNode; + tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc()); + } + } } MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan); queryMasterContext.getGlobalPlanner().build(queryContext, masterPlan); http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java deleted file mode 100644 index 23d245b..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java +++ /dev/null @@ -1,152 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.util; - -import com.google.gson.Gson; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.SortSpec; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.engine.json.CoreGsonHelper; -import org.apache.tajo.plan.LogicalPlan; -import org.apache.tajo.plan.expr.*; -import org.apache.tajo.plan.logical.IndexScanNode; -import org.apache.tajo.plan.logical.ScanNode; -import org.apache.tajo.storage.fragment.FileFragment; - -import java.util.Iterator; -import java.util.LinkedList; -import java.util.Map.Entry; - -public class IndexUtil { - public static String getIndexNameOfFrag(FileFragment fragment, SortSpec[] keys) { - StringBuilder builder = new StringBuilder(); - builder.append(fragment.getPath().getName() + "_"); - builder.append(fragment.getStartKey() + "_" + fragment.getLength() + "_"); - for(int i = 0 ; i < keys.length ; i ++) { - builder.append(keys[i].getSortKey().getSimpleName()+"_"); - } - builder.append("_index"); - return builder.toString(); - - } - - public static String getIndexName(String indexName , SortSpec[] keys) { - StringBuilder builder = new StringBuilder(); - builder.append(indexName + "_"); - for(int i = 0 ; i < keys.length ; i ++) { - builder.append(keys[i].getSortKey().getSimpleName() + "_"); - } - return builder.toString(); - } - - public static IndexScanNode indexEval(LogicalPlan plan, ScanNode scanNode, - Iterator<Entry<String, String>> iter ) { - - EvalNode qual = scanNode.getQual(); - Gson gson = CoreGsonHelper.getInstance(); - - FieldAndValueFinder nodeFinder = new FieldAndValueFinder(); - qual.preOrder(nodeFinder); - LinkedList<BinaryEval> nodeList = nodeFinder.getNodeList(); - - int maxSize = Integer.MIN_VALUE; - SortSpec[] maxIndex = null; - - String json; - while(iter.hasNext()) { - Entry<String , String> entry = iter.next(); - json = entry.getValue(); - SortSpec[] sortKey = gson.fromJson(json, SortSpec[].class); - if(sortKey.length > nodeList.size()) { - /* If the number of the sort key is greater than where condition, - * this index cannot be used - * */ - continue; - } else { - boolean[] equal = new boolean[sortKey.length]; - for(int i = 0 ; i < sortKey.length ; i ++) { - for(int j = 0 ; j < nodeList.size() ; j ++) { - Column col = ((FieldEval)(nodeList.get(j).getLeftExpr())).getColumnRef(); - if(col.equals(sortKey[i].getSortKey())) { - equal[i] = true; - } - } - } - boolean chk = true; - for(int i = 0 ; i < equal.length ; i ++) { - chk = chk && equal[i]; - } - if(chk) { - if(maxSize < sortKey.length) { - maxSize = sortKey.length; - maxIndex = sortKey; - } - } - } - } - if(maxIndex == null) { - return null; - } else { - Schema keySchema = new Schema(); - for(int i = 0 ; i < maxIndex.length ; i ++ ) { - keySchema.addColumn(maxIndex[i].getSortKey()); - } - Datum[] datum = new Datum[nodeList.size()]; - for(int i = 0 ; i < nodeList.size() ; i ++ ) { - datum[i] = ((ConstEval)(nodeList.get(i).getRightExpr())).getValue(); - } - - return new IndexScanNode(plan.newPID(), scanNode, keySchema , datum , maxIndex); - } - - } - - - private static class FieldAndValueFinder implements EvalNodeVisitor { - private LinkedList<BinaryEval> nodeList = new LinkedList<BinaryEval>(); - - public LinkedList<BinaryEval> getNodeList () { - return this.nodeList; - } - - @Override - public void visit(EvalNode node) { - BinaryEval binaryEval = (BinaryEval) node; - switch(node.getType()) { - case AND: - break; - case EQUAL: - if( binaryEval.getLeftExpr().getType() == EvalType.FIELD - && binaryEval.getRightExpr().getType() == EvalType.CONST ) { - nodeList.add(binaryEval); - } - break; - case IS_NULL: - if( binaryEval.getLeftExpr().getType() == EvalType.FIELD - && binaryEval.getRightExpr().getType() == EvalType.CONST) { - nodeList.add(binaryEval); - } - break; - default: - break; - } - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java index edb5703..281e23e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java @@ -26,8 +26,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.tajo.QueryId; -import org.apache.tajo.exception.ReturnStateUtil; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.exception.ReturnStateUtil; import org.apache.tajo.ipc.ClientProtos.GetQueryHistoryResponse; import org.apache.tajo.ipc.ClientProtos.QueryIdRequest; import org.apache.tajo.ipc.QueryMasterClientProtocol; http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java index d020639..5d7a53a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java @@ -367,7 +367,18 @@ public class TaskAttemptContext { } return fragmentMap.get(id).toArray(new FragmentProto[fragmentMap.get(id).size()]); } - + + public String getUniqueKeyFromFragments() { + StringBuilder sb = new StringBuilder(); + for (List<FragmentProto> fragments : fragmentMap.values()) { + for (FragmentProto f : fragments) { + FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, f); + sb.append(fileFragment.getPath().getName()).append(fileFragment.getStartKey()).append(fileFragment.getLength()); + } + } + return sb.toString(); + } + public int hashCode() { return Objects.hashCode(taskId); } http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java index 82ea479..92c682c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java @@ -125,21 +125,9 @@ public class TaskImpl implements Task { public void initPlan() throws IOException { plan = LogicalNodeDeserializer.deserialize(queryContext, context.getEvalContext(), request.getPlan()); - LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN); - if (scanNode != null) { - for (LogicalNode node : scanNode) { - ScanNode scan = (ScanNode) node; - descs.put(scan.getCanonicalName(), scan.getTableDesc()); - } - } - - LogicalNode [] partitionScanNode = PlannerUtil.findAllNodes(plan, NodeType.PARTITIONS_SCAN); - if (partitionScanNode != null) { - for (LogicalNode node : partitionScanNode) { - PartitionedTableScanNode scan = (PartitionedTableScanNode) node; - descs.put(scan.getCanonicalName(), scan.getTableDesc()); - } - } + updateDescsForScanNodes(NodeType.SCAN); + updateDescsForScanNodes(NodeType.PARTITIONS_SCAN); + updateDescsForScanNodes(NodeType.INDEX_SCAN); interQuery = request.getProto().getInterQuery(); if (interQuery) { @@ -179,6 +167,17 @@ public class TaskImpl implements Task { LOG.info("=================================="); } + private void updateDescsForScanNodes(NodeType nodeType) { + assert nodeType == NodeType.SCAN || nodeType == NodeType.PARTITIONS_SCAN || nodeType == NodeType.INDEX_SCAN; + LogicalNode[] scanNodes = PlannerUtil.findAllNodes(plan, nodeType); + if (scanNodes != null) { + for (LogicalNode node : scanNodes) { + ScanNode scanNode = (ScanNode) node; + descs.put(scanNode.getCanonicalName(), scanNode.getTableDesc()); + } + } + } + private void startScriptExecutors() throws IOException { for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) { executor.start(systemConf); http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java index f91288d..eb67167 100644 --- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java @@ -18,13 +18,11 @@ package org.apache.tajo.ws.rs.resources; -import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.QueryId; import org.apache.tajo.TajoProtos; import org.apache.tajo.exception.ReturnStateUtil; -import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse; import org.apache.tajo.master.QueryInProgress; import org.apache.tajo.master.QueryInfo; @@ -263,7 +261,7 @@ public class QueryResource { return ResourcesUtil.createBadRequestResponse(LOG, "Provided session id (" + sessionId + ") is invalid."); } - SubmitQueryResponse response = + SubmitQueryResponse response = masterContext.getGlobalEngine().executeQuery(session, request.getQuery(), false); if (ReturnStateUtil.isError(response.getState())) { return ResourcesUtil.createExceptionResponse(LOG, response.getState().getMessage()); http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java index 3fcb15e..b5e464b 100644 --- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java +++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java @@ -901,6 +901,9 @@ public class QueryTestCaseBase { if (isLocalTable) { createdTableGlobalSet.remove(tableName); } + } else if (expr.getType() == OpType.CreateIndex) { + // TODO: index existence check + client.executeQuery(compiled); } else { assertTrue(ddlFilePath + " is not a Create or Drop Table statement", false); } http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java b/tajo-core/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java index b371be2..aa8070e 100644 --- a/tajo-core/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java +++ b/tajo-core/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java @@ -18,13 +18,22 @@ package org.apache.tajo.cli.tools; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.tajo.QueryTestCaseBase; import org.apache.tajo.auth.UserRoleInfo; +import org.apache.tajo.storage.StorageUtil; +import org.apache.tajo.storage.TablespaceManager; +import org.apache.tajo.util.FileUtil; import org.junit.Test; import java.io.ByteArrayOutputStream; +import java.io.File; import java.io.PrintWriter; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + public class TestTajoDump extends QueryTestCaseBase { @Test @@ -33,16 +42,18 @@ public class TestTajoDump extends QueryTestCaseBase { executeString("CREATE TABLE \"" + getCurrentDatabase() + "\".\"TableName1\" (\"Age\" int, \"FirstName\" TEXT, lastname TEXT)"); - UserRoleInfo userInfo = UserRoleInfo.getCurrentUser(); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - PrintWriter printWriter = new PrintWriter(bos); - TajoDump.dump(client, userInfo, getCurrentDatabase(), false, false, false, printWriter); - printWriter.flush(); - printWriter.close(); - assertStrings(new String(bos.toByteArray())); - bos.close(); - - executeString("DROP TABLE \"" + getCurrentDatabase() + "\".\"TableName1\""); + try { + UserRoleInfo userInfo = UserRoleInfo.getCurrentUser(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + PrintWriter printWriter = new PrintWriter(bos); + TajoDump.dump(client, userInfo, getCurrentDatabase(), false, false, false, printWriter); + printWriter.flush(); + printWriter.close(); + assertStrings(new String(bos.toByteArray())); + bos.close(); + } finally { + executeString("DROP TABLE \"" + getCurrentDatabase() + "\".\"TableName1\""); + } } } @@ -52,16 +63,62 @@ public class TestTajoDump extends QueryTestCaseBase { executeString("CREATE TABLE \"" + getCurrentDatabase() + "\".\"TableName2\" (\"Age\" int, \"Name\" Record (\"FirstName\" TEXT, lastname TEXT))"); - UserRoleInfo userInfo = UserRoleInfo.getCurrentUser(); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - PrintWriter printWriter = new PrintWriter(bos); - TajoDump.dump(client, userInfo, getCurrentDatabase(), false, false, false, printWriter); - printWriter.flush(); - printWriter.close(); - assertStrings(new String(bos.toByteArray())); - bos.close(); + try { + UserRoleInfo userInfo = UserRoleInfo.getCurrentUser(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + PrintWriter printWriter = new PrintWriter(bos); + TajoDump.dump(client, userInfo, getCurrentDatabase(), false, false, false, printWriter); + printWriter.flush(); + printWriter.close(); + assertStrings(new String(bos.toByteArray())); + bos.close(); + } finally { + executeString("DROP TABLE \"" + getCurrentDatabase() + "\".\"TableName2\""); + } + } + } + + @Test + public void testDump3() throws Exception { + if (!testingCluster.isHiveCatalogStoreRunning()) { + executeString("CREATE TABLE \"" + getCurrentDatabase() + + "\".\"TableName1\" (\"Age\" int, \"FirstName\" TEXT, lastname TEXT)"); + + executeString("CREATE INDEX test_idx on \"" + getCurrentDatabase() + + "\".\"TableName1\" ( \"Age\" asc null first, \"FirstName\" desc null last )"); + + try { + UserRoleInfo userInfo = UserRoleInfo.getCurrentUser(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + PrintWriter printWriter = new PrintWriter(bos); + TajoDump.dump(client, userInfo, getCurrentDatabase(), false, false, false, printWriter); + printWriter.flush(); + printWriter.close(); + assertOutputResult("testDump3.result", new String(bos.toByteArray()), new String[]{"${index.path}"}, + new String[]{TablespaceManager.getDefault().getTableUri(getCurrentDatabase(), "test_idx").toString()}); + bos.close(); + } finally { + executeString("DROP INDEX test_idx"); + executeString("DROP TABLE \"" + getCurrentDatabase() + "\".\"TableName1\""); + } + } + } + + private void assertOutputResult(String expectedResultFile, String actual, String[] paramKeys, String[] paramValues) + throws Exception { + FileSystem fs = currentResultPath.getFileSystem(testBase.getTestingCluster().getConfiguration()); + Path resultFile = StorageUtil.concatPath(currentResultPath, expectedResultFile); + assertTrue(resultFile.toString() + " existence check", fs.exists(resultFile)); + + String expectedResult = FileUtil.readTextFile(new File(resultFile.toUri())); - executeString("DROP TABLE \"" + getCurrentDatabase() + "\".\"TableName2\""); + if (paramKeys != null) { + for (int i = 0; i < paramKeys.length; i++) { + if (i < paramValues.length) { + expectedResult = expectedResult.replace(paramKeys[i], paramValues[i]); + } + } } + assertEquals(expectedResult.trim(), actual.trim()); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java index 7629711..abd0973 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java @@ -35,8 +35,8 @@ import org.apache.tajo.engine.codegen.TajoClassLoader; import org.apache.tajo.engine.function.FunctionLoader; import org.apache.tajo.engine.json.CoreGsonHelper; import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.exception.TajoException; -import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.function.FunctionSignature; import org.apache.tajo.master.exec.QueryExecutor; import org.apache.tajo.plan.*; @@ -44,9 +44,6 @@ import org.apache.tajo.plan.expr.EvalContext; import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.serder.EvalNodeDeserializer; import org.apache.tajo.plan.serder.EvalNodeSerializer; -import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.catalog.SchemaUtil; -import org.apache.tajo.plan.serder.PlanProto; import org.apache.tajo.plan.verifier.LogicalPlanVerifier; import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier; import org.apache.tajo.plan.verifier.VerificationState; @@ -60,6 +57,7 @@ import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.datetime.DateTimeUtil; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.apache.tajo.plan.serder.PlanProto; import java.io.IOException; import java.util.List; @@ -68,9 +66,7 @@ import java.util.TimeZone; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; public class ExprTestBase { private static TajoTestingCluster util; @@ -106,7 +102,7 @@ public class ExprTestBase { analyzer = new SQLAnalyzer(); preLogicalPlanVerifier = new PreLogicalPlanVerifier(cat); planner = new LogicalPlanner(cat, TablespaceManager.getInstance()); - optimizer = new LogicalOptimizer(util.getConfiguration()); + optimizer = new LogicalOptimizer(util.getConfiguration(), cat); annotatedPlanVerifier = new LogicalPlanVerifier(util.getConfiguration(), cat); } http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestJoinOrderAlgorithm.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestJoinOrderAlgorithm.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestJoinOrderAlgorithm.java index eb4a3f4..368e89f 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestJoinOrderAlgorithm.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestJoinOrderAlgorithm.java @@ -126,7 +126,7 @@ public class TestJoinOrderAlgorithm { sqlAnalyzer = new SQLAnalyzer(); planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); - optimizer = new LogicalOptimizer(util.getConfiguration()); + optimizer = new LogicalOptimizer(util.getConfiguration(), catalog); defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java index 10e7e37..640d88b 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java @@ -104,9 +104,10 @@ public class TestLogicalOptimizer { catalog.createFunction(funcDesc); sqlAnalyzer = new SQLAnalyzer(); planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); - optimizer = new LogicalOptimizer(util.getConfiguration()); + optimizer = new LogicalOptimizer(util.getConfiguration(), catalog); defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration()); + optimizer = new LogicalOptimizer(util.getConfiguration(), catalog); } @AfterClass http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java index 097e232..02e921a 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java @@ -40,7 +40,6 @@ import org.apache.tajo.engine.function.builtin.SumInt; import org.apache.tajo.engine.json.CoreGsonHelper; import org.apache.tajo.engine.parser.SQLAnalyzer; import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.plan.*; import org.apache.tajo.exception.TajoException; import org.apache.tajo.plan.LogicalOptimizer; import org.apache.tajo.plan.LogicalPlan; @@ -151,14 +150,14 @@ public class TestLogicalPlanner { "select name, empid, e.deptname, manager from employee as e, dept as dp", // 1 "select name, empid, e.deptname, manager, score from employee as e, dept, score", // 2 "select p.deptname, sumtest(score) from dept as p, score group by p.deptName having sumtest(score) > 30", // 3 - "select p.deptname, score from dept as p, score order by score asc", // 4 + "select p.deptname, score*200 from dept as p, score order by score*10 asc", // 4 "select name from employee where empId = 100", // 5 "select name, score from employee, score", // 6 "select p.deptName, sumtest(score) from dept as p, score group by p.deptName", // 7 "create table store1 as select p.deptName, sumtest(score) from dept as p, score group by p.deptName", // 8 "select deptName, sumtest(score) from score group by deptName having sumtest(score) > 30", // 9 "select 7 + 8 as res1, 8 * 9 as res2, 10 * 10 as res3", // 10 - "create index idx_employee on employee using bitmap (name null first, empId desc) with ('fillfactor' = 70)", // 11 + "create index idx_employee on employee using bitmap_idx (name null first, empId desc) where empid > 100", // 11 "select name, score from employee, score order by score limit 3", // 12 "select length(name), length(deptname), *, empid+10 from employee where empId > 500", // 13 }; @@ -512,7 +511,7 @@ public class TestLogicalPlanner { Schema expected = tpch.getOutSchema("q2"); assertSchema(expected, node.getOutSchema()); - LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration()); + LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog); optimizer.optimize(plan); LogicalNode[] nodes = PlannerUtil.findAllNodes(node, NodeType.JOIN); @@ -551,7 +550,7 @@ public class TestLogicalPlanner { LogicalNode node = plan.getRootBlock().getRoot(); testJsonSerDerObject(node); - LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration()); + LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog); optimizer.optimize(plan); LogicalNode[] nodes = PlannerUtil.findAllNodes(node, NodeType.SCAN); @@ -592,7 +591,7 @@ public class TestLogicalPlanner { LogicalNode node = plan.getRootBlock().getRoot(); testJsonSerDerObject(node); - LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration()); + LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog); optimizer.optimize(plan); LogicalNode[] nodes = PlannerUtil.findAllNodes(node, NodeType.SCAN); @@ -639,7 +638,7 @@ public class TestLogicalPlanner { LogicalNode node = plan.getRootBlock().getRoot(); testJsonSerDerObject(node); - LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration()); + LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog); optimizer.optimize(plan); Map<BinaryEval, Boolean> scanMap = TUtil.newHashMap(); @@ -896,6 +895,29 @@ public class TestLogicalPlanner { } @Test + public final void testCreateIndexNode() throws TajoException { + QueryContext qc = new QueryContext(util.getConfiguration(), session); + Expr expr = sqlAnalyzer.parse(QUERIES[11]); + LogicalPlan rootNode = planner.createPlan(qc, expr); + LogicalNode plan = rootNode.getRootBlock().getRoot(); + testJsonSerDerObject(plan); + + LogicalRootNode root = (LogicalRootNode) plan; + assertEquals(NodeType.CREATE_INDEX, root.getChild().getType()); + CreateIndexNode createIndexNode = root.getChild(); + + assertEquals(NodeType.PROJECTION, createIndexNode.getChild().getType()); + ProjectionNode projNode = createIndexNode.getChild(); + + assertEquals(NodeType.SELECTION, projNode.getChild().getType()); + SelectionNode selNode = projNode.getChild(); + + assertEquals(NodeType.SCAN, selNode.getChild().getType()); + ScanNode scanNode = selNode.getChild(); + assertEquals(CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "employee"), scanNode.getTableName()); + } + + @Test public final void testAsterisk() throws CloneNotSupportedException, TajoException { QueryContext qc = createQueryContext(); http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java deleted file mode 100644 index 6fb7a45..0000000 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java +++ /dev/null @@ -1,206 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.engine.planner.physical; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.LocalTajoTestingUtility; -import org.apache.tajo.TajoConstants; -import org.apache.tajo.TajoTestingCluster; -import org.apache.tajo.algebra.Expr; -import org.apache.tajo.catalog.*; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.engine.parser.SQLAnalyzer; -import org.apache.tajo.plan.LogicalOptimizer; -import org.apache.tajo.plan.LogicalPlan; -import org.apache.tajo.plan.LogicalPlanner; -import org.apache.tajo.engine.planner.PhysicalPlannerImpl; -import org.apache.tajo.plan.logical.LogicalNode; -import org.apache.tajo.plan.logical.ScanNode; -import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.storage.*; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.fragment.FragmentConvertor; -import org.apache.tajo.storage.index.bst.BSTIndex; -import org.apache.tajo.util.CommonTestingUtil; -import org.apache.tajo.worker.TaskAttemptContext; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Random; -import java.util.Stack; - -import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; -import static org.junit.Assert.assertEquals; - -public class TestBSTIndexExec { - - private TajoConf conf; - private Path idxPath; - private CatalogService catalog; - private SQLAnalyzer analyzer; - private LogicalPlanner planner; - private LogicalOptimizer optimizer; - private FileTablespace sm; - private Schema idxSchema; - private BaseTupleComparator comp; - private BSTIndex.BSTIndexWriter writer; - private HashMap<Integer , Integer> randomValues ; - private int rndKey = -1; - private FileSystem fs; - private TableMeta meta; - private Path tablePath; - - private Random rnd = new Random(System.currentTimeMillis()); - - private TajoTestingCluster util; - - @Before - public void setup() throws Exception { - this.randomValues = new HashMap<Integer, Integer>(); - this.conf = new TajoConf(); - conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE); - util = new TajoTestingCluster(); - util.startCatalogCluster(); - catalog = util.getMiniCatalogCluster().getCatalog(); - - Path workDir = CommonTestingUtil.getTestDir(); - catalog.createTablespace(DEFAULT_TABLESPACE_NAME, workDir.toUri().toString()); - catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); - sm = TablespaceManager.getLocalFs(); - - idxPath = new Path(workDir, "test.idx"); - - Schema schema = new Schema(); - schema.addColumn("managerid", Type.INT4); - schema.addColumn("empid", Type.INT4); - schema.addColumn("deptname", Type.TEXT); - - this.idxSchema = new Schema(); - idxSchema.addColumn("managerid", Type.INT4); - SortSpec[] sortKeys = new SortSpec[1]; - sortKeys[0] = new SortSpec(idxSchema.getColumn("managerid"), true, false); - this.comp = new BaseTupleComparator(idxSchema, sortKeys); - - this.writer = new BSTIndex(conf).getIndexWriter(idxPath, - BSTIndex.TWO_LEVEL_INDEX, this.idxSchema, this.comp); - writer.setLoadNum(100); - writer.open(); - long offset; - - meta = CatalogUtil.newTableMeta("TEXT"); - tablePath = StorageUtil.concatPath(workDir, "employee", "table.csv"); - fs = tablePath.getFileSystem(conf); - fs.mkdirs(tablePath.getParent()); - - FileAppender appender = (FileAppender)sm.getAppender(meta, schema, tablePath); - appender.init(); - VTuple tuple = new VTuple(schema.size()); - for (int i = 0; i < 10000; i++) { - - VTuple key = new VTuple(this.idxSchema.size()); - int rndKey = rnd.nextInt(250); - if(this.randomValues.containsKey(rndKey)) { - int t = this.randomValues.remove(rndKey) + 1; - this.randomValues.put(rndKey, t); - } else { - this.randomValues.put(rndKey, 1); - } - - key.put(new Datum[] { DatumFactory.createInt4(rndKey) }); - tuple.put(new Datum[] { DatumFactory.createInt4(rndKey), - DatumFactory.createInt4(rnd.nextInt(10)), - DatumFactory.createText("dept_" + rnd.nextInt(10)) }); - offset = appender.getOffset(); - appender.addTuple(tuple); - writer.write(key, offset); - } - appender.flush(); - appender.close(); - writer.close(); - - TableDesc desc = new TableDesc( - CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "employee"), schema, meta, - tablePath.toUri()); - catalog.createTable(desc); - - analyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); - optimizer = new LogicalOptimizer(conf); - } - - @After - public void tearDown() { - util.shutdownCatalogCluster(); - } - - @Test - public void testEqual() throws Exception { - this.rndKey = rnd.nextInt(250); - final String QUERY = "select * from employee where managerId = " + rndKey; - - FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", meta, tablePath, Integer.MAX_VALUE); - Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testEqual"); - TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), - LocalTajoTestingUtility.newTaskAttemptId(), new FileFragment[] { frags[0] }, workDir); - Expr expr = analyzer.parse(QUERY); - LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr); - LogicalNode rootNode = optimizer.optimize(plan); - - TmpPlanner phyPlanner = new TmpPlanner(conf); - PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); - - int tupleCount = this.randomValues.get(rndKey); - int counter = 0; - exec.init(); - while (exec.next() != null) { - counter ++; - } - exec.close(); - assertEquals(tupleCount , counter); - } - - private class TmpPlanner extends PhysicalPlannerImpl { - public TmpPlanner(TajoConf conf) { - super(conf); - } - - @Override - public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> stack) - throws IOException { - Preconditions.checkNotNull(ctx.getTable(scanNode.getTableName()), - "Error: There is no table matched to %s", scanNode.getTableName()); - - List<FileFragment> fragments = FragmentConvertor.convert(ctx.getConf(), ctx.getTables(scanNode.getTableName())); - - Datum[] datum = new Datum[]{DatumFactory.createInt4(rndKey)}; - - return new BSTIndexScanExec(ctx, scanNode, fragments.get(0), idxPath, idxSchema, comp , datum); - - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java index 292c414..b9ba2de 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java @@ -62,6 +62,7 @@ public class TestHashAntiJoinExec { private LogicalPlanner planner; private LogicalOptimizer optimizer; private Path testDir; + private QueryContext queryContext; private TableDesc employee; private TableDesc people; @@ -126,11 +127,12 @@ public class TestHashAntiJoinExec { appender.flush(); appender.close(); + queryContext = new QueryContext(conf); people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath); catalog.createTable(people); analyzer = new SQLAnalyzer(); planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); - optimizer = new LogicalOptimizer(conf); + optimizer = new LogicalOptimizer(conf, catalog); } @After @@ -157,7 +159,7 @@ public class TestHashAntiJoinExec { FileFragment[] merged = TUtil.concat(empFrags, peopleFrags); Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testHashAntiJoin"); - TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + TaskAttemptContext ctx = new TaskAttemptContext(queryContext, LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); ctx.setEnforcer(new Enforcer()); Expr expr = analyzer.parse(QUERIES[0]);
