Repository: tajo Updated Branches: refs/heads/index_support df2ff2dd7 -> 8e52ed43a
http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java new file mode 100644 index 0000000..5cbed7e --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java @@ -0,0 +1,678 @@ +/* + * Lisensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.serder; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.OverridableConf; +import org.apache.tajo.algebra.JoinType; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.partition.PartitionMethodDesc; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.exception.UnimplementedException; +import org.apache.tajo.plan.Target; +import org.apache.tajo.plan.expr.AggregationFunctionCallEval; +import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.expr.FieldEval; +import org.apache.tajo.plan.expr.WindowFunctionEval; +import org.apache.tajo.plan.logical.*; +import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.TUtil; + +import java.util.*; + +/** + * It deserializes a list of serialized logical nodes into a logical node tree. + */ +public class LogicalNodeDeserializer { + private static final LogicalNodeDeserializer instance; + + static { + instance = new LogicalNodeDeserializer(); + } + + /** + * Deserialize a list of nodes into a logical node tree. + * + * @param context QueryContext + * @param tree LogicalNodeTree which contains a list of serialized logical nodes. + * @return A logical node tree + */ + public static LogicalNode deserialize(OverridableConf context, PlanProto.LogicalNodeTree tree) { + Map<Integer, LogicalNode> nodeMap = Maps.newHashMap(); + + // sort serialized logical nodes in an ascending order of their sids + List<PlanProto.LogicalNode> nodeList = Lists.newArrayList(tree.getNodesList()); + Collections.sort(nodeList, new Comparator<PlanProto.LogicalNode>() { + @Override + public int compare(PlanProto.LogicalNode o1, PlanProto.LogicalNode o2) { + return o1.getVisitSeq() - o2.getVisitSeq(); + } + }); + + LogicalNode current = null; + + // The sorted order is the same of a postfix traverse order. + // So, it sequentially transforms each serialized node into a LogicalNode instance in a postfix order of + // the original logical node tree. + + Iterator<PlanProto.LogicalNode> it = nodeList.iterator(); + while (it.hasNext()) { + PlanProto.LogicalNode protoNode = it.next(); + + switch (protoNode.getType()) { + case ROOT: + current = convertRoot(nodeMap, protoNode); + break; + case SET_SESSION: + current = convertSetSession(protoNode); + break; + case EXPRS: + current = convertEvalExpr(context, protoNode); + break; + case PROJECTION: + current = convertProjection(context, nodeMap, protoNode); + break; + case LIMIT: + current = convertLimit(nodeMap, protoNode); + break; + case SORT: + current = convertSort(nodeMap, protoNode); + break; + case WINDOW_AGG: + current = convertWindowAgg(context, nodeMap, protoNode); + break; + case HAVING: + current = convertHaving(context, nodeMap, protoNode); + break; + case GROUP_BY: + current = convertGroupby(context, nodeMap, protoNode); + break; + case DISTINCT_GROUP_BY: + current = convertDistinctGroupby(context, nodeMap, protoNode); + break; + case SELECTION: + current = convertFilter(context, nodeMap, protoNode); + break; + case JOIN: + current = convertJoin(context, nodeMap, protoNode); + break; + case TABLE_SUBQUERY: + current = convertTableSubQuery(context, nodeMap, protoNode); + break; + case UNION: + current = convertUnion(nodeMap, protoNode); + break; + case PARTITIONS_SCAN: + current = convertPartitionScan(context, protoNode); + break; + case SCAN: + current = convertScan(context, protoNode); + break; + + case CREATE_TABLE: + current = convertCreateTable(nodeMap, protoNode); + break; + case INSERT: + current = convertInsert(nodeMap, protoNode); + break; + case DROP_TABLE: + current = convertDropTable(protoNode); + break; + + case CREATE_DATABASE: + current = convertCreateDatabase(protoNode); + break; + case DROP_DATABASE: + current = convertDropDatabase(protoNode); + break; + + case ALTER_TABLESPACE: + current = convertAlterTablespace(protoNode); + break; + case ALTER_TABLE: + current = convertAlterTable(protoNode); + break; + case TRUNCATE_TABLE: + current = convertTruncateTable(protoNode); + break; + + default: + throw new RuntimeException("Unknown NodeType: " + protoNode.getType().name()); + } + + nodeMap.put(protoNode.getVisitSeq(), current); + } + + return current; + } + + private static LogicalRootNode convertRoot(Map<Integer, LogicalNode> nodeMap, + PlanProto.LogicalNode protoNode) { + PlanProto.RootNode rootProto = protoNode.getRoot(); + + LogicalRootNode root = new LogicalRootNode(protoNode.getNodeId()); + root.setChild(nodeMap.get(rootProto.getChildSeq())); + if (protoNode.hasInSchema()) { + root.setInSchema(convertSchema(protoNode.getInSchema())); + } + if (protoNode.hasOutSchema()) { + root.setOutSchema(convertSchema(protoNode.getOutSchema())); + } + + return root; + } + + private static SetSessionNode convertSetSession(PlanProto.LogicalNode protoNode) { + PlanProto.SetSessionNode setSessionProto = protoNode.getSetSession(); + + SetSessionNode setSession = new SetSessionNode(protoNode.getNodeId()); + setSession.init(setSessionProto.getName(), setSessionProto.hasValue() ? setSessionProto.getValue() : null); + + return setSession; + } + + private static EvalExprNode convertEvalExpr(OverridableConf context, PlanProto.LogicalNode protoNode) { + PlanProto.EvalExprNode evalExprProto = protoNode.getExprEval(); + + EvalExprNode evalExpr = new EvalExprNode(protoNode.getNodeId()); + evalExpr.setInSchema(convertSchema(protoNode.getInSchema())); + evalExpr.setTargets(convertTargets(context, evalExprProto.getTargetsList())); + + return evalExpr; + } + + private static ProjectionNode convertProjection(OverridableConf context, Map<Integer, LogicalNode> nodeMap, + PlanProto.LogicalNode protoNode) { + PlanProto.ProjectionNode projectionProto = protoNode.getProjection(); + + ProjectionNode projectionNode = new ProjectionNode(protoNode.getNodeId()); + projectionNode.init(projectionProto.getDistinct(), convertTargets(context, projectionProto.getTargetsList())); + projectionNode.setChild(nodeMap.get(projectionProto.getChildSeq())); + projectionNode.setInSchema(convertSchema(protoNode.getInSchema())); + projectionNode.setOutSchema(convertSchema(protoNode.getOutSchema())); + + return projectionNode; + } + + private static LimitNode convertLimit(Map<Integer, LogicalNode> nodeMap, PlanProto.LogicalNode protoNode) { + PlanProto.LimitNode limitProto = protoNode.getLimit(); + + LimitNode limitNode = new LimitNode(protoNode.getNodeId()); + limitNode.setChild(nodeMap.get(limitProto.getChildSeq())); + limitNode.setInSchema(convertSchema(protoNode.getInSchema())); + limitNode.setOutSchema(convertSchema(protoNode.getOutSchema())); + limitNode.setFetchFirst(limitProto.getFetchFirstNum()); + + return limitNode; + } + + private static SortNode convertSort(Map<Integer, LogicalNode> nodeMap, PlanProto.LogicalNode protoNode) { + PlanProto.SortNode sortProto = protoNode.getSort(); + + SortNode sortNode = new SortNode(protoNode.getNodeId()); + sortNode.setChild(nodeMap.get(sortProto.getChildSeq())); + sortNode.setInSchema(convertSchema(protoNode.getInSchema())); + sortNode.setOutSchema(convertSchema(protoNode.getOutSchema())); + sortNode.setSortSpecs(convertSortSpecs(sortProto.getSortSpecsList())); + + return sortNode; + } + + private static HavingNode convertHaving(OverridableConf context, Map<Integer, LogicalNode> nodeMap, + PlanProto.LogicalNode protoNode) { + PlanProto.FilterNode havingProto = protoNode.getFilter(); + + HavingNode having = new HavingNode(protoNode.getNodeId()); + having.setChild(nodeMap.get(havingProto.getChildSeq())); + having.setQual(EvalNodeDeserializer.deserialize(context, havingProto.getQual())); + having.setInSchema(convertSchema(protoNode.getInSchema())); + having.setOutSchema(convertSchema(protoNode.getOutSchema())); + + return having; + } + + private static WindowAggNode convertWindowAgg(OverridableConf context, Map<Integer, LogicalNode> nodeMap, + PlanProto.LogicalNode protoNode) { + PlanProto.WindowAggNode windowAggProto = protoNode.getWindowAgg(); + + WindowAggNode windowAgg = new WindowAggNode(protoNode.getNodeId()); + windowAgg.setChild(nodeMap.get(windowAggProto.getChildSeq())); + + if (windowAggProto.getPartitionKeysCount() > 0) { + windowAgg.setPartitionKeys(convertColumns(windowAggProto.getPartitionKeysList())); + } + + if (windowAggProto.getWindowFunctionsCount() > 0) { + windowAgg.setWindowFunctions(convertWindowFunccEvals(context, windowAggProto.getWindowFunctionsList())); + } + + windowAgg.setDistinct(windowAggProto.getDistinct()); + + if (windowAggProto.getSortSpecsCount() > 0) { + windowAgg.setSortSpecs(convertSortSpecs(windowAggProto.getSortSpecsList())); + } + + if (windowAggProto.getTargetsCount() > 0) { + windowAgg.setTargets(convertTargets(context, windowAggProto.getTargetsList())); + } + + windowAgg.setInSchema(convertSchema(protoNode.getInSchema())); + windowAgg.setOutSchema(convertSchema(protoNode.getOutSchema())); + + return windowAgg; + } + + private static GroupbyNode convertGroupby(OverridableConf context, Map<Integer, LogicalNode> nodeMap, + PlanProto.LogicalNode protoNode) { + PlanProto.GroupbyNode groupbyProto = protoNode.getGroupby(); + + GroupbyNode groupby = new GroupbyNode(protoNode.getNodeId()); + groupby.setChild(nodeMap.get(groupbyProto.getChildSeq())); + groupby.setDistinct(groupbyProto.getDistinct()); + + if (groupbyProto.getGroupingKeysCount() > 0) { + groupby.setGroupingColumns(convertColumns(groupbyProto.getGroupingKeysList())); + } + if (groupbyProto.getAggFunctionsCount() > 0) { + groupby.setAggFunctions(convertAggFuncCallEvals(context, groupbyProto.getAggFunctionsList())); + } + if (groupbyProto.getTargetsCount() > 0) { + groupby.setTargets(convertTargets(context, groupbyProto.getTargetsList())); + } + + groupby.setInSchema(convertSchema(protoNode.getInSchema())); + groupby.setOutSchema(convertSchema(protoNode.getOutSchema())); + + return groupby; + } + + private static DistinctGroupbyNode convertDistinctGroupby(OverridableConf context, Map<Integer, LogicalNode> nodeMap, + PlanProto.LogicalNode protoNode) { + PlanProto.DistinctGroupbyNode distinctGroupbyProto = protoNode.getDistinctGroupby(); + + DistinctGroupbyNode distinctGroupby = new DistinctGroupbyNode(protoNode.getNodeId()); + distinctGroupby.setChild(nodeMap.get(distinctGroupbyProto.getChildSeq())); + + if (distinctGroupbyProto.hasGroupbyNode()) { + distinctGroupby.setGroupbyPlan(convertGroupby(context, nodeMap, distinctGroupbyProto.getGroupbyNode())); + } + + if (distinctGroupbyProto.getSubPlansCount() > 0) { + List<GroupbyNode> subPlans = TUtil.newList(); + for (int i = 0; i < distinctGroupbyProto.getSubPlansCount(); i++) { + subPlans.add(convertGroupby(context, nodeMap, distinctGroupbyProto.getSubPlans(i))); + } + distinctGroupby.setSubPlans(subPlans); + } + + if (distinctGroupbyProto.getGroupingKeysCount() > 0) { + distinctGroupby.setGroupingColumns(convertColumns(distinctGroupbyProto.getGroupingKeysList())); + } + if (distinctGroupbyProto.getAggFunctionsCount() > 0) { + distinctGroupby.setAggFunctions(convertAggFuncCallEvals(context, distinctGroupbyProto.getAggFunctionsList())); + } + if (distinctGroupbyProto.getTargetsCount() > 0) { + distinctGroupby.setTargets(convertTargets(context, distinctGroupbyProto.getTargetsList())); + } + int [] resultColumnIds = new int[distinctGroupbyProto.getResultIdCount()]; + for (int i = 0; i < distinctGroupbyProto.getResultIdCount(); i++) { + resultColumnIds[i] = distinctGroupbyProto.getResultId(i); + } + distinctGroupby.setResultColumnIds(resultColumnIds); + + // TODO - in distinct groupby, output and target are not matched to each other. It does not follow the convention. + distinctGroupby.setInSchema(convertSchema(protoNode.getInSchema())); + distinctGroupby.setOutSchema(convertSchema(protoNode.getOutSchema())); + + return distinctGroupby; + } + + private static JoinNode convertJoin(OverridableConf context, Map<Integer, LogicalNode> nodeMap, + PlanProto.LogicalNode protoNode) { + PlanProto.JoinNode joinProto = protoNode.getJoin(); + + JoinNode join = new JoinNode(protoNode.getNodeId()); + join.setLeftChild(nodeMap.get(joinProto.getLeftChildSeq())); + join.setRightChild(nodeMap.get(joinProto.getRightChilSeq())); + join.setJoinType(convertJoinType(joinProto.getJoinType())); + join.setInSchema(convertSchema(protoNode.getInSchema())); + join.setOutSchema(convertSchema(protoNode.getOutSchema())); + if (joinProto.hasJoinQual()) { + join.setJoinQual(EvalNodeDeserializer.deserialize(context, joinProto.getJoinQual())); + } + if (joinProto.getExistsTargets()) { + join.setTargets(convertTargets(context, joinProto.getTargetsList())); + } + + return join; + } + + private static SelectionNode convertFilter(OverridableConf context, Map<Integer, LogicalNode> nodeMap, + PlanProto.LogicalNode protoNode) { + PlanProto.FilterNode filterProto = protoNode.getFilter(); + + SelectionNode selection = new SelectionNode(protoNode.getNodeId()); + selection.setInSchema(convertSchema(protoNode.getInSchema())); + selection.setOutSchema(convertSchema(protoNode.getOutSchema())); + selection.setChild(nodeMap.get(filterProto.getChildSeq())); + selection.setQual(EvalNodeDeserializer.deserialize(context, filterProto.getQual())); + + return selection; + } + + private static UnionNode convertUnion(Map<Integer, LogicalNode> nodeMap, PlanProto.LogicalNode protoNode) { + PlanProto.UnionNode unionProto = protoNode.getUnion(); + + UnionNode union = new UnionNode(protoNode.getNodeId()); + union.setInSchema(convertSchema(protoNode.getInSchema())); + union.setOutSchema(convertSchema(protoNode.getOutSchema())); + union.setLeftChild(nodeMap.get(unionProto.getLeftChildSeq())); + union.setRightChild(nodeMap.get(unionProto.getRightChildSeq())); + + return union; + } + + private static ScanNode convertScan(OverridableConf context, PlanProto.LogicalNode protoNode) { + ScanNode scan = new ScanNode(protoNode.getNodeId()); + fillScanNode(context, protoNode, scan); + + return scan; + } + + private static void fillScanNode(OverridableConf context, PlanProto.LogicalNode protoNode, ScanNode scan) { + PlanProto.ScanNode scanProto = protoNode.getScan(); + if (scanProto.hasAlias()) { + scan.init(new TableDesc(scanProto.getTable()), scanProto.getAlias()); + } else { + scan.init(new TableDesc(scanProto.getTable())); + } + + if (scanProto.getExistTargets()) { + scan.setTargets(convertTargets(context, scanProto.getTargetsList())); + } + + if (scanProto.hasQual()) { + scan.setQual(EvalNodeDeserializer.deserialize(context, scanProto.getQual())); + } + + scan.setInSchema(convertSchema(protoNode.getInSchema())); + scan.setOutSchema(convertSchema(protoNode.getOutSchema())); + } + + private static PartitionedTableScanNode convertPartitionScan(OverridableConf context, PlanProto.LogicalNode protoNode) { + PartitionedTableScanNode partitionedScan = new PartitionedTableScanNode(protoNode.getNodeId()); + fillScanNode(context, protoNode, partitionedScan); + + PlanProto.PartitionScanSpec partitionScanProto = protoNode.getPartitionScan(); + Path [] paths = new Path[partitionScanProto.getPathsCount()]; + for (int i = 0; i < partitionScanProto.getPathsCount(); i++) { + paths[i] = new Path(partitionScanProto.getPaths(i)); + } + partitionedScan.setInputPaths(paths); + return partitionedScan; + } + + private static TableSubQueryNode convertTableSubQuery(OverridableConf context, + Map<Integer, LogicalNode> nodeMap, + PlanProto.LogicalNode protoNode) { + PlanProto.TableSubQueryNode proto = protoNode.getTableSubQuery(); + + TableSubQueryNode tableSubQuery = new TableSubQueryNode(protoNode.getNodeId()); + tableSubQuery.init(proto.getTableName(), nodeMap.get(proto.getChildSeq())); + tableSubQuery.setInSchema(convertSchema(protoNode.getInSchema())); + if (proto.getTargetsCount() > 0) { + tableSubQuery.setTargets(convertTargets(context, proto.getTargetsList())); + } + + return tableSubQuery; + } + + private static CreateTableNode convertCreateTable(Map<Integer, LogicalNode> nodeMap, + PlanProto.LogicalNode protoNode) { + PlanProto.PersistentStoreNode persistentStoreProto = protoNode.getPersistentStore(); + PlanProto.StoreTableNodeSpec storeTableNodeSpec = protoNode.getStoreTable(); + PlanProto.CreateTableNodeSpec createTableNodeSpec = protoNode.getCreateTable(); + + CreateTableNode createTable = new CreateTableNode(protoNode.getNodeId()); + if (protoNode.hasInSchema()) { + createTable.setInSchema(convertSchema(protoNode.getInSchema())); + } + if (protoNode.hasOutSchema()) { + createTable.setOutSchema(convertSchema(protoNode.getOutSchema())); + } + createTable.setChild(nodeMap.get(persistentStoreProto.getChildSeq())); + createTable.setStorageType(persistentStoreProto.getStorageType()); + createTable.setOptions(new KeyValueSet(persistentStoreProto.getTableProperties())); + + createTable.setTableName(storeTableNodeSpec.getTableName()); + if (storeTableNodeSpec.hasPartitionMethod()) { + createTable.setPartitionMethod(new PartitionMethodDesc(storeTableNodeSpec.getPartitionMethod())); + } + + createTable.setTableSchema(convertSchema(createTableNodeSpec.getSchema())); + createTable.setExternal(createTableNodeSpec.getExternal()); + if (createTableNodeSpec.getExternal() && createTableNodeSpec.hasPath()) { + createTable.setPath(new Path(createTableNodeSpec.getPath())); + } + createTable.setIfNotExists(createTableNodeSpec.getIfNotExists()); + + return createTable; + } + + private static InsertNode convertInsert(Map<Integer, LogicalNode> nodeMap, + PlanProto.LogicalNode protoNode) { + PlanProto.PersistentStoreNode persistentStoreProto = protoNode.getPersistentStore(); + PlanProto.StoreTableNodeSpec storeTableNodeSpec = protoNode.getStoreTable(); + PlanProto.InsertNodeSpec insertNodeSpec = protoNode.getInsert(); + + InsertNode insertNode = new InsertNode(protoNode.getNodeId()); + if (protoNode.hasInSchema()) { + insertNode.setInSchema(convertSchema(protoNode.getInSchema())); + } + if (protoNode.hasOutSchema()) { + insertNode.setOutSchema(convertSchema(protoNode.getOutSchema())); + } + insertNode.setChild(nodeMap.get(persistentStoreProto.getChildSeq())); + insertNode.setStorageType(persistentStoreProto.getStorageType()); + insertNode.setOptions(new KeyValueSet(persistentStoreProto.getTableProperties())); + + if (storeTableNodeSpec.hasTableName()) { + insertNode.setTableName(storeTableNodeSpec.getTableName()); + } + if (storeTableNodeSpec.hasPartitionMethod()) { + insertNode.setPartitionMethod(new PartitionMethodDesc(storeTableNodeSpec.getPartitionMethod())); + } + + insertNode.setOverwrite(insertNodeSpec.getOverwrite()); + insertNode.setTableSchema(convertSchema(insertNodeSpec.getTableSchema())); + if (insertNodeSpec.hasTargetSchema()) { + insertNode.setTargetSchema(convertSchema(insertNodeSpec.getTargetSchema())); + } + if (insertNodeSpec.hasProjectedSchema()) { + insertNode.setProjectedSchema(convertSchema(insertNodeSpec.getProjectedSchema())); + } + if (insertNodeSpec.hasPath()) { + insertNode.setPath(new Path(insertNodeSpec.getPath())); + } + + return insertNode; + } + + private static DropTableNode convertDropTable(PlanProto.LogicalNode protoNode) { + DropTableNode dropTable = new DropTableNode(protoNode.getNodeId()); + + PlanProto.DropTableNode dropTableProto = protoNode.getDropTable(); + dropTable.init(dropTableProto.getTableName(), dropTableProto.getIfExists(), dropTableProto.getPurge()); + + return dropTable; + } + + private static CreateDatabaseNode convertCreateDatabase(PlanProto.LogicalNode protoNode) { + CreateDatabaseNode createDatabase = new CreateDatabaseNode(protoNode.getNodeId()); + + PlanProto.CreateDatabaseNode createDatabaseProto = protoNode.getCreateDatabase(); + createDatabase.init(createDatabaseProto.getDbName(), createDatabaseProto.getIfNotExists()); + + return createDatabase; + } + + private static DropDatabaseNode convertDropDatabase(PlanProto.LogicalNode protoNode) { + DropDatabaseNode dropDatabase = new DropDatabaseNode(protoNode.getNodeId()); + + PlanProto.DropDatabaseNode dropDatabaseProto = protoNode.getDropDatabase(); + dropDatabase.init(dropDatabaseProto.getDbName(), dropDatabaseProto.getIfExists()); + + return dropDatabase; + } + + private static AlterTablespaceNode convertAlterTablespace(PlanProto.LogicalNode protoNode) { + AlterTablespaceNode alterTablespace = new AlterTablespaceNode(protoNode.getNodeId()); + + PlanProto.AlterTablespaceNode alterTablespaceProto = protoNode.getAlterTablespace(); + alterTablespace.setTablespaceName(alterTablespaceProto.getTableSpaceName()); + + switch (alterTablespaceProto.getSetType()) { + case LOCATION: + alterTablespace.setLocation(alterTablespaceProto.getSetLocation().getLocation()); + break; + default: + throw new UnimplementedException("Unknown SET type in ALTER TABLE: " + alterTablespaceProto.getSetType().name()); + } + + return alterTablespace; + } + + private static AlterTableNode convertAlterTable(PlanProto.LogicalNode protoNode) { + AlterTableNode alterTable = new AlterTableNode(protoNode.getNodeId()); + + PlanProto.AlterTableNode alterTableProto = protoNode.getAlterTable(); + alterTable.setTableName(alterTableProto.getTableName()); + + switch (alterTableProto.getSetType()) { + case RENAME_TABLE: + alterTable.setNewTableName(alterTableProto.getRenameTable().getNewName()); + break; + case ADD_COLUMN: + alterTable.setAddNewColumn(new Column(alterTableProto.getAddColumn().getAddColumn())); + break; + case RENAME_COLUMN: + alterTable.setColumnName(alterTableProto.getRenameColumn().getOldName()); + alterTable.setNewColumnName(alterTableProto.getRenameColumn().getNewName()); + break; + default: + throw new UnimplementedException("Unknown SET type in ALTER TABLE: " + alterTableProto.getSetType().name()); + } + + return alterTable; + } + + private static TruncateTableNode convertTruncateTable(PlanProto.LogicalNode protoNode) { + TruncateTableNode truncateTable = new TruncateTableNode(protoNode.getNodeId()); + + PlanProto.TruncateTableNode truncateTableProto = protoNode.getTruncateTableNode(); + truncateTable.setTableNames(truncateTableProto.getTableNamesList()); + + return truncateTable; + } + + private static AggregationFunctionCallEval [] convertAggFuncCallEvals(OverridableConf context, + List<PlanProto.EvalNodeTree> evalTrees) { + AggregationFunctionCallEval [] aggFuncs = new AggregationFunctionCallEval[evalTrees.size()]; + for (int i = 0; i < aggFuncs.length; i++) { + aggFuncs[i] = (AggregationFunctionCallEval) EvalNodeDeserializer.deserialize(context, evalTrees.get(i)); + } + return aggFuncs; + } + + private static WindowFunctionEval[] convertWindowFunccEvals(OverridableConf context, + List<PlanProto.EvalNodeTree> evalTrees) { + WindowFunctionEval[] winFuncEvals = new WindowFunctionEval[evalTrees.size()]; + for (int i = 0; i < winFuncEvals.length; i++) { + winFuncEvals[i] = (WindowFunctionEval) EvalNodeDeserializer.deserialize(context, evalTrees.get(i)); + } + return winFuncEvals; + } + + public static Schema convertSchema(CatalogProtos.SchemaProto proto) { + return new Schema(proto); + } + + public static Column[] convertColumns(List<CatalogProtos.ColumnProto> columnProtos) { + Column [] columns = new Column[columnProtos.size()]; + for (int i = 0; i < columns.length; i++) { + columns[i] = new Column(columnProtos.get(i)); + } + return columns; + } + + public static Target[] convertTargets(OverridableConf context, List<PlanProto.Target> targetsProto) { + Target [] targets = new Target[targetsProto.size()]; + for (int i = 0; i < targets.length; i++) { + PlanProto.Target targetProto = targetsProto.get(i); + EvalNode evalNode = EvalNodeDeserializer.deserialize(context, targetProto.getExpr()); + if (targetProto.hasAlias()) { + targets[i] = new Target(evalNode, targetProto.getAlias()); + } else { + targets[i] = new Target((FieldEval) evalNode); + } + } + return targets; + } + + public static SortSpec[] convertSortSpecs(List<CatalogProtos.SortSpecProto> sortSpecProtos) { + SortSpec[] sortSpecs = new SortSpec[sortSpecProtos.size()]; + int i = 0; + for (CatalogProtos.SortSpecProto proto : sortSpecProtos) { + sortSpecs[i++] = new SortSpec(proto); + } + return sortSpecs; + } + + public static JoinType convertJoinType(PlanProto.JoinType type) { + switch (type) { + case CROSS_JOIN: + return JoinType.CROSS; + case INNER_JOIN: + return JoinType.INNER; + case LEFT_OUTER_JOIN: + return JoinType.LEFT_OUTER; + case RIGHT_OUTER_JOIN: + return JoinType.RIGHT_OUTER; + case FULL_OUTER_JOIN: + return JoinType.FULL_OUTER; + case LEFT_SEMI_JOIN: + return JoinType.LEFT_SEMI; + case RIGHT_SEMI_JOIN: + return JoinType.RIGHT_SEMI; + case LEFT_ANTI_JOIN: + return JoinType.LEFT_ANTI; + case RIGHT_ANTI_JOIN: + return JoinType.RIGHT_ANTI; + case UNION_JOIN: + return JoinType.UNION; + default: + throw new RuntimeException("Unknown JoinType: " + type.name()); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java new file mode 100644 index 0000000..39a13ba --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java @@ -0,0 +1,724 @@ +/* + * Lisensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.serder; + +import com.google.common.collect.Maps; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.algebra.JoinType; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.exception.UnimplementedException; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.PlanningException; +import org.apache.tajo.plan.Target; +import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.serder.PlanProto.AlterTableNode.AddColumn; +import org.apache.tajo.plan.serder.PlanProto.AlterTableNode.RenameColumn; +import org.apache.tajo.plan.serder.PlanProto.AlterTableNode.RenameTable; +import org.apache.tajo.plan.serder.PlanProto.AlterTablespaceNode.SetLocation; +import org.apache.tajo.plan.serder.PlanProto.LogicalNodeTree; +import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor; +import org.apache.tajo.util.ProtoUtil; +import org.apache.tajo.util.TUtil; + +import java.util.List; +import java.util.Map; +import java.util.Stack; + +/** + * It serializes a logical plan into a protobuf-based serialized bytes. + * + * In detail, it traverses all logical nodes in a postfix order. + * For each visiting node, it serializes the node and adds the serialized bytes into a list. + * Then, a list will contains a list of serialized nodes in a postfix order. + * + * @see org.apache.tajo.plan.serder.LogicalNodeDeserializer + */ +public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSerializer.SerializeContext, + LogicalNode> { + + private static final LogicalNodeSerializer instance; + + static { + instance = new LogicalNodeSerializer(); + } + + /** + * Serialize a logical plan into a protobuf-based serialized bytes. + * + * @param node LogicalNode to be serialized + * @return A list of serialized nodes + */ + public static LogicalNodeTree serialize(LogicalNode node) { + SerializeContext context = new SerializeContext(); + try { + instance.visit(context, null, null, node, new Stack<LogicalNode>()); + } catch (PlanningException e) { + throw new RuntimeException(e); + } + return context.treeBuilder.build(); + } + + private static PlanProto.LogicalNode.Builder createNodeBuilder(SerializeContext context, LogicalNode node) { + int selfId; + if (context.idMap.containsKey(node)) { + selfId = context.idMap.get(node); + } else { + selfId = context.seqId++; + context.idMap.put(node, selfId); + } + + PlanProto.LogicalNode.Builder nodeBuilder = PlanProto.LogicalNode.newBuilder(); + nodeBuilder.setVisitSeq(selfId); + nodeBuilder.setNodeId(node.getPID()); + nodeBuilder.setType(convertType(node.getType())); + + // some DDL statements like DropTable or DropDatabase do not have in/out schemas + if (node.getInSchema() != null) { + nodeBuilder.setInSchema(node.getInSchema().getProto()); + } + if (node.getOutSchema() != null) { + nodeBuilder.setOutSchema(node.getOutSchema().getProto()); + } + return nodeBuilder; + } + + public static class SerializeContext { + private int seqId = 0; + private Map<LogicalNode, Integer> idMap = Maps.newHashMap(); + private LogicalNodeTree.Builder treeBuilder = LogicalNodeTree.newBuilder(); + } + + public LogicalNode visitRoot(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + LogicalRootNode root, Stack<LogicalNode> stack) throws PlanningException { + super.visitRoot(context, plan, block, root, stack); + + int [] childIds = registerGetChildIds(context, root); + + PlanProto.RootNode.Builder rootBuilder = PlanProto.RootNode.newBuilder(); + rootBuilder.setChildSeq(childIds[0]); + + PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, root); + nodeBuilder.setRoot(rootBuilder); + context.treeBuilder.addNodes(nodeBuilder); + + return root; + } + + @Override + public LogicalNode visitSetSession(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + SetSessionNode node, Stack<LogicalNode> stack) throws PlanningException { + super.visitSetSession(context, plan, block, node, stack); + + PlanProto.SetSessionNode.Builder builder = PlanProto.SetSessionNode.newBuilder(); + builder.setName(node.getName()); + if (node.hasValue()) { + builder.setValue(node.getValue()); + } + + PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, node); + nodeBuilder.setSetSession(builder); + context.treeBuilder.addNodes(nodeBuilder); + + return node; + } + + public LogicalNode visitEvalExpr(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + EvalExprNode exprEval, Stack<LogicalNode> stack) throws PlanningException { + PlanProto.EvalExprNode.Builder exprEvalBuilder = PlanProto.EvalExprNode.newBuilder(); + exprEvalBuilder.addAllTargets( + ProtoUtil.<PlanProto.Target>toProtoObjects(exprEval.getTargets())); + + PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, exprEval); + nodeBuilder.setExprEval(exprEvalBuilder); + context.treeBuilder.addNodes(nodeBuilder); + + return exprEval; + } + + public LogicalNode visitProjection(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + ProjectionNode projection, Stack<LogicalNode> stack) throws PlanningException { + super.visitProjection(context, plan, block, projection, stack); + + int [] childIds = registerGetChildIds(context, projection); + + PlanProto.ProjectionNode.Builder projectionBuilder = PlanProto.ProjectionNode.newBuilder(); + projectionBuilder.setChildSeq(childIds[0]); + projectionBuilder.addAllTargets( + ProtoUtil.<PlanProto.Target>toProtoObjects(projection.getTargets())); + projectionBuilder.setDistinct(projection.isDistinct()); + + PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, projection); + nodeBuilder.setProjection(projectionBuilder); + context.treeBuilder.addNodes(nodeBuilder); + + return projection; + } + + @Override + public LogicalNode visitLimit(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + LimitNode limit, Stack<LogicalNode> stack) throws PlanningException { + super.visitLimit(context, plan, block, limit, stack); + + int [] childIds = registerGetChildIds(context, limit); + + PlanProto.LimitNode.Builder limitBuilder = PlanProto.LimitNode.newBuilder(); + limitBuilder.setChildSeq(childIds[0]); + limitBuilder.setFetchFirstNum(limit.getFetchFirstNum()); + + PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, limit); + nodeBuilder.setLimit(limitBuilder); + context.treeBuilder.addNodes(nodeBuilder); + + return limit; + } + + public LogicalNode visitWindowAgg(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + WindowAggNode windowAgg, Stack<LogicalNode> stack) throws PlanningException { + super.visitWindowAgg(context, plan, block, windowAgg, stack); + + int [] childIds = registerGetChildIds(context, windowAgg); + + PlanProto.WindowAggNode.Builder windowAggBuilder = PlanProto.WindowAggNode.newBuilder(); + windowAggBuilder.setChildSeq(childIds[0]); + + if (windowAgg.hasPartitionKeys()) { + windowAggBuilder.addAllPartitionKeys( + ProtoUtil.<CatalogProtos.ColumnProto>toProtoObjects(windowAgg.getPartitionKeys())); + } + + if (windowAgg.hasAggFunctions()) { + windowAggBuilder.addAllWindowFunctions( + ProtoUtil.<PlanProto.EvalNodeTree>toProtoObjects(windowAgg.getWindowFunctions())); + } + windowAggBuilder.setDistinct(windowAgg.isDistinct()); + + if (windowAgg.hasSortSpecs()) { + windowAggBuilder.addAllSortSpecs( + ProtoUtil.<CatalogProtos.SortSpecProto>toProtoObjects(windowAgg.getSortSpecs())); + } + if (windowAgg.hasTargets()) { + windowAggBuilder.addAllTargets( + ProtoUtil.<PlanProto.Target>toProtoObjects(windowAgg.getTargets())); + } + + PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, windowAgg); + nodeBuilder.setWindowAgg(windowAggBuilder); + context.treeBuilder.addNodes(nodeBuilder); + + return windowAgg; + } + + @Override + public LogicalNode visitSort(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + SortNode sort, Stack<LogicalNode> stack) throws PlanningException { + super.visitSort(context, plan, block, sort, stack); + + int [] childIds = registerGetChildIds(context, sort); + + PlanProto.SortNode.Builder sortBuilder = PlanProto.SortNode.newBuilder(); + sortBuilder.setChildSeq(childIds[0]); + for (int i = 0; i < sort.getSortKeys().length; i++) { + sortBuilder.addSortSpecs(sort.getSortKeys()[i].getProto()); + } + + PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, sort); + nodeBuilder.setSort(sortBuilder); + context.treeBuilder.addNodes(nodeBuilder); + + return sort; + } + + @Override + public LogicalNode visitHaving(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + HavingNode having, Stack<LogicalNode> stack) throws PlanningException { + super.visitHaving(context, plan, block, having, stack); + + int [] childIds = registerGetChildIds(context, having); + + PlanProto.FilterNode.Builder filterBuilder = PlanProto.FilterNode.newBuilder(); + filterBuilder.setChildSeq(childIds[0]); + filterBuilder.setQual(EvalNodeSerializer.serialize(having.getQual())); + + PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, having); + nodeBuilder.setFilter(filterBuilder); + context.treeBuilder.addNodes(nodeBuilder); + + return having; + } + + public LogicalNode visitGroupBy(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + GroupbyNode node, Stack<LogicalNode> stack) throws PlanningException { + super.visitGroupBy(context, plan, block, node, new Stack<LogicalNode>()); + + PlanProto.LogicalNode.Builder nodeBuilder = buildGroupby(context, node); + context.treeBuilder.addNodes(nodeBuilder); + return node; + } + + private PlanProto.LogicalNode.Builder buildGroupby(SerializeContext context, GroupbyNode node) + throws PlanningException { + int [] childIds = registerGetChildIds(context, node); + + PlanProto.GroupbyNode.Builder groupbyBuilder = PlanProto.GroupbyNode.newBuilder(); + groupbyBuilder.setChildSeq(childIds[0]); + groupbyBuilder.setDistinct(node.isDistinct()); + + if (node.groupingKeyNum() > 0) { + groupbyBuilder.addAllGroupingKeys( + ProtoUtil.<CatalogProtos.ColumnProto>toProtoObjects(node.getGroupingColumns())); + } + if (node.hasAggFunctions()) { + groupbyBuilder.addAllAggFunctions( + ProtoUtil.<PlanProto.EvalNodeTree>toProtoObjects(node.getAggFunctions())); + } + if (node.hasTargets()) { + groupbyBuilder.addAllTargets(ProtoUtil.<PlanProto.Target>toProtoObjects(node.getTargets())); + } + + PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, node); + nodeBuilder.setGroupby(groupbyBuilder); + + return nodeBuilder; + } + + public LogicalNode visitDistinctGroupby(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + DistinctGroupbyNode node, Stack<LogicalNode> stack) throws PlanningException { + super.visitDistinctGroupby(context, plan, block, node, new Stack<LogicalNode>()); + + int [] childIds = registerGetChildIds(context, node); + + PlanProto.DistinctGroupbyNode.Builder distGroupbyBuilder = PlanProto.DistinctGroupbyNode.newBuilder(); + distGroupbyBuilder.setChildSeq(childIds[0]); + if (node.getGroupbyPlan() != null) { + distGroupbyBuilder.setGroupbyNode(buildGroupby(context, node.getGroupbyPlan())); + } + + for (GroupbyNode subPlan : node.getSubPlans()) { + distGroupbyBuilder.addSubPlans(buildGroupby(context, subPlan)); + } + + if (node.getGroupingColumns().length > 0) { + distGroupbyBuilder.addAllGroupingKeys( + ProtoUtil.<CatalogProtos.ColumnProto>toProtoObjects(node.getGroupingColumns())); + } + if (node.getAggFunctions().length > 0) { + distGroupbyBuilder.addAllAggFunctions( + ProtoUtil.<PlanProto.EvalNodeTree>toProtoObjects(node.getAggFunctions())); + } + if (node.hasTargets()) { + distGroupbyBuilder.addAllTargets(ProtoUtil.<PlanProto.Target>toProtoObjects(node.getTargets())); + } + for (int cid : node.getResultColumnIds()) { + distGroupbyBuilder.addResultId(cid); + } + + PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, node); + nodeBuilder.setDistinctGroupby(distGroupbyBuilder); + context.treeBuilder.addNodes(nodeBuilder); + + return node; + } + + @Override + public LogicalNode visitFilter(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + SelectionNode filter, Stack<LogicalNode> stack) throws PlanningException { + super.visitFilter(context, plan, block, filter, stack); + + int [] childIds = registerGetChildIds(context, filter); + + PlanProto.FilterNode.Builder filterBuilder = PlanProto.FilterNode.newBuilder(); + filterBuilder.setChildSeq(childIds[0]); + filterBuilder.setQual(EvalNodeSerializer.serialize(filter.getQual())); + + PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, filter); + nodeBuilder.setFilter(filterBuilder); + context.treeBuilder.addNodes(nodeBuilder); + + return filter; + } + + public LogicalNode visitJoin(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, JoinNode join, + Stack<LogicalNode> stack) throws PlanningException { + super.visitJoin(context, plan, block, join, stack); + + int [] childIds = registerGetChildIds(context, join); + + // building itself + PlanProto.JoinNode.Builder joinBuilder = PlanProto.JoinNode.newBuilder(); + joinBuilder.setJoinType(convertJoinType(join.getJoinType())); + joinBuilder.setLeftChildSeq(childIds[0]); + joinBuilder.setRightChilSeq(childIds[1]); + if (join.hasJoinQual()) { + joinBuilder.setJoinQual(EvalNodeSerializer.serialize(join.getJoinQual())); + } + + if (join.hasTargets()) { + joinBuilder.setExistsTargets(true); + joinBuilder.addAllTargets(ProtoUtil.<PlanProto.Target>toProtoObjects(join.getTargets())); + } else { + joinBuilder.setExistsTargets(false); + } + + PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, join); + nodeBuilder.setJoin(joinBuilder); + context.treeBuilder.addNodes(nodeBuilder); + + return join; + } + + @Override + public LogicalNode visitUnion(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, UnionNode node, + Stack<LogicalNode> stack) throws PlanningException { + super.visitUnion(context, plan, block, node, stack); + + int [] childIds = registerGetChildIds(context, node); + + PlanProto.UnionNode.Builder unionBuilder = PlanProto.UnionNode.newBuilder(); + unionBuilder.setAll(true); + unionBuilder.setLeftChildSeq(childIds[0]); + unionBuilder.setRightChildSeq(childIds[1]); + + PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, node); + nodeBuilder.setUnion(unionBuilder); + context.treeBuilder.addNodes(nodeBuilder); + + return node; + } + + @Override + public LogicalNode visitScan(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + ScanNode scan, Stack<LogicalNode> stack) throws PlanningException { + + PlanProto.ScanNode.Builder scanBuilder = buildScanNode(scan); + + PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, scan); + nodeBuilder.setScan(scanBuilder); + context.treeBuilder.addNodes(nodeBuilder); + + return scan; + } + + public PlanProto.ScanNode.Builder buildScanNode(ScanNode scan) { + PlanProto.ScanNode.Builder scanBuilder = PlanProto.ScanNode.newBuilder(); + scanBuilder.setTable(scan.getTableDesc().getProto()); + if (scan.hasAlias()) { + scanBuilder.setAlias(scan.getAlias()); + } + + if (scan.hasTargets()) { + scanBuilder.setExistTargets(true); + scanBuilder.addAllTargets(ProtoUtil.<PlanProto.Target>toProtoObjects(scan.getTargets())); + } else { + scanBuilder.setExistTargets(false); + } + + if (scan.hasQual()) { + scanBuilder.setQual(EvalNodeSerializer.serialize(scan.getQual())); + } + return scanBuilder; + } + + @Override + public LogicalNode visitPartitionedTableScan(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + PartitionedTableScanNode node, Stack<LogicalNode> stack) + throws PlanningException { + + PlanProto.ScanNode.Builder scanBuilder = buildScanNode(node); + + PlanProto.PartitionScanSpec.Builder partitionScan = PlanProto.PartitionScanSpec.newBuilder(); + List<String> pathStrs = TUtil.newList(); + if (node.getInputPaths() != null) { + for (Path p : node.getInputPaths()) { + pathStrs.add(p.toString()); + } + partitionScan.addAllPaths(pathStrs); + } + + PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, node); + nodeBuilder.setScan(scanBuilder); + nodeBuilder.setPartitionScan(partitionScan); + context.treeBuilder.addNodes(nodeBuilder); + + return node; + } + + public LogicalNode visitTableSubQuery(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + TableSubQueryNode node, Stack<LogicalNode> stack) throws PlanningException { + super.visitTableSubQuery(context, plan, block, node, stack); + + int [] childIds = registerGetChildIds(context, node); + + PlanProto.TableSubQueryNode.Builder builder = PlanProto.TableSubQueryNode.newBuilder(); + builder.setChildSeq(childIds[0]); + + builder.setTableName(node.getTableName()); + + if (node.hasTargets()) { + builder.addAllTargets(ProtoUtil.<PlanProto.Target>toProtoObjects(node.getTargets())); + } + + PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, node); + nodeBuilder.setTableSubQuery(builder); + context.treeBuilder.addNodes(nodeBuilder); + + return node; + } + + public LogicalNode visitCreateTable(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + CreateTableNode node, Stack<LogicalNode> stack) throws PlanningException { + super.visitCreateTable(context, plan, block, node, stack); + + int [] childIds = registerGetChildIds(context, node); + + PlanProto.PersistentStoreNode.Builder persistentStoreBuilder = buildPersistentStoreBuilder(node, childIds); + PlanProto.StoreTableNodeSpec.Builder storeTableBuilder = buildStoreTableNodeSpec(node); + + PlanProto.CreateTableNodeSpec.Builder createTableBuilder = PlanProto.CreateTableNodeSpec.newBuilder(); + createTableBuilder.setSchema(node.getTableSchema().getProto()); + createTableBuilder.setExternal(node.isExternal()); + if (node.isExternal() && node.hasPath()) { + createTableBuilder.setPath(node.getPath().toString()); + } + createTableBuilder.setIfNotExists(node.isIfNotExists()); + + PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, node); + nodeBuilder.setPersistentStore(persistentStoreBuilder); + nodeBuilder.setStoreTable(storeTableBuilder); + nodeBuilder.setCreateTable(createTableBuilder); + context.treeBuilder.addNodes(nodeBuilder); + + return node; + } + + @Override + public LogicalNode visitDropTable(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + DropTableNode node, Stack<LogicalNode> stack) { + PlanProto.DropTableNode.Builder dropTableBuilder = PlanProto.DropTableNode.newBuilder(); + dropTableBuilder.setTableName(node.getTableName()); + dropTableBuilder.setIfExists(node.isIfExists()); + dropTableBuilder.setPurge(node.isPurge()); + + PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, node); + nodeBuilder.setDropTable(dropTableBuilder); + context.treeBuilder.addNodes(nodeBuilder); + + return node; + } + + @Override + public LogicalNode visitAlterTablespace(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + AlterTablespaceNode node, Stack<LogicalNode> stack) throws PlanningException { + PlanProto.AlterTablespaceNode.Builder alterTablespaceBuilder = PlanProto.AlterTablespaceNode.newBuilder(); + alterTablespaceBuilder.setTableSpaceName(node.getTablespaceName()); + + switch (node.getSetType()) { + case LOCATION: + alterTablespaceBuilder.setSetType(PlanProto.AlterTablespaceNode.Type.LOCATION); + alterTablespaceBuilder.setSetLocation(SetLocation.newBuilder().setLocation(node.getLocation())); + break; + + default: + throw new UnimplementedException("Unknown SET type in ALTER TABLESPACE: " + node.getSetType().name()); + } + + PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, node); + nodeBuilder.setAlterTablespace(alterTablespaceBuilder); + context.treeBuilder.addNodes(nodeBuilder); + + return node; + } + + @Override + public LogicalNode visitAlterTable(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + AlterTableNode node, Stack<LogicalNode> stack) { + PlanProto.AlterTableNode.Builder alterTableBuilder = PlanProto.AlterTableNode.newBuilder(); + alterTableBuilder.setTableName(node.getTableName()); + + switch (node.getAlterTableOpType()) { + case RENAME_TABLE: + alterTableBuilder.setSetType(PlanProto.AlterTableNode.Type.RENAME_TABLE); + alterTableBuilder.setRenameTable(RenameTable.newBuilder().setNewName(node.getNewTableName())); + break; + case ADD_COLUMN: + alterTableBuilder.setSetType(PlanProto.AlterTableNode.Type.ADD_COLUMN); + alterTableBuilder.setAddColumn(AddColumn.newBuilder().setAddColumn(node.getAddNewColumn().getProto())); + break; + case RENAME_COLUMN: + alterTableBuilder.setSetType(PlanProto.AlterTableNode.Type.RENAME_COLUMN); + alterTableBuilder.setRenameColumn(RenameColumn.newBuilder() + .setOldName(node.getColumnName()) + .setNewName(node.getNewColumnName())); + break; + default: + throw new UnimplementedException("Unknown SET type in ALTER TABLE: " + node.getAlterTableOpType().name()); + } + + PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, node); + nodeBuilder.setAlterTable(alterTableBuilder); + context.treeBuilder.addNodes(nodeBuilder); + + return node; + } + + @Override + public LogicalNode visitTruncateTable(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + TruncateTableNode node, Stack<LogicalNode> stack) throws PlanningException { + PlanProto.TruncateTableNode.Builder truncateTableBuilder = PlanProto.TruncateTableNode.newBuilder(); + truncateTableBuilder.addAllTableNames(node.getTableNames()); + + PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, node); + nodeBuilder.setTruncateTableNode(truncateTableBuilder); + context.treeBuilder.addNodes(nodeBuilder); + + return node; + } + + public LogicalNode visitInsert(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + InsertNode node, Stack<LogicalNode> stack) throws PlanningException { + super.visitInsert(context, plan, block, node, stack); + + int [] childIds = registerGetChildIds(context, node); + + PlanProto.PersistentStoreNode.Builder persistentStoreBuilder = buildPersistentStoreBuilder(node, childIds); + PlanProto.StoreTableNodeSpec.Builder storeTableBuilder = buildStoreTableNodeSpec(node); + + PlanProto.InsertNodeSpec.Builder insertNodeSpec = PlanProto.InsertNodeSpec.newBuilder(); + insertNodeSpec.setOverwrite(node.isOverwrite()); + insertNodeSpec.setTableSchema(node.getTableSchema().getProto()); + if (node.hasProjectedSchema()) { + insertNodeSpec.setProjectedSchema(node.getProjectedSchema().getProto()); + } + if (node.hasTargetSchema()) { + insertNodeSpec.setTargetSchema(node.getTargetSchema().getProto()); + } + if (node.hasPath()) { + insertNodeSpec.setPath(node.getPath().toString()); + } + + PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, node); + nodeBuilder.setPersistentStore(persistentStoreBuilder); + nodeBuilder.setStoreTable(storeTableBuilder); + nodeBuilder.setInsert(insertNodeSpec); + context.treeBuilder.addNodes(nodeBuilder); + + return node; + } + + private static PlanProto.PersistentStoreNode.Builder buildPersistentStoreBuilder(PersistentStoreNode node, + int [] childIds) { + PlanProto.PersistentStoreNode.Builder persistentStoreBuilder = PlanProto.PersistentStoreNode.newBuilder(); + persistentStoreBuilder.setChildSeq(childIds[0]); + persistentStoreBuilder.setStorageType(node.getStorageType()); + if (node.hasOptions()) { + persistentStoreBuilder.setTableProperties(node.getOptions().getProto()); + } + return persistentStoreBuilder; + } + + private static PlanProto.StoreTableNodeSpec.Builder buildStoreTableNodeSpec(StoreTableNode node) { + PlanProto.StoreTableNodeSpec.Builder storeTableBuilder = PlanProto.StoreTableNodeSpec.newBuilder(); + if (node.hasPartition()) { + storeTableBuilder.setPartitionMethod(node.getPartitionMethod().getProto()); + } + if (node.hasTableName()) { // It will be false if node is for INSERT INTO LOCATION '...' + storeTableBuilder.setTableName(node.getTableName()); + } + return storeTableBuilder; + } + + @Override + public LogicalNode visitCreateDatabase(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + CreateDatabaseNode node, Stack<LogicalNode> stack) throws PlanningException { + PlanProto.CreateDatabaseNode.Builder createDatabaseBuilder = PlanProto.CreateDatabaseNode.newBuilder(); + createDatabaseBuilder.setDbName(node.getDatabaseName()); + createDatabaseBuilder.setIfNotExists(node.isIfNotExists()); + + PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, node); + nodeBuilder.setCreateDatabase(createDatabaseBuilder); + context.treeBuilder.addNodes(nodeBuilder); + + return node; + } + + @Override + public LogicalNode visitDropDatabase(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + DropDatabaseNode node, Stack<LogicalNode> stack) throws PlanningException { + PlanProto.DropDatabaseNode.Builder dropDatabaseBuilder = PlanProto.DropDatabaseNode.newBuilder(); + dropDatabaseBuilder.setDbName(node.getDatabaseName()); + dropDatabaseBuilder.setIfExists(node.isIfExists()); + + PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, node); + nodeBuilder.setDropDatabase(dropDatabaseBuilder); + context.treeBuilder.addNodes(nodeBuilder); + + return node; + } + + public static PlanProto.NodeType convertType(NodeType type) { + return PlanProto.NodeType.valueOf(type.name()); + } + + public static PlanProto.JoinType convertJoinType(JoinType type) { + switch (type) { + case CROSS: + return PlanProto.JoinType.CROSS_JOIN; + case INNER: + return PlanProto.JoinType.INNER_JOIN; + case LEFT_OUTER: + return PlanProto.JoinType.LEFT_OUTER_JOIN; + case RIGHT_OUTER: + return PlanProto.JoinType.RIGHT_OUTER_JOIN; + case FULL_OUTER: + return PlanProto.JoinType.FULL_OUTER_JOIN; + case LEFT_SEMI: + return PlanProto.JoinType.LEFT_SEMI_JOIN; + case RIGHT_SEMI: + return PlanProto.JoinType.RIGHT_SEMI_JOIN; + case LEFT_ANTI: + return PlanProto.JoinType.LEFT_ANTI_JOIN; + case RIGHT_ANTI: + return PlanProto.JoinType.RIGHT_ANTI_JOIN; + case UNION: + return PlanProto.JoinType.UNION_JOIN; + default: + throw new RuntimeException("Unknown JoinType: " + type.name()); + } + } + + public static PlanProto.Target convertTarget(Target target) { + PlanProto.Target.Builder targetBuilder = PlanProto.Target.newBuilder(); + targetBuilder.setExpr(EvalNodeSerializer.serialize(target.getEvalTree())); + if (target.hasAlias()) { + targetBuilder.setAlias(target.getAlias()); + } + return targetBuilder.build(); + } + + private int [] registerGetChildIds(SerializeContext context, LogicalNode node) { + int [] childIds = new int[node.childNum()]; + for (int i = 0; i < node.childNum(); i++) { + if (context.idMap.containsKey(node.getChild(i))) { + childIds[i] = context.idMap.get(node.getChild(i)); + } else { + childIds[i] = context.seqId++; + } + } + return childIds; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/serder/package-info.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/package-info.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/package-info.java new file mode 100644 index 0000000..b148fec --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package provides (de)serialization API for logical plan and it related parts. + * They employ protocol buffer to (de)serialize logical plans. + */ +package org.apache.tajo.plan.serder; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java index 4672577..d813432 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java @@ -26,10 +26,8 @@ import org.apache.tajo.SessionVars; import org.apache.tajo.algebra.*; import org.apache.tajo.annotation.Nullable; import org.apache.tajo.catalog.*; -import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.common.TajoDataTypes.DataType; -import org.apache.tajo.conf.TajoConf; import org.apache.tajo.plan.*; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.logical.*; @@ -48,6 +46,9 @@ import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType.TEXTFILE; public class PlannerUtil { + public static final Column [] EMPTY_COLUMNS = new Column[] {}; + public static final AggregationFunctionCallEval [] EMPTY_AGG_FUNCS = new AggregationFunctionCallEval[] {}; + public static boolean checkIfSetSession(LogicalNode node) { LogicalNode baseNode = node; if (node instanceof LogicalRootNode) { @@ -698,7 +699,7 @@ public class PlannerUtil { copy.setPID(plan.newPID()); if (node instanceof DistinctGroupbyNode) { DistinctGroupbyNode dNode = (DistinctGroupbyNode)copy; - for (GroupbyNode eachNode: dNode.getGroupByNodes()) { + for (GroupbyNode eachNode: dNode.getSubPlans()) { eachNode.setPID(plan.newPID()); } } @@ -762,15 +763,6 @@ public class PlannerUtil { return names; } - public static SortSpec[] convertSortSpecs(Collection<CatalogProtos.SortSpecProto> sortSpecProtos) { - SortSpec[] sortSpecs = new SortSpec[sortSpecProtos.size()]; - int i = 0; - for (CatalogProtos.SortSpecProto proto : sortSpecProtos) { - sortSpecs[i++] = new SortSpec(proto); - } - return sortSpecs; - } - /** * Generate an explain string of a LogicalNode and its descendant nodes. * http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java index d09710e..23c834d 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java @@ -63,7 +63,8 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi current = visitSetSession(context, plan, block, (SetSessionNode) node, stack); break; case EXPRS: - return null; + current = visitEvalExpr(context, plan, block, (EvalExprNode) node, stack); + break; case PROJECTION: current = visitProjection(context, plan, block, (ProjectionNode) node, stack); break; @@ -83,7 +84,7 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi current = visitWindowAgg(context, plan, block, (WindowAggNode) node, stack); break; case DISTINCT_GROUP_BY: - current = visitDistinct(context, plan, block, (DistinctGroupbyNode) node, stack); + current = visitDistinctGroupby(context, plan, block, (DistinctGroupbyNode) node, stack); break; case SELECTION: current = visitFilter(context, plan, block, (SelectionNode) node, stack); @@ -159,6 +160,12 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi } @Override + public RESULT visitEvalExpr(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, EvalExprNode node, + Stack<LogicalNode> stack) throws PlanningException { + return null; + } + + @Override public RESULT visitProjection(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, ProjectionNode node, Stack<LogicalNode> stack) throws PlanningException { @@ -213,8 +220,8 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi return result; } - public RESULT visitDistinct(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, DistinctGroupbyNode node, - Stack<LogicalNode> stack) throws PlanningException { + public RESULT visitDistinctGroupby(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, + DistinctGroupbyNode node, Stack<LogicalNode> stack) throws PlanningException { stack.push(node); RESULT result = visit(context, plan, block, node.getChild(), stack); stack.pop(); @@ -244,10 +251,17 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi public RESULT visitUnion(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, UnionNode node, Stack<LogicalNode> stack) throws PlanningException { stack.push(node); - LogicalPlan.QueryBlock leftBlock = plan.getBlock(node.getLeftChild()); - RESULT result = visit(context, plan, leftBlock, leftBlock.getRoot(), stack); - LogicalPlan.QueryBlock rightBlock = plan.getBlock(node.getRightChild()); - visit(context, plan, rightBlock, rightBlock.getRoot(), stack); + RESULT result = null; + if (plan != null) { + LogicalPlan.QueryBlock leftBlock = plan.getBlock(node.getLeftChild()); + result = visit(context, plan, leftBlock, leftBlock.getRoot(), stack); + LogicalPlan.QueryBlock rightBlock = plan.getBlock(node.getRightChild()); + visit(context, plan, rightBlock, rightBlock.getRoot(), stack); + } else { + result = visit(context, plan, null, node.getLeftChild(), stack); + visit(context, plan, null, node.getRightChild(), stack); + } + stack.pop(); return result; } @@ -276,8 +290,13 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi public RESULT visitTableSubQuery(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, TableSubQueryNode node, Stack<LogicalNode> stack) throws PlanningException { stack.push(node); - LogicalPlan.QueryBlock childBlock = plan.getBlock(node.getSubQuery()); - RESULT result = visit(context, plan, childBlock, childBlock.getRoot(), stack); + RESULT result = null; + if (plan != null) { + LogicalPlan.QueryBlock childBlock = plan.getBlock(node.getSubQuery()); + result = visit(context, plan, childBlock, childBlock.getRoot(), stack); + } else { + result = visit(context, plan, null, node.getSubQuery(), stack); + } stack.pop(); return result; } http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/ExplainLogicalPlanVisitor.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/ExplainLogicalPlanVisitor.java b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/ExplainLogicalPlanVisitor.java index 7065295..52db8eb 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/ExplainLogicalPlanVisitor.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/ExplainLogicalPlanVisitor.java @@ -117,8 +117,9 @@ public class ExplainLogicalPlanVisitor extends BasicLogicalPlanVisitor<ExplainLo return visitUnaryNode(context, plan, block, node, stack); } - public LogicalNode visitDistinct(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, DistinctGroupbyNode node, - Stack<LogicalNode> stack) throws PlanningException { + public LogicalNode visitDistinctGroupby(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, + DistinctGroupbyNode node, + Stack<LogicalNode> stack) throws PlanningException { return visitUnaryNode(context, plan, block, node, stack); } http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/LogicalPlanVisitor.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/LogicalPlanVisitor.java b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/LogicalPlanVisitor.java index 6a0c338..5be2eec 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/LogicalPlanVisitor.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/LogicalPlanVisitor.java @@ -32,6 +32,9 @@ public interface LogicalPlanVisitor<CONTEXT, RESULT> { RESULT visitSetSession(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, SetSessionNode node, Stack<LogicalNode> stack) throws PlanningException; + RESULT visitEvalExpr(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, EvalExprNode node, + Stack<LogicalNode> stack) throws PlanningException; + RESULT visitProjection(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, ProjectionNode node, Stack<LogicalNode> stack) throws PlanningException; @@ -48,8 +51,8 @@ public interface LogicalPlanVisitor<CONTEXT, RESULT> { Stack<LogicalNode> stack) throws PlanningException; RESULT visitWindowAgg(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, WindowAggNode node, Stack<LogicalNode> stack) throws PlanningException; - RESULT visitDistinct(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, DistinctGroupbyNode node, - Stack<LogicalNode> stack) throws PlanningException; + RESULT visitDistinctGroupby(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, DistinctGroupbyNode node, + Stack<LogicalNode> stack) throws PlanningException; RESULT visitFilter(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, SelectionNode node, Stack<LogicalNode> stack) throws PlanningException; http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/proto/Plan.proto ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/proto/Plan.proto b/tajo-plan/src/main/proto/Plan.proto index 0f82d87..3e4f07c 100644 --- a/tajo-plan/src/main/proto/Plan.proto +++ b/tajo-plan/src/main/proto/Plan.proto @@ -26,58 +26,280 @@ import "CatalogProtos.proto"; import "DataTypes.proto"; enum NodeType { - BST_INDEX_SCAN = 0; - EXCEPT = 1; + SET_SESSION = 0; + + ROOT = 1; EXPRS = 2; - DISTINCT_GROUP_BY = 3; - GROUP_BY = 4; - HAVING = 5; - JOIN = 6; - INSERT = 7; - INTERSECT = 8; - LIMIT = 9; - PARTITIONS_SCAN = 10; - PROJECTION = 11; - ROOT = 12; - SCAN = 13; - SELECTION = 14; - SORT = 15; - STORE = 16; - TABLE_SUBQUERY = 17; - UNION = 18; - WINDOW_AGG = 19; - - CREATE_DATABASE = 20; - DROP_DATABASE = 21; - CREATE_TABLE = 22; - DROP_TABLE = 23; - ALTER_TABLESPACE = 24; - ALTER_TABLE = 25; - TRUNCATE_TABLE = 26; -} - -message LogicalPlan { - required KeyValueSetProto adjacentList = 1; + PROJECTION = 3; + LIMIT = 4; + WINDOW_AGG = 5; + SORT = 6; + HAVING = 7; + GROUP_BY = 8; + DISTINCT_GROUP_BY = 9; + SELECTION = 10; + JOIN = 11; + UNION = 12; + INTERSECT = 13; + EXCEPT = 14; + TABLE_SUBQUERY = 15; + SCAN = 16; + PARTITIONS_SCAN = 17; + BST_INDEX_SCAN = 18; + STORE = 19; + INSERT = 20; + + CREATE_DATABASE = 21; + DROP_DATABASE = 22; + CREATE_TABLE = 23; + DROP_TABLE = 24; + ALTER_TABLESPACE = 25; + ALTER_TABLE = 26; + TRUNCATE_TABLE = 27; } -message LogicalNode { - required int32 pid = 1; - required NodeType type = 2; - required SchemaProto in_schema = 3; - required SchemaProto out_schema = 4; - required NodeSpec spec = 5; +message LogicalNodeTree { + repeated LogicalNode nodes = 1; } -message NodeSpec { - optional ScanNode scan = 1; +message LogicalNode { + required int32 visitSeq = 1; + required int32 nodeId = 2; + required NodeType type = 3; + optional SchemaProto in_schema = 4; + optional SchemaProto out_schema = 5; + + optional ScanNode scan = 6; + optional PartitionScanSpec partitionScan = 7; + optional JoinNode join = 8; + optional FilterNode filter = 9; + optional GroupbyNode groupby = 10; + optional DistinctGroupbyNode distinctGroupby = 11; + optional SortNode sort = 12; + optional LimitNode limit = 13; + optional WindowAggNode windowAgg = 14; + optional ProjectionNode projection = 15; + optional EvalExprNode exprEval = 16; + optional UnionNode union = 17; + optional TableSubQueryNode tableSubQuery = 18; + optional PersistentStoreNode persistentStore = 19; + optional StoreTableNodeSpec storeTable = 20; + optional InsertNodeSpec insert = 21; + optional CreateTableNodeSpec createTable = 22; + optional RootNode root = 23; + optional SetSessionNode setSession = 24; + + optional CreateDatabaseNode createDatabase = 25; + optional DropDatabaseNode dropDatabase = 26; + optional DropTableNode dropTable = 27; + + optional AlterTablespaceNode alterTablespace = 28; + optional AlterTableNode alterTable = 29; + optional TruncateTableNode truncateTableNode = 30; } message ScanNode { required TableDescProto table = 1; optional string alias = 2; - required SchemaProto schema = 3; + required bool existTargets = 3; + repeated Target targets = 4; + optional EvalNodeTree qual = 5; +} + +message PartitionScanSpec { + repeated string paths = 1; +} + +message FilterNode { + required int32 childSeq = 1; + required EvalNodeTree qual = 2; +} + +message JoinNode { + required int32 leftChildSeq = 1; + required int32 rightChilSeq = 2; + required JoinType joinType = 3; + optional EvalNodeTree joinQual = 4; + required bool existsTargets = 5; + repeated Target targets = 6; +} + +message GroupbyNode { + required int32 childSeq = 1; + required bool distinct = 2; + repeated ColumnProto groupingKeys = 3; + repeated EvalNodeTree aggFunctions = 4; + repeated Target targets = 5; +} + +message DistinctGroupbyNode { + required int32 childSeq = 1; + optional LogicalNode groupbyNode = 2; + repeated LogicalNode subPlans = 3; + repeated Target targets = 4; + repeated ColumnProto groupingKeys = 5; + repeated int32 resultId = 6; + repeated EvalNodeTree aggFunctions = 7; +} + +message SortNode { + required int32 childSeq = 1; + repeated SortSpecProto sortSpecs = 2; +} + +message LimitNode { + required int32 childSeq = 1; + required int64 fetchFirstNum = 2; +} + +message WindowAggNode { + required int32 childSeq = 1; + repeated ColumnProto partitionKeys = 2; + repeated SortSpecProto sortSpecs = 3; + repeated EvalNodeTree windowFunctions = 4; + required bool distinct = 5; // if distinct aggregation function is included in window function + repeated Target targets = 6; +} + +message UnionNode { + required int32 leftChildSeq = 1; + required int32 rightChildSeq = 2; + required bool all = 3; +} + +message TableSubQueryNode { + required int32 childSeq = 1; + required string tableName = 2; + repeated Target targets = 3; +} + +message ProjectionNode { + required int32 childSeq = 1; + required bool distinct = 2; + repeated Target targets = 3; +} + +message EvalExprNode { + repeated Target targets = 1; +} + +message RootNode { + required int32 childSeq = 1; +} + +message SetSessionNode { + required string name = 1; + optional string value = 2; +} + +message Target { + required EvalNodeTree expr = 1; + optional string alias = 2; } +enum JoinType { + CROSS_JOIN = 0; + INNER_JOIN = 1; + LEFT_OUTER_JOIN = 2; + RIGHT_OUTER_JOIN = 3; + FULL_OUTER_JOIN = 4; + UNION_JOIN = 5; + LEFT_ANTI_JOIN = 6; + RIGHT_ANTI_JOIN = 7; + LEFT_SEMI_JOIN = 8; + RIGHT_SEMI_JOIN = 9; +} + +message PartitionTableScanSpec { + repeated string paths = 1; +} + +message PersistentStoreNode { + required int32 childSeq = 1; + required StoreType storageType = 2; + required KeyValueSetProto tableProperties = 3; +} + +message StoreTableNodeSpec { // required PersistentStoreSpec + optional string tableName = 1; // 'INSERT INTO LOCATION' does not require 'table name'. + optional PartitionMethodProto partitionMethod = 2; +} + +message InsertNodeSpec { // required PersistentStoreSpec and StoreTableSpec + required bool overwrite = 1; + required SchemaProto tableSchema = 2; + optional SchemaProto targetSchema = 4; + optional SchemaProto projectedSchema = 3; + optional string path = 5; +} + +message CreateTableNodeSpec { // required PersistentStoreSpec and StoreTableNodeSpec + required SchemaProto schema = 1; + required bool external = 2; + required bool ifNotExists = 3; + optional string path = 4; +} + +message DropTableNode { + required string tableName = 1; + required bool ifExists = 2; + required bool purge = 3; +} + +message TruncateTableNode { + repeated string tableNames = 1; +} + +message CreateDatabaseNode { + required string dbName = 1; + required bool ifNotExists = 2; +} + +message DropDatabaseNode { + required string dbName = 1; + required bool ifExists = 2; +} + +message AlterTablespaceNode { + enum Type { + LOCATION = 0; + } + + message SetLocation { + required string location = 1; + } + + required string tableSpaceName = 1; + required Type setType = 2; + optional SetLocation setLocation = 3; +} + +message AlterTableNode { + enum Type { + RENAME_TABLE = 0; + RENAME_COLUMN = 1; + ADD_COLUMN = 2; + } + + message RenameTable { + required string newName = 1; + } + + message RenameColumn { + required string oldName = 1; + required string newName = 2; + } + + message AddColumn { + required ColumnProto addColumn = 1; + } + + required string tableName = 1; + required Type setType = 2; + optional RenameTable renameTable = 3; + optional RenameColumn renameColumn = 4; + optional AddColumn addColumn = 5; +} enum EvalType { NOT = 0; @@ -126,7 +348,7 @@ enum EvalType { CONST = 33; } -message EvalTree { +message EvalNodeTree { repeated EvalNode nodes = 1; } @@ -140,10 +362,13 @@ message EvalNode { optional ConstEval const = 6; optional ColumnProto field = 7; // field eval optional FunctionEval function = 8; - optional RowConstEval rowConst = 9; - optional BetweenEval between = 10; - optional CaseWhenEval casewhen = 11; - optional IfCondEval ifCond = 12; + optional AggFunctionEvalSpec aggFunction = 9; + optional WinFunctionEvalSpec winFunction = 10; + optional RowConstEval rowConst = 11; + optional BetweenEval between = 12; + optional CaseWhenEval casewhen = 13; + optional IfCondEval ifCond = 14; + optional PatternMatchEvalSpec patternMatch = 15; } message UnaryEval { @@ -159,6 +384,10 @@ message BinaryEval { optional bool negative = 3 [default = false]; } +message PatternMatchEvalSpec { // requires BinaryEval + optional bool caseSensitive = 1; +} + message BetweenEval { required int32 predicand = 1; required int32 begin = 2; @@ -190,6 +419,50 @@ message FunctionEval { repeated int32 paramIds = 2; } +message AggFunctionEvalSpec { // requires FunctionEval + required bool intermediatePhase = 1; + required bool finalPhase = 2; + optional string alias = 3; +} + +message WinFunctionEvalSpec { + message WindowFrame { + required WindowStartBound startBound = 1; + required WindowEndBound endBound = 2; + optional WindowFrameUnit unit = 3; + } + + enum WindowFrameStartBoundType { + S_UNBOUNDED_PRECEDING = 0; + S_CURRENT_ROW = 1; + S_PRECEDING = 2; + } + + enum WindowFrameEndBoundType { + E_UNBOUNDED_FOLLOWING = 0; + E_CURRENT_ROW = 1; + E_FOLLOWING = 2; + } + + enum WindowFrameUnit { + ROW = 0; + RANGE = 1; + } + + message WindowStartBound { + required WindowFrameStartBoundType boundType = 1; + optional EvalNodeTree number = 2; + } + + message WindowEndBound { + required WindowFrameEndBoundType boundType = 1; + optional EvalNodeTree number = 2; + } + + repeated SortSpecProto sortSpec = 1; + required WindowFrame windowFrame = 2; +} + message Datum { required Type type = 1; optional bool boolean = 2; http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java index 609ca20..c9f493d 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java @@ -38,7 +38,7 @@ import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.logical.LogicalNode; import org.apache.tajo.plan.logical.NodeType; import org.apache.tajo.plan.logical.ScanNode; -import org.apache.tajo.plan.rewrite.RewriteRule; +import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.fragment.FragmentConvertor; import org.apache.tajo.util.TUtil; @@ -601,7 +601,7 @@ public abstract class StorageManager { * @return The list of storage specified rewrite rules * @throws java.io.IOException */ - public List<RewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) throws IOException { + public List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) throws IOException { return null; } http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java index 79161cc..e95aeec 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java @@ -18,6 +18,7 @@ package org.apache.tajo.storage.hbase; +import org.apache.tajo.OverridableConf; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SortSpec; @@ -30,12 +31,13 @@ import org.apache.tajo.plan.logical.LogicalRootNode; import org.apache.tajo.plan.logical.SortNode; import org.apache.tajo.plan.logical.SortNode.SortPurpose; import org.apache.tajo.plan.logical.UnaryNode; -import org.apache.tajo.plan.rewrite.RewriteRule; +import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; import org.apache.tajo.plan.util.PlannerUtil; -public class AddSortForInsertRewriter implements RewriteRule { +public class AddSortForInsertRewriter implements LogicalPlanRewriteRule { private int[] sortColumnIndexes; private Column[] sortColumns; + public AddSortForInsertRewriter(TableDesc tableDesc, Column[] sortColumns) { this.sortColumns = sortColumns; this.sortColumnIndexes = new int[sortColumns.length]; @@ -52,13 +54,13 @@ public class AddSortForInsertRewriter implements RewriteRule { } @Override - public boolean isEligible(LogicalPlan plan) { + public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) { StoreType storeType = PlannerUtil.getStoreType(plan); return storeType != null; } @Override - public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException { + public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws PlanningException { LogicalRootNode rootNode = plan.getRootBlock().getRoot(); UnaryNode insertNode = rootNode.getChild(); LogicalNode childNode = insertNode.getChild(); http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java index de4b4cb..c606e88 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java @@ -49,7 +49,7 @@ import org.apache.tajo.plan.logical.CreateTableNode; import org.apache.tajo.plan.logical.LogicalNode; import org.apache.tajo.plan.logical.NodeType; import org.apache.tajo.plan.logical.ScanNode; -import org.apache.tajo.plan.rewrite.RewriteRule; +import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.util.Bytes; @@ -1050,9 +1050,9 @@ public class HBaseStorageManager extends StorageManager { } } - public List<RewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) throws IOException { + public List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) throws IOException { if ("false".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false"))) { - List<RewriteRule> rules = new ArrayList<RewriteRule>(); + List<LogicalPlanRewriteRule> rules = new ArrayList<LogicalPlanRewriteRule>(); rules.add(new AddSortForInsertRewriter(tableDesc, getIndexColumns(tableDesc))); return rules; } else {
