Repository: tajo
Updated Branches:
  refs/heads/index_support df2ff2dd7 -> 8e52ed43a


http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
new file mode 100644
index 0000000..5cbed7e
--- /dev/null
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
@@ -0,0 +1,678 @@
+/*
+ * Lisensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.serder;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.OverridableConf;
+import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.exception.UnimplementedException;
+import org.apache.tajo.plan.Target;
+import org.apache.tajo.plan.expr.AggregationFunctionCallEval;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.expr.FieldEval;
+import org.apache.tajo.plan.expr.WindowFunctionEval;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.TUtil;
+
+import java.util.*;
+
+/**
+ * It deserializes a list of serialized logical nodes into a logical node tree.
+ */
+public class LogicalNodeDeserializer {
+  private static final LogicalNodeDeserializer instance;
+
+  static {
+    instance = new LogicalNodeDeserializer();
+  }
+
+  /**
+   * Deserialize a list of nodes into a logical node tree.
+   *
+   * @param context QueryContext
+   * @param tree LogicalNodeTree which contains a list of serialized logical 
nodes.
+   * @return A logical node tree
+   */
+  public static LogicalNode deserialize(OverridableConf context, 
PlanProto.LogicalNodeTree tree) {
+    Map<Integer, LogicalNode> nodeMap = Maps.newHashMap();
+
+    // sort serialized logical nodes in an ascending order of their sids
+    List<PlanProto.LogicalNode> nodeList = 
Lists.newArrayList(tree.getNodesList());
+    Collections.sort(nodeList, new Comparator<PlanProto.LogicalNode>() {
+      @Override
+      public int compare(PlanProto.LogicalNode o1, PlanProto.LogicalNode o2) {
+        return o1.getVisitSeq() - o2.getVisitSeq();
+      }
+    });
+
+    LogicalNode current = null;
+
+    // The sorted order is the same of a postfix traverse order.
+    // So, it sequentially transforms each serialized node into a LogicalNode 
instance in a postfix order of
+    // the original logical node tree.
+
+    Iterator<PlanProto.LogicalNode> it = nodeList.iterator();
+    while (it.hasNext()) {
+      PlanProto.LogicalNode protoNode = it.next();
+
+      switch (protoNode.getType()) {
+      case ROOT:
+        current = convertRoot(nodeMap, protoNode);
+        break;
+      case SET_SESSION:
+        current = convertSetSession(protoNode);
+        break;
+      case EXPRS:
+        current = convertEvalExpr(context, protoNode);
+        break;
+      case PROJECTION:
+        current = convertProjection(context, nodeMap, protoNode);
+        break;
+      case LIMIT:
+        current = convertLimit(nodeMap, protoNode);
+        break;
+      case SORT:
+        current = convertSort(nodeMap, protoNode);
+        break;
+      case WINDOW_AGG:
+        current = convertWindowAgg(context, nodeMap, protoNode);
+        break;
+      case HAVING:
+        current = convertHaving(context, nodeMap, protoNode);
+        break;
+      case GROUP_BY:
+        current = convertGroupby(context, nodeMap, protoNode);
+        break;
+      case DISTINCT_GROUP_BY:
+        current = convertDistinctGroupby(context, nodeMap, protoNode);
+        break;
+      case SELECTION:
+        current = convertFilter(context, nodeMap, protoNode);
+        break;
+      case JOIN:
+        current = convertJoin(context, nodeMap, protoNode);
+        break;
+      case TABLE_SUBQUERY:
+        current = convertTableSubQuery(context, nodeMap, protoNode);
+        break;
+      case UNION:
+        current = convertUnion(nodeMap, protoNode);
+        break;
+      case PARTITIONS_SCAN:
+        current = convertPartitionScan(context, protoNode);
+        break;
+      case SCAN:
+        current = convertScan(context, protoNode);
+        break;
+
+      case CREATE_TABLE:
+        current = convertCreateTable(nodeMap, protoNode);
+        break;
+      case INSERT:
+        current = convertInsert(nodeMap, protoNode);
+        break;
+      case DROP_TABLE:
+        current = convertDropTable(protoNode);
+        break;
+
+      case CREATE_DATABASE:
+        current = convertCreateDatabase(protoNode);
+        break;
+      case DROP_DATABASE:
+        current = convertDropDatabase(protoNode);
+        break;
+
+      case ALTER_TABLESPACE:
+        current = convertAlterTablespace(protoNode);
+        break;
+      case ALTER_TABLE:
+        current = convertAlterTable(protoNode);
+        break;
+      case TRUNCATE_TABLE:
+        current = convertTruncateTable(protoNode);
+        break;
+
+      default:
+        throw new RuntimeException("Unknown NodeType: " + 
protoNode.getType().name());
+      }
+
+      nodeMap.put(protoNode.getVisitSeq(), current);
+    }
+
+    return current;
+  }
+
+  private static LogicalRootNode convertRoot(Map<Integer, LogicalNode> nodeMap,
+                                            PlanProto.LogicalNode protoNode) {
+    PlanProto.RootNode rootProto = protoNode.getRoot();
+
+    LogicalRootNode root = new LogicalRootNode(protoNode.getNodeId());
+    root.setChild(nodeMap.get(rootProto.getChildSeq()));
+    if (protoNode.hasInSchema()) {
+      root.setInSchema(convertSchema(protoNode.getInSchema()));
+    }
+    if (protoNode.hasOutSchema()) {
+      root.setOutSchema(convertSchema(protoNode.getOutSchema()));
+    }
+
+    return root;
+  }
+
+  private static SetSessionNode convertSetSession(PlanProto.LogicalNode 
protoNode) {
+    PlanProto.SetSessionNode setSessionProto = protoNode.getSetSession();
+
+    SetSessionNode setSession = new SetSessionNode(protoNode.getNodeId());
+    setSession.init(setSessionProto.getName(), setSessionProto.hasValue() ? 
setSessionProto.getValue() : null);
+
+    return setSession;
+  }
+
+  private static EvalExprNode convertEvalExpr(OverridableConf context, 
PlanProto.LogicalNode protoNode) {
+    PlanProto.EvalExprNode evalExprProto = protoNode.getExprEval();
+
+    EvalExprNode evalExpr = new EvalExprNode(protoNode.getNodeId());
+    evalExpr.setInSchema(convertSchema(protoNode.getInSchema()));
+    evalExpr.setTargets(convertTargets(context, 
evalExprProto.getTargetsList()));
+
+    return evalExpr;
+  }
+
+  private static ProjectionNode convertProjection(OverridableConf context, 
Map<Integer, LogicalNode> nodeMap,
+                                                 PlanProto.LogicalNode 
protoNode) {
+    PlanProto.ProjectionNode projectionProto = protoNode.getProjection();
+
+    ProjectionNode projectionNode = new ProjectionNode(protoNode.getNodeId());
+    projectionNode.init(projectionProto.getDistinct(), convertTargets(context, 
projectionProto.getTargetsList()));
+    projectionNode.setChild(nodeMap.get(projectionProto.getChildSeq()));
+    projectionNode.setInSchema(convertSchema(protoNode.getInSchema()));
+    projectionNode.setOutSchema(convertSchema(protoNode.getOutSchema()));
+
+    return projectionNode;
+  }
+
+  private static LimitNode convertLimit(Map<Integer, LogicalNode> nodeMap, 
PlanProto.LogicalNode protoNode) {
+    PlanProto.LimitNode limitProto = protoNode.getLimit();
+
+    LimitNode limitNode = new LimitNode(protoNode.getNodeId());
+    limitNode.setChild(nodeMap.get(limitProto.getChildSeq()));
+    limitNode.setInSchema(convertSchema(protoNode.getInSchema()));
+    limitNode.setOutSchema(convertSchema(protoNode.getOutSchema()));
+    limitNode.setFetchFirst(limitProto.getFetchFirstNum());
+
+    return limitNode;
+  }
+
+  private static SortNode convertSort(Map<Integer, LogicalNode> nodeMap, 
PlanProto.LogicalNode protoNode) {
+    PlanProto.SortNode sortProto = protoNode.getSort();
+
+    SortNode sortNode = new SortNode(protoNode.getNodeId());
+    sortNode.setChild(nodeMap.get(sortProto.getChildSeq()));
+    sortNode.setInSchema(convertSchema(protoNode.getInSchema()));
+    sortNode.setOutSchema(convertSchema(protoNode.getOutSchema()));
+    sortNode.setSortSpecs(convertSortSpecs(sortProto.getSortSpecsList()));
+
+    return sortNode;
+  }
+
+  private static HavingNode convertHaving(OverridableConf context, 
Map<Integer, LogicalNode> nodeMap,
+                                         PlanProto.LogicalNode protoNode) {
+    PlanProto.FilterNode havingProto = protoNode.getFilter();
+
+    HavingNode having = new HavingNode(protoNode.getNodeId());
+    having.setChild(nodeMap.get(havingProto.getChildSeq()));
+    having.setQual(EvalNodeDeserializer.deserialize(context, 
havingProto.getQual()));
+    having.setInSchema(convertSchema(protoNode.getInSchema()));
+    having.setOutSchema(convertSchema(protoNode.getOutSchema()));
+
+    return having;
+  }
+
+  private static WindowAggNode convertWindowAgg(OverridableConf context, 
Map<Integer, LogicalNode> nodeMap,
+                                               PlanProto.LogicalNode 
protoNode) {
+    PlanProto.WindowAggNode windowAggProto = protoNode.getWindowAgg();
+
+    WindowAggNode windowAgg = new WindowAggNode(protoNode.getNodeId());
+    windowAgg.setChild(nodeMap.get(windowAggProto.getChildSeq()));
+
+    if (windowAggProto.getPartitionKeysCount() > 0) {
+      
windowAgg.setPartitionKeys(convertColumns(windowAggProto.getPartitionKeysList()));
+    }
+
+    if (windowAggProto.getWindowFunctionsCount() > 0) {
+      windowAgg.setWindowFunctions(convertWindowFunccEvals(context, 
windowAggProto.getWindowFunctionsList()));
+    }
+
+    windowAgg.setDistinct(windowAggProto.getDistinct());
+
+    if (windowAggProto.getSortSpecsCount() > 0) {
+      
windowAgg.setSortSpecs(convertSortSpecs(windowAggProto.getSortSpecsList()));
+    }
+
+    if (windowAggProto.getTargetsCount() > 0) {
+      windowAgg.setTargets(convertTargets(context, 
windowAggProto.getTargetsList()));
+    }
+
+    windowAgg.setInSchema(convertSchema(protoNode.getInSchema()));
+    windowAgg.setOutSchema(convertSchema(protoNode.getOutSchema()));
+
+    return windowAgg;
+  }
+
+  private static GroupbyNode convertGroupby(OverridableConf context, 
Map<Integer, LogicalNode> nodeMap,
+                                           PlanProto.LogicalNode protoNode) {
+    PlanProto.GroupbyNode groupbyProto = protoNode.getGroupby();
+
+    GroupbyNode groupby = new GroupbyNode(protoNode.getNodeId());
+    groupby.setChild(nodeMap.get(groupbyProto.getChildSeq()));
+    groupby.setDistinct(groupbyProto.getDistinct());
+
+    if (groupbyProto.getGroupingKeysCount() > 0) {
+      
groupby.setGroupingColumns(convertColumns(groupbyProto.getGroupingKeysList()));
+    }
+    if (groupbyProto.getAggFunctionsCount() > 0) {
+      groupby.setAggFunctions(convertAggFuncCallEvals(context, 
groupbyProto.getAggFunctionsList()));
+    }
+    if (groupbyProto.getTargetsCount() > 0) {
+      groupby.setTargets(convertTargets(context, 
groupbyProto.getTargetsList()));
+    }
+
+    groupby.setInSchema(convertSchema(protoNode.getInSchema()));
+    groupby.setOutSchema(convertSchema(protoNode.getOutSchema()));
+
+    return groupby;
+  }
+
+  private static DistinctGroupbyNode convertDistinctGroupby(OverridableConf 
context, Map<Integer, LogicalNode> nodeMap,
+                                           PlanProto.LogicalNode protoNode) {
+    PlanProto.DistinctGroupbyNode distinctGroupbyProto = 
protoNode.getDistinctGroupby();
+
+    DistinctGroupbyNode distinctGroupby = new 
DistinctGroupbyNode(protoNode.getNodeId());
+    distinctGroupby.setChild(nodeMap.get(distinctGroupbyProto.getChildSeq()));
+
+    if (distinctGroupbyProto.hasGroupbyNode()) {
+      distinctGroupby.setGroupbyPlan(convertGroupby(context, nodeMap, 
distinctGroupbyProto.getGroupbyNode()));
+    }
+
+    if (distinctGroupbyProto.getSubPlansCount() > 0) {
+      List<GroupbyNode> subPlans = TUtil.newList();
+      for (int i = 0; i < distinctGroupbyProto.getSubPlansCount(); i++) {
+        subPlans.add(convertGroupby(context, nodeMap, 
distinctGroupbyProto.getSubPlans(i)));
+      }
+      distinctGroupby.setSubPlans(subPlans);
+    }
+
+    if (distinctGroupbyProto.getGroupingKeysCount() > 0) {
+      
distinctGroupby.setGroupingColumns(convertColumns(distinctGroupbyProto.getGroupingKeysList()));
+    }
+    if (distinctGroupbyProto.getAggFunctionsCount() > 0) {
+      distinctGroupby.setAggFunctions(convertAggFuncCallEvals(context, 
distinctGroupbyProto.getAggFunctionsList()));
+    }
+    if (distinctGroupbyProto.getTargetsCount() > 0) {
+      distinctGroupby.setTargets(convertTargets(context, 
distinctGroupbyProto.getTargetsList()));
+    }
+    int [] resultColumnIds = new int[distinctGroupbyProto.getResultIdCount()];
+    for (int i = 0; i < distinctGroupbyProto.getResultIdCount(); i++) {
+      resultColumnIds[i] = distinctGroupbyProto.getResultId(i);
+    }
+    distinctGroupby.setResultColumnIds(resultColumnIds);
+
+    // TODO - in distinct groupby, output and target are not matched to each 
other. It does not follow the convention.
+    distinctGroupby.setInSchema(convertSchema(protoNode.getInSchema()));
+    distinctGroupby.setOutSchema(convertSchema(protoNode.getOutSchema()));
+
+    return distinctGroupby;
+  }
+
+  private static JoinNode convertJoin(OverridableConf context, Map<Integer, 
LogicalNode> nodeMap,
+                                     PlanProto.LogicalNode protoNode) {
+    PlanProto.JoinNode joinProto = protoNode.getJoin();
+
+    JoinNode join = new JoinNode(protoNode.getNodeId());
+    join.setLeftChild(nodeMap.get(joinProto.getLeftChildSeq()));
+    join.setRightChild(nodeMap.get(joinProto.getRightChilSeq()));
+    join.setJoinType(convertJoinType(joinProto.getJoinType()));
+    join.setInSchema(convertSchema(protoNode.getInSchema()));
+    join.setOutSchema(convertSchema(protoNode.getOutSchema()));
+    if (joinProto.hasJoinQual()) {
+      join.setJoinQual(EvalNodeDeserializer.deserialize(context, 
joinProto.getJoinQual()));
+    }
+    if (joinProto.getExistsTargets()) {
+      join.setTargets(convertTargets(context, joinProto.getTargetsList()));
+    }
+
+    return join;
+  }
+
+  private static SelectionNode convertFilter(OverridableConf context, 
Map<Integer, LogicalNode> nodeMap,
+                                            PlanProto.LogicalNode protoNode) {
+    PlanProto.FilterNode filterProto = protoNode.getFilter();
+
+    SelectionNode selection = new SelectionNode(protoNode.getNodeId());
+    selection.setInSchema(convertSchema(protoNode.getInSchema()));
+    selection.setOutSchema(convertSchema(protoNode.getOutSchema()));
+    selection.setChild(nodeMap.get(filterProto.getChildSeq()));
+    selection.setQual(EvalNodeDeserializer.deserialize(context, 
filterProto.getQual()));
+
+    return selection;
+  }
+
+  private static UnionNode convertUnion(Map<Integer, LogicalNode> nodeMap, 
PlanProto.LogicalNode protoNode) {
+    PlanProto.UnionNode unionProto = protoNode.getUnion();
+
+    UnionNode union = new UnionNode(protoNode.getNodeId());
+    union.setInSchema(convertSchema(protoNode.getInSchema()));
+    union.setOutSchema(convertSchema(protoNode.getOutSchema()));
+    union.setLeftChild(nodeMap.get(unionProto.getLeftChildSeq()));
+    union.setRightChild(nodeMap.get(unionProto.getRightChildSeq()));
+
+    return union;
+  }
+
+  private static ScanNode convertScan(OverridableConf context, 
PlanProto.LogicalNode protoNode) {
+    ScanNode scan = new ScanNode(protoNode.getNodeId());
+    fillScanNode(context, protoNode, scan);
+
+    return scan;
+  }
+
+  private static void fillScanNode(OverridableConf context, 
PlanProto.LogicalNode protoNode, ScanNode scan) {
+    PlanProto.ScanNode scanProto = protoNode.getScan();
+    if (scanProto.hasAlias()) {
+      scan.init(new TableDesc(scanProto.getTable()), scanProto.getAlias());
+    } else {
+      scan.init(new TableDesc(scanProto.getTable()));
+    }
+
+    if (scanProto.getExistTargets()) {
+      scan.setTargets(convertTargets(context, scanProto.getTargetsList()));
+    }
+
+    if (scanProto.hasQual()) {
+      scan.setQual(EvalNodeDeserializer.deserialize(context, 
scanProto.getQual()));
+    }
+
+    scan.setInSchema(convertSchema(protoNode.getInSchema()));
+    scan.setOutSchema(convertSchema(protoNode.getOutSchema()));
+  }
+
+  private static PartitionedTableScanNode convertPartitionScan(OverridableConf 
context, PlanProto.LogicalNode protoNode) {
+    PartitionedTableScanNode partitionedScan = new 
PartitionedTableScanNode(protoNode.getNodeId());
+    fillScanNode(context, protoNode, partitionedScan);
+
+    PlanProto.PartitionScanSpec partitionScanProto = 
protoNode.getPartitionScan();
+    Path [] paths = new Path[partitionScanProto.getPathsCount()];
+    for (int i = 0; i < partitionScanProto.getPathsCount(); i++) {
+      paths[i] = new Path(partitionScanProto.getPaths(i));
+    }
+    partitionedScan.setInputPaths(paths);
+    return partitionedScan;
+  }
+
+  private static TableSubQueryNode convertTableSubQuery(OverridableConf 
context,
+                                                                 Map<Integer, 
LogicalNode> nodeMap,
+                                                                 
PlanProto.LogicalNode protoNode) {
+    PlanProto.TableSubQueryNode proto = protoNode.getTableSubQuery();
+
+    TableSubQueryNode tableSubQuery = new 
TableSubQueryNode(protoNode.getNodeId());
+    tableSubQuery.init(proto.getTableName(), nodeMap.get(proto.getChildSeq()));
+    tableSubQuery.setInSchema(convertSchema(protoNode.getInSchema()));
+    if (proto.getTargetsCount() > 0) {
+      tableSubQuery.setTargets(convertTargets(context, 
proto.getTargetsList()));
+    }
+
+    return tableSubQuery;
+  }
+
+  private static CreateTableNode convertCreateTable(Map<Integer, LogicalNode> 
nodeMap,
+                                            PlanProto.LogicalNode protoNode) {
+    PlanProto.PersistentStoreNode persistentStoreProto = 
protoNode.getPersistentStore();
+    PlanProto.StoreTableNodeSpec storeTableNodeSpec = 
protoNode.getStoreTable();
+    PlanProto.CreateTableNodeSpec createTableNodeSpec = 
protoNode.getCreateTable();
+
+    CreateTableNode createTable = new CreateTableNode(protoNode.getNodeId());
+    if (protoNode.hasInSchema()) {
+      createTable.setInSchema(convertSchema(protoNode.getInSchema()));
+    }
+    if (protoNode.hasOutSchema()) {
+      createTable.setOutSchema(convertSchema(protoNode.getOutSchema()));
+    }
+    createTable.setChild(nodeMap.get(persistentStoreProto.getChildSeq()));
+    createTable.setStorageType(persistentStoreProto.getStorageType());
+    createTable.setOptions(new 
KeyValueSet(persistentStoreProto.getTableProperties()));
+
+    createTable.setTableName(storeTableNodeSpec.getTableName());
+    if (storeTableNodeSpec.hasPartitionMethod()) {
+      createTable.setPartitionMethod(new 
PartitionMethodDesc(storeTableNodeSpec.getPartitionMethod()));
+    }
+
+    createTable.setTableSchema(convertSchema(createTableNodeSpec.getSchema()));
+    createTable.setExternal(createTableNodeSpec.getExternal());
+    if (createTableNodeSpec.getExternal() && createTableNodeSpec.hasPath()) {
+      createTable.setPath(new Path(createTableNodeSpec.getPath()));
+    }
+    createTable.setIfNotExists(createTableNodeSpec.getIfNotExists());
+
+    return createTable;
+  }
+
+  private static InsertNode convertInsert(Map<Integer, LogicalNode> nodeMap,
+                                                   PlanProto.LogicalNode 
protoNode) {
+    PlanProto.PersistentStoreNode persistentStoreProto = 
protoNode.getPersistentStore();
+    PlanProto.StoreTableNodeSpec storeTableNodeSpec = 
protoNode.getStoreTable();
+    PlanProto.InsertNodeSpec insertNodeSpec = protoNode.getInsert();
+
+    InsertNode insertNode = new InsertNode(protoNode.getNodeId());
+    if (protoNode.hasInSchema()) {
+      insertNode.setInSchema(convertSchema(protoNode.getInSchema()));
+    }
+    if (protoNode.hasOutSchema()) {
+      insertNode.setOutSchema(convertSchema(protoNode.getOutSchema()));
+    }
+    insertNode.setChild(nodeMap.get(persistentStoreProto.getChildSeq()));
+    insertNode.setStorageType(persistentStoreProto.getStorageType());
+    insertNode.setOptions(new 
KeyValueSet(persistentStoreProto.getTableProperties()));
+
+    if (storeTableNodeSpec.hasTableName()) {
+      insertNode.setTableName(storeTableNodeSpec.getTableName());
+    }
+    if (storeTableNodeSpec.hasPartitionMethod()) {
+      insertNode.setPartitionMethod(new 
PartitionMethodDesc(storeTableNodeSpec.getPartitionMethod()));
+    }
+
+    insertNode.setOverwrite(insertNodeSpec.getOverwrite());
+    insertNode.setTableSchema(convertSchema(insertNodeSpec.getTableSchema()));
+    if (insertNodeSpec.hasTargetSchema()) {
+      
insertNode.setTargetSchema(convertSchema(insertNodeSpec.getTargetSchema()));
+    }
+    if (insertNodeSpec.hasProjectedSchema()) {
+      
insertNode.setProjectedSchema(convertSchema(insertNodeSpec.getProjectedSchema()));
+    }
+    if (insertNodeSpec.hasPath()) {
+      insertNode.setPath(new Path(insertNodeSpec.getPath()));
+    }
+
+    return insertNode;
+  }
+
+  private static DropTableNode convertDropTable(PlanProto.LogicalNode 
protoNode) {
+    DropTableNode dropTable = new DropTableNode(protoNode.getNodeId());
+
+    PlanProto.DropTableNode dropTableProto = protoNode.getDropTable();
+    dropTable.init(dropTableProto.getTableName(), 
dropTableProto.getIfExists(), dropTableProto.getPurge());
+
+    return dropTable;
+  }
+
+  private static CreateDatabaseNode 
convertCreateDatabase(PlanProto.LogicalNode protoNode) {
+    CreateDatabaseNode createDatabase = new 
CreateDatabaseNode(protoNode.getNodeId());
+
+    PlanProto.CreateDatabaseNode createDatabaseProto = 
protoNode.getCreateDatabase();
+    createDatabase.init(createDatabaseProto.getDbName(), 
createDatabaseProto.getIfNotExists());
+
+    return createDatabase;
+  }
+
+  private static DropDatabaseNode convertDropDatabase(PlanProto.LogicalNode 
protoNode) {
+    DropDatabaseNode dropDatabase = new 
DropDatabaseNode(protoNode.getNodeId());
+
+    PlanProto.DropDatabaseNode dropDatabaseProto = protoNode.getDropDatabase();
+    dropDatabase.init(dropDatabaseProto.getDbName(), 
dropDatabaseProto.getIfExists());
+
+    return dropDatabase;
+  }
+
+  private static AlterTablespaceNode 
convertAlterTablespace(PlanProto.LogicalNode protoNode) {
+    AlterTablespaceNode alterTablespace = new 
AlterTablespaceNode(protoNode.getNodeId());
+
+    PlanProto.AlterTablespaceNode alterTablespaceProto = 
protoNode.getAlterTablespace();
+    
alterTablespace.setTablespaceName(alterTablespaceProto.getTableSpaceName());
+
+    switch (alterTablespaceProto.getSetType()) {
+    case LOCATION:
+      
alterTablespace.setLocation(alterTablespaceProto.getSetLocation().getLocation());
+      break;
+    default:
+      throw new UnimplementedException("Unknown SET type in ALTER TABLE: " + 
alterTablespaceProto.getSetType().name());
+    }
+
+    return alterTablespace;
+  }
+
+  private static AlterTableNode convertAlterTable(PlanProto.LogicalNode 
protoNode) {
+    AlterTableNode alterTable = new AlterTableNode(protoNode.getNodeId());
+
+    PlanProto.AlterTableNode alterTableProto = protoNode.getAlterTable();
+    alterTable.setTableName(alterTableProto.getTableName());
+
+    switch (alterTableProto.getSetType()) {
+    case RENAME_TABLE:
+      
alterTable.setNewTableName(alterTableProto.getRenameTable().getNewName());
+      break;
+    case ADD_COLUMN:
+      alterTable.setAddNewColumn(new 
Column(alterTableProto.getAddColumn().getAddColumn()));
+      break;
+    case RENAME_COLUMN:
+      alterTable.setColumnName(alterTableProto.getRenameColumn().getOldName());
+      
alterTable.setNewColumnName(alterTableProto.getRenameColumn().getNewName());
+      break;
+    default:
+      throw new UnimplementedException("Unknown SET type in ALTER TABLE: " + 
alterTableProto.getSetType().name());
+    }
+
+    return alterTable;
+  }
+
+  private static TruncateTableNode convertTruncateTable(PlanProto.LogicalNode 
protoNode) {
+    TruncateTableNode truncateTable = new 
TruncateTableNode(protoNode.getNodeId());
+
+    PlanProto.TruncateTableNode truncateTableProto = 
protoNode.getTruncateTableNode();
+    truncateTable.setTableNames(truncateTableProto.getTableNamesList());
+
+    return truncateTable;
+  }
+
+  private static AggregationFunctionCallEval [] 
convertAggFuncCallEvals(OverridableConf context,
+                                                                       
List<PlanProto.EvalNodeTree> evalTrees) {
+    AggregationFunctionCallEval [] aggFuncs = new 
AggregationFunctionCallEval[evalTrees.size()];
+    for (int i = 0; i < aggFuncs.length; i++) {
+      aggFuncs[i] = (AggregationFunctionCallEval) 
EvalNodeDeserializer.deserialize(context, evalTrees.get(i));
+    }
+    return aggFuncs;
+  }
+
+  private static WindowFunctionEval[] convertWindowFunccEvals(OverridableConf 
context,
+                                                                       
List<PlanProto.EvalNodeTree> evalTrees) {
+    WindowFunctionEval[] winFuncEvals = new 
WindowFunctionEval[evalTrees.size()];
+    for (int i = 0; i < winFuncEvals.length; i++) {
+      winFuncEvals[i] = (WindowFunctionEval) 
EvalNodeDeserializer.deserialize(context, evalTrees.get(i));
+    }
+    return winFuncEvals;
+  }
+
+  public static Schema convertSchema(CatalogProtos.SchemaProto proto) {
+    return new Schema(proto);
+  }
+
+  public static Column[] convertColumns(List<CatalogProtos.ColumnProto> 
columnProtos) {
+    Column [] columns = new Column[columnProtos.size()];
+    for (int i = 0; i < columns.length; i++) {
+      columns[i] = new Column(columnProtos.get(i));
+    }
+    return columns;
+  }
+
+  public static Target[] convertTargets(OverridableConf context, 
List<PlanProto.Target> targetsProto) {
+    Target [] targets = new Target[targetsProto.size()];
+    for (int i = 0; i < targets.length; i++) {
+      PlanProto.Target targetProto = targetsProto.get(i);
+      EvalNode evalNode = EvalNodeDeserializer.deserialize(context, 
targetProto.getExpr());
+      if (targetProto.hasAlias()) {
+        targets[i] = new Target(evalNode, targetProto.getAlias());
+      } else {
+        targets[i] = new Target((FieldEval) evalNode);
+      }
+    }
+    return targets;
+  }
+
+  public static SortSpec[] convertSortSpecs(List<CatalogProtos.SortSpecProto> 
sortSpecProtos) {
+    SortSpec[] sortSpecs = new SortSpec[sortSpecProtos.size()];
+    int i = 0;
+    for (CatalogProtos.SortSpecProto proto : sortSpecProtos) {
+      sortSpecs[i++] = new SortSpec(proto);
+    }
+    return sortSpecs;
+  }
+
+  public static JoinType convertJoinType(PlanProto.JoinType type) {
+    switch (type) {
+    case CROSS_JOIN:
+      return JoinType.CROSS;
+    case INNER_JOIN:
+      return JoinType.INNER;
+    case LEFT_OUTER_JOIN:
+      return JoinType.LEFT_OUTER;
+    case RIGHT_OUTER_JOIN:
+      return JoinType.RIGHT_OUTER;
+    case FULL_OUTER_JOIN:
+      return JoinType.FULL_OUTER;
+    case LEFT_SEMI_JOIN:
+      return JoinType.LEFT_SEMI;
+    case RIGHT_SEMI_JOIN:
+      return JoinType.RIGHT_SEMI;
+    case LEFT_ANTI_JOIN:
+      return JoinType.LEFT_ANTI;
+    case RIGHT_ANTI_JOIN:
+      return JoinType.RIGHT_ANTI;
+    case UNION_JOIN:
+      return JoinType.UNION;
+    default:
+      throw new RuntimeException("Unknown JoinType: " + type.name());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
new file mode 100644
index 0000000..39a13ba
--- /dev/null
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
@@ -0,0 +1,724 @@
+/*
+ * Lisensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.serder;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.exception.UnimplementedException;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.Target;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.serder.PlanProto.AlterTableNode.AddColumn;
+import org.apache.tajo.plan.serder.PlanProto.AlterTableNode.RenameColumn;
+import org.apache.tajo.plan.serder.PlanProto.AlterTableNode.RenameTable;
+import org.apache.tajo.plan.serder.PlanProto.AlterTablespaceNode.SetLocation;
+import org.apache.tajo.plan.serder.PlanProto.LogicalNodeTree;
+import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
+import org.apache.tajo.util.ProtoUtil;
+import org.apache.tajo.util.TUtil;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+/**
+ * It serializes a logical plan into a protobuf-based serialized bytes.
+ *
+ * In detail, it traverses all logical nodes in a postfix order.
+ * For each visiting node, it serializes the node and adds the serialized 
bytes into a list.
+ * Then, a list will contains a list of serialized nodes in a postfix order.
+ *
+ * @see org.apache.tajo.plan.serder.LogicalNodeDeserializer
+ */
+public class LogicalNodeSerializer extends 
BasicLogicalPlanVisitor<LogicalNodeSerializer.SerializeContext,
+    LogicalNode> {
+
+  private static final LogicalNodeSerializer instance;
+
+  static {
+    instance = new LogicalNodeSerializer();
+  }
+
+  /**
+   * Serialize a logical plan into a protobuf-based serialized bytes.
+   *
+   * @param node LogicalNode to be serialized
+   * @return A list of serialized nodes
+   */
+  public static LogicalNodeTree serialize(LogicalNode node) {
+    SerializeContext context = new SerializeContext();
+    try {
+      instance.visit(context, null, null, node, new Stack<LogicalNode>());
+    } catch (PlanningException e) {
+      throw new RuntimeException(e);
+    }
+    return context.treeBuilder.build();
+  }
+
+  private static PlanProto.LogicalNode.Builder 
createNodeBuilder(SerializeContext context, LogicalNode node) {
+    int selfId;
+    if (context.idMap.containsKey(node)) {
+      selfId = context.idMap.get(node);
+    } else {
+      selfId = context.seqId++;
+      context.idMap.put(node, selfId);
+    }
+
+    PlanProto.LogicalNode.Builder nodeBuilder = 
PlanProto.LogicalNode.newBuilder();
+    nodeBuilder.setVisitSeq(selfId);
+    nodeBuilder.setNodeId(node.getPID());
+    nodeBuilder.setType(convertType(node.getType()));
+
+    // some DDL statements like DropTable or DropDatabase do not have in/out 
schemas
+    if (node.getInSchema() != null) {
+      nodeBuilder.setInSchema(node.getInSchema().getProto());
+    }
+    if (node.getOutSchema() != null) {
+      nodeBuilder.setOutSchema(node.getOutSchema().getProto());
+    }
+    return nodeBuilder;
+  }
+
+  public static class SerializeContext {
+    private int seqId = 0;
+    private Map<LogicalNode, Integer> idMap = Maps.newHashMap();
+    private LogicalNodeTree.Builder treeBuilder = LogicalNodeTree.newBuilder();
+  }
+
+  public LogicalNode visitRoot(SerializeContext context, LogicalPlan plan, 
LogicalPlan.QueryBlock block,
+                               LogicalRootNode root, Stack<LogicalNode> stack) 
throws PlanningException {
+    super.visitRoot(context, plan, block, root, stack);
+
+    int [] childIds = registerGetChildIds(context, root);
+
+    PlanProto.RootNode.Builder rootBuilder = PlanProto.RootNode.newBuilder();
+    rootBuilder.setChildSeq(childIds[0]);
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, 
root);
+    nodeBuilder.setRoot(rootBuilder);
+    context.treeBuilder.addNodes(nodeBuilder);
+
+    return root;
+  }
+
+  @Override
+  public LogicalNode visitSetSession(SerializeContext context, LogicalPlan 
plan, LogicalPlan.QueryBlock block,
+                                     SetSessionNode node, Stack<LogicalNode> 
stack) throws PlanningException {
+    super.visitSetSession(context, plan, block, node, stack);
+
+    PlanProto.SetSessionNode.Builder builder = 
PlanProto.SetSessionNode.newBuilder();
+    builder.setName(node.getName());
+    if (node.hasValue()) {
+      builder.setValue(node.getValue());
+    }
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, 
node);
+    nodeBuilder.setSetSession(builder);
+    context.treeBuilder.addNodes(nodeBuilder);
+
+    return node;
+  }
+
+  public LogicalNode visitEvalExpr(SerializeContext context, LogicalPlan plan, 
LogicalPlan.QueryBlock block,
+                                   EvalExprNode exprEval, Stack<LogicalNode> 
stack) throws PlanningException {
+    PlanProto.EvalExprNode.Builder exprEvalBuilder = 
PlanProto.EvalExprNode.newBuilder();
+    exprEvalBuilder.addAllTargets(
+        ProtoUtil.<PlanProto.Target>toProtoObjects(exprEval.getTargets()));
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, 
exprEval);
+    nodeBuilder.setExprEval(exprEvalBuilder);
+    context.treeBuilder.addNodes(nodeBuilder);
+
+    return exprEval;
+  }
+
+  public LogicalNode visitProjection(SerializeContext context, LogicalPlan 
plan, LogicalPlan.QueryBlock block,
+                                     ProjectionNode projection, 
Stack<LogicalNode> stack) throws PlanningException {
+    super.visitProjection(context, plan, block, projection, stack);
+
+    int [] childIds = registerGetChildIds(context, projection);
+
+    PlanProto.ProjectionNode.Builder projectionBuilder = 
PlanProto.ProjectionNode.newBuilder();
+    projectionBuilder.setChildSeq(childIds[0]);
+    projectionBuilder.addAllTargets(
+        ProtoUtil.<PlanProto.Target>toProtoObjects(projection.getTargets()));
+    projectionBuilder.setDistinct(projection.isDistinct());
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, 
projection);
+    nodeBuilder.setProjection(projectionBuilder);
+    context.treeBuilder.addNodes(nodeBuilder);
+
+    return projection;
+  }
+
+  @Override
+  public LogicalNode visitLimit(SerializeContext context, LogicalPlan plan, 
LogicalPlan.QueryBlock block,
+                                LimitNode limit, Stack<LogicalNode> stack) 
throws PlanningException {
+    super.visitLimit(context, plan, block, limit, stack);
+
+    int [] childIds = registerGetChildIds(context, limit);
+
+    PlanProto.LimitNode.Builder limitBuilder = 
PlanProto.LimitNode.newBuilder();
+    limitBuilder.setChildSeq(childIds[0]);
+    limitBuilder.setFetchFirstNum(limit.getFetchFirstNum());
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, 
limit);
+    nodeBuilder.setLimit(limitBuilder);
+    context.treeBuilder.addNodes(nodeBuilder);
+
+    return limit;
+  }
+
+  public LogicalNode visitWindowAgg(SerializeContext context, LogicalPlan 
plan, LogicalPlan.QueryBlock block,
+                                    WindowAggNode windowAgg, 
Stack<LogicalNode> stack) throws PlanningException {
+    super.visitWindowAgg(context, plan, block, windowAgg, stack);
+
+    int [] childIds = registerGetChildIds(context, windowAgg);
+
+    PlanProto.WindowAggNode.Builder windowAggBuilder = 
PlanProto.WindowAggNode.newBuilder();
+    windowAggBuilder.setChildSeq(childIds[0]);
+
+    if (windowAgg.hasPartitionKeys()) {
+      windowAggBuilder.addAllPartitionKeys(
+          
ProtoUtil.<CatalogProtos.ColumnProto>toProtoObjects(windowAgg.getPartitionKeys()));
+    }
+
+    if (windowAgg.hasAggFunctions()) {
+      windowAggBuilder.addAllWindowFunctions(
+          
ProtoUtil.<PlanProto.EvalNodeTree>toProtoObjects(windowAgg.getWindowFunctions()));
+    }
+    windowAggBuilder.setDistinct(windowAgg.isDistinct());
+
+    if (windowAgg.hasSortSpecs()) {
+      windowAggBuilder.addAllSortSpecs(
+          
ProtoUtil.<CatalogProtos.SortSpecProto>toProtoObjects(windowAgg.getSortSpecs()));
+    }
+    if (windowAgg.hasTargets()) {
+      windowAggBuilder.addAllTargets(
+          ProtoUtil.<PlanProto.Target>toProtoObjects(windowAgg.getTargets()));
+    }
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, 
windowAgg);
+    nodeBuilder.setWindowAgg(windowAggBuilder);
+    context.treeBuilder.addNodes(nodeBuilder);
+
+    return windowAgg;
+  }
+
+  @Override
+  public LogicalNode visitSort(SerializeContext context, LogicalPlan plan, 
LogicalPlan.QueryBlock block,
+                                SortNode sort, Stack<LogicalNode> stack) 
throws PlanningException {
+    super.visitSort(context, plan, block, sort, stack);
+
+    int [] childIds = registerGetChildIds(context, sort);
+
+    PlanProto.SortNode.Builder sortBuilder = PlanProto.SortNode.newBuilder();
+    sortBuilder.setChildSeq(childIds[0]);
+    for (int i = 0; i < sort.getSortKeys().length; i++) {
+      sortBuilder.addSortSpecs(sort.getSortKeys()[i].getProto());
+    }
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, 
sort);
+    nodeBuilder.setSort(sortBuilder);
+    context.treeBuilder.addNodes(nodeBuilder);
+
+    return sort;
+  }
+
+  @Override
+  public LogicalNode visitHaving(SerializeContext context, LogicalPlan plan, 
LogicalPlan.QueryBlock block,
+                                 HavingNode having, Stack<LogicalNode> stack) 
throws PlanningException {
+    super.visitHaving(context, plan, block, having, stack);
+
+    int [] childIds = registerGetChildIds(context, having);
+
+    PlanProto.FilterNode.Builder filterBuilder = 
PlanProto.FilterNode.newBuilder();
+    filterBuilder.setChildSeq(childIds[0]);
+    filterBuilder.setQual(EvalNodeSerializer.serialize(having.getQual()));
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, 
having);
+    nodeBuilder.setFilter(filterBuilder);
+    context.treeBuilder.addNodes(nodeBuilder);
+
+    return having;
+  }
+
+  public LogicalNode visitGroupBy(SerializeContext context, LogicalPlan plan, 
LogicalPlan.QueryBlock block,
+                                  GroupbyNode node, Stack<LogicalNode> stack) 
throws PlanningException {
+    super.visitGroupBy(context, plan, block, node, new Stack<LogicalNode>());
+
+    PlanProto.LogicalNode.Builder nodeBuilder = buildGroupby(context, node);
+    context.treeBuilder.addNodes(nodeBuilder);
+    return node;
+  }
+
+  private PlanProto.LogicalNode.Builder buildGroupby(SerializeContext context, 
GroupbyNode node)
+      throws PlanningException {
+    int [] childIds = registerGetChildIds(context, node);
+
+    PlanProto.GroupbyNode.Builder groupbyBuilder = 
PlanProto.GroupbyNode.newBuilder();
+    groupbyBuilder.setChildSeq(childIds[0]);
+    groupbyBuilder.setDistinct(node.isDistinct());
+
+    if (node.groupingKeyNum() > 0) {
+      groupbyBuilder.addAllGroupingKeys(
+          
ProtoUtil.<CatalogProtos.ColumnProto>toProtoObjects(node.getGroupingColumns()));
+    }
+    if (node.hasAggFunctions()) {
+      groupbyBuilder.addAllAggFunctions(
+          
ProtoUtil.<PlanProto.EvalNodeTree>toProtoObjects(node.getAggFunctions()));
+    }
+    if (node.hasTargets()) {
+      
groupbyBuilder.addAllTargets(ProtoUtil.<PlanProto.Target>toProtoObjects(node.getTargets()));
+    }
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, 
node);
+    nodeBuilder.setGroupby(groupbyBuilder);
+
+    return nodeBuilder;
+  }
+
+  public LogicalNode visitDistinctGroupby(SerializeContext context, 
LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                          DistinctGroupbyNode node, 
Stack<LogicalNode> stack) throws PlanningException {
+    super.visitDistinctGroupby(context, plan, block, node, new 
Stack<LogicalNode>());
+
+    int [] childIds = registerGetChildIds(context, node);
+
+    PlanProto.DistinctGroupbyNode.Builder distGroupbyBuilder = 
PlanProto.DistinctGroupbyNode.newBuilder();
+    distGroupbyBuilder.setChildSeq(childIds[0]);
+    if (node.getGroupbyPlan() != null) {
+      distGroupbyBuilder.setGroupbyNode(buildGroupby(context, 
node.getGroupbyPlan()));
+    }
+
+    for (GroupbyNode subPlan : node.getSubPlans()) {
+      distGroupbyBuilder.addSubPlans(buildGroupby(context, subPlan));
+    }
+
+    if (node.getGroupingColumns().length > 0) {
+      distGroupbyBuilder.addAllGroupingKeys(
+          
ProtoUtil.<CatalogProtos.ColumnProto>toProtoObjects(node.getGroupingColumns()));
+    }
+    if (node.getAggFunctions().length > 0) {
+      distGroupbyBuilder.addAllAggFunctions(
+          
ProtoUtil.<PlanProto.EvalNodeTree>toProtoObjects(node.getAggFunctions()));
+    }
+    if (node.hasTargets()) {
+      
distGroupbyBuilder.addAllTargets(ProtoUtil.<PlanProto.Target>toProtoObjects(node.getTargets()));
+    }
+    for (int cid : node.getResultColumnIds()) {
+      distGroupbyBuilder.addResultId(cid);
+    }
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, 
node);
+    nodeBuilder.setDistinctGroupby(distGroupbyBuilder);
+    context.treeBuilder.addNodes(nodeBuilder);
+
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitFilter(SerializeContext context, LogicalPlan plan, 
LogicalPlan.QueryBlock block,
+                                 SelectionNode filter, Stack<LogicalNode> 
stack) throws PlanningException {
+    super.visitFilter(context, plan, block, filter, stack);
+
+    int [] childIds = registerGetChildIds(context, filter);
+
+    PlanProto.FilterNode.Builder filterBuilder = 
PlanProto.FilterNode.newBuilder();
+    filterBuilder.setChildSeq(childIds[0]);
+    filterBuilder.setQual(EvalNodeSerializer.serialize(filter.getQual()));
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, 
filter);
+    nodeBuilder.setFilter(filterBuilder);
+    context.treeBuilder.addNodes(nodeBuilder);
+
+    return filter;
+  }
+
+  public LogicalNode visitJoin(SerializeContext context, LogicalPlan plan, 
LogicalPlan.QueryBlock block, JoinNode join,
+                          Stack<LogicalNode> stack) throws PlanningException {
+    super.visitJoin(context, plan, block, join, stack);
+
+    int [] childIds = registerGetChildIds(context, join);
+
+    // building itself
+    PlanProto.JoinNode.Builder joinBuilder = PlanProto.JoinNode.newBuilder();
+    joinBuilder.setJoinType(convertJoinType(join.getJoinType()));
+    joinBuilder.setLeftChildSeq(childIds[0]);
+    joinBuilder.setRightChilSeq(childIds[1]);
+    if (join.hasJoinQual()) {
+      
joinBuilder.setJoinQual(EvalNodeSerializer.serialize(join.getJoinQual()));
+    }
+
+    if (join.hasTargets()) {
+      joinBuilder.setExistsTargets(true);
+      
joinBuilder.addAllTargets(ProtoUtil.<PlanProto.Target>toProtoObjects(join.getTargets()));
+    } else {
+      joinBuilder.setExistsTargets(false);
+    }
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, 
join);
+    nodeBuilder.setJoin(joinBuilder);
+    context.treeBuilder.addNodes(nodeBuilder);
+
+    return join;
+  }
+
+  @Override
+  public LogicalNode visitUnion(SerializeContext context, LogicalPlan plan, 
LogicalPlan.QueryBlock block, UnionNode node,
+                           Stack<LogicalNode> stack) throws PlanningException {
+    super.visitUnion(context, plan, block, node, stack);
+
+    int [] childIds = registerGetChildIds(context, node);
+
+    PlanProto.UnionNode.Builder unionBuilder = 
PlanProto.UnionNode.newBuilder();
+    unionBuilder.setAll(true);
+    unionBuilder.setLeftChildSeq(childIds[0]);
+    unionBuilder.setRightChildSeq(childIds[1]);
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, 
node);
+    nodeBuilder.setUnion(unionBuilder);
+    context.treeBuilder.addNodes(nodeBuilder);
+
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitScan(SerializeContext context, LogicalPlan plan, 
LogicalPlan.QueryBlock block,
+                               ScanNode scan, Stack<LogicalNode> stack) throws 
PlanningException {
+
+    PlanProto.ScanNode.Builder scanBuilder = buildScanNode(scan);
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, 
scan);
+    nodeBuilder.setScan(scanBuilder);
+    context.treeBuilder.addNodes(nodeBuilder);
+
+    return scan;
+  }
+
+  public PlanProto.ScanNode.Builder buildScanNode(ScanNode scan) {
+    PlanProto.ScanNode.Builder scanBuilder = PlanProto.ScanNode.newBuilder();
+    scanBuilder.setTable(scan.getTableDesc().getProto());
+    if (scan.hasAlias()) {
+      scanBuilder.setAlias(scan.getAlias());
+    }
+
+    if (scan.hasTargets()) {
+      scanBuilder.setExistTargets(true);
+      
scanBuilder.addAllTargets(ProtoUtil.<PlanProto.Target>toProtoObjects(scan.getTargets()));
+    } else {
+      scanBuilder.setExistTargets(false);
+    }
+
+    if (scan.hasQual()) {
+      scanBuilder.setQual(EvalNodeSerializer.serialize(scan.getQual()));
+    }
+    return scanBuilder;
+  }
+
+  @Override
+  public LogicalNode visitPartitionedTableScan(SerializeContext context, 
LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                          PartitionedTableScanNode node, 
Stack<LogicalNode> stack)
+      throws PlanningException {
+
+    PlanProto.ScanNode.Builder scanBuilder = buildScanNode(node);
+
+    PlanProto.PartitionScanSpec.Builder partitionScan = 
PlanProto.PartitionScanSpec.newBuilder();
+    List<String> pathStrs = TUtil.newList();
+    if (node.getInputPaths() != null) {
+      for (Path p : node.getInputPaths()) {
+        pathStrs.add(p.toString());
+      }
+      partitionScan.addAllPaths(pathStrs);
+    }
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, 
node);
+    nodeBuilder.setScan(scanBuilder);
+    nodeBuilder.setPartitionScan(partitionScan);
+    context.treeBuilder.addNodes(nodeBuilder);
+
+    return node;
+  }
+
+  public LogicalNode visitTableSubQuery(SerializeContext context, LogicalPlan 
plan, LogicalPlan.QueryBlock block,
+                                   TableSubQueryNode node, Stack<LogicalNode> 
stack) throws PlanningException {
+    super.visitTableSubQuery(context, plan, block, node, stack);
+
+    int [] childIds = registerGetChildIds(context, node);
+
+    PlanProto.TableSubQueryNode.Builder builder = 
PlanProto.TableSubQueryNode.newBuilder();
+    builder.setChildSeq(childIds[0]);
+
+    builder.setTableName(node.getTableName());
+
+    if (node.hasTargets()) {
+      
builder.addAllTargets(ProtoUtil.<PlanProto.Target>toProtoObjects(node.getTargets()));
+    }
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, 
node);
+    nodeBuilder.setTableSubQuery(builder);
+    context.treeBuilder.addNodes(nodeBuilder);
+
+    return node;
+  }
+
+  public LogicalNode visitCreateTable(SerializeContext context, LogicalPlan 
plan, LogicalPlan.QueryBlock block,
+                                      CreateTableNode node, Stack<LogicalNode> 
stack) throws PlanningException {
+    super.visitCreateTable(context, plan, block, node, stack);
+
+    int [] childIds = registerGetChildIds(context, node);
+
+    PlanProto.PersistentStoreNode.Builder persistentStoreBuilder = 
buildPersistentStoreBuilder(node, childIds);
+    PlanProto.StoreTableNodeSpec.Builder storeTableBuilder = 
buildStoreTableNodeSpec(node);
+
+    PlanProto.CreateTableNodeSpec.Builder createTableBuilder = 
PlanProto.CreateTableNodeSpec.newBuilder();
+    createTableBuilder.setSchema(node.getTableSchema().getProto());
+    createTableBuilder.setExternal(node.isExternal());
+    if (node.isExternal() && node.hasPath()) {
+      createTableBuilder.setPath(node.getPath().toString());
+    }
+    createTableBuilder.setIfNotExists(node.isIfNotExists());
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, 
node);
+    nodeBuilder.setPersistentStore(persistentStoreBuilder);
+    nodeBuilder.setStoreTable(storeTableBuilder);
+    nodeBuilder.setCreateTable(createTableBuilder);
+    context.treeBuilder.addNodes(nodeBuilder);
+
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitDropTable(SerializeContext context, LogicalPlan 
plan, LogicalPlan.QueryBlock block,
+                                    DropTableNode node, Stack<LogicalNode> 
stack) {
+    PlanProto.DropTableNode.Builder dropTableBuilder = 
PlanProto.DropTableNode.newBuilder();
+    dropTableBuilder.setTableName(node.getTableName());
+    dropTableBuilder.setIfExists(node.isIfExists());
+    dropTableBuilder.setPurge(node.isPurge());
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, 
node);
+    nodeBuilder.setDropTable(dropTableBuilder);
+    context.treeBuilder.addNodes(nodeBuilder);
+
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitAlterTablespace(SerializeContext context, 
LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                     AlterTablespaceNode node, 
Stack<LogicalNode> stack) throws PlanningException {
+    PlanProto.AlterTablespaceNode.Builder alterTablespaceBuilder = 
PlanProto.AlterTablespaceNode.newBuilder();
+    alterTablespaceBuilder.setTableSpaceName(node.getTablespaceName());
+
+    switch (node.getSetType()) {
+    case LOCATION:
+      
alterTablespaceBuilder.setSetType(PlanProto.AlterTablespaceNode.Type.LOCATION);
+      
alterTablespaceBuilder.setSetLocation(SetLocation.newBuilder().setLocation(node.getLocation()));
+      break;
+
+    default:
+      throw new UnimplementedException("Unknown SET type in ALTER TABLESPACE: 
" + node.getSetType().name());
+    }
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, 
node);
+    nodeBuilder.setAlterTablespace(alterTablespaceBuilder);
+    context.treeBuilder.addNodes(nodeBuilder);
+
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitAlterTable(SerializeContext context, LogicalPlan 
plan, LogicalPlan.QueryBlock block,
+                                     AlterTableNode node, Stack<LogicalNode> 
stack) {
+    PlanProto.AlterTableNode.Builder alterTableBuilder = 
PlanProto.AlterTableNode.newBuilder();
+    alterTableBuilder.setTableName(node.getTableName());
+
+    switch (node.getAlterTableOpType()) {
+    case RENAME_TABLE:
+      alterTableBuilder.setSetType(PlanProto.AlterTableNode.Type.RENAME_TABLE);
+      
alterTableBuilder.setRenameTable(RenameTable.newBuilder().setNewName(node.getNewTableName()));
+      break;
+    case ADD_COLUMN:
+      alterTableBuilder.setSetType(PlanProto.AlterTableNode.Type.ADD_COLUMN);
+      
alterTableBuilder.setAddColumn(AddColumn.newBuilder().setAddColumn(node.getAddNewColumn().getProto()));
+      break;
+    case RENAME_COLUMN:
+      
alterTableBuilder.setSetType(PlanProto.AlterTableNode.Type.RENAME_COLUMN);
+      alterTableBuilder.setRenameColumn(RenameColumn.newBuilder()
+          .setOldName(node.getColumnName())
+          .setNewName(node.getNewColumnName()));
+      break;
+    default:
+      throw new UnimplementedException("Unknown SET type in ALTER TABLE: " + 
node.getAlterTableOpType().name());
+    }
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, 
node);
+    nodeBuilder.setAlterTable(alterTableBuilder);
+    context.treeBuilder.addNodes(nodeBuilder);
+
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitTruncateTable(SerializeContext context, LogicalPlan 
plan, LogicalPlan.QueryBlock block,
+                                   TruncateTableNode node, Stack<LogicalNode> 
stack) throws PlanningException {
+    PlanProto.TruncateTableNode.Builder truncateTableBuilder = 
PlanProto.TruncateTableNode.newBuilder();
+    truncateTableBuilder.addAllTableNames(node.getTableNames());
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, 
node);
+    nodeBuilder.setTruncateTableNode(truncateTableBuilder);
+    context.treeBuilder.addNodes(nodeBuilder);
+
+    return node;
+  }
+
+  public LogicalNode visitInsert(SerializeContext context, LogicalPlan plan, 
LogicalPlan.QueryBlock block,
+                                 InsertNode node, Stack<LogicalNode> stack) 
throws PlanningException {
+    super.visitInsert(context, plan, block, node, stack);
+
+    int [] childIds = registerGetChildIds(context, node);
+
+    PlanProto.PersistentStoreNode.Builder persistentStoreBuilder = 
buildPersistentStoreBuilder(node, childIds);
+    PlanProto.StoreTableNodeSpec.Builder storeTableBuilder = 
buildStoreTableNodeSpec(node);
+
+    PlanProto.InsertNodeSpec.Builder insertNodeSpec = 
PlanProto.InsertNodeSpec.newBuilder();
+    insertNodeSpec.setOverwrite(node.isOverwrite());
+    insertNodeSpec.setTableSchema(node.getTableSchema().getProto());
+    if (node.hasProjectedSchema()) {
+      insertNodeSpec.setProjectedSchema(node.getProjectedSchema().getProto());
+    }
+    if (node.hasTargetSchema()) {
+      insertNodeSpec.setTargetSchema(node.getTargetSchema().getProto());
+    }
+    if (node.hasPath()) {
+      insertNodeSpec.setPath(node.getPath().toString());
+    }
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, 
node);
+    nodeBuilder.setPersistentStore(persistentStoreBuilder);
+    nodeBuilder.setStoreTable(storeTableBuilder);
+    nodeBuilder.setInsert(insertNodeSpec);
+    context.treeBuilder.addNodes(nodeBuilder);
+
+    return node;
+  }
+
+  private static PlanProto.PersistentStoreNode.Builder 
buildPersistentStoreBuilder(PersistentStoreNode node,
+                                                                               
    int [] childIds) {
+    PlanProto.PersistentStoreNode.Builder persistentStoreBuilder = 
PlanProto.PersistentStoreNode.newBuilder();
+    persistentStoreBuilder.setChildSeq(childIds[0]);
+    persistentStoreBuilder.setStorageType(node.getStorageType());
+    if (node.hasOptions()) {
+      persistentStoreBuilder.setTableProperties(node.getOptions().getProto());
+    }
+    return persistentStoreBuilder;
+  }
+
+  private static PlanProto.StoreTableNodeSpec.Builder 
buildStoreTableNodeSpec(StoreTableNode node) {
+    PlanProto.StoreTableNodeSpec.Builder storeTableBuilder = 
PlanProto.StoreTableNodeSpec.newBuilder();
+    if (node.hasPartition()) {
+      
storeTableBuilder.setPartitionMethod(node.getPartitionMethod().getProto());
+    }
+    if (node.hasTableName()) { // It will be false if node is for INSERT INTO 
LOCATION '...'
+      storeTableBuilder.setTableName(node.getTableName());
+    }
+    return storeTableBuilder;
+  }
+
+  @Override
+  public LogicalNode visitCreateDatabase(SerializeContext context, LogicalPlan 
plan, LogicalPlan.QueryBlock block,
+                                    CreateDatabaseNode node, 
Stack<LogicalNode> stack) throws PlanningException {
+    PlanProto.CreateDatabaseNode.Builder createDatabaseBuilder = 
PlanProto.CreateDatabaseNode.newBuilder();
+    createDatabaseBuilder.setDbName(node.getDatabaseName());
+    createDatabaseBuilder.setIfNotExists(node.isIfNotExists());
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, 
node);
+    nodeBuilder.setCreateDatabase(createDatabaseBuilder);
+    context.treeBuilder.addNodes(nodeBuilder);
+
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitDropDatabase(SerializeContext context, LogicalPlan 
plan, LogicalPlan.QueryBlock block,
+                                       DropDatabaseNode node, 
Stack<LogicalNode> stack) throws PlanningException {
+    PlanProto.DropDatabaseNode.Builder dropDatabaseBuilder = 
PlanProto.DropDatabaseNode.newBuilder();
+    dropDatabaseBuilder.setDbName(node.getDatabaseName());
+    dropDatabaseBuilder.setIfExists(node.isIfExists());
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, 
node);
+    nodeBuilder.setDropDatabase(dropDatabaseBuilder);
+    context.treeBuilder.addNodes(nodeBuilder);
+
+    return node;
+  }
+
+  public static PlanProto.NodeType convertType(NodeType type) {
+    return PlanProto.NodeType.valueOf(type.name());
+  }
+
+  public static PlanProto.JoinType convertJoinType(JoinType type) {
+    switch (type) {
+    case CROSS:
+      return PlanProto.JoinType.CROSS_JOIN;
+    case INNER:
+      return PlanProto.JoinType.INNER_JOIN;
+    case LEFT_OUTER:
+      return PlanProto.JoinType.LEFT_OUTER_JOIN;
+    case RIGHT_OUTER:
+      return PlanProto.JoinType.RIGHT_OUTER_JOIN;
+    case FULL_OUTER:
+      return PlanProto.JoinType.FULL_OUTER_JOIN;
+    case LEFT_SEMI:
+      return PlanProto.JoinType.LEFT_SEMI_JOIN;
+    case RIGHT_SEMI:
+      return PlanProto.JoinType.RIGHT_SEMI_JOIN;
+    case LEFT_ANTI:
+      return PlanProto.JoinType.LEFT_ANTI_JOIN;
+    case RIGHT_ANTI:
+      return PlanProto.JoinType.RIGHT_ANTI_JOIN;
+    case UNION:
+      return PlanProto.JoinType.UNION_JOIN;
+    default:
+      throw new RuntimeException("Unknown JoinType: " + type.name());
+    }
+  }
+
+  public static PlanProto.Target convertTarget(Target target) {
+    PlanProto.Target.Builder targetBuilder = PlanProto.Target.newBuilder();
+    targetBuilder.setExpr(EvalNodeSerializer.serialize(target.getEvalTree()));
+    if (target.hasAlias()) {
+      targetBuilder.setAlias(target.getAlias());
+    }
+    return targetBuilder.build();
+  }
+
+  private int [] registerGetChildIds(SerializeContext context, LogicalNode 
node) {
+    int [] childIds = new int[node.childNum()];
+    for (int i = 0; i < node.childNum(); i++) {
+      if (context.idMap.containsKey(node.getChild(i))) {
+        childIds[i] = context.idMap.get(node.getChild(i));
+      } else {
+        childIds[i] = context.seqId++;
+      }
+    }
+    return childIds;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/serder/package-info.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/package-info.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/package-info.java
new file mode 100644
index 0000000..b148fec
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package provides (de)serialization API for logical plan and it related 
parts.
+ * They employ protocol buffer to (de)serialize logical plans.
+ */
+package org.apache.tajo.plan.serder;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
index 4672577..d813432 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
@@ -26,10 +26,8 @@ import org.apache.tajo.SessionVars;
 import org.apache.tajo.algebra.*;
 import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.common.TajoDataTypes.DataType;
-import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.plan.*;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.logical.*;
@@ -48,6 +46,9 @@ import static 
org.apache.tajo.catalog.proto.CatalogProtos.StoreType.TEXTFILE;
 
 public class PlannerUtil {
 
+  public static final Column [] EMPTY_COLUMNS = new Column[] {};
+  public static final AggregationFunctionCallEval [] EMPTY_AGG_FUNCS = new 
AggregationFunctionCallEval[] {};
+
   public static boolean checkIfSetSession(LogicalNode node) {
     LogicalNode baseNode = node;
     if (node instanceof LogicalRootNode) {
@@ -698,7 +699,7 @@ public class PlannerUtil {
         copy.setPID(plan.newPID());
         if (node instanceof DistinctGroupbyNode) {
           DistinctGroupbyNode dNode = (DistinctGroupbyNode)copy;
-          for (GroupbyNode eachNode: dNode.getGroupByNodes()) {
+          for (GroupbyNode eachNode: dNode.getSubPlans()) {
             eachNode.setPID(plan.newPID());
           }
         }
@@ -762,15 +763,6 @@ public class PlannerUtil {
     return names;
   }
 
-  public static SortSpec[] 
convertSortSpecs(Collection<CatalogProtos.SortSpecProto> sortSpecProtos) {
-    SortSpec[] sortSpecs = new SortSpec[sortSpecProtos.size()];
-    int i = 0;
-    for (CatalogProtos.SortSpecProto proto : sortSpecProtos) {
-      sortSpecs[i++] = new SortSpec(proto);
-    }
-    return sortSpecs;
-  }
-
   /**
    * Generate an explain string of a LogicalNode and its descendant nodes.
    *

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java
index d09710e..23c834d 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java
@@ -63,7 +63,8 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> 
implements LogicalPlanVisi
         current = visitSetSession(context, plan, block, (SetSessionNode) node, 
stack);
         break;
       case EXPRS:
-        return null;
+        current = visitEvalExpr(context, plan, block, (EvalExprNode) node, 
stack);
+        break;
       case PROJECTION:
         current = visitProjection(context, plan, block, (ProjectionNode) node, 
stack);
         break;
@@ -83,7 +84,7 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> 
implements LogicalPlanVisi
         current = visitWindowAgg(context, plan, block, (WindowAggNode) node, 
stack);
         break;
       case DISTINCT_GROUP_BY:
-        current = visitDistinct(context, plan, block, (DistinctGroupbyNode) 
node, stack);
+        current = visitDistinctGroupby(context, plan, block, 
(DistinctGroupbyNode) node, stack);
         break;
       case SELECTION:
         current = visitFilter(context, plan, block, (SelectionNode) node, 
stack);
@@ -159,6 +160,12 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> 
implements LogicalPlanVisi
   }
 
   @Override
+  public RESULT visitEvalExpr(CONTEXT context, LogicalPlan plan, 
LogicalPlan.QueryBlock block, EvalExprNode node,
+                              Stack<LogicalNode> stack) throws 
PlanningException {
+    return null;
+  }
+
+  @Override
   public RESULT visitProjection(CONTEXT context, LogicalPlan plan, 
LogicalPlan.QueryBlock block, ProjectionNode node,
                                 Stack<LogicalNode> stack)
       throws PlanningException {
@@ -213,8 +220,8 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> 
implements LogicalPlanVisi
     return result;
   }
 
-  public RESULT visitDistinct(CONTEXT context, LogicalPlan plan, 
LogicalPlan.QueryBlock block, DistinctGroupbyNode node,
-                             Stack<LogicalNode> stack) throws 
PlanningException {
+  public RESULT visitDistinctGroupby(CONTEXT context, LogicalPlan plan, 
LogicalPlan.QueryBlock block,
+                                     DistinctGroupbyNode node, 
Stack<LogicalNode> stack) throws PlanningException {
     stack.push(node);
     RESULT result = visit(context, plan, block, node.getChild(), stack);
     stack.pop();
@@ -244,10 +251,17 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> 
implements LogicalPlanVisi
   public RESULT visitUnion(CONTEXT context, LogicalPlan plan, 
LogicalPlan.QueryBlock block, UnionNode node,
                            Stack<LogicalNode> stack) throws PlanningException {
     stack.push(node);
-    LogicalPlan.QueryBlock leftBlock = plan.getBlock(node.getLeftChild());
-    RESULT result = visit(context, plan, leftBlock, leftBlock.getRoot(), 
stack);
-    LogicalPlan.QueryBlock rightBlock = plan.getBlock(node.getRightChild());
-    visit(context, plan, rightBlock, rightBlock.getRoot(), stack);
+    RESULT result = null;
+    if (plan != null) {
+      LogicalPlan.QueryBlock leftBlock = plan.getBlock(node.getLeftChild());
+      result = visit(context, plan, leftBlock, leftBlock.getRoot(), stack);
+      LogicalPlan.QueryBlock rightBlock = plan.getBlock(node.getRightChild());
+      visit(context, plan, rightBlock, rightBlock.getRoot(), stack);
+    } else {
+      result = visit(context, plan, null, node.getLeftChild(), stack);
+      visit(context, plan, null, node.getRightChild(), stack);
+    }
+
     stack.pop();
     return result;
   }
@@ -276,8 +290,13 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> 
implements LogicalPlanVisi
   public RESULT visitTableSubQuery(CONTEXT context, LogicalPlan plan, 
LogicalPlan.QueryBlock block,
                                    TableSubQueryNode node, Stack<LogicalNode> 
stack) throws PlanningException {
     stack.push(node);
-    LogicalPlan.QueryBlock childBlock = plan.getBlock(node.getSubQuery());
-    RESULT result = visit(context, plan, childBlock, childBlock.getRoot(), 
stack);
+    RESULT result = null;
+    if (plan != null) {
+      LogicalPlan.QueryBlock childBlock = plan.getBlock(node.getSubQuery());
+      result = visit(context, plan, childBlock, childBlock.getRoot(), stack);
+    } else {
+      result = visit(context, plan, null, node.getSubQuery(), stack);
+    }
     stack.pop();
     return result;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/ExplainLogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/ExplainLogicalPlanVisitor.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/ExplainLogicalPlanVisitor.java
index 7065295..52db8eb 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/ExplainLogicalPlanVisitor.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/ExplainLogicalPlanVisitor.java
@@ -117,8 +117,9 @@ public class ExplainLogicalPlanVisitor extends 
BasicLogicalPlanVisitor<ExplainLo
     return visitUnaryNode(context, plan, block, node, stack);
   }
 
-  public LogicalNode visitDistinct(Context context, LogicalPlan plan, 
LogicalPlan.QueryBlock block, DistinctGroupbyNode node,
-                                  Stack<LogicalNode> stack) throws 
PlanningException {
+  public LogicalNode visitDistinctGroupby(Context context, LogicalPlan plan, 
LogicalPlan.QueryBlock block,
+                                          DistinctGroupbyNode node,
+                                          Stack<LogicalNode> stack) throws 
PlanningException {
     return visitUnaryNode(context, plan, block, node, stack);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/LogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/LogicalPlanVisitor.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/LogicalPlanVisitor.java
index 6a0c338..5be2eec 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/LogicalPlanVisitor.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/LogicalPlanVisitor.java
@@ -32,6 +32,9 @@ public interface LogicalPlanVisitor<CONTEXT, RESULT> {
   RESULT visitSetSession(CONTEXT context, LogicalPlan plan, 
LogicalPlan.QueryBlock block, SetSessionNode node,
                          Stack<LogicalNode> stack) throws PlanningException;
 
+  RESULT visitEvalExpr(CONTEXT context, LogicalPlan plan, 
LogicalPlan.QueryBlock block, EvalExprNode node,
+                       Stack<LogicalNode> stack) throws PlanningException;
+
   RESULT visitProjection(CONTEXT context, LogicalPlan plan, 
LogicalPlan.QueryBlock block, ProjectionNode node,
                          Stack<LogicalNode> stack) throws PlanningException;
 
@@ -48,8 +51,8 @@ public interface LogicalPlanVisitor<CONTEXT, RESULT> {
                       Stack<LogicalNode> stack) throws PlanningException;
   RESULT visitWindowAgg(CONTEXT context, LogicalPlan plan, 
LogicalPlan.QueryBlock block, WindowAggNode node,
                       Stack<LogicalNode> stack) throws PlanningException;
-  RESULT visitDistinct(CONTEXT context, LogicalPlan plan, 
LogicalPlan.QueryBlock block, DistinctGroupbyNode node,
-                                Stack<LogicalNode> stack) throws 
PlanningException;
+  RESULT visitDistinctGroupby(CONTEXT context, LogicalPlan plan, 
LogicalPlan.QueryBlock block, DistinctGroupbyNode node,
+                              Stack<LogicalNode> stack) throws 
PlanningException;
 
   RESULT visitFilter(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock 
block, SelectionNode node,
                      Stack<LogicalNode> stack) throws PlanningException;

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/proto/Plan.proto
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/proto/Plan.proto 
b/tajo-plan/src/main/proto/Plan.proto
index 0f82d87..3e4f07c 100644
--- a/tajo-plan/src/main/proto/Plan.proto
+++ b/tajo-plan/src/main/proto/Plan.proto
@@ -26,58 +26,280 @@ import "CatalogProtos.proto";
 import "DataTypes.proto";
 
 enum NodeType {
-  BST_INDEX_SCAN = 0;
-  EXCEPT = 1;
+  SET_SESSION = 0;
+
+  ROOT = 1;
   EXPRS = 2;
-  DISTINCT_GROUP_BY = 3;
-  GROUP_BY = 4;
-  HAVING = 5;
-  JOIN = 6;
-  INSERT = 7;
-  INTERSECT = 8;
-  LIMIT = 9;
-  PARTITIONS_SCAN = 10;
-  PROJECTION = 11;
-  ROOT = 12;
-  SCAN = 13;
-  SELECTION = 14;
-  SORT = 15;
-  STORE = 16;
-  TABLE_SUBQUERY = 17;
-  UNION = 18;
-  WINDOW_AGG = 19;
-
-  CREATE_DATABASE = 20;
-  DROP_DATABASE = 21;
-  CREATE_TABLE = 22;
-  DROP_TABLE = 23;
-  ALTER_TABLESPACE = 24;
-  ALTER_TABLE = 25;
-  TRUNCATE_TABLE = 26;
-}
-
-message LogicalPlan {
-  required KeyValueSetProto adjacentList = 1;
+  PROJECTION = 3;
+  LIMIT = 4;
+  WINDOW_AGG = 5;
+  SORT = 6;
+  HAVING = 7;
+  GROUP_BY = 8;
+  DISTINCT_GROUP_BY = 9;
+  SELECTION = 10;
+  JOIN = 11;
+  UNION = 12;
+  INTERSECT = 13;
+  EXCEPT = 14;
+  TABLE_SUBQUERY = 15;
+  SCAN = 16;
+  PARTITIONS_SCAN = 17;
+  BST_INDEX_SCAN = 18;
+  STORE = 19;
+  INSERT = 20;
+
+  CREATE_DATABASE = 21;
+  DROP_DATABASE = 22;
+  CREATE_TABLE = 23;
+  DROP_TABLE = 24;
+  ALTER_TABLESPACE = 25;
+  ALTER_TABLE = 26;
+  TRUNCATE_TABLE = 27;
 }
 
-message LogicalNode {
-  required int32 pid = 1;
-  required NodeType type = 2;
-  required SchemaProto in_schema = 3;
-  required SchemaProto out_schema = 4;
-  required NodeSpec spec = 5;
+message LogicalNodeTree {
+  repeated LogicalNode nodes = 1;
 }
 
-message NodeSpec {
-  optional ScanNode scan = 1;
+message LogicalNode {
+  required int32 visitSeq = 1;
+  required int32 nodeId = 2;
+  required NodeType type = 3;
+  optional SchemaProto in_schema = 4;
+  optional SchemaProto out_schema = 5;
+
+  optional ScanNode scan = 6;
+  optional PartitionScanSpec partitionScan = 7;
+  optional JoinNode join = 8;
+  optional FilterNode filter = 9;
+  optional GroupbyNode groupby = 10;
+  optional DistinctGroupbyNode distinctGroupby = 11;
+  optional SortNode sort = 12;
+  optional LimitNode limit = 13;
+  optional WindowAggNode windowAgg = 14;
+  optional ProjectionNode projection = 15;
+  optional EvalExprNode exprEval = 16;
+  optional UnionNode union = 17;
+  optional TableSubQueryNode tableSubQuery = 18;
+  optional PersistentStoreNode persistentStore = 19;
+  optional StoreTableNodeSpec storeTable = 20;
+  optional InsertNodeSpec insert = 21;
+  optional CreateTableNodeSpec createTable = 22;
+  optional RootNode root = 23;
+  optional SetSessionNode setSession = 24;
+
+  optional CreateDatabaseNode createDatabase = 25;
+  optional DropDatabaseNode dropDatabase = 26;
+  optional DropTableNode dropTable = 27;
+
+  optional AlterTablespaceNode alterTablespace = 28;
+  optional AlterTableNode alterTable = 29;
+  optional TruncateTableNode truncateTableNode = 30;
 }
 
 message ScanNode {
   required TableDescProto table = 1;
   optional string alias = 2;
-  required SchemaProto schema = 3;
+  required bool existTargets = 3;
+  repeated Target targets = 4;
+  optional EvalNodeTree qual = 5;
+}
+
+message PartitionScanSpec {
+  repeated string paths = 1;
+}
+
+message FilterNode {
+  required int32 childSeq = 1;
+  required EvalNodeTree qual = 2;
+}
+
+message JoinNode {
+  required int32 leftChildSeq = 1;
+  required int32 rightChilSeq = 2;
+  required JoinType joinType = 3;
+  optional EvalNodeTree joinQual = 4;
+  required bool existsTargets = 5;
+  repeated Target targets = 6;
+}
+
+message GroupbyNode {
+  required int32 childSeq = 1;
+  required bool distinct = 2;
+  repeated ColumnProto groupingKeys = 3;
+  repeated EvalNodeTree aggFunctions = 4;
+  repeated Target targets = 5;
+}
+
+message DistinctGroupbyNode {
+  required int32 childSeq = 1;
+  optional LogicalNode groupbyNode = 2;
+  repeated LogicalNode subPlans = 3;
+  repeated Target targets = 4;
+  repeated ColumnProto groupingKeys = 5;
+  repeated int32 resultId = 6;
+  repeated EvalNodeTree aggFunctions = 7;
+}
+
+message SortNode {
+  required int32 childSeq = 1;
+  repeated SortSpecProto sortSpecs = 2;
+}
+
+message LimitNode {
+  required int32 childSeq = 1;
+  required int64 fetchFirstNum = 2;
+}
+
+message WindowAggNode {
+  required int32 childSeq = 1;
+  repeated ColumnProto partitionKeys = 2;
+  repeated SortSpecProto sortSpecs = 3;
+  repeated EvalNodeTree windowFunctions = 4;
+  required bool  distinct = 5; // if distinct aggregation function is included 
in window function
+  repeated Target targets = 6;
+}
+
+message UnionNode {
+  required int32 leftChildSeq = 1;
+  required int32 rightChildSeq = 2;
+  required bool all = 3;
+}
+
+message TableSubQueryNode {
+  required int32 childSeq = 1;
+  required string tableName = 2;
+  repeated Target targets = 3;
+}
+
+message ProjectionNode {
+  required int32 childSeq = 1;
+  required bool distinct = 2;
+  repeated Target targets = 3;
+}
+
+message EvalExprNode {
+  repeated Target targets = 1;
+}
+
+message RootNode {
+  required int32 childSeq = 1;
+}
+
+message SetSessionNode {
+  required string name = 1;
+  optional string value = 2;
+}
+
+message Target {
+  required EvalNodeTree expr = 1;
+  optional string alias = 2;
 }
 
+enum JoinType {
+  CROSS_JOIN = 0;
+  INNER_JOIN = 1;
+  LEFT_OUTER_JOIN = 2;
+  RIGHT_OUTER_JOIN = 3;
+  FULL_OUTER_JOIN = 4;
+  UNION_JOIN = 5;
+  LEFT_ANTI_JOIN = 6;
+  RIGHT_ANTI_JOIN = 7;
+  LEFT_SEMI_JOIN = 8;
+  RIGHT_SEMI_JOIN = 9;
+}
+
+message PartitionTableScanSpec {
+  repeated string paths = 1;
+}
+
+message PersistentStoreNode {
+  required int32 childSeq = 1;
+  required StoreType storageType = 2;
+  required KeyValueSetProto tableProperties = 3;
+}
+
+message StoreTableNodeSpec { // required PersistentStoreSpec
+  optional string tableName = 1; // 'INSERT INTO LOCATION' does not require 
'table name'.
+  optional PartitionMethodProto partitionMethod = 2;
+}
+
+message InsertNodeSpec { // required PersistentStoreSpec and StoreTableSpec
+  required bool overwrite = 1;
+  required SchemaProto tableSchema = 2;
+  optional SchemaProto targetSchema = 4;
+  optional SchemaProto projectedSchema = 3;
+  optional string path = 5;
+}
+
+message CreateTableNodeSpec { // required PersistentStoreSpec and 
StoreTableNodeSpec
+  required SchemaProto schema = 1;
+  required bool external = 2;
+  required bool ifNotExists = 3;
+  optional string path = 4;
+}
+
+message DropTableNode {
+  required string tableName = 1;
+  required bool ifExists = 2;
+  required bool purge = 3;
+}
+
+message TruncateTableNode {
+  repeated string tableNames = 1;
+}
+
+message CreateDatabaseNode {
+  required string dbName = 1;
+  required bool ifNotExists = 2;
+}
+
+message DropDatabaseNode {
+  required string dbName = 1;
+  required bool ifExists = 2;
+}
+
+message AlterTablespaceNode {
+  enum Type {
+    LOCATION = 0;
+  }
+
+  message SetLocation {
+    required string location = 1;
+  }
+
+  required string tableSpaceName = 1;
+  required Type setType = 2;
+  optional SetLocation setLocation = 3;
+}
+
+message AlterTableNode {
+  enum Type {
+    RENAME_TABLE = 0;
+    RENAME_COLUMN = 1;
+    ADD_COLUMN = 2;
+  }
+
+  message RenameTable {
+    required string newName = 1;
+  }
+
+  message RenameColumn {
+    required string oldName = 1;
+    required string newName = 2;
+  }
+
+  message AddColumn {
+    required ColumnProto addColumn = 1;
+  }
+
+  required string tableName = 1;
+  required Type setType = 2;
+  optional RenameTable renameTable = 3;
+  optional RenameColumn renameColumn = 4;
+  optional AddColumn addColumn = 5;
+}
 
 enum EvalType {
   NOT = 0;
@@ -126,7 +348,7 @@ enum EvalType {
   CONST = 33;
 }
 
-message EvalTree {
+message EvalNodeTree {
   repeated EvalNode nodes = 1;
 }
 
@@ -140,10 +362,13 @@ message EvalNode {
   optional ConstEval const = 6;
   optional ColumnProto field = 7; // field eval
   optional FunctionEval function = 8;
-  optional RowConstEval rowConst = 9;
-  optional BetweenEval between = 10;
-  optional CaseWhenEval casewhen = 11;
-  optional IfCondEval ifCond = 12;
+  optional AggFunctionEvalSpec aggFunction = 9;
+  optional WinFunctionEvalSpec winFunction = 10;
+  optional RowConstEval rowConst = 11;
+  optional BetweenEval between = 12;
+  optional CaseWhenEval casewhen = 13;
+  optional IfCondEval ifCond = 14;
+  optional PatternMatchEvalSpec patternMatch = 15;
 }
 
 message UnaryEval {
@@ -159,6 +384,10 @@ message BinaryEval {
   optional bool negative = 3 [default = false];
 }
 
+message PatternMatchEvalSpec { // requires BinaryEval
+  optional bool caseSensitive = 1;
+}
+
 message BetweenEval {
   required int32 predicand = 1;
   required int32 begin = 2;
@@ -190,6 +419,50 @@ message FunctionEval {
   repeated int32 paramIds = 2;
 }
 
+message AggFunctionEvalSpec { // requires FunctionEval
+  required bool intermediatePhase = 1;
+  required bool finalPhase = 2;
+  optional string alias = 3;
+}
+
+message WinFunctionEvalSpec {
+  message WindowFrame {
+    required WindowStartBound startBound = 1;
+    required WindowEndBound endBound = 2;
+    optional WindowFrameUnit unit = 3;
+  }
+
+  enum WindowFrameStartBoundType {
+    S_UNBOUNDED_PRECEDING = 0;
+    S_CURRENT_ROW = 1;
+    S_PRECEDING = 2;
+  }
+
+  enum WindowFrameEndBoundType {
+    E_UNBOUNDED_FOLLOWING = 0;
+    E_CURRENT_ROW = 1;
+    E_FOLLOWING = 2;
+  }
+
+  enum WindowFrameUnit {
+    ROW = 0;
+    RANGE = 1;
+  }
+
+  message WindowStartBound {
+    required WindowFrameStartBoundType boundType = 1;
+    optional EvalNodeTree number = 2;
+  }
+
+  message WindowEndBound {
+    required WindowFrameEndBoundType boundType = 1;
+    optional EvalNodeTree number = 2;
+  }
+
+  repeated SortSpecProto sortSpec = 1;
+  required WindowFrame windowFrame = 2;
+}
+
 message Datum {
   required Type type = 1;
   optional bool boolean = 2;

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
index 609ca20..c9f493d 100644
--- 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
@@ -38,7 +38,7 @@ import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.logical.LogicalNode;
 import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.plan.rewrite.RewriteRule;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.util.TUtil;
@@ -601,7 +601,7 @@ public abstract class StorageManager {
    * @return The list of storage specified rewrite rules
    * @throws java.io.IOException
    */
-  public List<RewriteRule> getRewriteRules(OverridableConf queryContext, 
TableDesc tableDesc) throws IOException {
+  public List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf 
queryContext, TableDesc tableDesc) throws IOException {
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
 
b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
index 79161cc..e95aeec 100644
--- 
a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
+++ 
b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.storage.hbase;
 
+import org.apache.tajo.OverridableConf;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
@@ -30,12 +31,13 @@ import org.apache.tajo.plan.logical.LogicalRootNode;
 import org.apache.tajo.plan.logical.SortNode;
 import org.apache.tajo.plan.logical.SortNode.SortPurpose;
 import org.apache.tajo.plan.logical.UnaryNode;
-import org.apache.tajo.plan.rewrite.RewriteRule;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
 import org.apache.tajo.plan.util.PlannerUtil;
 
-public class AddSortForInsertRewriter implements RewriteRule {
+public class AddSortForInsertRewriter implements LogicalPlanRewriteRule {
   private int[] sortColumnIndexes;
   private Column[] sortColumns;
+
   public AddSortForInsertRewriter(TableDesc tableDesc, Column[] sortColumns) {
     this.sortColumns = sortColumns;
     this.sortColumnIndexes = new int[sortColumns.length];
@@ -52,13 +54,13 @@ public class AddSortForInsertRewriter implements 
RewriteRule {
   }
 
   @Override
-  public boolean isEligible(LogicalPlan plan) {
+  public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) {
     StoreType storeType = PlannerUtil.getStoreType(plan);
     return storeType != null;
   }
 
   @Override
-  public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
+  public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) 
throws PlanningException {
     LogicalRootNode rootNode = plan.getRootBlock().getRoot();
     UnaryNode insertNode = rootNode.getChild();
     LogicalNode childNode = insertNode.getChild();

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
 
b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
index de4b4cb..c606e88 100644
--- 
a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
+++ 
b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
@@ -49,7 +49,7 @@ import org.apache.tajo.plan.logical.CreateTableNode;
 import org.apache.tajo.plan.logical.LogicalNode;
 import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.plan.rewrite.RewriteRule;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.util.Bytes;
@@ -1050,9 +1050,9 @@ public class HBaseStorageManager extends StorageManager {
     }
   }
 
-  public List<RewriteRule> getRewriteRules(OverridableConf queryContext, 
TableDesc tableDesc) throws IOException {
+  public List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf 
queryContext, TableDesc tableDesc) throws IOException {
     if 
("false".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE,
 "false"))) {
-      List<RewriteRule> rules = new ArrayList<RewriteRule>();
+      List<LogicalPlanRewriteRule> rules = new 
ArrayList<LogicalPlanRewriteRule>();
       rules.add(new AddSortForInsertRewriter(tableDesc, 
getIndexColumns(tableDesc)));
       return rules;
     } else {

Reply via email to