http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
index 2d35496..2edb4c4 100644
--- 
a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
+++ 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
@@ -26,14 +26,17 @@ import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.TajoConstants;
+import org.apache.tajo.catalog.CatalogConstants;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.FunctionDesc;
+import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.exception.*;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.TableDescProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.TableDescProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto;
@@ -61,7 +64,6 @@ public class MemStore implements CatalogStore {
 
   public MemStore(Configuration conf) {
   }
-
   
   public void close() throws IOException {
     databases.clear();
@@ -147,6 +149,8 @@ public class MemStore implements CatalogStore {
     }
 
     databases.put(databaseName, new HashMap<String, 
CatalogProtos.TableDescProto>());
+    indexes.put(databaseName, new HashMap<String, IndexDescProto>());
+    indexesByColumn.put(databaseName, new HashMap<String, IndexDescProto>());
   }
 
   @Override
@@ -160,6 +164,8 @@ public class MemStore implements CatalogStore {
       throw new NoSuchDatabaseException(databaseName);
     }
     databases.remove(databaseName);
+    indexes.remove(databaseName);
+    indexesByColumn.remove(databaseName);
   }
 
   @Override
@@ -539,17 +545,23 @@ public class MemStore implements CatalogStore {
   @Override
   public void createIndex(IndexDescProto proto) throws CatalogException {
     final String databaseName = proto.getTableIdentifier().getDatabaseName();
+    final String tableName = 
CatalogUtil.extractSimpleName(proto.getTableIdentifier().getTableName());
 
     Map<String, IndexDescProto> index = checkAndGetDatabaseNS(indexes, 
databaseName);
     Map<String, IndexDescProto> indexByColumn = 
checkAndGetDatabaseNS(indexesByColumn, databaseName);
+    TableDescProto tableDescProto = getTable(databaseName, tableName);
 
-    if (index.containsKey(proto.getName())) {
-      throw new AlreadyExistsIndexException(proto.getName());
+    if (index.containsKey(proto.getIndexName())) {
+      throw new AlreadyExistsIndexException(proto.getIndexName());
     }
 
-    index.put(proto.getName(), proto);
-    indexByColumn.put(proto.getTableIdentifier().getTableName() + "."
-        + CatalogUtil.extractSimpleName(proto.getColumn().getName()), proto);
+    index.put(proto.getIndexName(), proto);
+    String originalTableName = proto.getTableIdentifier().getTableName();
+    String simpleTableName = CatalogUtil.extractSimpleName(originalTableName);
+    
indexByColumn.put(CatalogUtil.buildFQName(proto.getTableIdentifier().getDatabaseName(),
+            simpleTableName,
+            getUnifiedNameForIndexByColumn(proto)),
+        proto);
   }
 
   /* (non-Javadoc)
@@ -558,10 +570,19 @@ public class MemStore implements CatalogStore {
   @Override
   public void dropIndex(String databaseName, String indexName) throws 
CatalogException {
     Map<String, IndexDescProto> index = checkAndGetDatabaseNS(indexes, 
databaseName);
+    Map<String, IndexDescProto> indexByColumn = 
checkAndGetDatabaseNS(indexesByColumn, databaseName);
     if (!index.containsKey(indexName)) {
       throw new NoSuchIndexException(indexName);
     }
+    IndexDescProto proto = index.get(indexName);
+    final String tableName = 
CatalogUtil.extractSimpleName(proto.getTableIdentifier().getTableName());
+    TableDescProto tableDescProto = getTable(databaseName, tableName);
     index.remove(indexName);
+    String originalTableName = proto.getTableIdentifier().getTableName();
+    String simpleTableName = CatalogUtil.extractSimpleName(originalTableName);
+    
indexByColumn.remove(CatalogUtil.buildFQName(proto.getTableIdentifier().getDatabaseName(),
+        simpleTableName,
+        getUnifiedNameForIndexByColumn(proto)));
   }
 
   /* (non-Javadoc)
@@ -577,19 +598,18 @@ public class MemStore implements CatalogStore {
     return index.get(indexName);
   }
 
-  /* (non-Javadoc)
-   * @see CatalogStore#getIndexByName(java.lang.String, java.lang.String)
-   */
   @Override
-  public IndexDescProto getIndexByColumn(String databaseName, String 
tableName, String columnName)
-      throws CatalogException {
-
+  public IndexDescProto getIndexByColumns(String databaseName, String 
tableName, String[] columnNames) throws CatalogException {
     Map<String, IndexDescProto> indexByColumn = 
checkAndGetDatabaseNS(indexesByColumn, databaseName);
-    if (!indexByColumn.containsKey(columnName)) {
-      throw new NoSuchIndexException(columnName);
+    String simpleTableName = CatalogUtil.extractSimpleName(tableName);
+    TableDescProto tableDescProto = getTable(databaseName, simpleTableName);
+    String qualifiedColumnName = CatalogUtil.buildFQName(databaseName, 
simpleTableName,
+        CatalogUtil.getUnifiedSimpleColumnName(new 
Schema(tableDescProto.getSchema()), columnNames));
+    if (!indexByColumn.containsKey(qualifiedColumnName)) {
+      throw new NoSuchIndexException(qualifiedColumnName);
     }
 
-    return indexByColumn.get(columnName);
+    return indexByColumn.get(qualifiedColumnName);
   }
 
   @Override
@@ -599,50 +619,47 @@ public class MemStore implements CatalogStore {
   }
 
   @Override
-  public boolean existIndexByColumn(String databaseName, String tableName, 
String columnName)
-      throws CatalogException {
+  public boolean existIndexByColumns(String databaseName, String tableName, 
String[] columnNames) throws CatalogException {
     Map<String, IndexDescProto> indexByColumn = 
checkAndGetDatabaseNS(indexesByColumn, databaseName);
-    return indexByColumn.containsKey(columnName);
+    TableDescProto tableDescProto = getTable(databaseName, tableName);
+    return indexByColumn.containsKey(
+        CatalogUtil.buildFQName(databaseName, 
CatalogUtil.extractSimpleName(tableName),
+            CatalogUtil.getUnifiedSimpleColumnName(new 
Schema(tableDescProto.getSchema()), columnNames)));
   }
 
   @Override
-  public IndexDescProto[] getIndexes(String databaseName, String tableName) 
throws CatalogException {
-    List<IndexDescProto> protos = new ArrayList<IndexDescProto>();
+  public List<String> getAllIndexNamesByTable(String databaseName, String 
tableName) throws CatalogException {
+    List<String> indexNames = new ArrayList<String>();
     Map<String, IndexDescProto> indexByColumn = 
checkAndGetDatabaseNS(indexesByColumn, databaseName);
+    String simpleTableName = CatalogUtil.extractSimpleName(tableName);
     for (IndexDescProto proto : indexByColumn.values()) {
-      if (proto.getTableIdentifier().getTableName().equals(tableName)) {
-        protos.add(proto);
+      if (proto.getTableIdentifier().getTableName().equals(simpleTableName)) {
+        indexNames.add(proto.getIndexName());
       }
     }
 
-    return protos.toArray(new IndexDescProto[protos.size()]);
+    return indexNames;
   }
-  
+
   @Override
-  public List<IndexProto> getAllIndexes() throws CatalogException {
-    List<IndexProto> indexList = new ArrayList<CatalogProtos.IndexProto>();
-    Set<String> databases = indexes.keySet();
-    
-    for (String databaseName: databases) {
-      Map<String, IndexDescProto> indexMap = indexes.get(databaseName);
-      
-      for (String indexName: indexMap.keySet()) {
-        IndexDescProto indexDesc = indexMap.get(indexName);
-        IndexProto.Builder builder = IndexProto.newBuilder();
-        
-        builder.setColumnName(indexDesc.getColumn().getName());
-        
builder.setDataType(indexDesc.getColumn().getDataType().getType().toString());
-        builder.setIndexName(indexName);
-        builder.setIndexType(indexDesc.getIndexMethod().toString());
-        builder.setIsAscending(indexDesc.hasIsAscending() && 
indexDesc.getIsAscending());
-        builder.setIsClustered(indexDesc.hasIsClustered() && 
indexDesc.getIsClustered());
-        builder.setIsUnique(indexDesc.hasIsUnique() && 
indexDesc.getIsUnique());
-        
-        indexList.add(builder.build());
+  public boolean existIndexesByTable(String databaseName, String tableName) 
throws CatalogException {
+    Map<String, IndexDescProto> indexByColumn = 
checkAndGetDatabaseNS(indexesByColumn, databaseName);
+    String simpleTableName = CatalogUtil.extractSimpleName(tableName);
+    for (IndexDescProto proto : indexByColumn.values()) {
+      if (proto.getTableIdentifier().getTableName().equals(simpleTableName)) {
+        return true;
       }
     }
-    
-    return indexList;
+    return false;
+  }
+
+  @Override
+  public List<IndexDescProto> getAllIndexes() throws CatalogException {
+    List<IndexDescProto> indexDescProtos = TUtil.newList();
+    for (Map<String,IndexDescProto> indexMap : indexes.values()) {
+      indexDescProtos.addAll(indexMap.values());
+    }
+    return indexDescProtos;
   }
 
   @Override
@@ -666,4 +683,13 @@ public class MemStore implements CatalogStore {
     return null;
   }
 
+  public static String getUnifiedNameForIndexByColumn(IndexDescProto proto) {
+    StringBuilder sb = new StringBuilder();
+    for (SortSpecProto columnSpec : proto.getKeySortSpecsList()) {
+      String[] identifiers = 
columnSpec.getColumn().getName().split(CatalogConstants.IDENTIFIER_DELIMITER_REGEXP);
+      sb.append(identifiers[identifiers.length-1]).append("_");
+    }
+    sb.deleteCharAt(sb.length()-1);
+    return sb.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml 
b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml
index b0ba3b9..5e19ac1 100644
--- 
a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml
+++ 
b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml
@@ -97,17 +97,20 @@
                        <tns:Object order="11" type="table" name="INDEXES">
                                <tns:sql><![CDATA[
                                CREATE TABLE INDEXES (
-                               DB_ID INT NOT NULL REFERENCES DATABASES_ 
(DB_ID) ON DELETE CASCADE,
-                               TID INT NOT NULL REFERENCES TABLES (TID) ON 
DELETE CASCADE,
-                               INDEX_NAME VARCHAR(128) NOT NULL,
-                               COLUMN_NAME VARCHAR(128) NOT NULL,
-                               DATA_TYPE VARCHAR(128) NOT NULL,
-                               INDEX_TYPE CHAR(32) NOT NULL,
-                               PATH VARCHAR(4096),
-                               IS_UNIQUE BOOLEAN NOT NULL,
-                               IS_CLUSTERED BOOLEAN NOT NULL,
-                               IS_ASCENDING BOOLEAN NOT NULL,
-                               CONSTRAINT C_INDEXES_PK PRIMARY KEY (DB_ID, 
INDEX_NAME)
+                                       INDEX_ID INT NOT NULL GENERATED ALWAYS 
AS IDENTITY (START WITH 1, INCREMENT BY 1),
+                                       DB_ID INT NOT NULL REFERENCES 
DATABASES_ (DB_ID) ON DELETE CASCADE,
+                                       TID INT NOT NULL REFERENCES TABLES 
(TID) ON DELETE CASCADE,
+                                       INDEX_NAME VARCHAR(128) NOT NULL,
+                                       INDEX_TYPE CHAR(32) NOT NULL,
+                                       PATH VARCHAR(4096),
+                                       COLUMN_NAMES VARCHAR(128) NOT NULL, -- 
array of column names
+                                       DATA_TYPES VARCHAR(128) NOT NULL, -- 
array of column types
+                                       ORDERS VARCHAR(128) NOT NULL, -- array 
of column orders
+                                       NULL_ORDERS VARCHAR(128) NOT NULL, -- 
array of null orderings
+                                       IS_UNIQUE BOOLEAN NOT NULL,
+                                       IS_CLUSTERED BOOLEAN NOT NULL,
+                                       CONSTRAINT INDEXES_PK PRIMARY KEY 
(INDEX_ID),
+                                       CONSTRAINT C_INDEXES_UNIQ UNIQUE 
(DB_ID, INDEX_NAME)
                                )]]>
                                </tns:sql>
                        </tns:Object>
@@ -115,7 +118,7 @@
                                <tns:sql><![CDATA[CREATE UNIQUE INDEX 
idx_indexes_pk ON INDEXES (DB_ID,index_name)]]></tns:sql>
                        </tns:Object>
                        <tns:Object order="13" type="index" 
name="IDX_INDEXES_COLUMNS" dependsOn="INDEXES">
-                               <tns:sql><![CDATA[CREATE INDEX 
idx_indexes_columns ON INDEXES (TID,column_name)]]></tns:sql>
+                               <tns:sql><![CDATA[CREATE INDEX idx_col_names ON 
INDEXES (DB_ID,TID,column_names)]]></tns:sql>
                        </tns:Object>
                        <tns:Object order="14" type="table" name="STATS">
                                <tns:sql><![CDATA[

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/indexes.sql
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/indexes.sql
 
b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/indexes.sql
index 33bf0f5..9c7f8ba 100644
--- 
a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/indexes.sql
+++ 
b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/indexes.sql
@@ -1,17 +1,19 @@
 CREATE TABLE INDEXES (
+  INDEX_ID INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
   DB_ID INT NOT NULL,
   TID INT NOT NULL,
   INDEX_NAME VARCHAR(128) NOT NULL,
-  COLUMN_NAME VARCHAR(128) NOT NULL,
-  DATA_TYPE VARCHAR(128) NOT NULL,
   INDEX_TYPE CHAR(32) NOT NULL,
-  PATH VARCHAR(4096),
+  PATH VARCHAR(4096) NOT NULL,
+  COLUMN_NAMES VARCHAR(256) NOT NULL, -- array of column names
+  DATA_TYPES VARCHAR(128) NOT NULL, -- array of column types
+  ORDERS VARCHAR(128) NOT NULL, -- array of column orders
+  NULL_ORDERS VARCHAR(128) NOT NULL, -- array of null orderings
   IS_UNIQUE BOOLEAN NOT NULL,
   IS_CLUSTERED BOOLEAN NOT NULL,
-  IS_ASCENDING BOOLEAN NOT NULL,
-  PRIMARY KEY (DB_ID, INDEX_NAME),
+  PRIMARY KEY (INDEX_ID),
   FOREIGN KEY (DB_ID) REFERENCES DATABASES_ (DB_ID) ON DELETE CASCADE,
   FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE,
   UNIQUE INDEX IDX_DB_ID_NAME (DB_ID, INDEX_NAME),
-  INDEX IDX_TID_COLUMN_NAME (TID, COLUMN_NAME)
+  INDEX IDX_COL_NAMES (DB_ID, TID, COLUMN_NAMES)
 )
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/indexes.sql
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/indexes.sql 
b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/indexes.sql
index 33bf0f5..9c7f8ba 100644
--- 
a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/indexes.sql
+++ 
b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/indexes.sql
@@ -1,17 +1,19 @@
 CREATE TABLE INDEXES (
+  INDEX_ID INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
   DB_ID INT NOT NULL,
   TID INT NOT NULL,
   INDEX_NAME VARCHAR(128) NOT NULL,
-  COLUMN_NAME VARCHAR(128) NOT NULL,
-  DATA_TYPE VARCHAR(128) NOT NULL,
   INDEX_TYPE CHAR(32) NOT NULL,
-  PATH VARCHAR(4096),
+  PATH VARCHAR(4096) NOT NULL,
+  COLUMN_NAMES VARCHAR(256) NOT NULL, -- array of column names
+  DATA_TYPES VARCHAR(128) NOT NULL, -- array of column types
+  ORDERS VARCHAR(128) NOT NULL, -- array of column orders
+  NULL_ORDERS VARCHAR(128) NOT NULL, -- array of null orderings
   IS_UNIQUE BOOLEAN NOT NULL,
   IS_CLUSTERED BOOLEAN NOT NULL,
-  IS_ASCENDING BOOLEAN NOT NULL,
-  PRIMARY KEY (DB_ID, INDEX_NAME),
+  PRIMARY KEY (INDEX_ID),
   FOREIGN KEY (DB_ID) REFERENCES DATABASES_ (DB_ID) ON DELETE CASCADE,
   FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE,
   UNIQUE INDEX IDX_DB_ID_NAME (DB_ID, INDEX_NAME),
-  INDEX IDX_TID_COLUMN_NAME (TID, COLUMN_NAME)
+  INDEX IDX_COL_NAMES (DB_ID, TID, COLUMN_NAMES)
 )
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/indexes.sql
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/indexes.sql
 
b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/indexes.sql
index ae31d6f..d416006 100644
--- 
a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/indexes.sql
+++ 
b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/indexes.sql
@@ -1,14 +1,18 @@
 CREATE TABLE INDEXES (
+  INDEX_ID NUMBER(10) PRIMARY KEY,
   DB_ID INT NOT NULL,
   TID INT NOT NULL,
   INDEX_NAME VARCHAR2(128) NOT NULL,
-  COLUMN_NAME VARCHAR2(128) NOT NULL,
-  DATA_TYPE VARCHAR2(128) NOT NULL,
   INDEX_TYPE CHAR(32) NOT NULL,
-  IS_UNIQUE CHAR NOT NULL,
-  IS_CLUSTERED CHAR NOT NULL,
-  IS_ASCENDING CHAR NOT NULL,
-  CONSTRAINT INDEXES_PKEY PRIMARY KEY (DB_ID, INDEX_NAME),
+  PATH VARCHAR(4096) NOT NULL,
+  COLUMN_NAMES VARCHAR(256) NOT NULL, -- array of column names
+  DATA_TYPES VARCHAR(128) NOT NULL, -- array of column types
+  ORDERS VARCHAR(128) NOT NULL, -- array of column orders
+  NULL_ORDERS VARCHAR(128) NOT NULL, -- array of null orderings
+  IS_UNIQUE BOOLEAN NOT NULL,
+  IS_CLUSTERED BOOLEAN NOT NULL,
   FOREIGN KEY (DB_ID) REFERENCES DATABASES_ (DB_ID) ON DELETE CASCADE,
-  FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE
+  FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE,
+  UNIQUE INDEX IDX_DB_ID_NAME (DB_ID, INDEX_NAME),
+  INDEX IDX_COL_NAMES (DB_ID, TID, COLUMN_NAMES)
 )
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml
 
b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml
index 8e5cbcc..f6b2b0d 100644
--- 
a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml
+++ 
b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml
@@ -97,18 +97,21 @@ xsi:schemaLocation="http://tajo.apache.org/catalogstore 
../DBMSSchemaDefinition.
                        <tns:Object name="INDEXES" type="table" order="10">
                                <tns:sql><![CDATA[
                                CREATE TABLE INDEXES (
-                               DB_ID INT NOT NULL,
-                               TID INT NOT NULL,
-                               INDEX_NAME VARCHAR(128) NOT NULL,
-                               COLUMN_NAME VARCHAR(128) NOT NULL,
-                               DATA_TYPE VARCHAR(128) NOT NULL,
-                               INDEX_TYPE CHAR(32) NOT NULL,
-                               IS_UNIQUE BOOLEAN NOT NULL,
-                               IS_CLUSTERED BOOLEAN NOT NULL,
-                               IS_ASCENDING BOOLEAN NOT NULL,
-                               CONSTRAINT INDEXES_PKEY PRIMARY KEY (DB_ID, 
INDEX_NAME),
-                               FOREIGN KEY (DB_ID) REFERENCES DATABASES_ 
(DB_ID) ON DELETE CASCADE,
-                               FOREIGN KEY (TID) REFERENCES TABLES (TID) ON 
DELETE CASCADE
+                                       INDEX_ID SERIAL NOT NULL PRIMARY KEY,
+                                       DB_ID INT NOT NULL,
+                                       TID INT NOT NULL,
+                                       INDEX_NAME VARCHAR(128) NOT NULL,
+                                       INDEX_TYPE CHAR(32) NOT NULL,
+                                       PATH VARCHAR(4096) NOT NULL,
+                                       COLUMN_NAMES VARCHAR(256) NOT NULL, -- 
array of column names
+                                       DATA_TYPES VARCHAR(128) NOT NULL, -- 
array of column types
+                                       ORDERS VARCHAR(128) NOT NULL, -- array 
of column orders
+                                       NULL_ORDERS VARCHAR(128) NOT NULL, -- 
array of null orderings
+                                       IS_UNIQUE BOOLEAN NOT NULL,
+                                       IS_CLUSTERED BOOLEAN NOT NULL,
+                                       CONSTRAINT INDEXES_PKEY PRIMARY KEY 
(INDEX_ID),
+                                       FOREIGN KEY (DB_ID) REFERENCES 
DATABASES_ (DB_ID) ON DELETE CASCADE,
+                                       FOREIGN KEY (TID) REFERENCES TABLES 
(TID) ON DELETE CASCADE
                                )]]>
                                </tns:sql>
                        </tns:Object>
@@ -116,7 +119,7 @@ xsi:schemaLocation="http://tajo.apache.org/catalogstore 
../DBMSSchemaDefinition.
                                <tns:sql><![CDATA[CREATE UNIQUE INDEX 
INDEXES_IDX_DB_ID_NAME on INDEXES (DB_ID, INDEX_NAME)]]></tns:sql>
                        </tns:Object>
                        <tns:Object name="INDEXES_IDX_TID_COLUMN_NAME" 
type="index" order="12" dependsOn="INDEXES">
-                               <tns:sql><![CDATA[CREATE INDEX 
INDEXES_IDX_TID_COLUMN_NAME on INDEXES (TID, COLUMN_NAME)]]></tns:sql>
+                               <tns:sql><![CDATA[CREATE INDEX 
INDEXES_IDX_TID_COLUMN_NAME on INDEXES (DB_ID, TID, COLUMN_NAMES)]]></tns:sql>
                        </tns:Object>
                        <tns:Object name="STATS" type="table" order="13">
                                <tns:sql><![CDATA[

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
 
b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
index e0b049c..044ef61 100644
--- 
a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
+++ 
b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
@@ -25,20 +25,16 @@ import org.apache.tajo.TajoConstants;
 import org.apache.tajo.catalog.dictionary.InfoSchemaMetadataDictionary;
 import org.apache.tajo.catalog.exception.CatalogException;
 import org.apache.tajo.catalog.exception.NoSuchFunctionException;
-import org.apache.tajo.catalog.store.PostgreSQLStore;
-import org.apache.tajo.function.Function;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
 import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.store.DerbyStore;
-import org.apache.tajo.catalog.store.MySQLStore;
-import org.apache.tajo.catalog.store.MariaDBStore;
-import org.apache.tajo.catalog.store.OracleStore;
+import org.apache.tajo.catalog.store.*;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.function.Function;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.TUtil;
@@ -47,6 +43,8 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.*;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
@@ -383,36 +381,43 @@ public class TestCatalog {
   static IndexDesc desc1;
   static IndexDesc desc2;
   static IndexDesc desc3;
-
-  static {
-    desc1 = new IndexDesc(
-        "idx_test", new Path("idx_test"), DEFAULT_DATABASE_NAME, "indexed", 
new Column("id", Type.INT4),
-        IndexMethod.TWO_LEVEL_BIN_TREE, true, true, true);
-
-    desc2 = new IndexDesc(
-        "idx_test2", new Path("idx_test2"), DEFAULT_DATABASE_NAME, "indexed", 
new Column("score", Type.FLOAT8),
-        IndexMethod.TWO_LEVEL_BIN_TREE, false, false, false);
-
-    desc3 = new IndexDesc(
-        "idx_test", new Path("idx_test"), DEFAULT_DATABASE_NAME, "indexed", 
new Column("id", Type.INT4),
-        IndexMethod.TWO_LEVEL_BIN_TREE, true, true, true);
-  }
+  static Schema relationSchema;
 
   public static TableDesc prepareTable() throws IOException {
-    Schema schema = new Schema();
-    schema.addColumn("indexed.id", Type.INT4)
-        .addColumn("indexed.name", Type.TEXT)
-        .addColumn("indexed.age", Type.INT4)
-        .addColumn("indexed.score", Type.FLOAT8);
+    relationSchema = new Schema();
+    relationSchema.addColumn(DEFAULT_DATABASE_NAME + ".indexed.id", Type.INT4)
+        .addColumn(DEFAULT_DATABASE_NAME + ".indexed.name", Type.TEXT)
+        .addColumn(DEFAULT_DATABASE_NAME + ".indexed.age", Type.INT4)
+        .addColumn(DEFAULT_DATABASE_NAME + ".indexed.score", Type.FLOAT8);
 
     String tableName = "indexed";
 
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
     return new TableDesc(
-        CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, 
tableName), schema, meta,
+        CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, 
tableName), relationSchema, meta,
         new Path(CommonTestingUtil.getTestDir(), "indexed").toUri());
   }
 
+  public static void prepareIndexDescs() throws IOException, 
URISyntaxException {
+    SortSpec[] colSpecs1 = new SortSpec[1];
+    colSpecs1[0] = new SortSpec(new Column("default.indexed.id", Type.INT4), 
true, true);
+    desc1 = new IndexDesc(DEFAULT_DATABASE_NAME, "indexed",
+        "idx_test", new URI("idx_test"), colSpecs1,
+        IndexMethod.TWO_LEVEL_BIN_TREE, true, true, relationSchema);
+
+    SortSpec[] colSpecs2 = new SortSpec[1];
+    colSpecs2[0] = new SortSpec(new Column("default.indexed.score", 
Type.FLOAT8), false, false);
+    desc2 = new IndexDesc(DEFAULT_DATABASE_NAME, "indexed",
+        "idx_test2", new URI("idx_test2"), colSpecs2,
+        IndexMethod.TWO_LEVEL_BIN_TREE, false, false, relationSchema);
+
+    SortSpec[] colSpecs3 = new SortSpec[1];
+    colSpecs3[0] = new SortSpec(new Column("default.indexed.id", Type.INT4), 
true, false);
+    desc3 = new IndexDesc(DEFAULT_DATABASE_NAME, "indexed",
+        "idx_test", new URI("idx_test"), colSpecs3,
+        IndexMethod.TWO_LEVEL_BIN_TREE, true, true, relationSchema);
+  }
+
   @Test
   public void testCreateSameTables() throws IOException {
     assertTrue(catalog.createDatabase("tmpdb3", 
TajoConstants.DEFAULT_TABLESPACE_NAME));
@@ -448,20 +453,29 @@ public class TestCatalog {
        @Test
        public void testAddAndDelIndex() throws Exception {
          TableDesc desc = prepareTable();
+    prepareIndexDescs();
          assertTrue(catalog.createTable(desc));
          
-         assertFalse(catalog.existIndexByName("db1", desc1.getName()));
-         assertFalse(catalog.existIndexByColumn(DEFAULT_DATABASE_NAME, 
"indexed", "id"));
+         assertFalse(catalog.existIndexByName(DEFAULT_DATABASE_NAME, 
desc1.getName()));
+         assertFalse(catalog.existIndexByColumnNames(DEFAULT_DATABASE_NAME, 
"indexed", new String[]{"id"}));
          catalog.createIndex(desc1);
          assertTrue(catalog.existIndexByName(DEFAULT_DATABASE_NAME, 
desc1.getName()));
-         assertTrue(catalog.existIndexByColumn(DEFAULT_DATABASE_NAME, 
"indexed", "id"));
+         assertTrue(catalog.existIndexByColumnNames(DEFAULT_DATABASE_NAME, 
"indexed", new String[]{"id"}));
 
 
          assertFalse(catalog.existIndexByName(DEFAULT_DATABASE_NAME, 
desc2.getName()));
-         assertFalse(catalog.existIndexByColumn(DEFAULT_DATABASE_NAME, 
"indexed", "score"));
+         assertFalse(catalog.existIndexByColumnNames(DEFAULT_DATABASE_NAME, 
"indexed", new String[]{"score"}));
          catalog.createIndex(desc2);
          assertTrue(catalog.existIndexByName(DEFAULT_DATABASE_NAME, 
desc2.getName()));
-         assertTrue(catalog.existIndexByColumn(DEFAULT_DATABASE_NAME, 
"indexed", "score"));
+         assertTrue(catalog.existIndexByColumnNames(DEFAULT_DATABASE_NAME, 
"indexed", new String[]{"score"}));
+
+    Set<IndexDesc> indexDescs = TUtil.newHashSet();
+    indexDescs.add(desc1);
+    indexDescs.add(desc2);
+    indexDescs.add(desc3);
+    for (IndexDesc index : catalog.getAllIndexesByTable(DEFAULT_DATABASE_NAME, 
"indexed")) {
+      assertTrue(indexDescs.contains(index));
+    }
          
          catalog.dropIndex(DEFAULT_DATABASE_NAME, desc1.getName());
          assertFalse(catalog.existIndexByName(DEFAULT_DATABASE_NAME, 
desc1.getName()));

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java 
b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
index db7f981..5ecb161 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
@@ -497,7 +497,7 @@ public class TajoCli {
     if (response == null) {
       displayFormatter.printErrorMessage(sout, "response is null");
       wasError = true;
-    } else if (response.getResultCode() == ClientProtos.ResultCode.OK) {
+    } else if (response.getResult().getResultCode() == 
ClientProtos.ResultCode.OK) {
       if (response.getIsForwarded()) {
         QueryId queryId = new QueryId(response.getQueryId());
         waitForQueryCompleted(queryId);
@@ -510,8 +510,8 @@ public class TajoCli {
         }
       }
     } else {
-      if (response.hasErrorMessage()) {
-        displayFormatter.printErrorMessage(sout, response.getErrorMessage());
+      if (response.getResult().hasErrorMessage()) {
+        displayFormatter.printErrorMessage(sout, 
response.getResult().getErrorMessage());
         wasError = true;
       }
     }
@@ -534,7 +534,7 @@ public class TajoCli {
     if (response == null) {
       displayFormatter.printErrorMessage(sout, "response is null");
       wasError = true;
-    } else if (response.getResultCode() == ClientProtos.ResultCode.OK) {
+    } else if (response.getResult().getResultCode() == 
ClientProtos.ResultCode.OK) {
       if (response.getIsForwarded()) {
         QueryId queryId = new QueryId(response.getQueryId());
         waitForQueryCompleted(queryId);
@@ -546,8 +546,8 @@ public class TajoCli {
         }
       }
     } else {
-      if (response.hasErrorMessage()) {
-        displayFormatter.printErrorMessage(sout, response.getErrorMessage());
+      if (response.getResult().hasErrorMessage()) {
+        displayFormatter.printErrorMessage(sout, 
response.getResult().getErrorMessage());
         wasError = true;
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-client/src/main/java/org/apache/tajo/cli/tsql/commands/DescTableCommand.java
----------------------------------------------------------------------
diff --git 
a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/commands/DescTableCommand.java
 
b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/commands/DescTableCommand.java
index 5eebc2b..2915612 100644
--- 
a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/commands/DescTableCommand.java
+++ 
b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/commands/DescTableCommand.java
@@ -21,9 +21,12 @@ package org.apache.tajo.cli.tsql.commands;
 import org.apache.commons.lang.CharUtils;
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.tajo.TajoConstants;
+import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto;
 import org.apache.tajo.cli.tsql.TajoCli;
 import org.apache.tajo.util.FileUtil;
 import org.apache.tajo.util.TUtil;
@@ -51,6 +54,22 @@ public class DescTableCommand extends TajoShellCommand {
         context.getOutput().println("Did not find any relation named \"" + 
tableName + "\"");
       } else {
         context.getOutput().println(toFormattedString(desc));
+        // If there exists any indexes for the table, print index information
+        if (client.hasIndexes(tableName)) {
+          StringBuilder sb = new StringBuilder();
+          sb.append("Indexes:\n");
+          for (IndexDescProto index : client.getIndexes(tableName)) {
+            sb.append("\"").append(index.getIndexName()).append("\" ");
+            sb.append(index.getIndexMethod()).append(" (");
+            for (SortSpecProto key : index.getKeySortSpecsList()) {
+              
sb.append(CatalogUtil.extractSimpleName(key.getColumn().getName()));
+              sb.append(key.getAscending() ? " ASC" : " DESC");
+              sb.append(key.getNullFirst() ? " NULLS FIRST, " : " NULLS LAST, 
");
+            }
+            sb.delete(sb.length()-2, sb.length()-1).append(")\n");
+          }
+          context.getOutput().println(sb.toString());
+        }
       }
     } else if (cmd.length == 1) {
       List<String> tableList = client.getTableList(null);

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClient.java
----------------------------------------------------------------------
diff --git 
a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClient.java 
b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClient.java
index a36fc0e..652008c 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClient.java
@@ -21,11 +21,13 @@ package org.apache.tajo.client;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.catalog.IndexMeta;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
 
 import java.io.Closeable;
 import java.sql.SQLException;
@@ -134,4 +136,18 @@ public interface CatalogAdminClient extends Closeable {
   public TableDesc getTableDesc(final String tableName) throws 
ServiceException;
 
   public List<CatalogProtos.FunctionDescProto> getFunctions(final String 
functionName) throws ServiceException;
+
+  public IndexDescProto getIndex(final String indexName) throws 
ServiceException;
+
+  public boolean existIndex(final String indexName) throws ServiceException;
+
+  public List<IndexDescProto> getIndexes(final String tableName) throws 
ServiceException;
+
+  public boolean hasIndexes(final String tableName) throws ServiceException;
+
+  public IndexDescProto getIndex(final String tableName, final String[] 
columnNames) throws ServiceException;
+
+  public boolean existIndex(final String tableName, final String[] columnName) 
throws ServiceException;
+
+  public boolean dropIndex(final String indexName) throws ServiceException;
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
----------------------------------------------------------------------
diff --git 
a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java 
b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
index 496161d..17fdb25 100644
--- 
a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
+++ 
b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
@@ -21,13 +21,12 @@ package org.apache.tajo.client;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.annotation.Nullable;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
 import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.ClientProtos.*;
 import org.apache.tajo.ipc.TajoMasterClientProtocol;
 import org.apache.tajo.jdbc.SQLStates;
 import org.apache.tajo.rpc.NettyClientBase;
@@ -37,7 +36,6 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.util.List;
 
-import static 
org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService;
 import static 
org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService.BlockingInterface;
 
 public class CatalogAdminClientImpl implements CatalogAdminClient {
@@ -152,10 +150,10 @@ public class CatalogAdminClientImpl implements 
CatalogAdminClient {
           builder.setPartition(partitionMethodDesc.getProto());
         }
         ClientProtos.TableResponse res = 
tajoMasterService.createExternalTable(null, builder.build());
-        if (res.getResultCode() == ClientProtos.ResultCode.OK) {
+        if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
           return CatalogUtil.newTableDesc(res.getTableDesc());
         } else {
-          throw new SQLException(res.getErrorMessage(), 
SQLStates.ER_NO_SUCH_TABLE.getState());
+          throw new SQLException(res.getResult().getErrorMessage(), 
SQLStates.ER_NO_SUCH_TABLE.getState());
         }
       }
 
@@ -226,10 +224,10 @@ public class CatalogAdminClientImpl implements 
CatalogAdminClient {
         builder.setSessionId(connection.sessionId);
         builder.setTableName(tableName);
         ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, 
builder.build());
-        if (res.getResultCode() == ClientProtos.ResultCode.OK) {
+        if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
           return CatalogUtil.newTableDesc(res.getTableDesc());
         } else {
-          throw new SQLException(res.getErrorMessage(), 
SQLStates.ER_NO_SUCH_TABLE.getState());
+          throw new SQLException(res.getResult().getErrorMessage(), 
SQLStates.ER_NO_SUCH_TABLE.getState());
         }
       }
 
@@ -250,10 +248,10 @@ public class CatalogAdminClientImpl implements 
CatalogAdminClient {
         String paramFunctionName = functionName == null ? "" : functionName;
         ClientProtos.FunctionResponse res = 
tajoMasterService.getFunctionList(null,
             connection.convertSessionedString(paramFunctionName));
-        if (res.getResultCode() == ClientProtos.ResultCode.OK) {
+        if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
           return res.getFunctionsList();
         } else {
-          throw new SQLException(res.getErrorMessage());
+          throw new SQLException(res.getResult().getErrorMessage());
         }
       }
 
@@ -261,6 +259,124 @@ public class CatalogAdminClientImpl implements 
CatalogAdminClient {
   }
 
   @Override
+  public IndexDescProto getIndex(final String indexName) throws 
ServiceException {
+    return new ServerCallable<IndexDescProto>(connection.connPool,
+        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, 
true) {
+
+      @Override
+      public IndexDescProto call(NettyClientBase client) throws Exception {
+        BlockingInterface tajoMasterService = client.getStub();
+        return tajoMasterService.getIndexWithName(null,
+            connection.convertSessionedString(indexName));
+      }
+    }.withRetries();
+  }
+
+  @Override
+  public boolean existIndex(final String indexName) throws ServiceException {
+    return new ServerCallable<Boolean>(connection.connPool,
+        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, 
true) {
+
+      @Override
+      public Boolean call(NettyClientBase client) throws Exception {
+        BlockingInterface tajoMasterService = client.getStub();
+        return tajoMasterService.existIndexWithName(null,
+            connection.convertSessionedString(indexName)).getValue();
+      }
+    }.withRetries();
+  }
+
+  @Override
+  public List<IndexDescProto> getIndexes(final String tableName) throws 
ServiceException {
+    return new ServerCallable<List<IndexDescProto>>(connection.connPool,
+        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, 
true) {
+
+      @Override
+      public List<IndexDescProto> call(NettyClientBase client) throws 
Exception {
+        BlockingInterface tajoMasterService = client.getStub();
+        GetIndexesResponse response = 
tajoMasterService.getIndexesForTable(null,
+            connection.convertSessionedString(tableName));
+        if (response.getResult().getResultCode() == ResultCode.OK) {
+          return response.getIndexesList();
+        } else {
+          throw new SQLException(response.getResult().getErrorMessage());
+        }
+      }
+    }.withRetries();
+  }
+
+  @Override
+  public boolean hasIndexes(final String tableName) throws ServiceException {
+    return new ServerCallable<Boolean>(connection.connPool,
+        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, 
true) {
+
+      @Override
+      public Boolean call(NettyClientBase client) throws Exception {
+        BlockingInterface tajoMasterService = client.getStub();
+        return tajoMasterService.existIndexesForTable(null,
+            connection.convertSessionedString(tableName)).getValue();
+      }
+    }.withRetries();
+  }
+
+  @Override
+  public IndexDescProto getIndex(final String tableName, final String[] 
columnNames) throws ServiceException {
+    return new ServerCallable<IndexDescProto>(connection.connPool,
+        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, 
true) {
+
+      @Override
+      public IndexDescProto call(NettyClientBase client) throws Exception {
+        BlockingInterface tajoMasterService = client.getStub();
+        GetIndexWithColumnsRequest.Builder builder = 
GetIndexWithColumnsRequest.newBuilder();
+        builder.setSessionId(connection.sessionId);
+        builder.setTableName(tableName);
+        for (String eachColumnName : columnNames) {
+          builder.addColumnNames(eachColumnName);
+        }
+        GetIndexWithColumnsResponse response = 
tajoMasterService.getIndexWithColumns(null, builder.build());
+        if (response.getResult().getResultCode() == ResultCode.OK) {
+          return response.getIndexDesc();
+        } else {
+          throw new SQLException(response.getResult().getErrorMessage());
+        }
+      }
+    }.withRetries();
+  }
+
+  @Override
+  public boolean existIndex(final String tableName, final String[] columnName) 
throws ServiceException {
+    return new ServerCallable<Boolean>(connection.connPool,
+        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, 
true) {
+
+      @Override
+      public Boolean call(NettyClientBase client) throws Exception {
+        BlockingInterface tajoMasterService = client.getStub();
+        GetIndexWithColumnsRequest.Builder builder = 
GetIndexWithColumnsRequest.newBuilder();
+        builder.setSessionId(connection.sessionId);
+        builder.setTableName(tableName);
+        for (String eachColumnName : columnName) {
+          builder.addColumnNames(eachColumnName);
+        }
+        return tajoMasterService.existIndexWithColumns(null, 
builder.build()).getValue();
+      }
+    }.withRetries();
+  }
+
+  @Override
+  public boolean dropIndex(final String indexName) throws ServiceException {
+    return new ServerCallable<Boolean>(connection.connPool,
+        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, 
true) {
+
+      @Override
+      public Boolean call(NettyClientBase client) throws Exception {
+        BlockingInterface tajoMasterService = client.getStub();
+        return tajoMasterService.dropIndex(null,
+            connection.convertSessionedString(indexName)).getValue();
+      }
+    }.withRetries();
+  }
+
+  @Override
   public void close() throws IOException {
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
----------------------------------------------------------------------
diff --git 
a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java 
b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
index bab3518..c643679 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
@@ -170,7 +170,7 @@ public class QueryClientImpl implements QueryClient {
 
 
         SubmitQueryResponse response = tajoMasterService.submitQuery(null, 
builder.build());
-        if (response.getResultCode() == ResultCode.OK) {
+        if (response.getResult().getResultCode() == ResultCode.OK) {
           
connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
         }
         return response;
@@ -205,11 +205,11 @@ public class QueryClientImpl implements QueryClient {
 
     ClientProtos.SubmitQueryResponse response = executeQuery(sql);
 
-    if (response.getResultCode() == ClientProtos.ResultCode.ERROR) {
-      if (response.hasErrorMessage()) {
-        throw new ServiceException(response.getErrorMessage());
-      } else if (response.hasErrorTrace()) {
-        throw new ServiceException(response.getErrorTrace());
+    if (response.getResult().getResultCode() == ClientProtos.ResultCode.ERROR) 
{
+      if (response.getResult().hasErrorMessage()) {
+        throw new ServiceException(response.getResult().getErrorMessage());
+      } else if (response.getResult().hasErrorTrace()) {
+        throw new ServiceException(response.getResult().getErrorTrace());
       }
     }
 
@@ -241,8 +241,8 @@ public class QueryClientImpl implements QueryClient {
 
     ClientProtos.SubmitQueryResponse response = executeQueryWithJson(json);
 
-    if (response.getResultCode() == ClientProtos.ResultCode.ERROR) {
-      throw new ServiceException(response.getErrorTrace());
+    if (response.getResult().getResultCode() == ClientProtos.ResultCode.ERROR) 
{
+      throw new ServiceException(response.getResult().getErrorTrace());
     }
 
     QueryId queryId = new QueryId(response.getQueryId());
@@ -391,8 +391,8 @@ public class QueryClientImpl implements QueryClient {
               builder.setFetchRowNum(fetchRowNum);
               try {
                 GetQueryResultDataResponse response = 
tajoMasterService.getQueryResultData(null, builder.build());
-                if (response.getResultCode() == ClientProtos.ResultCode.ERROR) 
{
-                  throw new ServiceException(response.getErrorTrace());
+                if (response.getResult().getResultCode() == 
ClientProtos.ResultCode.ERROR) {
+                  throw new 
ServiceException(response.getResult().getErrorTrace());
                 }
 
                 return response.getResultSet();
@@ -434,12 +434,12 @@ public class QueryClientImpl implements QueryClient {
         builder.setIsJson(false);
         ClientProtos.UpdateQueryResponse response = 
tajoMasterService.updateQuery(null, builder.build());
 
-        if (response.getResultCode() == ClientProtos.ResultCode.OK) {
+        if (response.getResult().getResultCode() == 
ClientProtos.ResultCode.OK) {
           
connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
           return true;
         } else {
-          if (response.hasErrorMessage()) {
-            System.err.println("ERROR: " + response.getErrorMessage());
+          if (response.getResult().hasErrorMessage()) {
+            System.err.println("ERROR: " + 
response.getResult().getErrorMessage());
           }
           return false;
         }
@@ -463,11 +463,11 @@ public class QueryClientImpl implements QueryClient {
         builder.setQuery(json);
         builder.setIsJson(true);
         ClientProtos.UpdateQueryResponse response = 
tajoMasterService.updateQuery(null, builder.build());
-        if (response.getResultCode() == ClientProtos.ResultCode.OK) {
+        if (response.getResult().getResultCode() == 
ClientProtos.ResultCode.OK) {
           return true;
         } else {
-          if (response.hasErrorMessage()) {
-            System.err.println("ERROR: " + response.getErrorMessage());
+          if (response.getResult().hasErrorMessage()) {
+            System.err.println("ERROR: " + 
response.getResult().getErrorMessage());
           }
           return false;
         }
@@ -588,11 +588,11 @@ public class QueryClientImpl implements QueryClient {
 
         TajoMasterClientProtocolService.BlockingInterface tajoMasterService = 
client.getStub();
         GetQueryInfoResponse res = 
tajoMasterService.getQueryInfo(null,builder.build());
-        if (res.getResultCode() == ResultCode.OK) {
+        if (res.getResult().getResultCode() == ResultCode.OK) {
           return res.getQueryInfo();
         } else {
           abort();
-          throw new ServiceException(res.getErrorMessage());
+          throw new ServiceException(res.getResult().getErrorMessage());
         }
       }
     }.withRetries();
@@ -618,11 +618,11 @@ public class QueryClientImpl implements QueryClient {
 
         QueryMasterClientProtocolService.BlockingInterface queryMasterService 
= client.getStub();
         GetQueryHistoryResponse res = 
queryMasterService.getQueryHistory(null,builder.build());
-        if (res.getResultCode() == ResultCode.OK) {
+        if (res.getResult().getResultCode() == ResultCode.OK) {
           return res.getQueryHistory();
         } else {
           abort();
-          throw new ServiceException(res.getErrorMessage());
+          throw new ServiceException(res.getResult().getErrorMessage());
         }
       }
     }.withRetries();

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-client/src/main/java/org/apache/tajo/client/QueryStatus.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryStatus.java 
b/tajo-client/src/main/java/org/apache/tajo/client/QueryStatus.java
index 4a38934..486f95c 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/QueryStatus.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryStatus.java
@@ -41,11 +41,11 @@ public class QueryStatus {
     submitTime = proto.getSubmitTime();
     finishTime = proto.getFinishTime();
     hasResult = proto.getHasResult();
-    if (proto.hasErrorMessage()) {
-      errorText = proto.getErrorMessage();
+    if (proto.getResult().hasErrorMessage()) {
+      errorText = proto.getResult().getErrorMessage();
     }
-    if (proto.hasErrorTrace()) {
-      errorTrace = proto.getErrorTrace();
+    if (proto.getResult().hasErrorTrace()) {
+      errorTrace = proto.getResult().getErrorTrace();
     }
 
     queryMasterHost = proto.getQueryMasterHost();

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git 
a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java 
b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index 1bc8050..0f82ae8 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -184,11 +184,11 @@ public class SessionConnection implements Closeable {
 
         SessionUpdateResponse response = 
tajoMasterService.updateSessionVariables(null, request);
 
-        if (response.getResultCode() == ResultCode.OK) {
+        if (response.getResult().getResultCode() == ResultCode.OK) {
           
updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
           return Collections.unmodifiableMap(sessionVarsCache);
         } else {
-          throw new ServiceException(response.getMessage());
+          throw new ServiceException(response.getResult().getErrorMessage());
         }
       }
     }.withRetries();
@@ -207,11 +207,11 @@ public class SessionConnection implements Closeable {
 
         SessionUpdateResponse response = 
tajoMasterService.updateSessionVariables(null, request);
 
-        if (response.getResultCode() == ResultCode.OK) {
+        if (response.getResult().getResultCode() == ResultCode.OK) {
           
updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
           return Collections.unmodifiableMap(sessionVarsCache);
         } else {
-          throw new ServiceException(response.getMessage());
+          throw new ServiceException(response.getResult().getErrorMessage());
         }
       }
     }.withRetries();
@@ -334,7 +334,7 @@ public class SessionConnection implements Closeable {
 
       CreateSessionResponse response = tajoMasterService.createSession(null, 
builder.build());
 
-      if (response.getResultCode() == ResultCode.OK) {
+      if (response.getResult().getResultCode() == ResultCode.OK) {
 
         sessionId = response.getSessionId();
         
updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
@@ -343,7 +343,7 @@ public class SessionConnection implements Closeable {
         }
 
       } else {
-        throw new InvalidClientSessionException(response.getMessage());
+        throw new 
InvalidClientSessionException(response.getResult().getErrorMessage());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
----------------------------------------------------------------------
diff --git 
a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java 
b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
index 8eafc91..9522746 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
@@ -26,11 +26,13 @@ import org.apache.hadoop.fs.Path;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.annotation.ThreadSafe;
+import org.apache.tajo.catalog.IndexMeta;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.ha.HAServiceUtil;
@@ -237,4 +239,39 @@ public class TajoClientImpl extends SessionConnection 
implements TajoClient, Que
   public List<CatalogProtos.FunctionDescProto> getFunctions(final String 
functionName) throws ServiceException {
     return catalogClient.getFunctions(functionName);
   }
+
+  @Override
+  public IndexDescProto getIndex(String indexName) throws ServiceException {
+    return catalogClient.getIndex(indexName);
+  }
+
+  @Override
+  public boolean existIndex(String indexName) throws ServiceException {
+    return catalogClient.existIndex(indexName);
+  }
+
+  @Override
+  public List<IndexDescProto> getIndexes(String tableName) throws 
ServiceException {
+    return catalogClient.getIndexes(tableName);
+  }
+
+  @Override
+  public boolean hasIndexes(String tableName) throws ServiceException {
+    return catalogClient.hasIndexes(tableName);
+  }
+
+  @Override
+  public IndexDescProto getIndex(String tableName, String[] columnNames) 
throws ServiceException {
+    return catalogClient.getIndex(tableName, columnNames);
+  }
+
+  @Override
+  public boolean existIndex(String tableName, String[] columnName) throws 
ServiceException {
+    return catalogClient.existIndex(tableName, columnName);
+  }
+
+  @Override
+  public boolean dropIndex(String indexName) throws ServiceException {
+    return catalogClient.dropIndex(indexName);
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-client/src/main/proto/ClientProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/proto/ClientProtos.proto 
b/tajo-client/src/main/proto/ClientProtos.proto
index a9f5498..0454901 100644
--- a/tajo-client/src/main/proto/ClientProtos.proto
+++ b/tajo-client/src/main/proto/ClientProtos.proto
@@ -31,16 +31,21 @@ enum ResultCode {
   ERROR = 1;
 }
 
+message RequestResult {
+  required ResultCode resultCode = 1;
+  optional string errorMessage = 2;
+  optional string errorTrace = 3;
+}
+
 message CreateSessionRequest {
   required string username = 1;
   optional string baseDatabaseName = 2;
 }
 
 message CreateSessionResponse {
-  required ResultCode resultCode = 1;
+  required RequestResult result = 1;
   optional SessionIdProto sessionId = 2;
   optional KeyValueSetProto sessionVars = 3;
-  optional string message = 4;
 }
 
 message UpdateSessionVariableRequest {
@@ -50,9 +55,8 @@ message UpdateSessionVariableRequest {
 }
 
 message SessionUpdateResponse {
-  required ResultCode resultCode = 1;
+  required RequestResult result = 1;
   optional KeyValueSetProto sessionVars = 2;
-  optional string message = 3;
 }
 
 message SessionedStringProto {
@@ -61,9 +65,8 @@ message SessionedStringProto {
 }
 
 message ExplainQueryResponse {
-  required ResultCode resultCode = 1;
+  required RequestResult result = 1;
   optional string explain = 2;
-  optional string errorMessage = 3;
 }
 
 message QueryRequest {
@@ -74,9 +77,8 @@ message QueryRequest {
 }
 
 message UpdateQueryResponse {
-  required ResultCode resultCode = 1;
-  optional string errorMessage = 2;
-  optional KeyValueSetProto sessionVars = 3;
+  required RequestResult result = 1;
+  optional KeyValueSetProto sessionVars = 2;
 }
 
 message GetQueryResultRequest {
@@ -126,7 +128,7 @@ message SerializedResultSet {
 }
 
 message SubmitQueryResponse {
-  required ResultCode resultCode = 1;
+  required RequestResult result = 1;
   required QueryIdProto queryId = 2;
   required string userName = 3;
   optional bool isForwarded = 4 [default = false];
@@ -138,24 +140,19 @@ message SubmitQueryResponse {
   optional TableDescProto tableDesc = 8;
   optional int32 maxRowNum = 9;
 
-  optional string errorMessage = 10;
-  optional string errorTrace = 11;
-
-  optional KeyValueSetProto sessionVars = 12;
+  optional KeyValueSetProto sessionVars = 10;
 }
 
 message GetQueryStatusResponse {
-  required ResultCode resultCode = 1;
+  required RequestResult result = 1;
   required QueryIdProto queryId = 2;
   optional QueryState state = 3;
   optional float progress = 4;
   optional int64 submitTime = 5;
   optional int64 finishTime = 7;
   optional bool hasResult = 8;
-  optional string errorMessage = 9;
-  optional string errorTrace = 10;
-  optional string queryMasterHost = 11;
-  optional int32 queryMasterPort = 12;
+  optional string queryMasterHost = 9;
+  optional int32 queryMasterPort = 10;
 }
 
 message GetQueryResultDataRequest {
@@ -165,10 +162,8 @@ message GetQueryResultDataRequest {
 }
 
 message GetQueryResultDataResponse {
-  required ResultCode resultCode = 1;
+  required RequestResult result = 1;
   required SerializedResultSet resultSet = 2;
-  optional string errorMessage = 3;
-  optional string errorTrace = 4;
 }
 
 message GetClusterInfoRequest {
@@ -228,15 +223,13 @@ message DropTableRequest {
 }
 
 message TableResponse {
-  required ResultCode resultCode = 1;
+  required RequestResult result = 1;
   optional TableDescProto tableDesc = 2;
-  optional string errorMessage = 3;
 }
 
 message FunctionResponse {
-  required ResultCode resultCode = 1;
+  required RequestResult result = 1;
   repeated FunctionDescProto functions = 2;
-  optional string errorMessage = 3;
 }
 
 message QueryInfoProto {
@@ -289,14 +282,32 @@ message QueryHistoryProto {
 }
 
 message GetQueryHistoryResponse {
-  required ResultCode resultCode = 1;
+  required RequestResult result = 1;
   optional QueryHistoryProto queryHistory = 2;
-  optional string errorMessage = 3;
 }
 
 message GetQueryInfoResponse {
-  required ResultCode resultCode = 1;
+  required RequestResult result = 1;
   optional QueryInfoProto queryInfo = 2;
-  optional string errorMessage = 3;
 }
 
+message CreateIndexResponse {
+  required RequestResult result = 1;
+  optional IndexDescProto indexDesc = 2;
+}
+
+message GetIndexesResponse {
+  required RequestResult result = 1;
+  repeated IndexDescProto indexes = 2;
+}
+
+message GetIndexWithColumnsRequest {
+  required SessionIdProto sessionId = 1;
+  required string tableName = 2;
+  repeated string columnNames = 3;
+}
+
+message GetIndexWithColumnsResponse {
+  required RequestResult result = 1;
+  optional IndexDescProto indexDesc = 2;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-client/src/main/proto/TajoMasterClientProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/proto/TajoMasterClientProtocol.proto 
b/tajo-client/src/main/proto/TajoMasterClientProtocol.proto
index 10ca268..586f9ab 100644
--- a/tajo-client/src/main/proto/TajoMasterClientProtocol.proto
+++ b/tajo-client/src/main/proto/TajoMasterClientProtocol.proto
@@ -68,4 +68,13 @@ service TajoMasterClientProtocolService {
   rpc getTableList(GetTableListRequest) returns (GetTableListResponse);
   rpc getTableDesc(GetTableDescRequest) returns (TableResponse);
   rpc getFunctionList(SessionedStringProto) returns (FunctionResponse);
+
+  // Index Management APIs
+  rpc getIndexWithName(SessionedStringProto) returns (IndexDescProto);
+  rpc existIndexWithName(SessionedStringProto) returns (BoolProto);
+  rpc getIndexesForTable(SessionedStringProto) returns (GetIndexesResponse);
+  rpc existIndexesForTable(SessionedStringProto) returns (BoolProto);
+  rpc getIndexWithColumns(GetIndexWithColumnsRequest) returns 
(GetIndexWithColumnsResponse);
+  rpc existIndexWithColumns(GetIndexWithColumnsRequest) returns (BoolProto);
+  rpc dropIndex(SessionedStringProto) returns (BoolProto);
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java 
b/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java
index b7a5da7..d65f4c3 100644
--- a/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java
@@ -171,7 +171,7 @@ public class OverridableConf extends KeyValueSet {
   }
 
   public float getFloat(ConfigKey key) {
-    return getLong(key, null);
+    return getFloat(key, null);
   }
 
   public void put(ConfigKey key, String val) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java 
b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
index d87bbef..be0639e 100644
--- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
+++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
@@ -123,6 +123,10 @@ public enum SessionVars implements ConfigKey {
   NULL_CHAR(ConfVars.$TEXT_NULL, "null char of text file output", DEFAULT),
   CODEGEN(ConfVars.$CODEGEN, "Runtime code generation enabled (experiment)", 
DEFAULT),
 
+  // for index
+  INDEX_ENABLED(ConfVars.$INDEX_ENABLED, "index scan enabled", DEFAULT),
+  INDEX_SELECTIVITY_THRESHOLD(ConfVars.$INDEX_SELECTIVITY_THRESHOLD, "the 
selectivity threshold for index scan", DEFAULT),
+
   // Behavior Control ---------------------------------------------------------
   ARITHABORT(ConfVars.$BEHAVIOR_ARITHMETIC_ABORT,
       "If true, a running query will be terminated when an overflow or 
divide-by-zero occurs.", DEFAULT),

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java 
b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index ab11ddd..9a3ec6b 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -319,6 +319,10 @@ public class TajoConf extends Configuration {
     $MAX_OUTPUT_FILE_SIZE("tajo.query.max-outfile-size-mb", 0), // zero means 
infinite
     $CODEGEN("tajo.executor.codegen.enabled", false), // Runtime code 
generation
 
+    // for index
+    $INDEX_ENABLED("tajo.query.index.enabled", false),
+    $INDEX_SELECTIVITY_THRESHOLD("tajo.query.index.selectivity.threshold", 
0.05f),
+
     // Client -----------------------------------------------------------------
     $CLIENT_SESSION_EXPIRY_TIME("tajo.client.session.expiry-time-sec", 3600), 
// default time is one hour.
 
@@ -498,7 +502,7 @@ public class TajoConf extends Configuration {
   }
 
   public static long getLongVar(Configuration conf, ConfVars var) {
-    assert (var.valClass == Long.class || var.valClass == Integer.class);
+    assert (var.valClass == Long.class || var.valClass == Integer.class || 
var.valClass == Float.class);
     if (var.valClass == Integer.class) {
       return conf.getInt(var.varname, var.defaultIntVal);
     } else {

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java 
b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
index a1de860..65d795d 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
@@ -21,6 +21,8 @@ package org.apache.tajo.util;
 import com.google.common.base.Objects;
 
 import java.lang.reflect.Array;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -299,4 +301,12 @@ public class TUtil {
     StackTraceElement element = ste[2 + depth];
     return element.getClassName() + ":" + element.getMethodName() + "(" + 
element.getLineNumber() +")";
   }
+
+  public static URI stringToURI(String str) {
+    try {
+      return new URI(str);
+    } catch (URISyntaxException e) {
+      throw new RuntimeException("Cannot convert " + str + " to the URI type", 
e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 
b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
index e73680d..5754040 100644
--- a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
+++ b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
@@ -77,10 +77,19 @@ schema_statement
   ;
 
 index_statement
+  : create_index_statement
+  | drop_index_statement
+  ;
+
+create_index_statement
   : CREATE (u=UNIQUE)? INDEX identifier ON table_name (method_specifier)?
     LEFT_PAREN sort_specifier_list RIGHT_PAREN param_clause? (where_clause)?
   ;
 
+drop_index_statement
+  : DROP INDEX index_name = identifier
+  ;
+
 database_definition
   : CREATE DATABASE (if_not_exists)? dbname = identifier
   ;

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/engine/codegen/ExecutorPreCompiler.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/ExecutorPreCompiler.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/ExecutorPreCompiler.java
index 79513dc..4199b00 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/ExecutorPreCompiler.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/ExecutorPreCompiler.java
@@ -214,4 +214,11 @@ public class ExecutorPreCompiler extends 
BasicLogicalPlanVisitor<ExecutorPreComp
 
     return node;
   }
+
+  @Override
+  public LogicalNode visitIndexScan(CompilationContext context, LogicalPlan 
plan, LogicalPlan.QueryBlock block,
+                                    IndexScanNode node, Stack<LogicalNode> 
stack) throws PlanningException {
+    visitScan(context, plan, block, node, stack);
+    return node;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
index d0db2b0..aea5a59 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
@@ -1187,7 +1187,7 @@ public class SQLAnalyzer extends 
SQLParserBaseVisitor<Expr> {
   }
 
   @Override
-  public Expr visitIndex_statement(SQLParser.Index_statementContext ctx) {
+  public Expr 
visitCreate_index_statement(SQLParser.Create_index_statementContext ctx) {
     String indexName = ctx.identifier().getText();
     String tableName = ctx.table_name().getText();
     Relation relation = new Relation(tableName);
@@ -1223,6 +1223,12 @@ public class SQLAnalyzer extends 
SQLParserBaseVisitor<Expr> {
   }
 
   @Override
+  public Expr visitDrop_index_statement(SQLParser.Drop_index_statementContext 
ctx) {
+    String indexName = ctx.identifier().getText();
+    return new DropIndex(indexName);
+  }
+
+  @Override
   public Expr visitDatabase_definition(@NotNull 
SQLParser.Database_definitionContext ctx) {
     return new CreateDatabase(ctx.identifier().getText(), null, 
checkIfExist(ctx.if_not_exists()));
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 3218e15..17a9bb1 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -30,10 +30,12 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
 import org.apache.tajo.plan.serder.LogicalNodeDeserializer;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
@@ -47,20 +49,18 @@ import 
org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAg
 import 
org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.rewrite.rules.IndexScanInfo.SimplePredicate;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.IndexUtil;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Stack;
+import java.util.*;
 
 import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionType;
@@ -233,7 +233,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner 
{
         return new LimitExec(ctx, limitNode.getInSchema(),
             limitNode.getOutSchema(), leftExec, limitNode);
 
-      case BST_INDEX_SCAN:
+      case INDEX_SCAN:
         IndexScanNode indexScanNode = (IndexScanNode) logicalNode;
         leftExec = createIndexScanExec(ctx, indexScanNode);
         return leftExec;
@@ -1187,15 +1187,8 @@ public class PhysicalPlannerImpl implements 
PhysicalPlanner {
     List<FileFragment> fragments =
         FragmentConvertor.convert(ctx.getConf(), fragmentProtos);
 
-    String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0), 
annotation.getSortKeys());
-    FileStorageManager sm = 
(FileStorageManager)StorageManager.getFileStorageManager(ctx.getConf());
-    Path indexPath = new Path(sm.getTablePath(annotation.getTableName()), 
"index");
-
-    TupleComparator comp = new BaseTupleComparator(annotation.getKeySchema(),
-        annotation.getSortKeys());
-    return new BSTIndexScanExec(ctx, annotation, fragments.get(0), new 
Path(indexPath, indexName),
-        annotation.getKeySchema(), comp, annotation.getDatum());
-
+    return new BSTIndexScanExec(ctx, annotation, fragments.get(0), 
annotation.getIndexPath(),
+        annotation.getKeySchema(), annotation.getPredicates());
   }
 
   public static EnforceProperty getAlgorithmEnforceProperty(Enforcer enforcer, 
LogicalNode node) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 9b37a80..67f5691 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -1525,6 +1525,15 @@ public class GlobalPlanner {
     }
 
     @Override
+    public LogicalNode visitIndexScan(GlobalPlanContext context, LogicalPlan 
plan, LogicalPlan.QueryBlock block,
+                                      IndexScanNode node, Stack<LogicalNode> 
stack) throws PlanningException {
+      ExecutionBlock newBlock = context.plan.newExecutionBlock();
+      newBlock.setPlan(node);
+      context.execBlockMap.put(node.getPID(), newBlock);
+      return node;
+    }
+
+    @Override
     public LogicalNode visitPartitionedTableScan(GlobalPlanContext context, 
LogicalPlan plan,
                                                  LogicalPlan.QueryBlock block, 
PartitionedTableScanNode node,
                                                  Stack<LogicalNode> 
stack)throws PlanningException {

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index 6adc523..3e4c62d 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -18,23 +18,34 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.plan.Target;
 import org.apache.tajo.plan.expr.EvalNode;
-import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.plan.expr.EvalTreeUtil;
+import org.apache.tajo.plan.logical.IndexScanNode;
+import org.apache.tajo.plan.rewrite.rules.IndexScanInfo.SimplePredicate;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
+import java.net.URI;
+import java.util.Set;
 
 public class BSTIndexScanExec extends PhysicalExec {
-  private ScanNode scanNode;
+  private final static Log LOG = LogFactory.getLog(BSTIndexScanExec.class);
+  private IndexScanNode scanNode;
   private SeekableScanner fileScanner;
   
   private EvalNode qual;
@@ -42,31 +53,62 @@ public class BSTIndexScanExec extends PhysicalExec {
   
   private Projector projector;
   
-  private Datum[] datum = null;
-  
+  private Datum[] values = null;
+
   private boolean initialize = true;
 
   private float progress;
 
-  public BSTIndexScanExec(TaskAttemptContext context, ScanNode scanNode ,
-       FileFragment fragment, Path fileName , Schema keySchema,
-       TupleComparator comparator , Datum[] datum) throws IOException {
+  private TableStats inputStats;
+
+  public BSTIndexScanExec(TaskAttemptContext context,
+                          IndexScanNode scanNode ,
+       FileFragment fragment, URI indexPrefix , Schema keySchema,
+       SimplePredicate [] predicates) throws IOException {
     super(context, scanNode.getInSchema(), scanNode.getOutSchema());
     this.scanNode = scanNode;
     this.qual = scanNode.getQual();
-    this.datum = datum;
+
+    SortSpec[] keySortSpecs = new SortSpec[predicates.length];
+    values = new Datum[predicates.length];
+    for (int i = 0; i < predicates.length; i++) {
+      keySortSpecs[i] = predicates[i].getKeySortSpec();
+      values[i] = predicates[i].getValue();
+    }
+
+    TupleComparator comparator = new BaseTupleComparator(keySchema,
+        keySortSpecs);
+
+    Schema fileScanOutSchema = mergeSubSchemas(inSchema, keySchema, 
scanNode.getTargets(), qual);
 
     this.fileScanner = StorageManager.getSeekableScanner(context.getConf(),
-        scanNode.getTableDesc().getMeta(), scanNode.getInSchema(), fragment, 
outSchema);
+        scanNode.getTableDesc().getMeta(), inSchema, fragment, 
fileScanOutSchema);
     this.fileScanner.init();
     this.projector = new Projector(context, inSchema, outSchema, 
scanNode.getTargets());
 
-    FileSystem fs = fileName.getFileSystem(context.getConf());
-    this.reader = new BSTIndex(fs.getConf()).
-        getIndexReader(fileName, keySchema, comparator);
+    Path indexPath = new Path(indexPrefix.toString(), 
context.getUniqueKeyFromFragments());
+    this.reader = new BSTIndex(context.getConf()).
+        getIndexReader(indexPath, keySchema, comparator);
     this.reader.open();
   }
 
+  private static Schema mergeSubSchemas(Schema originalSchema, Schema 
subSchema, Target[] targets, EvalNode qual) {
+    Schema mergedSchema = new Schema();
+    Set<Column> qualAndTargets = TUtil.newHashSet();
+    qualAndTargets.addAll(EvalTreeUtil.findUniqueColumns(qual));
+    for (Target target : targets) {
+      
qualAndTargets.addAll(EvalTreeUtil.findUniqueColumns(target.getEvalTree()));
+    }
+    for (Column column : originalSchema.getColumns()) {
+      if (subSchema.contains(column)
+          || qualAndTargets.contains(column)
+          || qualAndTargets.contains(column)) {
+        mergedSchema.addColumn(column);
+      }
+    }
+    return mergedSchema;
+  }
+
   @Override
   public void init() throws IOException {
     progress = 0.0f;
@@ -76,8 +118,8 @@ public class BSTIndexScanExec extends PhysicalExec {
   public Tuple next() throws IOException {
     if(initialize) {
       //TODO : more complicated condition
-      Tuple key = new VTuple(datum.length);
-      key.put(datum);
+      Tuple key = new VTuple(values.length);
+      key.put(values);
       long offset = reader.find(key);
       if (offset == -1) {
         reader.close();
@@ -116,8 +158,11 @@ public class BSTIndexScanExec extends PhysicalExec {
            return outTuple;
          } else {
            long offset = reader.next();
-           if (offset == -1) return null;
+           if (offset == -1) {
+             return null;
+           }
            else fileScanner.seek(offset);
+           return null;
          }
        }
      }
@@ -132,6 +177,16 @@ public class BSTIndexScanExec extends PhysicalExec {
   @Override
   public void close() throws IOException {
     IOUtils.cleanup(null, reader, fileScanner);
+    if (fileScanner != null) {
+      try {
+        TableStats stats = fileScanner.getInputStats();
+        if (stats != null) {
+          inputStats = (TableStats) stats.clone();
+        }
+      } catch (CloneNotSupportedException e) {
+        e.printStackTrace();
+      }
+    }
     reader = null;
     fileScanner = null;
     scanNode = null;
@@ -143,4 +198,13 @@ public class BSTIndexScanExec extends PhysicalExec {
   public float getProgress() {
     return progress;
   }
+
+  @Override
+  public TableStats getInputStats() {
+    if (fileScanner != null) {
+      return fileScanner.getInputStats();
+    } else {
+      return inputStats;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
index 72a667d..9594b58 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
@@ -58,6 +58,7 @@ public class ProjectionExec extends UnaryPhysicalExec {
     }
 
     projector.eval(tuple, outTuple);
+    outTuple.setOffset(tuple.getOffset());
     return outTuple;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
index 0592217..f9db842 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
@@ -34,6 +35,7 @@ import 
org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
+import java.util.Arrays;
 
 public class StoreIndexExec extends UnaryPhysicalExec {
   private static final Log LOG = LogFactory.getLog(StoreIndexExec.class);
@@ -53,7 +55,7 @@ public class StoreIndexExec extends UnaryPhysicalExec {
   public void init() throws IOException {
     super.init();
 
-    SortSpec[] sortSpecs = logicalPlan.getSortSpecs();
+    SortSpec[] sortSpecs = logicalPlan.getKeySortSpecs();
     indexKeys = new int[sortSpecs.length];
     keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs);
 
@@ -64,8 +66,7 @@ public class StoreIndexExec extends UnaryPhysicalExec {
     }
 
     TajoConf conf = context.getConf();
-    Path indexPath = new Path(logicalPlan.getIndexPath(), 
""+context.getUniqueKeyFromFragments());
-    System.out.println("exec: " + indexPath);
+    Path indexPath = new Path(logicalPlan.getIndexPath().toString(), 
context.getUniqueKeyFromFragments());
     // TODO: Create factory using reflection
     BSTIndex bst = new BSTIndex(conf);
     this.comparator = new BaseTupleComparator(keySchema, sortSpecs);
@@ -78,17 +79,13 @@ public class StoreIndexExec extends UnaryPhysicalExec {
   public Tuple next() throws IOException {
     Tuple tuple;
     Tuple keyTuple;
-    Tuple prevKeyTuple = null;
     long offset;
 
     while((tuple = child.next()) != null) {
       offset = tuple.getOffset();
       keyTuple = new VTuple(keySchema.size());
       RowStoreUtil.project(tuple, keyTuple, indexKeys);
-      if (prevKeyTuple == null || !prevKeyTuple.equals(keyTuple)) {
-        indexWriter.write(keyTuple, offset);
-        prevKeyTuple = keyTuple;
-      }
+      indexWriter.write(keyTuple, offset);
     }
     return null;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java
index 29dc845..709aa81 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java
@@ -22,6 +22,7 @@ import org.apache.tajo.OverridableConf;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext;
 
 @SuppressWarnings("unused")
 public class ErrorInjectionRewriter implements LogicalPlanRewriteRule {
@@ -31,12 +32,12 @@ public class ErrorInjectionRewriter implements 
LogicalPlanRewriteRule {
   }
 
   @Override
-  public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) {
+  public boolean isEligible(LogicalPlanRewriteRuleContext context) {
     return true;
   }
 
   @Override
-  public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) 
throws PlanningException {
+  public LogicalPlan rewrite(LogicalPlanRewriteRuleContext context) throws 
PlanningException {
     throw new NullPointerException();
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java 
b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 51964f0..6ba413c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -34,7 +34,7 @@ import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.ClientProtos.ResultCode;
 import org.apache.tajo.master.TajoMaster.MasterContext;
 import org.apache.tajo.master.exec.DDLExecutor;
 import org.apache.tajo.master.exec.QueryExecutor;
@@ -51,6 +51,7 @@ import org.apache.tajo.plan.verifier.VerificationState;
 import org.apache.tajo.plan.verifier.VerifyException;
 import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.IPCUtil;
 
 import java.io.IOException;
 import java.sql.SQLException;
@@ -90,7 +91,8 @@ public class GlobalEngine extends AbstractService {
       analyzer = new SQLAnalyzer();
       preVerifier = new PreLogicalPlanVerifier(context.getCatalog());
       planner = new LogicalPlanner(context.getCatalog());
-      optimizer = new LogicalOptimizer(context.getConf());
+      // Access path rewriter is enabled only in QueryMasterTask
+      optimizer = new LogicalOptimizer(context.getConf(), 
context.getCatalog());
       annotatedPlanVerifier = new LogicalPlanVerifier(context.getConf(), 
context.getCatalog());
     } catch (Throwable t) {
       LOG.error(t.getMessage(), t);
@@ -169,13 +171,12 @@ public class GlobalEngine extends AbstractService {
       responseBuilder.setUserName(queryContext.get(SessionVars.USERNAME));
       responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
       responseBuilder.setIsForwarded(true);
-      responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR);
       String errorMessage = t.getMessage();
       if (t.getMessage() == null) {
         errorMessage = t.getClass().getName();
       }
-      responseBuilder.setErrorMessage(errorMessage);
-      responseBuilder.setErrorTrace(StringUtils.stringifyException(t));
+      responseBuilder.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR,
+          errorMessage, StringUtils.stringifyException(t)));
       return responseBuilder.build();
     }
   }

Reply via email to