Github user hyunsik commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/322#discussion_r22341678
  
    --- Diff: 
tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
 ---
    @@ -0,0 +1,678 @@
    +/*
    + * Lisensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.tajo.plan.serder;
    +
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.tajo.OverridableConf;
    +import org.apache.tajo.algebra.JoinType;
    +import org.apache.tajo.catalog.Column;
    +import org.apache.tajo.catalog.Schema;
    +import org.apache.tajo.catalog.SortSpec;
    +import org.apache.tajo.catalog.TableDesc;
    +import org.apache.tajo.catalog.partition.PartitionMethodDesc;
    +import org.apache.tajo.catalog.proto.CatalogProtos;
    +import org.apache.tajo.exception.UnimplementedException;
    +import org.apache.tajo.plan.Target;
    +import org.apache.tajo.plan.expr.AggregationFunctionCallEval;
    +import org.apache.tajo.plan.expr.EvalNode;
    +import org.apache.tajo.plan.expr.FieldEval;
    +import org.apache.tajo.plan.expr.WindowFunctionEval;
    +import org.apache.tajo.plan.logical.*;
    +import org.apache.tajo.util.KeyValueSet;
    +import org.apache.tajo.util.TUtil;
    +
    +import java.util.*;
    +
    +/**
    + * It deserializes a list of serialized logical nodes into a logical node 
tree.
    + */
    +public class LogicalNodeDeserializer {
    +  private static final LogicalNodeDeserializer instance;
    +
    +  static {
    +    instance = new LogicalNodeDeserializer();
    +  }
    +
    +  /**
    +   * Deserialize a list of nodes into a logical node tree.
    +   *
    +   * @param context QueryContext
    +   * @param tree LogicalNodeTree which contains a list of serialized 
logical nodes.
    +   * @return A logical node tree
    +   */
    +  public static LogicalNode deserialize(OverridableConf context, 
PlanProto.LogicalNodeTree tree) {
    +    Map<Integer, LogicalNode> nodeMap = Maps.newHashMap();
    +
    +    // sort serialized logical nodes in an ascending order of their sids
    +    List<PlanProto.LogicalNode> nodeList = 
Lists.newArrayList(tree.getNodesList());
    +    Collections.sort(nodeList, new Comparator<PlanProto.LogicalNode>() {
    +      @Override
    +      public int compare(PlanProto.LogicalNode o1, PlanProto.LogicalNode 
o2) {
    +        return o1.getSid() - o2.getSid();
    +      }
    +    });
    +
    +    LogicalNode current = null;
    +
    +    // The sorted order is the same of a postfix traverse order.
    +    // So, it sequentially transforms each serialized node into a 
LogicalNode instance in a postfix order of
    +    // the original logical node tree.
    +
    +    Iterator<PlanProto.LogicalNode> it = nodeList.iterator();
    +    while (it.hasNext()) {
    +      PlanProto.LogicalNode protoNode = it.next();
    +
    +      switch (protoNode.getType()) {
    +      case ROOT:
    +        current = convertRoot(nodeMap, protoNode);
    +        break;
    +      case SET_SESSION:
    +        current = convertSetSession(protoNode);
    +        break;
    +      case EXPRS:
    +        current = convertEvalExpr(context, protoNode);
    +        break;
    +      case PROJECTION:
    +        current = convertProjection(context, nodeMap, protoNode);
    +        break;
    +      case LIMIT:
    +        current = convertLimit(nodeMap, protoNode);
    +        break;
    +      case SORT:
    +        current = convertSort(nodeMap, protoNode);
    +        break;
    +      case WINDOW_AGG:
    +        current = convertWindowAgg(context, nodeMap, protoNode);
    +        break;
    +      case HAVING:
    +        current = convertHaving(context, nodeMap, protoNode);
    +        break;
    +      case GROUP_BY:
    +        current = convertGroupby(context, nodeMap, protoNode);
    +        break;
    +      case DISTINCT_GROUP_BY:
    +        current = convertDistinctGroupby(context, nodeMap, protoNode);
    +        break;
    +      case SELECTION:
    +        current = convertFilter(context, nodeMap, protoNode);
    +        break;
    +      case JOIN:
    +        current = convertJoin(context, nodeMap, protoNode);
    +        break;
    +      case TABLE_SUBQUERY:
    +        current = convertTableSubQuery(context, nodeMap, protoNode);
    +        break;
    +      case UNION:
    +        current = convertUnion(nodeMap, protoNode);
    +        break;
    +      case PARTITIONS_SCAN:
    +        current = convertPartitionScan(context, protoNode);
    +        break;
    +      case SCAN:
    +        current = convertScan(context, protoNode);
    +        break;
    +
    +      case CREATE_TABLE:
    +        current = convertCreateTable(nodeMap, protoNode);
    +        break;
    +      case INSERT:
    +        current = convertInsert(nodeMap, protoNode);
    +        break;
    +      case DROP_TABLE:
    +        current = convertDropTable(protoNode);
    +        break;
    +
    +      case CREATE_DATABASE:
    +        current = convertCreateDatabase(protoNode);
    +        break;
    +      case DROP_DATABASE:
    +        current = convertDropDatabase(protoNode);
    +        break;
    +
    +      case ALTER_TABLESPACE:
    +        current = convertAlterTablespace(protoNode);
    +        break;
    +      case ALTER_TABLE:
    +        current = convertAlterTable(protoNode);
    +        break;
    +      case TRUNCATE_TABLE:
    +        current = convertTruncateTable(protoNode);
    +        break;
    +
    +      default:
    +        throw new RuntimeException("Unknown NodeType: " + 
protoNode.getType().name());
    +      }
    +
    +      nodeMap.put(protoNode.getSid(), current);
    +    }
    +
    +    return current;
    +  }
    +
    +  private static LogicalRootNode convertRoot(Map<Integer, LogicalNode> 
nodeMap,
    +                                            PlanProto.LogicalNode 
protoNode) {
    +    PlanProto.RootNode rootProto = protoNode.getRoot();
    +
    +    LogicalRootNode root = new LogicalRootNode(protoNode.getPid());
    +    root.setChild(nodeMap.get(rootProto.getChildId()));
    +    if (protoNode.hasInSchema()) {
    +      root.setInSchema(convertSchema(protoNode.getInSchema()));
    +    }
    +    if (protoNode.hasOutSchema()) {
    +      root.setOutSchema(convertSchema(protoNode.getOutSchema()));
    +    }
    +
    +    return root;
    +  }
    +
    +  private static SetSessionNode convertSetSession(PlanProto.LogicalNode 
protoNode) {
    +    PlanProto.SetSessionNode setSessionProto = protoNode.getSetSession();
    +
    +    SetSessionNode setSession = new SetSessionNode(protoNode.getPid());
    +    setSession.init(setSessionProto.getName(), setSessionProto.hasValue() 
? setSessionProto.getValue() : null);
    +
    +    return setSession;
    +  }
    +
    +  private static EvalExprNode convertEvalExpr(OverridableConf context, 
PlanProto.LogicalNode protoNode) {
    +    PlanProto.EvalExprNode evalExprProto = protoNode.getExprEval();
    +
    +    EvalExprNode evalExpr = new EvalExprNode(protoNode.getPid());
    +    evalExpr.setInSchema(convertSchema(protoNode.getInSchema()));
    +    evalExpr.setTargets(convertTargets(context, 
evalExprProto.getTargetsList()));
    +
    +    return evalExpr;
    +  }
    +
    +  private static ProjectionNode convertProjection(OverridableConf context, 
Map<Integer, LogicalNode> nodeMap,
    +                                                 PlanProto.LogicalNode 
protoNode) {
    +    PlanProto.ProjectionNode projectionProto = protoNode.getProjection();
    +
    +    ProjectionNode projectionNode = new ProjectionNode(protoNode.getPid());
    +    projectionNode.init(projectionProto.getDistinct(), 
convertTargets(context, projectionProto.getTargetsList()));
    +    projectionNode.setChild(nodeMap.get(projectionProto.getChildId()));
    +    projectionNode.setInSchema(convertSchema(protoNode.getInSchema()));
    +    projectionNode.setOutSchema(convertSchema(protoNode.getOutSchema()));
    +
    +    return projectionNode;
    +  }
    +
    +  private static LimitNode convertLimit(Map<Integer, LogicalNode> nodeMap, 
PlanProto.LogicalNode protoNode) {
    +    PlanProto.LimitNode limitProto = protoNode.getLimit();
    +
    +    LimitNode limitNode = new LimitNode(protoNode.getPid());
    +    limitNode.setChild(nodeMap.get(limitProto.getChildId()));
    +    limitNode.setInSchema(convertSchema(protoNode.getInSchema()));
    +    limitNode.setOutSchema(convertSchema(protoNode.getOutSchema()));
    +    limitNode.setFetchFirst(limitProto.getFetchFirstNum());
    +
    +    return limitNode;
    +  }
    +
    +  private static SortNode convertSort(Map<Integer, LogicalNode> nodeMap, 
PlanProto.LogicalNode protoNode) {
    +    PlanProto.SortNode sortProto = protoNode.getSort();
    +
    +    SortNode sortNode = new SortNode(protoNode.getPid());
    +    sortNode.setChild(nodeMap.get(sortProto.getChildId()));
    +    sortNode.setInSchema(convertSchema(protoNode.getInSchema()));
    +    sortNode.setOutSchema(convertSchema(protoNode.getOutSchema()));
    +    sortNode.setSortSpecs(convertSortSpecs(sortProto.getSortSpecsList()));
    +
    +    return sortNode;
    +  }
    +
    +  private static HavingNode convertHaving(OverridableConf context, 
Map<Integer, LogicalNode> nodeMap,
    +                                         PlanProto.LogicalNode protoNode) {
    +    PlanProto.FilterNode havingProto = protoNode.getFilter();
    +
    +    HavingNode having = new HavingNode(protoNode.getPid());
    +    having.setChild(nodeMap.get(havingProto.getChildId()));
    +    having.setQual(EvalNodeDeserializer.deserialize(context, 
havingProto.getQual()));
    +    having.setInSchema(convertSchema(protoNode.getInSchema()));
    +    having.setOutSchema(convertSchema(protoNode.getOutSchema()));
    +
    +    return having;
    +  }
    +
    +  private static WindowAggNode convertWindowAgg(OverridableConf context, 
Map<Integer, LogicalNode> nodeMap,
    +                                               PlanProto.LogicalNode 
protoNode) {
    +    PlanProto.WindowAggNode windowAggProto = protoNode.getWindowAgg();
    +
    +    WindowAggNode windowAgg = new WindowAggNode(protoNode.getPid());
    +    windowAgg.setChild(nodeMap.get(windowAggProto.getChildId()));
    +
    +    if (windowAggProto.getPartitionKeysCount() > 0) {
    +      
windowAgg.setPartitionKeys(convertColumns(windowAggProto.getPartitionKeysList()));
    +    }
    +
    +    if (windowAggProto.getWindowFunctionsCount() > 0) {
    +      windowAgg.setWindowFunctions(convertWindowFunccEvals(context, 
windowAggProto.getWindowFunctionsList()));
    +    }
    +
    +    windowAgg.setDistinct(windowAggProto.getDistinct());
    +
    +    if (windowAggProto.getSortSpecsCount() > 0) {
    +      
windowAgg.setSortSpecs(convertSortSpecs(windowAggProto.getSortSpecsList()));
    +    }
    +
    +    if (windowAggProto.getTargetsCount() > 0) {
    +      windowAgg.setTargets(convertTargets(context, 
windowAggProto.getTargetsList()));
    +    }
    +
    +    windowAgg.setInSchema(convertSchema(protoNode.getInSchema()));
    +    windowAgg.setOutSchema(convertSchema(protoNode.getOutSchema()));
    +
    +    return windowAgg;
    +  }
    +
    +  private static GroupbyNode convertGroupby(OverridableConf context, 
Map<Integer, LogicalNode> nodeMap,
    +                                           PlanProto.LogicalNode 
protoNode) {
    +    PlanProto.GroupbyNode groupbyProto = protoNode.getGroupby();
    +
    +    GroupbyNode groupby = new GroupbyNode(protoNode.getPid());
    +    groupby.setChild(nodeMap.get(groupbyProto.getChildId()));
    +    groupby.setDistinct(groupbyProto.getDistinct());
    +
    +    if (groupbyProto.getGroupingKeysCount() > 0) {
    +      
groupby.setGroupingColumns(convertColumns(groupbyProto.getGroupingKeysList()));
    +    }
    +    if (groupbyProto.getAggFunctionsCount() > 0) {
    +      groupby.setAggFunctions(convertAggFuncCallEvals(context, 
groupbyProto.getAggFunctionsList()));
    +    }
    +    if (groupbyProto.getTargetsCount() > 0) {
    +      groupby.setTargets(convertTargets(context, 
groupbyProto.getTargetsList()));
    +    }
    +
    +    groupby.setInSchema(convertSchema(protoNode.getInSchema()));
    +    groupby.setOutSchema(convertSchema(protoNode.getOutSchema()));
    +
    +    return groupby;
    +  }
    +
    +  private static DistinctGroupbyNode 
convertDistinctGroupby(OverridableConf context, Map<Integer, LogicalNode> 
nodeMap,
    +                                           PlanProto.LogicalNode 
protoNode) {
    +    PlanProto.DistinctGroupbyNode distinctGroupbyProto = 
protoNode.getDistinctGroupby();
    +
    +    DistinctGroupbyNode distinctGroupby = new 
DistinctGroupbyNode(protoNode.getPid());
    +    
distinctGroupby.setChild(nodeMap.get(distinctGroupbyProto.getChildId()));
    +
    +    if (distinctGroupbyProto.hasGroupbyNode()) {
    +      distinctGroupby.setGroupbyPlan(convertGroupby(context, nodeMap, 
distinctGroupbyProto.getGroupbyNode()));
    +    }
    +
    +    if (distinctGroupbyProto.getSubPlansCount() > 0) {
    +      List<GroupbyNode> subPlans = TUtil.newList();
    +      for (int i = 0; i < distinctGroupbyProto.getSubPlansCount(); i++) {
    +        subPlans.add(convertGroupby(context, nodeMap, 
distinctGroupbyProto.getSubPlans(i)));
    +      }
    +      distinctGroupby.setSubPlans(subPlans);
    +    }
    +
    +    if (distinctGroupbyProto.getGroupingKeysCount() > 0) {
    +      
distinctGroupby.setGroupingColumns(convertColumns(distinctGroupbyProto.getGroupingKeysList()));
    +    }
    +    if (distinctGroupbyProto.getAggFunctionsCount() > 0) {
    +      distinctGroupby.setAggFunctions(convertAggFuncCallEvals(context, 
distinctGroupbyProto.getAggFunctionsList()));
    +    }
    +    if (distinctGroupbyProto.getTargetsCount() > 0) {
    +      distinctGroupby.setTargets(convertTargets(context, 
distinctGroupbyProto.getTargetsList()));
    +    }
    +    int [] resultColumnIds = new 
int[distinctGroupbyProto.getResultIdCount()];
    +    for (int i = 0; i < distinctGroupbyProto.getResultIdCount(); i++) {
    +      resultColumnIds[i] = distinctGroupbyProto.getResultId(i);
    +    }
    +    distinctGroupby.setResultColumnIds(resultColumnIds);
    +
    +    // TODO - in distinct groupby, output and target are not matched to 
each other. It does not follow the convention.
    +    distinctGroupby.setInSchema(convertSchema(protoNode.getInSchema()));
    +    distinctGroupby.setOutSchema(convertSchema(protoNode.getOutSchema()));
    +
    +    return distinctGroupby;
    +  }
    +
    +  private static JoinNode convertJoin(OverridableConf context, 
Map<Integer, LogicalNode> nodeMap,
    +                                     PlanProto.LogicalNode protoNode) {
    +    PlanProto.JoinNode joinProto = protoNode.getJoin();
    +
    +    JoinNode join = new JoinNode(protoNode.getPid());
    +    join.setLeftChild(nodeMap.get(joinProto.getLeftChildId()));
    +    join.setRightChild(nodeMap.get(joinProto.getRightChildId()));
    +    join.setJoinType(convertJoinType(joinProto.getJoinType()));
    +    join.setInSchema(convertSchema(protoNode.getInSchema()));
    +    join.setOutSchema(convertSchema(protoNode.getOutSchema()));
    +    if (joinProto.hasJoinQual()) {
    +      join.setJoinQual(EvalNodeDeserializer.deserialize(context, 
joinProto.getJoinQual()));
    +    }
    +    if (joinProto.getExistsTargets()) {
    +      join.setTargets(convertTargets(context, joinProto.getTargetsList()));
    +    }
    +
    +    return join;
    +  }
    +
    +  private static SelectionNode convertFilter(OverridableConf context, 
Map<Integer, LogicalNode> nodeMap,
    +                                            PlanProto.LogicalNode 
protoNode) {
    +    PlanProto.FilterNode filterProto = protoNode.getFilter();
    +
    +    SelectionNode selection = new SelectionNode(protoNode.getPid());
    +    selection.setInSchema(convertSchema(protoNode.getInSchema()));
    +    selection.setOutSchema(convertSchema(protoNode.getOutSchema()));
    +    selection.setChild(nodeMap.get(filterProto.getChildId()));
    +    selection.setQual(EvalNodeDeserializer.deserialize(context, 
filterProto.getQual()));
    +
    +    return selection;
    +  }
    +
    +  private static UnionNode convertUnion(Map<Integer, LogicalNode> nodeMap, 
PlanProto.LogicalNode protoNode) {
    +    PlanProto.UnionNode unionProto = protoNode.getUnion();
    +
    +    UnionNode union = new UnionNode(protoNode.getPid());
    +    union.setInSchema(convertSchema(protoNode.getInSchema()));
    +    union.setOutSchema(convertSchema(protoNode.getOutSchema()));
    +    union.setLeftChild(nodeMap.get(unionProto.getLeftChildId()));
    +    union.setRightChild(nodeMap.get(unionProto.getRightChildId()));
    +
    +    return union;
    +  }
    +
    +  private static ScanNode convertScan(OverridableConf context, 
PlanProto.LogicalNode protoNode) {
    +    ScanNode scan = new ScanNode(protoNode.getPid());
    +    fillScanNode(context, protoNode, scan);
    +
    +    return scan;
    +  }
    +
    +  private static void fillScanNode(OverridableConf context, 
PlanProto.LogicalNode protoNode, ScanNode scan) {
    +    PlanProto.ScanNode scanProto = protoNode.getScan();
    +    if (scanProto.hasAlias()) {
    +      scan.init(new TableDesc(scanProto.getTable()), scanProto.getAlias());
    +    } else {
    +      scan.init(new TableDesc(scanProto.getTable()));
    +    }
    +
    +    if (scanProto.getExistTargets()) {
    +      scan.setTargets(convertTargets(context, scanProto.getTargetsList()));
    +    }
    +
    +    if (scanProto.hasQual()) {
    +      scan.setQual(EvalNodeDeserializer.deserialize(context, 
scanProto.getQual()));
    +    }
    +
    +    scan.setInSchema(convertSchema(protoNode.getInSchema()));
    +    scan.setOutSchema(convertSchema(protoNode.getOutSchema()));
    +  }
    +
    +  private static PartitionedTableScanNode 
convertPartitionScan(OverridableConf context, PlanProto.LogicalNode protoNode) {
    +    PartitionedTableScanNode partitionedScan = new 
PartitionedTableScanNode(protoNode.getPid());
    +    fillScanNode(context, protoNode, partitionedScan);
    +
    +    PlanProto.PartitionScanSpec partitionScanProto = 
protoNode.getPartitionScan();
    +    Path [] paths = new Path[partitionScanProto.getPathsCount()];
    +    for (int i = 0; i < partitionScanProto.getPathsCount(); i++) {
    +      paths[i] = new Path(partitionScanProto.getPaths(i));
    +    }
    +    partitionedScan.setInputPaths(paths);
    +    return partitionedScan;
    +  }
    +
    +  private static TableSubQueryNode convertTableSubQuery(OverridableConf 
context,
    +                                                                 
Map<Integer, LogicalNode> nodeMap,
    +                                                                 
PlanProto.LogicalNode protoNode) {
    +    PlanProto.TableSubQueryNode proto = protoNode.getTableSubQuery();
    +
    +    TableSubQueryNode tableSubQuery = new 
TableSubQueryNode(protoNode.getPid());
    +    tableSubQuery.init(proto.getTableName(), 
nodeMap.get(proto.getChildId()));
    +    tableSubQuery.setInSchema(convertSchema(protoNode.getInSchema()));
    +    if (proto.getTargetsCount() > 0) {
    +      tableSubQuery.setTargets(convertTargets(context, 
proto.getTargetsList()));
    +    }
    +
    +    return tableSubQuery;
    +  }
    +
    +  private static CreateTableNode convertCreateTable(Map<Integer, 
LogicalNode> nodeMap,
    +                                            PlanProto.LogicalNode 
protoNode) {
    +    PlanProto.PersistentStoreNode persistentStoreProto = 
protoNode.getPersistentStore();
    +    PlanProto.StoreTableNodeSpec storeTableNodeSpec = 
protoNode.getStoreTable();
    +    PlanProto.CreateTableNodeSpec createTableNodeSpec = 
protoNode.getCreateTable();
    +
    +    CreateTableNode createTable = new CreateTableNode(protoNode.getPid());
    +    if (protoNode.hasInSchema()) {
    +      createTable.setInSchema(convertSchema(protoNode.getInSchema()));
    +    }
    +    if (protoNode.hasOutSchema()) {
    +      createTable.setOutSchema(convertSchema(protoNode.getOutSchema()));
    +    }
    +    createTable.setChild(nodeMap.get(persistentStoreProto.getChildId()));
    +    createTable.setStorageType(persistentStoreProto.getStorageType());
    +    createTable.setOptions(new 
KeyValueSet(persistentStoreProto.getTableProperties()));
    +
    +    createTable.setTableName(storeTableNodeSpec.getTableName());
    +    if (storeTableNodeSpec.hasPartitionMethod()) {
    +      createTable.setPartitionMethod(new 
PartitionMethodDesc(storeTableNodeSpec.getPartitionMethod()));
    +    }
    +
    +    
createTable.setTableSchema(convertSchema(createTableNodeSpec.getSchema()));
    +    createTable.setExternal(createTableNodeSpec.getExternal());
    +    if (createTableNodeSpec.getExternal() && 
createTableNodeSpec.hasPath()) {
    +      createTable.setPath(new Path(createTableNodeSpec.getPath()));
    +    }
    +    createTable.setIfNotExists(createTableNodeSpec.getIfNotExists());
    +
    +    return createTable;
    +  }
    +
    +  private static InsertNode convertInsert(Map<Integer, LogicalNode> 
nodeMap,
    +                                                   PlanProto.LogicalNode 
protoNode) {
    +    PlanProto.PersistentStoreNode persistentStoreProto = 
protoNode.getPersistentStore();
    +    PlanProto.StoreTableNodeSpec storeTableNodeSpec = 
protoNode.getStoreTable();
    +    PlanProto.InsertNodeSpec insertNodeSpec = protoNode.getInsert();
    +
    +    InsertNode insertNode = new InsertNode(protoNode.getPid());
    +    if (protoNode.hasInSchema()) {
    +      insertNode.setInSchema(convertSchema(protoNode.getInSchema()));
    +    }
    +    if (protoNode.hasOutSchema()) {
    +      insertNode.setOutSchema(convertSchema(protoNode.getOutSchema()));
    +    }
    +    insertNode.setChild(nodeMap.get(persistentStoreProto.getChildId()));
    +    insertNode.setStorageType(persistentStoreProto.getStorageType());
    +    insertNode.setOptions(new 
KeyValueSet(persistentStoreProto.getTableProperties()));
    +
    +    if (storeTableNodeSpec.hasTableName()) {
    +      insertNode.setTableName(storeTableNodeSpec.getTableName());
    +    }
    +    if (storeTableNodeSpec.hasPartitionMethod()) {
    +      insertNode.setPartitionMethod(new 
PartitionMethodDesc(storeTableNodeSpec.getPartitionMethod()));
    +    }
    +
    +    insertNode.setOverwrite(insertNodeSpec.getOverwrite());
    +    
insertNode.setTableSchema(convertSchema(insertNodeSpec.getTableSchema()));
    +    if (insertNodeSpec.hasTargetSchema()) {
    +      
insertNode.setTargetSchema(convertSchema(insertNodeSpec.getTargetSchema()));
    +    }
    +    if (insertNodeSpec.hasProjectedSchema()) {
    +      
insertNode.setProjectedSchema(convertSchema(insertNodeSpec.getProjectedSchema()));
    +    }
    +    if (insertNodeSpec.hasPath()) {
    +      insertNode.setPath(new Path(insertNodeSpec.getPath()));
    +    }
    +
    +    return insertNode;
    +  }
    +
    +  private static DropTableNode convertDropTable(PlanProto.LogicalNode 
protoNode) {
    +    DropTableNode dropTable = new DropTableNode(protoNode.getPid());
    +
    +    PlanProto.DropTableNode dropTableProto = protoNode.getDropTable();
    +    dropTable.init(dropTableProto.getTableName(), 
dropTableProto.getIfExists(), dropTableProto.getPurge());
    +
    +    return dropTable;
    +  }
    +
    +  private static CreateDatabaseNode 
convertCreateDatabase(PlanProto.LogicalNode protoNode) {
    +    CreateDatabaseNode createDatabase = new 
CreateDatabaseNode(protoNode.getPid());
    +
    +    PlanProto.CreateDatabaseNode createDatabaseProto = 
protoNode.getCreateDatabase();
    +    createDatabase.init(createDatabaseProto.getDbName(), 
createDatabaseProto.getIfNotExists());
    +
    +    return createDatabase;
    +  }
    +
    +  private static DropDatabaseNode 
convertDropDatabase(PlanProto.LogicalNode protoNode) {
    +    DropDatabaseNode dropDatabase = new 
DropDatabaseNode(protoNode.getPid());
    +
    +    PlanProto.DropDatabaseNode dropDatabaseProto = 
protoNode.getDropDatabase();
    +    dropDatabase.init(dropDatabaseProto.getDbName(), 
dropDatabaseProto.getIfExists());
    +
    +    return dropDatabase;
    +  }
    +
    +  private static AlterTablespaceNode 
convertAlterTablespace(PlanProto.LogicalNode protoNode) {
    +    AlterTablespaceNode alterTablespace = new 
AlterTablespaceNode(protoNode.getPid());
    +
    +    PlanProto.AlterTablespaceNode alterTablespaceProto = 
protoNode.getAlterTablespace();
    +    
alterTablespace.setTablespaceName(alterTablespaceProto.getTableSpaceName());
    +
    +    switch (alterTablespaceProto.getSetType()) {
    +    case LOCATION:
    +      
alterTablespace.setLocation(alterTablespaceProto.getSetLocation().getLocation());
    +      break;
    +    default:
    +      throw new UnimplementedException("Unknown SET type in ALTER TABLE: " 
+ alterTablespaceProto.getSetType().name());
    +    }
    +
    +    return alterTablespace;
    +  }
    +
    +  private static AlterTableNode convertAlterTable(PlanProto.LogicalNode 
protoNode) {
    +    AlterTableNode alterTable = new AlterTableNode(protoNode.getPid());
    +
    +    PlanProto.AlterTableNode alterTableProto = protoNode.getAlterTable();
    +    alterTable.setTableName(alterTableProto.getTableName());
    +
    +    switch (alterTableProto.getSetType()) {
    +    case RENAME_TABLE:
    +      
alterTable.setNewTableName(alterTableProto.getRenameTable().getNewName());
    +      break;
    +    case ADD_COLUMN:
    +      alterTable.setAddNewColumn(new 
Column(alterTableProto.getAddColumn().getAddColumn()));
    +      break;
    +    case RENAME_COLUMN:
    +      
alterTable.setColumnName(alterTableProto.getRenameColumn().getOldName());
    +      
alterTable.setNewColumnName(alterTableProto.getRenameColumn().getNewName());
    +      break;
    +    default:
    +      throw new UnimplementedException("Unknown SET type in ALTER TABLE: " 
+ alterTableProto.getSetType().name());
    +    }
    +
    +    return alterTable;
    +  }
    +
    +  private static TruncateTableNode 
convertTruncateTable(PlanProto.LogicalNode protoNode) {
    +    TruncateTableNode truncateTable = new 
TruncateTableNode(protoNode.getPid());
    +
    +    PlanProto.TruncateTableNode truncateTableProto = 
protoNode.getTruncateTableNode();
    +    truncateTable.setTableNames(truncateTableProto.getTableNamesList());
    +
    +    return truncateTable;
    +  }
    +
    +  private static AggregationFunctionCallEval [] 
convertAggFuncCallEvals(OverridableConf context,
    +                                                                       
List<PlanProto.EvalNodeTree> evalTrees) {
    +    AggregationFunctionCallEval [] aggFuncs = new 
AggregationFunctionCallEval[evalTrees.size()];
    +    for (int i = 0; i < aggFuncs.length; i++) {
    +      aggFuncs[i] = (AggregationFunctionCallEval) 
EvalNodeDeserializer.deserialize(context, evalTrees.get(i));
    +    }
    +    return aggFuncs;
    +  }
    +
    +  private static WindowFunctionEval[] 
convertWindowFunccEvals(OverridableConf context,
    +                                                                       
List<PlanProto.EvalNodeTree> evalTrees) {
    +    WindowFunctionEval[] winFuncEvals = new 
WindowFunctionEval[evalTrees.size()];
    +    for (int i = 0; i < winFuncEvals.length; i++) {
    +      winFuncEvals[i] = (WindowFunctionEval) 
EvalNodeDeserializer.deserialize(context, evalTrees.get(i));
    +    }
    +    return winFuncEvals;
    +  }
    +
    +  public static Schema convertSchema(CatalogProtos.SchemaProto proto) {
    +    return new Schema(proto);
    +  }
    +
    +  public static Column[] convertColumns(List<CatalogProtos.ColumnProto> 
columnProtos) {
    +    Column [] columns = new Column[columnProtos.size()];
    +    for (int i = 0; i < columns.length; i++) {
    +      columns[i] = new Column(columnProtos.get(i));
    +    }
    +    return columns;
    +  }
    +
    +  public static Target[] convertTargets(OverridableConf context, 
List<PlanProto.Target> targetsProto) {
    +    Target [] targets = new Target[targetsProto.size()];
    +    for (int i = 0; i < targets.length; i++) {
    +      PlanProto.Target targetProto = targetsProto.get(i);
    +      EvalNode evalNode = EvalNodeDeserializer.deserialize(context, 
targetProto.getExpr());
    +      if (targetProto.hasAlias()) {
    +        targets[i] = new Target(evalNode, targetProto.getAlias());
    +      } else {
    +        targets[i] = new Target((FieldEval) evalNode);
    +      }
    +    }
    +    return targets;
    +  }
    +
    +  public static SortSpec[] 
convertSortSpecs(List<CatalogProtos.SortSpecProto> sortSpecProtos) {
    +    SortSpec[] sortSpecs = new SortSpec[sortSpecProtos.size()];
    +    int i = 0;
    +    for (CatalogProtos.SortSpecProto proto : sortSpecProtos) {
    +      sortSpecs[i++] = new SortSpec(proto);
    +    }
    +    return sortSpecs;
    +  }
    +
    +  public static JoinType convertJoinType(PlanProto.JoinType type) {
    +    switch (type) {
    +    case CROSS_JOIN:
    --- End diff --
    
    That's good idea. Actually, I have a plan to use protobuf-based serialized 
plans in C++ implementation. Some enum constant name is not available in C++. 
Please see this protobuf issue 
(https://code.google.com/p/protobuf/issues/detail?id=515). I think that it 
would be good to keep the separation for a while.
    
    So, in this time, it would be good to separate them. Later, we can use 
single enum type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to