Repository: tajo
Updated Branches:
  refs/heads/index_support 1fad72ebe -> 071c5d05d


http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
index d7cd82e..1ce7019 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
@@ -23,14 +23,15 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.OverridableConf;
 import org.apache.tajo.algebra.JoinType;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.datum.Datum;
 import org.apache.tajo.plan.*;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext;
 import 
org.apache.tajo.plan.rewrite.rules.FilterPushDownRule.FilterPushDownContext;
+import org.apache.tajo.plan.rewrite.rules.IndexScanInfo.SimplePredicate;
+import org.apache.tajo.plan.util.IndexUtil;
 import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
@@ -47,6 +48,8 @@ public class FilterPushDownRule extends 
BasicLogicalPlanVisitor<FilterPushDownCo
   private final static Log LOG = LogFactory.getLog(FilterPushDownRule.class);
   private static final String NAME = "FilterPushDown";
 
+  private CatalogService catalog;
+
   static class FilterPushDownContext {
     Set<EvalNode> pushingDownFilters = new HashSet<EvalNode>();
 
@@ -80,8 +83,8 @@ public class FilterPushDownRule extends 
BasicLogicalPlanVisitor<FilterPushDownCo
   }
 
   @Override
-  public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) {
-    for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
+  public boolean isEligible(LogicalPlanRewriteRuleContext context) {
+    for (LogicalPlan.QueryBlock block : context.getPlan().getQueryBlocks()) {
       if (block.hasNode(NodeType.SELECTION) || block.hasNode(NodeType.JOIN)) {
         return true;
       }
@@ -90,7 +93,7 @@ public class FilterPushDownRule extends 
BasicLogicalPlanVisitor<FilterPushDownCo
   }
 
   @Override
-  public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) 
throws PlanningException {
+  public LogicalPlan rewrite(LogicalPlanRewriteRuleContext rewriteRuleContext) 
throws PlanningException {
     /*
     FilterPushDown rule: processing when visits each node
       - If a target which is corresponding on a filter EvalNode's column is 
not FieldEval, do not PushDown.
@@ -102,6 +105,8 @@ public class FilterPushDownRule extends 
BasicLogicalPlanVisitor<FilterPushDownCo
         . It not, create new HavingNode and set parent's child.
      */
     FilterPushDownContext context = new FilterPushDownContext();
+    LogicalPlan plan = rewriteRuleContext.getPlan();
+    catalog = rewriteRuleContext.getCatalog();
     for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
       context.clear();
       this.visit(context, plan, block, block.getRoot(), new 
Stack<LogicalNode>());
@@ -834,7 +839,7 @@ public class FilterPushDownRule extends 
BasicLogicalPlanVisitor<FilterPushDownCo
 
   @Override
   public LogicalNode visitScan(FilterPushDownContext context, LogicalPlan plan,
-                               LogicalPlan.QueryBlock block, ScanNode scanNode,
+                               LogicalPlan.QueryBlock block, final ScanNode 
scanNode,
                                Stack<LogicalNode> stack) throws 
PlanningException {
     List<EvalNode> matched = Lists.newArrayList();
 
@@ -904,8 +909,42 @@ public class FilterPushDownRule extends 
BasicLogicalPlanVisitor<FilterPushDownCo
       qual = matched.iterator().next();
     }
 
+    block.addAccessPath(scanNode, new SeqScanInfo(table));
     if (qual != null) { // if a matched qual exists
       scanNode.setQual(qual);
+
+      // Index path can be identified only after filters are pushed into each 
scan.
+      String databaseName, tableName;
+      databaseName = CatalogUtil.extractQualifier(table.getName());
+      tableName = CatalogUtil.extractSimpleName(table.getName());
+      Set<Predicate> predicates = TUtil.newHashSet();
+      for (EvalNode eval : IndexUtil.getAllEqualEvals(qual)) {
+        BinaryEval binaryEval = (BinaryEval) eval;
+        // TODO: consider more complex predicates
+        if (binaryEval.getLeftExpr().getType() == EvalType.FIELD &&
+            binaryEval.getRightExpr().getType() == EvalType.CONST) {
+          predicates.add(new Predicate(binaryEval.getType(),
+              ((FieldEval) binaryEval.getLeftExpr()).getColumnRef(),
+              ((ConstEval)binaryEval.getRightExpr()).getValue()));
+        } else if (binaryEval.getLeftExpr().getType() == EvalType.CONST &&
+            binaryEval.getRightExpr().getType() == EvalType.FIELD) {
+          predicates.add(new Predicate(binaryEval.getType(),
+              ((FieldEval) binaryEval.getRightExpr()).getColumnRef(),
+              ((ConstEval)binaryEval.getLeftExpr()).getValue()));
+        }
+      }
+
+      // for every subset of the set of columns, find all matched index paths
+      for (Set<Predicate> subset : Sets.powerSet(predicates)) {
+        if (subset.size() == 0)
+          continue;
+        Column[] columns = extractColumns(subset);
+        if (catalog.existIndexByColumns(databaseName, tableName, columns)) {
+          IndexDesc indexDesc = catalog.getIndexByColumns(databaseName, 
tableName, columns);
+          block.addAccessPath(scanNode, new IndexScanInfo(
+              table.getStats(), indexDesc, getSimplePredicates(indexDesc, 
subset)));
+        }
+      }
     }
 
     for (EvalNode matchedEval: matched) {
@@ -918,6 +957,49 @@ public class FilterPushDownRule extends 
BasicLogicalPlanVisitor<FilterPushDownCo
     return scanNode;
   }
 
+  private static class Predicate {
+    Column column;
+    Datum value;
+    EvalType evalType;
+
+    public Predicate(EvalType evalType, Column column, Datum value) {
+      this.evalType = evalType;
+      this.column = column;
+      this.value = value;
+    }
+  }
+
+  private static SimplePredicate[] getSimplePredicates(IndexDesc desc, 
Set<Predicate> predicates) {
+    SimplePredicate[] simplePredicates = new 
SimplePredicate[predicates.size()];
+    Map<Column, Datum> colToValue = TUtil.newHashMap();
+    for (Predicate predicate : predicates) {
+      colToValue.put(predicate.column, predicate.value);
+    }
+    SortSpec [] keySortSpecs = desc.getKeySortSpecs();
+    for (int i = 0; i < keySortSpecs.length; i++) {
+      simplePredicates[i] = new SimplePredicate(keySortSpecs[i],
+          colToValue.get(keySortSpecs[i].getSortKey()));
+    }
+    return simplePredicates;
+  }
+
+  private static Datum[] extractPredicateValues(List<Predicate> predicates) {
+    Datum[] values = new Datum[predicates.size()];
+    for (int i = 0; i < values.length; i++) {
+      values[i] = predicates.get(i).value;
+    }
+    return values;
+  }
+
+  private static Column[] extractColumns(Set<Predicate> predicates) {
+    Column[] columns = new Column[predicates.size()];
+    int i = 0;
+    for (Predicate p : predicates) {
+      columns[i++] = p.column;
+    }
+    return columns;
+  }
+
   private void errorFilterPushDown(LogicalPlan plan, LogicalNode node,
                                    FilterPushDownContext context) throws 
PlanningException {
     String notMatchedNodeStr = "";

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/IndexScanInfo.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/IndexScanInfo.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/IndexScanInfo.java
new file mode 100644
index 0000000..9ac8ccf
--- /dev/null
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/IndexScanInfo.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.rewrite.rules;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.IndexDesc;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.plan.serder.EvalNodeDeserializer;
+import org.apache.tajo.plan.serder.EvalNodeSerializer;
+import org.apache.tajo.plan.serder.PlanProto.SimplePredicateProto;
+
+import java.net.URI;
+
+public class IndexScanInfo extends AccessPathInfo {
+
+  /**
+   * Simple predicate represents an equal eval expression which consists of
+   * a column and a value.
+   */
+  // TODO: extend to represent more complex expressions
+  public static class SimplePredicate implements 
ProtoObject<SimplePredicateProto> {
+    @Expose private SortSpec keySortSpec;
+    @Expose private Datum value;
+
+    public SimplePredicate(SortSpec keySortSpec, Datum value) {
+      this.keySortSpec = keySortSpec;
+      this.value = value;
+    }
+
+    public SimplePredicate(SimplePredicateProto proto) {
+      keySortSpec = new SortSpec(proto.getKeySortSpec());
+      value = EvalNodeDeserializer.deserialize(proto.getValue());
+    }
+
+    public SortSpec getKeySortSpec() {
+      return keySortSpec;
+    }
+
+    public Datum getValue() {
+      return value;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o instanceof SimplePredicate) {
+        SimplePredicate other = (SimplePredicate) o;
+        return this.keySortSpec.equals(other.keySortSpec) && 
this.value.equals(other.value);
+      } else {
+        return false;
+      }
+    }
+
+    @Override
+    public Object clone() throws CloneNotSupportedException {
+      SimplePredicate clone = new SimplePredicate(this.keySortSpec, 
this.value);
+      return clone;
+    }
+
+    @Override
+    public SimplePredicateProto getProto() {
+      SimplePredicateProto.Builder builder = SimplePredicateProto.newBuilder();
+      builder.setKeySortSpec(keySortSpec.getProto());
+      builder.setValue(EvalNodeSerializer.serialize(value));
+      return builder.build();
+    }
+  }
+
+  private final URI indexPath;
+  private final Schema keySchema;
+  private final SimplePredicate[] predicates;
+
+  public IndexScanInfo(TableStats tableStats, IndexDesc indexDesc, 
SimplePredicate[] predicates) {
+    super(ScanTypeControl.INDEX_SCAN, tableStats);
+    this.indexPath = indexDesc.getIndexPath();
+    keySchema = new Schema();
+    this.predicates = predicates;
+    for (SimplePredicate predicate : predicates) {
+      keySchema.addColumn(predicate.getKeySortSpec().getSortKey());
+    }
+  }
+
+  public URI getIndexPath() {
+    return indexPath;
+  }
+
+  public Schema getKeySchema() {
+    return keySchema;
+  }
+
+  public SimplePredicate[] getPredicates() {
+    return predicates;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java
index 8a24add..c8a81ec 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java
@@ -23,6 +23,7 @@ import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.logical.LogicalNode;
 import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext;
 import org.apache.tajo.plan.serder.LogicalNodeDeserializer;
 import org.apache.tajo.plan.serder.LogicalNodeSerializer;
 import org.apache.tajo.plan.serder.PlanProto;
@@ -40,15 +41,16 @@ public class LogicalPlanEqualityTester implements 
LogicalPlanRewriteRule {
   }
 
   @Override
-  public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) {
+  public boolean isEligible(LogicalPlanRewriteRuleContext context) {
     return true;
   }
 
   @Override
-  public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) 
throws PlanningException {
+  public LogicalPlan rewrite(LogicalPlanRewriteRuleContext context) throws 
PlanningException {
+    LogicalPlan plan = context.getPlan();
     LogicalNode root = plan.getRootBlock().getRoot();
     PlanProto.LogicalNodeTree serialized = 
LogicalNodeSerializer.serialize(plan.getRootBlock().getRoot());
-    LogicalNode deserialized = 
LogicalNodeDeserializer.deserialize(queryContext, serialized);
+    LogicalNode deserialized = 
LogicalNodeDeserializer.deserialize(context.getQueryContext(), serialized);
     assert root.deepEquals(deserialized);
     return plan;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
index 7604c53..962ec1f 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
@@ -32,6 +32,7 @@ import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.expr.*;
@@ -58,8 +59,8 @@ public class PartitionedTableRewriter implements 
LogicalPlanRewriteRule {
   }
 
   @Override
-  public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) {
-    for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
+  public boolean isEligible(LogicalPlanRewriteRuleContext context) {
+    for (LogicalPlan.QueryBlock block : context.getPlan().getQueryBlocks()) {
       for (RelationNode relation : block.getRelations()) {
         if (relation.getType() == NodeType.SCAN) {
           TableDesc table = ((ScanNode)relation).getTableDesc();
@@ -73,9 +74,10 @@ public class PartitionedTableRewriter implements 
LogicalPlanRewriteRule {
   }
 
   @Override
-  public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) 
throws PlanningException {
+  public LogicalPlan rewrite(LogicalPlanRewriteRuleContext context) throws 
PlanningException {
+    LogicalPlan plan = context.getPlan();
     LogicalPlan.QueryBlock rootBlock = plan.getRootBlock();
-    rewriter.visit(queryContext, plan, rootBlock, rootBlock.getRoot(), new 
Stack<LogicalNode>());
+    rewriter.visit(context.getQueryContext(), plan, rootBlock, 
rootBlock.getRoot(), new Stack<LogicalNode>());
     return plan;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java
index 64aecf7..ed46f62 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java
@@ -31,6 +31,7 @@ import org.apache.tajo.plan.LogicalPlan.QueryBlock;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
@@ -56,13 +57,13 @@ public class ProjectionPushDownRule extends
   }
 
   @Override
-  public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) {
-    LogicalNode toBeOptimized = plan.getRootBlock().getRoot();
+  public boolean isEligible(LogicalPlanRewriteRuleContext context) {
+    LogicalNode toBeOptimized = context.getPlan().getRootBlock().getRoot();
 
     if (PlannerUtil.checkIfDDLPlan(toBeOptimized)) {
       return false;
     }
-    for (QueryBlock eachBlock: plan.getQueryBlocks()) {
+    for (QueryBlock eachBlock: context.getPlan().getQueryBlocks()) {
       if (eachBlock.hasTableExpression()) {
         return true;
       }
@@ -71,7 +72,8 @@ public class ProjectionPushDownRule extends
   }
 
   @Override
-  public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) 
throws PlanningException {
+  public LogicalPlan rewrite(LogicalPlanRewriteRuleContext rewriteRuleContext) 
throws PlanningException {
+    LogicalPlan plan = rewriteRuleContext.getPlan();
     LogicalPlan.QueryBlock rootBlock = plan.getRootBlock();
 
     LogicalPlan.QueryBlock topmostBlock = rootBlock;
@@ -1108,6 +1110,12 @@ public class ProjectionPushDownRule extends
   }
 
   @Override
+  public LogicalNode visitIndexScan(Context context, LogicalPlan plan, 
LogicalPlan.QueryBlock block,
+                                    IndexScanNode node, Stack<LogicalNode> 
stack) throws PlanningException {
+    return visitScan(context, plan, block,node, stack);
+  }
+
+  @Override
   public LogicalNode visitTableSubQuery(Context upperContext, LogicalPlan 
plan, LogicalPlan.QueryBlock block,
                                    TableSubQueryNode node, Stack<LogicalNode> 
stack) throws PlanningException {
     Context childContext = new Context(plan, upperContext.requiredSet);

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/SeqScanInfo.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/SeqScanInfo.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/SeqScanInfo.java
new file mode 100644
index 0000000..0c054bd
--- /dev/null
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/SeqScanInfo.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.rewrite.rules;
+
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.statistics.TableStats;
+
+public class SeqScanInfo extends AccessPathInfo {
+  private TableDesc tableDesc;
+
+  public SeqScanInfo(TableStats tableStats) {
+    super(ScanTypeControl.SEQ_SCAN, tableStats);
+  }
+
+  public SeqScanInfo(TableDesc tableDesc) {
+    this(tableDesc.getStats());
+    this.setTableDesc(tableDesc);
+  }
+
+  public TableDesc getTableDesc() {
+    return tableDesc;
+  }
+
+  public void setTableDesc(TableDesc tableDesc) {
+    this.tableDesc = tableDesc;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
index 5cbed7e..9cdf18b 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
@@ -36,9 +36,12 @@ import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.plan.expr.FieldEval;
 import org.apache.tajo.plan.expr.WindowFunctionEval;
 import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.rewrite.rules.IndexScanInfo.SimplePredicate;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.TUtil;
 
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.*;
 
 /**
@@ -129,6 +132,9 @@ public class LogicalNodeDeserializer {
       case SCAN:
         current = convertScan(context, protoNode);
         break;
+      case INDEX_SCAN:
+        current = convertIndexScan(context, protoNode);
+        break;
 
       case CREATE_TABLE:
         current = convertCreateTable(nodeMap, protoNode);
@@ -157,6 +163,13 @@ public class LogicalNodeDeserializer {
         current = convertTruncateTable(protoNode);
         break;
 
+      case CREATE_INDEX:
+        current = convertCreateIndex(nodeMap, protoNode);
+        break;
+      case DROP_INDEX:
+        current = convertDropIndex(protoNode);
+        break;
+
       default:
         throw new RuntimeException("Unknown NodeType: " + 
protoNode.getType().name());
       }
@@ -420,7 +433,24 @@ public class LogicalNodeDeserializer {
     scan.setOutSchema(convertSchema(protoNode.getOutSchema()));
   }
 
-  private static PartitionedTableScanNode convertPartitionScan(OverridableConf 
context, PlanProto.LogicalNode protoNode) {
+  private static IndexScanNode convertIndexScan(OverridableConf context, 
PlanProto.LogicalNode protoNode) {
+    IndexScanNode indexScan = new IndexScanNode(protoNode.getNodeId());
+    fillScanNode(context, protoNode, indexScan);
+
+    PlanProto.IndexScanSpec indexScanSpec = protoNode.getIndexScan();
+    SimplePredicate[] predicates = new 
SimplePredicate[indexScanSpec.getPredicatesCount()];
+    for (int i = 0; i < predicates.length; i++) {
+      predicates[i] = new SimplePredicate(indexScanSpec.getPredicates(i));
+    }
+
+    indexScan.set(new Schema(indexScanSpec.getKeySchema()), predicates,
+        TUtil.stringToURI(indexScanSpec.getIndexPath()));
+
+    return indexScan;
+  }
+
+  private static PartitionedTableScanNode convertPartitionScan(OverridableConf 
context,
+                                                               
PlanProto.LogicalNode protoNode) {
     PartitionedTableScanNode partitionedScan = new 
PartitionedTableScanNode(protoNode.getNodeId());
     fillScanNode(context, protoNode, partitionedScan);
 
@@ -596,6 +626,45 @@ public class LogicalNodeDeserializer {
     return truncateTable;
   }
 
+  private static CreateIndexNode convertCreateIndex(Map<Integer, LogicalNode> 
nodeMap,
+                                                    PlanProto.LogicalNode 
protoNode) {
+    CreateIndexNode createIndex = new CreateIndexNode(protoNode.getNodeId());
+
+    PlanProto.CreateIndexNode createIndexProto = protoNode.getCreateIndex();
+    createIndex.setIndexName(createIndexProto.getIndexName());
+    createIndex.setIndexMethod(createIndexProto.getIndexMethod());
+    try {
+      createIndex.setIndexPath(new URI(createIndexProto.getIndexPath()));
+    } catch (URISyntaxException e) {
+      e.printStackTrace();
+    }
+    SortSpec[] keySortSpecs = new 
SortSpec[createIndexProto.getKeySortSpecsCount()];
+    for (int i = 0; i < keySortSpecs.length; i++) {
+      keySortSpecs[i] = new SortSpec(createIndexProto.getKeySortSpecs(i));
+    }
+    createIndex.setKeySortSpecs(new 
Schema(createIndexProto.getTargetRelationSchema()),
+        keySortSpecs);
+    createIndex.setUnique(createIndexProto.getIsUnique());
+    createIndex.setClustered(createIndexProto.getIsClustered());
+    if (createIndexProto.hasIndexProperties()) {
+      createIndex.setOptions(new 
KeyValueSet(createIndexProto.getIndexProperties()));
+    }
+    createIndex.setChild(nodeMap.get(createIndexProto.getChildSeq()));
+    createIndex.setInSchema(convertSchema(protoNode.getInSchema()));
+    createIndex.setOutSchema(convertSchema(protoNode.getOutSchema()));
+
+    return createIndex;
+  }
+
+  private static DropIndexNode convertDropIndex(PlanProto.LogicalNode 
protoNode) {
+    DropIndexNode dropIndex = new DropIndexNode(protoNode.getNodeId());
+
+    PlanProto.DropIndexNode dropIndexProto = protoNode.getDropIndex();
+    dropIndex.setIndexName(dropIndexProto.getIndexName());
+
+    return dropIndex;
+  }
+
   private static AggregationFunctionCallEval [] 
convertAggFuncCallEvals(OverridableConf context,
                                                                        
List<PlanProto.EvalNodeTree> evalTrees) {
     AggregationFunctionCallEval [] aggFuncs = new 
AggregationFunctionCallEval[evalTrees.size()];

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
index 39a13ba..5ad79c0 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
@@ -21,12 +21,15 @@ package org.apache.tajo.plan.serder;
 import com.google.common.collect.Maps;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto;
 import org.apache.tajo.exception.UnimplementedException;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.Target;
 import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.rewrite.rules.IndexScanInfo.SimplePredicate;
 import org.apache.tajo.plan.serder.PlanProto.AlterTableNode.AddColumn;
 import org.apache.tajo.plan.serder.PlanProto.AlterTableNode.RenameColumn;
 import org.apache.tajo.plan.serder.PlanProto.AlterTableNode.RenameTable;
@@ -104,6 +107,7 @@ public class LogicalNodeSerializer extends 
BasicLogicalPlanVisitor<LogicalNodeSe
     private LogicalNodeTree.Builder treeBuilder = LogicalNodeTree.newBuilder();
   }
 
+  @Override
   public LogicalNode visitRoot(SerializeContext context, LogicalPlan plan, 
LogicalPlan.QueryBlock block,
                                LogicalRootNode root, Stack<LogicalNode> stack) 
throws PlanningException {
     super.visitRoot(context, plan, block, root, stack);
@@ -138,6 +142,7 @@ public class LogicalNodeSerializer extends 
BasicLogicalPlanVisitor<LogicalNodeSe
     return node;
   }
 
+  @Override
   public LogicalNode visitEvalExpr(SerializeContext context, LogicalPlan plan, 
LogicalPlan.QueryBlock block,
                                    EvalExprNode exprEval, Stack<LogicalNode> 
stack) throws PlanningException {
     PlanProto.EvalExprNode.Builder exprEvalBuilder = 
PlanProto.EvalExprNode.newBuilder();
@@ -151,6 +156,7 @@ public class LogicalNodeSerializer extends 
BasicLogicalPlanVisitor<LogicalNodeSe
     return exprEval;
   }
 
+  @Override
   public LogicalNode visitProjection(SerializeContext context, LogicalPlan 
plan, LogicalPlan.QueryBlock block,
                                      ProjectionNode projection, 
Stack<LogicalNode> stack) throws PlanningException {
     super.visitProjection(context, plan, block, projection, stack);
@@ -188,6 +194,7 @@ public class LogicalNodeSerializer extends 
BasicLogicalPlanVisitor<LogicalNodeSe
     return limit;
   }
 
+  @Override
   public LogicalNode visitWindowAgg(SerializeContext context, LogicalPlan 
plan, LogicalPlan.QueryBlock block,
                                     WindowAggNode windowAgg, 
Stack<LogicalNode> stack) throws PlanningException {
     super.visitWindowAgg(context, plan, block, windowAgg, stack);
@@ -262,6 +269,7 @@ public class LogicalNodeSerializer extends 
BasicLogicalPlanVisitor<LogicalNodeSe
     return having;
   }
 
+  @Override
   public LogicalNode visitGroupBy(SerializeContext context, LogicalPlan plan, 
LogicalPlan.QueryBlock block,
                                   GroupbyNode node, Stack<LogicalNode> stack) 
throws PlanningException {
     super.visitGroupBy(context, plan, block, node, new Stack<LogicalNode>());
@@ -297,6 +305,7 @@ public class LogicalNodeSerializer extends 
BasicLogicalPlanVisitor<LogicalNodeSe
     return nodeBuilder;
   }
 
+  @Override
   public LogicalNode visitDistinctGroupby(SerializeContext context, 
LogicalPlan plan, LogicalPlan.QueryBlock block,
                                           DistinctGroupbyNode node, 
Stack<LogicalNode> stack) throws PlanningException {
     super.visitDistinctGroupby(context, plan, block, node, new 
Stack<LogicalNode>());
@@ -353,6 +362,7 @@ public class LogicalNodeSerializer extends 
BasicLogicalPlanVisitor<LogicalNodeSe
     return filter;
   }
 
+  @Override
   public LogicalNode visitJoin(SerializeContext context, LogicalPlan plan, 
LogicalPlan.QueryBlock block, JoinNode join,
                           Stack<LogicalNode> stack) throws PlanningException {
     super.visitJoin(context, plan, block, join, stack);
@@ -435,6 +445,26 @@ public class LogicalNodeSerializer extends 
BasicLogicalPlanVisitor<LogicalNodeSe
   }
 
   @Override
+  public LogicalNode visitIndexScan(SerializeContext context, LogicalPlan 
plan, LogicalPlan.QueryBlock block,
+                                    IndexScanNode node, Stack<LogicalNode> 
stack) throws PlanningException {
+
+    PlanProto.ScanNode.Builder scanBuilder = buildScanNode(node);
+
+    PlanProto.IndexScanSpec.Builder indexScanSpecBuilder = 
PlanProto.IndexScanSpec.newBuilder();
+    indexScanSpecBuilder.setKeySchema(node.getKeySchema().getProto());
+    indexScanSpecBuilder.setIndexPath(node.getIndexPath().toString());
+    for (SimplePredicate predicate : node.getPredicates()) {
+      indexScanSpecBuilder.addPredicates(predicate.getProto());
+    }
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, 
node);
+    nodeBuilder.setScan(scanBuilder);
+    nodeBuilder.setIndexScan(indexScanSpecBuilder);
+    context.treeBuilder.addNodes(nodeBuilder);
+    return node;
+  }
+
+  @Override
   public LogicalNode visitPartitionedTableScan(SerializeContext context, 
LogicalPlan plan, LogicalPlan.QueryBlock block,
                                           PartitionedTableScanNode node, 
Stack<LogicalNode> stack)
       throws PlanningException {
@@ -458,6 +488,7 @@ public class LogicalNodeSerializer extends 
BasicLogicalPlanVisitor<LogicalNodeSe
     return node;
   }
 
+  @Override
   public LogicalNode visitTableSubQuery(SerializeContext context, LogicalPlan 
plan, LogicalPlan.QueryBlock block,
                                    TableSubQueryNode node, Stack<LogicalNode> 
stack) throws PlanningException {
     super.visitTableSubQuery(context, plan, block, node, stack);
@@ -480,6 +511,7 @@ public class LogicalNodeSerializer extends 
BasicLogicalPlanVisitor<LogicalNodeSe
     return node;
   }
 
+  @Override
   public LogicalNode visitCreateTable(SerializeContext context, LogicalPlan 
plan, LogicalPlan.QueryBlock block,
                                       CreateTableNode node, Stack<LogicalNode> 
stack) throws PlanningException {
     super.visitCreateTable(context, plan, block, node, stack);
@@ -589,6 +621,7 @@ public class LogicalNodeSerializer extends 
BasicLogicalPlanVisitor<LogicalNodeSe
     return node;
   }
 
+  @Override
   public LogicalNode visitInsert(SerializeContext context, LogicalPlan plan, 
LogicalPlan.QueryBlock block,
                                  InsertNode node, Stack<LogicalNode> stack) 
throws PlanningException {
     super.visitInsert(context, plan, block, node, stack);
@@ -670,6 +703,47 @@ public class LogicalNodeSerializer extends 
BasicLogicalPlanVisitor<LogicalNodeSe
     return node;
   }
 
+  @Override
+  public LogicalNode visitCreateIndex(SerializeContext context, LogicalPlan 
plan, LogicalPlan.QueryBlock block,
+                                      CreateIndexNode node, Stack<LogicalNode> 
stack) throws PlanningException {
+    super.visitCreateIndex(context, plan, block, node, new 
Stack<LogicalNode>());
+
+    PlanProto.CreateIndexNode.Builder createIndexBuilder = 
PlanProto.CreateIndexNode.newBuilder();
+    int [] childIds = registerGetChildIds(context, node);
+    createIndexBuilder.setChildSeq(childIds[0]);
+    createIndexBuilder.setIndexName(node.getIndexName());
+    createIndexBuilder.setIndexMethod(node.getIndexMethod());
+    createIndexBuilder.setIndexPath(node.getIndexPath().toString());
+    for (SortSpec sortSpec : node.getKeySortSpecs()) {
+      createIndexBuilder.addKeySortSpecs(sortSpec.getProto());
+    }
+    
createIndexBuilder.setTargetRelationSchema(node.getTargetRelationSchema().getProto());
+    createIndexBuilder.setIsUnique(node.isUnique());
+    createIndexBuilder.setIsClustered(node.isClustered());
+    if (node.hasOptions()) {
+      createIndexBuilder.setIndexProperties(node.getOptions().getProto());
+    }
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, 
node);
+    nodeBuilder.setCreateIndex(createIndexBuilder);
+    context.treeBuilder.addNodes(nodeBuilder);
+
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitDropIndex(SerializeContext context, LogicalPlan 
plan, LogicalPlan.QueryBlock block,
+                                    DropIndexNode node, Stack<LogicalNode> 
stack) {
+    PlanProto.DropIndexNode.Builder dropIndexBuilder = 
PlanProto.DropIndexNode.newBuilder();
+    dropIndexBuilder.setIndexName(node.getIndexName());
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, 
node);
+    nodeBuilder.setDropIndex(dropIndexBuilder);
+    context.treeBuilder.addNodes(nodeBuilder);
+
+    return node;
+  }
+
   public static PlanProto.NodeType convertType(NodeType type) {
     return PlanProto.NodeType.valueOf(type.name());
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-plan/src/main/java/org/apache/tajo/plan/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/IndexUtil.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/util/IndexUtil.java
new file mode 100644
index 0000000..73b33d5
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/IndexUtil.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.util;
+
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.plan.expr.*;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Stack;
+
+public class IndexUtil {
+  
+  public static String getIndexName(String indexName , SortSpec[] keys) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(indexName + "_");
+    for(int i = 0 ; i < keys.length ; i ++) {
+      builder.append(keys[i].getSortKey().getSimpleName() + "_");
+    }
+    return builder.toString();
+  }
+
+  public static List<EvalNode> getAllEqualEvals(EvalNode qual) {
+    EvalTreeUtil.EvalFinder finder = new 
EvalTreeUtil.EvalFinder(EvalType.EQUAL);
+    finder.visitChild(null, qual, new Stack<EvalNode>());
+    return finder.getEvalNodes();
+  }
+  
+  private static class FieldAndValueFinder implements EvalNodeVisitor {
+    private LinkedList<BinaryEval> nodeList = new LinkedList<BinaryEval>();
+    
+    public LinkedList<BinaryEval> getNodeList () {
+      return this.nodeList;
+    }
+    
+    @Override
+    public void visit(EvalNode node) {
+      BinaryEval binaryEval = (BinaryEval) node;
+      switch(node.getType()) {
+      case AND:
+        break;
+      case EQUAL:
+        if( binaryEval.getLeftExpr().getType() == EvalType.FIELD
+          && binaryEval.getRightExpr().getType() == EvalType.CONST ) {
+          nodeList.add(binaryEval);
+        }
+        break;
+      case IS_NULL:
+        if( binaryEval.getLeftExpr().getType() == EvalType.FIELD
+          && binaryEval.getRightExpr().getType() == EvalType.CONST) {
+          nodeList.add(binaryEval);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
index 0fbd359..ebd47de 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
@@ -71,10 +71,31 @@ public class PlannerUtil {
         type == NodeType.CREATE_DATABASE ||
             type == NodeType.DROP_DATABASE ||
             (type == NodeType.CREATE_TABLE && !((CreateTableNode) 
baseNode).hasSubQuery()) ||
-            baseNode.getType() == NodeType.DROP_TABLE ||
-            baseNode.getType() == NodeType.ALTER_TABLESPACE ||
-            baseNode.getType() == NodeType.ALTER_TABLE ||
-            baseNode.getType() == NodeType.TRUNCATE_TABLE;
+            type == NodeType.DROP_TABLE ||
+            type == NodeType.ALTER_TABLESPACE ||
+            type == NodeType.ALTER_TABLE ||
+            type == NodeType.TRUNCATE_TABLE ||
+            type == NodeType.CREATE_INDEX ||
+            type == NodeType.DROP_INDEX;
+  }
+
+  /**
+   * Most update queries require only the updates to the catalog information,
+   * but some queries such as "CREATE INDEX" or CTAS requires distributed 
execution on multiple cluster nodes.
+   * This function checks whether the given DDL plan requires distributed 
execution or not.
+   * @param node the root node of a query plan
+   * @return Return true if the input query plan requires distributed 
execution. Otherwise, return false.
+   */
+  public static boolean isDistExecDDL(LogicalNode node) {
+    LogicalNode baseNode = node;
+    if (node instanceof LogicalRootNode) {
+      baseNode = ((LogicalRootNode) node).getChild();
+    }
+
+    NodeType type = baseNode.getType();
+
+    return type == NodeType.CREATE_INDEX ||
+        type == NodeType.CREATE_TABLE && 
((CreateTableNode)baseNode).hasSubQuery();
   }
 
   /**
@@ -361,6 +382,12 @@ public class PlannerUtil {
         throws PlanningException {
       return node;
     }
+
+    @Override
+    public LogicalNode visitIndexScan(ReplacerContext context, LogicalPlan 
plan, LogicalPlan.QueryBlock block,
+                                      IndexScanNode node, Stack<LogicalNode> 
stack) throws PlanningException {
+      return node;
+    }
   }
 
   public static void replaceNode(LogicalNode plan, LogicalNode newNode, 
NodeType type) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java
index 1ff70a2..9bd3b24 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java
@@ -110,6 +110,9 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> 
implements LogicalPlanVisi
       case PARTITIONS_SCAN:
         current = visitPartitionedTableScan(context, plan, block, 
(PartitionedTableScanNode) node, stack);
         break;
+      case INDEX_SCAN:
+        current = visitIndexScan(context, plan, block, (IndexScanNode) node, 
stack);
+        break;
       case STORE:
         current = visitStoreTable(context, plan, block, (StoreTableNode) node, 
stack);
         break;
@@ -140,6 +143,9 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> 
implements LogicalPlanVisi
       case TRUNCATE_TABLE:
         current = visitTruncateTable(context, plan, block, (TruncateTableNode) 
node, stack);
         break;
+      case DROP_INDEX:
+        current = visitDropIndex(context, plan, block, (DropIndexNode) node, 
stack);
+        break;
       default:
         throw new PlanningException("Unknown logical node type: " + 
node.getType());
     }
@@ -311,6 +317,12 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> 
implements LogicalPlanVisi
   }
 
   @Override
+  public RESULT visitIndexScan(CONTEXT context, LogicalPlan plan, 
LogicalPlan.QueryBlock block, IndexScanNode node,
+                               Stack<LogicalNode> stack) throws 
PlanningException {
+    return null;
+  }
+
+  @Override
   public RESULT visitPartitionedTableScan(CONTEXT context, LogicalPlan plan, 
LogicalPlan.QueryBlock block,
                                           PartitionedTableScanNode node, 
Stack<LogicalNode> stack)
       throws PlanningException {
@@ -387,6 +399,12 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> 
implements LogicalPlanVisi
   }
 
   @Override
+  public RESULT visitDropIndex(CONTEXT context, LogicalPlan plan, 
LogicalPlan.QueryBlock block, DropIndexNode node,
+                               Stack<LogicalNode> stack) {
+    return null;
+  }
+
+  @Override
   public RESULT visitTruncateTable(CONTEXT context, LogicalPlan plan, 
LogicalPlan.QueryBlock block,
                                    TruncateTableNode node, Stack<LogicalNode> 
stack) throws PlanningException {
     return null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/ExplainLogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/ExplainLogicalPlanVisitor.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/ExplainLogicalPlanVisitor.java
index 73bc7f1..2b33937 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/ExplainLogicalPlanVisitor.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/ExplainLogicalPlanVisitor.java
@@ -240,6 +240,13 @@ public class ExplainLogicalPlanVisitor extends 
BasicLogicalPlanVisitor<ExplainLo
     return visitUnaryNode(context, plan, block, node, stack);
   }
 
+  @Override
+  public LogicalNode visitDropIndex(Context context, LogicalPlan plan, 
LogicalPlan.QueryBlock block,
+                                    DropIndexNode node, Stack<LogicalNode> 
stack) {
+    context.add(context.depth, node.getPlanString());
+    return node;
+  }
+
   public static String printDepthString(int maxDepth, DepthString planStr) {
     StringBuilder output = new StringBuilder();
     String pad = new String(new char[planStr.getDepth() * 3]).replace('\0', ' 
');

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/LogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/LogicalPlanVisitor.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/LogicalPlanVisitor.java
index d0643c9..a9fb20b 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/LogicalPlanVisitor.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/LogicalPlanVisitor.java
@@ -75,6 +75,9 @@ public interface LogicalPlanVisitor<CONTEXT, RESULT> {
   RESULT visitScan(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock 
block, ScanNode node,
                    Stack<LogicalNode> stack) throws PlanningException;
 
+  RESULT visitIndexScan(CONTEXT context, LogicalPlan plan, 
LogicalPlan.QueryBlock block, IndexScanNode node,
+                        Stack<LogicalNode> stack) throws PlanningException;
+
   RESULT visitPartitionedTableScan(CONTEXT context, LogicalPlan plan, 
LogicalPlan.QueryBlock block,
                                    PartitionedTableScanNode node, 
Stack<LogicalNode> stack) throws PlanningException;
 
@@ -105,6 +108,9 @@ public interface LogicalPlanVisitor<CONTEXT, RESULT> {
   RESULT visitCreateIndex(CONTEXT context, LogicalPlan plan, 
LogicalPlan.QueryBlock block, CreateIndexNode node,
                                Stack<LogicalNode> stack) throws 
PlanningException;
 
+  RESULT visitDropIndex(CONTEXT context, LogicalPlan plan, 
LogicalPlan.QueryBlock block, DropIndexNode node,
+                        Stack<LogicalNode> stack) throws PlanningException;
+
   RESULT visitTruncateTable(CONTEXT context, LogicalPlan plan, 
LogicalPlan.QueryBlock block, TruncateTableNode node,
                          Stack<LogicalNode> stack) throws PlanningException;
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-plan/src/main/proto/Plan.proto
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/proto/Plan.proto 
b/tajo-plan/src/main/proto/Plan.proto
index 3e4f07c..dfee969 100644
--- a/tajo-plan/src/main/proto/Plan.proto
+++ b/tajo-plan/src/main/proto/Plan.proto
@@ -45,7 +45,7 @@ enum NodeType {
   TABLE_SUBQUERY = 15;
   SCAN = 16;
   PARTITIONS_SCAN = 17;
-  BST_INDEX_SCAN = 18;
+  INDEX_SCAN = 18;
   STORE = 19;
   INSERT = 20;
 
@@ -56,6 +56,8 @@ enum NodeType {
   ALTER_TABLESPACE = 25;
   ALTER_TABLE = 26;
   TRUNCATE_TABLE = 27;
+  CREATE_INDEX = 28;
+  DROP_INDEX = 29;
 }
 
 message LogicalNodeTree {
@@ -71,31 +73,35 @@ message LogicalNode {
 
   optional ScanNode scan = 6;
   optional PartitionScanSpec partitionScan = 7;
-  optional JoinNode join = 8;
-  optional FilterNode filter = 9;
-  optional GroupbyNode groupby = 10;
-  optional DistinctGroupbyNode distinctGroupby = 11;
-  optional SortNode sort = 12;
-  optional LimitNode limit = 13;
-  optional WindowAggNode windowAgg = 14;
-  optional ProjectionNode projection = 15;
-  optional EvalExprNode exprEval = 16;
-  optional UnionNode union = 17;
-  optional TableSubQueryNode tableSubQuery = 18;
-  optional PersistentStoreNode persistentStore = 19;
-  optional StoreTableNodeSpec storeTable = 20;
-  optional InsertNodeSpec insert = 21;
-  optional CreateTableNodeSpec createTable = 22;
-  optional RootNode root = 23;
-  optional SetSessionNode setSession = 24;
-
-  optional CreateDatabaseNode createDatabase = 25;
-  optional DropDatabaseNode dropDatabase = 26;
-  optional DropTableNode dropTable = 27;
-
-  optional AlterTablespaceNode alterTablespace = 28;
-  optional AlterTableNode alterTable = 29;
-  optional TruncateTableNode truncateTableNode = 30;
+  optional IndexScanSpec indexScan = 8;
+  optional JoinNode join = 9;
+  optional FilterNode filter = 10;
+  optional GroupbyNode groupby = 11;
+  optional DistinctGroupbyNode distinctGroupby = 12;
+  optional SortNode sort = 13;
+  optional LimitNode limit = 14;
+  optional WindowAggNode windowAgg = 15;
+  optional ProjectionNode projection = 16;
+  optional EvalExprNode exprEval = 17;
+  optional UnionNode union = 18;
+  optional TableSubQueryNode tableSubQuery = 19;
+  optional PersistentStoreNode persistentStore = 20;
+  optional StoreTableNodeSpec storeTable = 21;
+  optional InsertNodeSpec insert = 22;
+  optional CreateTableNodeSpec createTable = 23;
+  optional RootNode root = 24;
+  optional SetSessionNode setSession = 25;
+
+  optional CreateDatabaseNode createDatabase = 26;
+  optional DropDatabaseNode dropDatabase = 27;
+  optional DropTableNode dropTable = 28;
+
+  optional AlterTablespaceNode alterTablespace = 29;
+  optional AlterTableNode alterTable = 30;
+  optional TruncateTableNode truncateTableNode = 31;
+
+  optional CreateIndexNode createIndex = 32;
+  optional DropIndexNode dropIndex = 33;
 }
 
 message ScanNode {
@@ -110,6 +116,17 @@ message PartitionScanSpec {
   repeated string paths = 1;
 }
 
+message IndexScanSpec {
+  required SchemaProto keySchema = 1;
+  required string indexPath = 2;
+  repeated SimplePredicateProto predicates = 3;
+}
+
+message SimplePredicateProto {
+  required SortSpecProto keySortSpec = 1;
+  required Datum value = 2;
+}
+
 message FilterNode {
   required int32 childSeq = 1;
   required EvalNodeTree qual = 2;
@@ -301,6 +318,22 @@ message AlterTableNode {
   optional AddColumn addColumn = 5;
 }
 
+message CreateIndexNode {
+  required int32 childSeq = 1;
+  required string indexName = 2;
+  required IndexMethod indexMethod = 3;
+  required string indexPath = 4;
+  repeated SortSpecProto keySortSpecs = 5;
+  required SchemaProto targetRelationSchema = 6;
+  optional bool isUnique = 7 [default = false];
+  optional bool isClustered = 8 [default = false];
+  optional KeyValueSetProto indexProperties = 9;
+}
+
+message DropIndexNode {
+  required string indexName = 1;
+}
+
 enum EvalType {
   NOT = 0;
   AND = 1;

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
 
b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
index e95aeec..f9a57fa 100644
--- 
a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
+++ 
b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
@@ -32,6 +32,7 @@ import org.apache.tajo.plan.logical.SortNode;
 import org.apache.tajo.plan.logical.SortNode.SortPurpose;
 import org.apache.tajo.plan.logical.UnaryNode;
 import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext;
 import org.apache.tajo.plan.util.PlannerUtil;
 
 public class AddSortForInsertRewriter implements LogicalPlanRewriteRule {
@@ -54,13 +55,14 @@ public class AddSortForInsertRewriter implements 
LogicalPlanRewriteRule {
   }
 
   @Override
-  public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) {
-    StoreType storeType = PlannerUtil.getStoreType(plan);
+  public boolean isEligible(LogicalPlanRewriteRuleContext context) {
+    StoreType storeType = PlannerUtil.getStoreType(context.getPlan());
     return storeType != null;
   }
 
   @Override
-  public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) 
throws PlanningException {
+  public LogicalPlan rewrite(LogicalPlanRewriteRuleContext context) throws 
PlanningException {
+    LogicalPlan plan = context.getPlan();
     LogicalRootNode rootNode = plan.getRootBlock().getRoot();
     UnaryNode insertNode = rootNode.getChild();
     LogicalNode childNode = insertNode.getChild();

Reply via email to