TAJO-838: Improve query planner to utilize index. (jihoon)

Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/071c5d05
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/071c5d05
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/071c5d05

Branch: refs/heads/index_support
Commit: 071c5d05dff157e47d1bb02e79de88a9577c3346
Parents: 1fad72e
Author: Jihoon Son <[email protected]>
Authored: Fri Jan 9 01:13:52 2015 +0900
Committer: Jihoon Son <[email protected]>
Committed: Fri Jan 9 01:13:52 2015 +0900

----------------------------------------------------------------------
 .../java/org/apache/tajo/algebra/DropIndex.java |  54 ++++
 .../java/org/apache/tajo/algebra/OpType.java    |   1 +
 .../tajo/catalog/AbstractCatalogClient.java     | 120 +++++++--
 .../src/main/proto/CatalogProtocol.proto        |   6 +-
 .../apache/tajo/catalog/CatalogConstants.java   |   1 +
 .../org/apache/tajo/catalog/CatalogService.java |  24 +-
 .../org/apache/tajo/catalog/CatalogUtil.java    |  46 +++-
 .../java/org/apache/tajo/catalog/IndexDesc.java | 132 +++++-----
 .../java/org/apache/tajo/catalog/IndexMeta.java | 180 +++++++++++++
 .../java/org/apache/tajo/catalog/TableDesc.java |   4 -
 .../catalog/exception/NoSuchIndexException.java |   2 +-
 .../src/main/proto/CatalogProtos.proto          |  36 +--
 .../org/apache/tajo/catalog/TestIndexDesc.java  |  58 +++--
 .../tajo/catalog/store/HCatalogStore.java       |  62 +++--
 .../org/apache/tajo/catalog/CatalogServer.java  |  93 +++++--
 .../dictionary/IndexesTableDescriptor.java      |   8 +-
 .../tajo/catalog/store/AbstractDBStore.java     | 186 +++++++------
 .../apache/tajo/catalog/store/CatalogStore.java |  29 +--
 .../org/apache/tajo/catalog/store/MemStore.java | 120 +++++----
 .../src/main/resources/schemas/derby/derby.xml  |  27 +-
 .../main/resources/schemas/mariadb/indexes.sql  |  14 +-
 .../main/resources/schemas/mysql/indexes.sql    |  14 +-
 .../main/resources/schemas/oracle/indexes.sql   |  18 +-
 .../resources/schemas/postgresql/postgresql.xml |  29 ++-
 .../org/apache/tajo/catalog/TestCatalog.java    |  76 +++---
 .../java/org/apache/tajo/cli/tsql/TajoCli.java  |  12 +-
 .../cli/tsql/commands/DescTableCommand.java     |  19 ++
 .../apache/tajo/client/CatalogAdminClient.java  |  16 ++
 .../tajo/client/CatalogAdminClientImpl.java     | 138 +++++++++-
 .../org/apache/tajo/client/QueryClientImpl.java |  40 +--
 .../org/apache/tajo/client/QueryStatus.java     |   8 +-
 .../apache/tajo/client/SessionConnection.java   |  12 +-
 .../org/apache/tajo/client/TajoClientImpl.java  |  37 +++
 tajo-client/src/main/proto/ClientProtos.proto   |  71 ++---
 .../main/proto/TajoMasterClientProtocol.proto   |   9 +
 .../java/org/apache/tajo/OverridableConf.java   |   2 +-
 .../main/java/org/apache/tajo/SessionVars.java  |   4 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   6 +-
 .../main/java/org/apache/tajo/util/TUtil.java   |  10 +
 .../org/apache/tajo/engine/parser/SQLParser.g4  |   9 +
 .../engine/codegen/ExecutorPreCompiler.java     |   7 +
 .../apache/tajo/engine/parser/SQLAnalyzer.java  |   8 +-
 .../engine/planner/PhysicalPlannerImpl.java     |  21 +-
 .../engine/planner/global/GlobalPlanner.java    |   9 +
 .../planner/physical/BSTIndexScanExec.java      |  96 +++++--
 .../engine/planner/physical/ProjectionExec.java |   1 +
 .../engine/planner/physical/StoreIndexExec.java |  13 +-
 .../utils/test/ErrorInjectionRewriter.java      |   5 +-
 .../org/apache/tajo/master/GlobalEngine.java    |  11 +-
 .../NonForwardQueryResultSystemScanner.java     |  46 ++--
 .../tajo/master/TajoMasterClientService.java    | 258 +++++++++++++++----
 .../apache/tajo/master/exec/DDLExecutor.java    | 109 +++++---
 .../apache/tajo/master/exec/QueryExecutor.java  |  59 ++++-
 .../apache/tajo/master/querymaster/Query.java   |  45 ++++
 .../master/querymaster/QueryMasterTask.java     |  23 +-
 .../main/java/org/apache/tajo/util/IPCUtil.java |  44 ++++
 .../java/org/apache/tajo/util/IndexUtil.java    | 149 -----------
 .../tajo/webapp/QueryExecutorServlet.java       |   8 +-
 .../tajo/worker/TajoWorkerClientService.java    |  27 +-
 .../main/java/org/apache/tajo/worker/Task.java  |  29 +--
 .../apache/tajo/worker/TaskAttemptContext.java  |  13 +-
 .../apache/tajo/engine/eval/ExprTestBase.java   |   4 +-
 .../engine/planner/TestLogicalOptimizer.java    |   2 +-
 .../tajo/engine/planner/TestLogicalPlanner.java |   8 +-
 .../planner/global/TestBroadcastJoinPlan.java   |  28 +-
 .../planner/physical/TestBSTIndexExec.java      | 206 ---------------
 .../planner/physical/TestHashAntiJoinExec.java  |   6 +-
 .../planner/physical/TestHashSemiJoinExec.java  |   6 +-
 .../planner/physical/TestPhysicalPlanner.java   |   5 +-
 .../engine/planner/physical/TestSortExec.java   |   6 +-
 .../tajo/engine/query/TestCreateIndex.java      |  44 +++-
 .../apache/tajo/engine/query/TestIndexScan.java | 119 +++++++++
 .../tajo/engine/query/TestTablePartitions.java  |   8 +-
 .../tajo/master/TestExecutionBlockCursor.java   |   2 +-
 .../apache/tajo/master/TestGlobalPlanner.java   |   2 +-
 .../tajo/master/querymaster/TestKillQuery.java  |   2 +-
 .../tajo/worker/TestRangeRetrieverHandler.java  |   2 +-
 .../testCreateIndexOnMultiExprs.sql             |   1 +
 .../TestIndexScan/testOnMultipleExprs.result    |   3 +
 .../TestIndexScan/testOnMultipleKeys.result     |   3 +
 .../TestIndexScan/testOnMultipleKeys2.result    |   3 +
 .../testOnSortedNonUniqueKeys.result            |   4 +
 .../TestIndexScan/testOnUnsortedTextKeys.result |   3 +
 .../TestIndexScan/testWithGroupBy.result        |   3 +
 .../results/TestIndexScan/testWithJoin.result   |   4 +
 .../results/TestIndexScan/testWithSort.result   |   4 +
 .../TestTajoCli/testHelpSessionVars.result      |   2 +
 tajo-docs/src/main/sphinx/index.rst             |   1 +
 tajo-docs/src/main/sphinx/index/future_work.rst |   8 +
 tajo-docs/src/main/sphinx/index/how_to_use.rst  |  69 +++++
 tajo-docs/src/main/sphinx/index/types.rst       |   7 +
 tajo-docs/src/main/sphinx/index_overview.rst    |  20 ++
 tajo-docs/src/main/sphinx/sql_language/ddl.rst  |  33 ++-
 .../org/apache/tajo/plan/LogicalOptimizer.java  |  14 +-
 .../java/org/apache/tajo/plan/LogicalPlan.java  |  28 ++
 .../tajo/plan/LogicalPlanPreprocessor.java      |   8 +-
 .../org/apache/tajo/plan/LogicalPlanner.java    |  27 +-
 .../org/apache/tajo/plan/NamedExprsManager.java |   4 +-
 .../tajo/plan/algebra/AlgebraVisitor.java       |   1 +
 .../tajo/plan/algebra/BaseAlgebraVisitor.java   |   8 +
 .../apache/tajo/plan/expr/AlgebraicUtil.java    |  69 +++++
 .../org/apache/tajo/plan/expr/EvalTreeUtil.java |   6 +-
 .../tajo/plan/logical/CreateIndexNode.java      |  89 ++++---
 .../apache/tajo/plan/logical/DropIndexNode.java |  92 +++++++
 .../apache/tajo/plan/logical/IndexScanNode.java |  88 +++----
 .../org/apache/tajo/plan/logical/NodeType.java  |   3 +-
 .../rewrite/BaseLogicalPlanRewriteEngine.java   |  10 +-
 .../BaseLogicalPlanRewriteRuleProvider.java     |   4 +-
 .../plan/rewrite/LogicalPlanRewriteEngine.java  |   3 +-
 .../plan/rewrite/LogicalPlanRewriteRule.java    |   5 +-
 .../rewrite/LogicalPlanRewriteRuleContext.java  |  65 +++++
 .../tajo/plan/rewrite/rules/AccessPathInfo.java |  52 ++++
 .../plan/rewrite/rules/AccessPathRewriter.java  | 129 ++++++++++
 .../plan/rewrite/rules/FilterPushDownRule.java  |  98 ++++++-
 .../tajo/plan/rewrite/rules/IndexScanInfo.java  | 113 ++++++++
 .../rules/LogicalPlanEqualityTester.java        |   8 +-
 .../rewrite/rules/PartitionedTableRewriter.java |  10 +-
 .../rewrite/rules/ProjectionPushDownRule.java   |  16 +-
 .../tajo/plan/rewrite/rules/SeqScanInfo.java    |  43 ++++
 .../plan/serder/LogicalNodeDeserializer.java    |  71 ++++-
 .../tajo/plan/serder/LogicalNodeSerializer.java |  74 ++++++
 .../org/apache/tajo/plan/util/IndexUtil.java    |  72 ++++++
 .../org/apache/tajo/plan/util/PlannerUtil.java  |  35 ++-
 .../plan/visitor/BasicLogicalPlanVisitor.java   |  18 ++
 .../plan/visitor/ExplainLogicalPlanVisitor.java |   7 +
 .../tajo/plan/visitor/LogicalPlanVisitor.java   |   6 +
 tajo-plan/src/main/proto/Plan.proto             |  85 ++++--
 .../storage/hbase/AddSortForInsertRewriter.java |   8 +-
 128 files changed, 3414 insertions(+), 1287 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-algebra/src/main/java/org/apache/tajo/algebra/DropIndex.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/DropIndex.java 
b/tajo-algebra/src/main/java/org/apache/tajo/algebra/DropIndex.java
new file mode 100644
index 0000000..5a75e78
--- /dev/null
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/DropIndex.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.algebra;
+
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+
+public class DropIndex extends Expr {
+  @Expose @SerializedName("IndexName")
+  private String indexName;
+
+  public DropIndex(final String indexName) {
+    super(OpType.DropIndex);
+    this.indexName = indexName;
+  }
+
+  @Override
+  public int hashCode() {
+    return indexName.hashCode();
+  }
+
+  @Override
+  boolean equalsTo(Expr expr) {
+    DropIndex other = (DropIndex) expr;
+    return this.indexName.equals(other.indexName);
+  }
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    DropIndex clone = (DropIndex) super.clone();
+    clone.indexName = indexName;
+    return clone;
+  }
+
+  public String getIndexName() {
+    return indexName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-algebra/src/main/java/org/apache/tajo/algebra/OpType.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/OpType.java 
b/tajo-algebra/src/main/java/org/apache/tajo/algebra/OpType.java
index 32c51db..47fea64 100644
--- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/OpType.java
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/OpType.java
@@ -54,6 +54,7 @@ public enum OpType {
   AlterTablespace(AlterTablespace.class),
   AlterTable(AlterTable.class),
   CreateIndex(CreateIndex.class),
+  DropIndex(DropIndex.class),
   TruncateTable(TruncateTable.class),
 
   // Insert or Update

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
 
b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index 8ef1c9a..dc12614 100644
--- 
a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ 
b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@ -43,6 +43,7 @@ import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
 import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.util.ProtoUtil;
+import org.apache.tajo.util.TUtil;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -379,6 +380,24 @@ public abstract class AbstractCatalogClient implements 
CatalogService {
   }
 
   @Override
+  public List<IndexDescProto> getAllIndexes() {
+    try {
+      return new ServerCallable<List<IndexDescProto>>(pool, 
getCatalogServerAddr(), CatalogProtocol.class, false) {
+
+        @Override
+        public List<IndexDescProto> call(NettyClientBase client) throws 
Exception {
+          CatalogProtocolService.BlockingInterface stub = getStub(client);
+          GetIndexesProto response = stub.getAllIndexes(null, 
ProtoUtil.NULL_PROTO);
+          return response.getIndexList();
+        }
+      }.withRetries();
+    } catch (ServiceException e) {
+      LOG.error(e.getMessage(), e);
+      return null;
+    }
+  }
+
+  @Override
   public final PartitionMethodDesc getPartitionMethod(final String 
databaseName, final String tableName) {
     try {
       return new ServerCallable<PartitionMethodDesc>(this.pool, 
getCatalogServerAddr(), CatalogProtocol.class, false) {
@@ -582,17 +601,40 @@ public abstract class AbstractCatalogClient implements 
CatalogService {
   }
 
   @Override
-  public boolean existIndexByColumn(final String databaseName, final String 
tableName, final String columnName) {
+  public boolean existIndexByColumns(final String databaseName, final String 
tableName, final Column [] columns) {
+    return existIndexByColumnNames(databaseName, tableName, 
extractColumnNames(columns));
+  }
+
+  @Override
+  public boolean existIndexByColumnNames(final String databaseName, final 
String tableName, final String [] columnNames) {
     try {
       return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), 
CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
 
-          GetIndexByColumnRequest.Builder builder = 
GetIndexByColumnRequest.newBuilder();
+          GetIndexByColumnNamesRequest.Builder builder = 
GetIndexByColumnNamesRequest.newBuilder();
           
builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, 
tableName));
-          builder.setColumnName(columnName);
+          for (String colunName : columnNames) {
+            builder.addColumnNames(colunName);
+          }
 
           CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.existIndexByColumn(null, builder.build()).getValue();
+          return stub.existIndexByColumnNames(null, 
builder.build()).getValue();
+        }
+      }.withRetries();
+    } catch (ServiceException e) {
+      LOG.error(e.getMessage(), e);
+      return false;
+    }
+  }
+
+  @Override
+  public boolean existIndexesByTable(final String databaseName, final String 
tableName) {
+    try {
+      return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), 
CatalogProtocol.class, false) {
+        public Boolean call(NettyClientBase client) throws ServiceException {
+
+          CatalogProtocolService.BlockingInterface stub = getStub(client);
+          return stub.existIndexesByTable(null, 
CatalogUtil.buildTableIdentifier(databaseName, tableName)).getValue();
         }
       }.withRetries();
     } catch (ServiceException e) {
@@ -621,20 +663,60 @@ public abstract class AbstractCatalogClient implements 
CatalogService {
     }
   }
 
+  private static String[] extractColumnNames(Column[] columns) {
+    String[] columnNames = new String [columns.length];
+    for (int i = 0; i < columnNames.length; i++) {
+      columnNames[i] = columns[i].getSimpleName();
+    }
+    return columnNames;
+  }
+
+  @Override
+  public final IndexDesc getIndexByColumns(final String databaseName,
+                                               final String tableName,
+                                               final Column [] columns) {
+    return getIndexByColumnNames(databaseName, tableName, 
extractColumnNames(columns));
+  }
+
   @Override
-  public final IndexDesc getIndexByColumn(final String databaseName,
-                                          final String tableName,
-                                          final String columnName) {
+  public final IndexDesc getIndexByColumnNames(final String databaseName,
+                                           final String tableName,
+                                           final String [] columnNames) {
     try {
       return new ServerCallable<IndexDesc>(this.pool, getCatalogServerAddr(), 
CatalogProtocol.class, false) {
         public IndexDesc call(NettyClientBase client) throws ServiceException {
 
-          GetIndexByColumnRequest.Builder builder = 
GetIndexByColumnRequest.newBuilder();
+          GetIndexByColumnNamesRequest.Builder builder = 
GetIndexByColumnNamesRequest.newBuilder();
           
builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, 
tableName));
-          builder.setColumnName(columnName);
+          for (String columnName : columnNames) {
+            builder.addColumnNames(columnName);
+          }
 
           CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return new IndexDesc(stub.getIndexByColumn(null, builder.build()));
+          return new IndexDesc(stub.getIndexByColumnNames(null, 
builder.build()));
+        }
+      }.withRetries();
+    } catch (ServiceException e) {
+      LOG.error(e.getMessage(), e);
+      return null;
+    }
+  }
+
+  @Override
+  public final Collection<IndexDesc> getAllIndexesByTable(final String 
databaseName,
+                                                          final String 
tableName) {
+    try {
+      return new ServerCallable<Collection<IndexDesc>>(this.pool, 
getCatalogServerAddr(), CatalogProtocol.class, false) {
+        @Override
+        public Collection<IndexDesc> call(NettyClientBase client) throws 
Exception {
+          TableIdentifierProto proto = 
CatalogUtil.buildTableIdentifier(databaseName, tableName);
+          CatalogProtocolService.BlockingInterface stub = getStub(client);
+          GetAllIndexesResponse response = stub.getAllIndexesByTable(null, 
proto);
+          List<IndexDesc> indexDescs = TUtil.newList();
+          for (IndexDescProto descProto : response.getIndexDescList()) {
+            indexDescs.add(new IndexDesc(descProto));
+          }
+          return indexDescs;
         }
       }.withRetries();
     } catch (ServiceException e) {
@@ -665,24 +747,6 @@ public abstract class AbstractCatalogClient implements 
CatalogService {
   }
   
   @Override
-  public List<IndexProto> getAllIndexes() {
-    try {
-      return new ServerCallable<List<IndexProto>>(pool, 
getCatalogServerAddr(), CatalogProtocol.class, false) {
-
-        @Override
-        public List<IndexProto> call(NettyClientBase client) throws Exception {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          GetIndexesProto response = stub.getAllIndexes(null, 
ProtoUtil.NULL_PROTO);
-          return response.getIndexList();
-        }
-      }.withRetries();
-    } catch (ServiceException e) {
-      LOG.error(e.getMessage(), e);
-      return null;
-    }
-  }
-
-  @Override
   public final boolean createFunction(final FunctionDesc funcDesc) {
     try {
       return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), 
CatalogProtocol.class, false) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto 
b/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto
index cae5d88..cbe689f 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto
+++ b/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto
@@ -66,9 +66,11 @@ service CatalogProtocolService {
   rpc createIndex(IndexDescProto) returns (BoolProto);
   rpc dropIndex(IndexNameProto) returns (BoolProto);
   rpc existIndexByName(IndexNameProto) returns (BoolProto);
-  rpc existIndexByColumn(GetIndexByColumnRequest) returns (BoolProto);
+  rpc existIndexByColumnNames(GetIndexByColumnNamesRequest) returns 
(BoolProto);
+  rpc existIndexesByTable(TableIdentifierProto) returns (BoolProto);
   rpc getIndexByName(IndexNameProto) returns (IndexDescProto);
-  rpc getIndexByColumn(GetIndexByColumnRequest) returns (IndexDescProto);
+  rpc getIndexByColumnNames(GetIndexByColumnNamesRequest) returns 
(IndexDescProto);
+  rpc getAllIndexesByTable(TableIdentifierProto) returns 
(GetAllIndexesResponse);
   rpc getAllIndexes(NullProto) returns (GetIndexesProto);
 
   rpc createFunction(FunctionDescProto) returns (BoolProto);

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
 
b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
index 6ec52b9..c7df801 100644
--- 
a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
+++ 
b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
@@ -52,5 +52,6 @@ public class CatalogConstants {
   public static final String COL_TABLESPACE_PK = "SPACE_ID";
   public static final String COL_DATABASES_PK = "DB_ID";
   public static final String COL_TABLES_PK = "TID";
+  public static final String COL_INDEXES_PK = "INDEX_ID";
   public static final String COL_TABLES_NAME = "TABLE_NAME";
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java
 
b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java
index 2a5d890..dd26a27 100644
--- 
a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java
+++ 
b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java
@@ -19,13 +19,7 @@
 package org.apache.tajo.catalog;
 
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
-import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TablePartitionProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.*;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 
 import java.util.Collection;
@@ -154,6 +148,8 @@ public interface CatalogService {
    */
   List<ColumnProto> getAllColumns();
 
+  List<IndexDescProto> getAllIndexes();
+
   /**
    *
    * @return All FunctionDescs
@@ -190,15 +186,21 @@ public interface CatalogService {
 
   boolean existIndexByName(String databaseName, String indexName);
 
-  boolean existIndexByColumn(String databaseName, String tableName, String 
columnName);
+  boolean existIndexByColumns(String databaseName, String tableName, Column[] 
columns);
+
+  boolean existIndexByColumnNames(String databaseName, String tableName, 
String [] columnNames);
+
+  boolean existIndexesByTable(String databaseName, String tableName);
 
   IndexDesc getIndexByName(String databaseName, String indexName);
 
-  IndexDesc getIndexByColumn(String databaseName, String tableName, String 
columnName);
+  IndexDesc getIndexByColumns(String databaseName, String tableName, Column [] 
columns);
+
+  IndexDesc getIndexByColumnNames(String databaseName, String tableName, 
String [] columnNames);
+
+  Collection<IndexDesc> getAllIndexesByTable(String databaseName, String 
tableName);
 
   boolean dropIndex(String databaseName, String indexName);
-  
-  List<IndexProto> getAllIndexes();
 
   boolean createFunction(FunctionDesc funcDesc);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
 
b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
index bb15ec1..a593cd9 100644
--- 
a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
+++ 
b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
@@ -26,6 +26,7 @@ import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.catalog.proto.CatalogProtos.TableDescProto;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.common.TajoDataTypes.DataType;
@@ -39,12 +40,8 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
-import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import static org.apache.tajo.common.TajoDataTypes.Type;
 
 public class CatalogUtil {
@@ -868,4 +865,43 @@ public class CatalogUtil {
 
     return options;
   }
+
+  /**
+   * Make a unique name by concatenating column names.
+   * The concatenation is performed in sequence of columns' occurrence in the 
relation schema.
+   *
+   * @param originalSchema original relation schema
+   * @param columnNames column names which will be unified
+   * @return unified name
+   */
+  public static String getUnifiedSimpleColumnName(Schema originalSchema, 
String[] columnNames) {
+    String[] simpleNames = new String[columnNames.length];
+    for (int i = 0; i < simpleNames.length; i++) {
+      String[] identifiers = 
columnNames[i].split(CatalogConstants.IDENTIFIER_DELIMITER_REGEXP);
+      simpleNames[i] = identifiers[identifiers.length-1];
+    }
+    Arrays.sort(simpleNames, new ColumnPosComparator(originalSchema));
+    StringBuilder sb = new StringBuilder();
+    for (String colName : simpleNames) {
+      sb.append(colName).append("_");
+    }
+    sb.deleteCharAt(sb.length()-1);
+    return sb.toString();
+  }
+
+  /**
+   * Given column names, compare the position of columns in the relation 
schema.
+   */
+  public static class ColumnPosComparator implements Comparator<String> {
+
+    private Schema originlSchema;
+    public ColumnPosComparator(Schema originalSchema) {
+      this.originlSchema = originalSchema;
+    }
+
+    @Override
+    public int compare(String o1, String o2) {
+      return originlSchema.getColumnId(o1) - originlSchema.getColumnId(o2);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/IndexDesc.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/IndexDesc.java
 
b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/IndexDesc.java
index 7dca4eb..9f64913 100644
--- 
a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/IndexDesc.java
+++ 
b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/IndexDesc.java
@@ -21,80 +21,91 @@ package org.apache.tajo.catalog;
 import com.google.common.base.Objects;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
-import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod;
 import org.apache.tajo.common.ProtoObject;
 
-public class IndexDesc implements ProtoObject<IndexDescProto>, Cloneable {
-  private IndexDescProto.Builder builder;
+import java.net.URI;
+import java.net.URISyntaxException;
 
-  private String name;
-  private Path indexPath;            // required
+public class IndexDesc implements ProtoObject<IndexDescProto>, Cloneable {
   private String databaseName;         // required
   private String tableName;            // required
-  private Column column;               // required
-  private IndexMethod indexMethod;     // required
-  private boolean isUnique = false;    // optional [default = false]
-  private boolean isClustered = false; // optional [default = false]
-  private boolean isAscending = false; // optional [default = false]
-  
+  private IndexMeta indexMeta;         // required
+
   public IndexDesc() {
   }
   
-  public IndexDesc(String name, Path indexPath, String databaseName, String 
tableName, Column column,
-                   IndexMethod type,  boolean isUnique, boolean isClustered, 
boolean isAscending) {
+  public IndexDesc(String databaseName, String tableName, String indexName, 
URI indexPath, SortSpec[] keySortSpecs,
+                   IndexMethod type,  boolean isUnique, boolean isClustered, 
Schema targetRelationSchema) {
     this();
-    this.name = name;
-    this.indexPath = indexPath;
-    this.databaseName = databaseName;
-    this.tableName = tableName;
-    this.column = column;
-    this.indexMethod = type;
-    this.isUnique = isUnique;
-    this.isClustered = isClustered;
-    this.isAscending = isAscending;
+    this.set(databaseName, tableName, indexName, indexPath, keySortSpecs, 
type, isUnique, isClustered,
+        targetRelationSchema);
   }
   
   public IndexDesc(IndexDescProto proto) {
-    this(proto.getName(), new Path(proto.getIndexPath()),
-        proto.getTableIdentifier().getDatabaseName(),
-        proto.getTableIdentifier().getTableName(),
-        new Column(proto.getColumn()),
-        proto.getIndexMethod(), proto.getIsUnique(), proto.getIsClustered(), 
proto.getIsAscending());
+    this();
+
+    SortSpec[] keySortSpecs = new SortSpec[proto.getKeySortSpecsCount()];
+    for (int i = 0; i < keySortSpecs.length; i++) {
+      keySortSpecs[i] = new SortSpec(proto.getKeySortSpecs(i));
+    }
+
+    try {
+      this.set(proto.getTableIdentifier().getDatabaseName(),
+          proto.getTableIdentifier().getTableName(),
+          proto.getIndexName(), new URI(proto.getIndexPath()),
+          keySortSpecs,
+          proto.getIndexMethod(), proto.getIsUnique(), proto.getIsClustered(),
+          new Schema(proto.getTargetRelationSchema()));
+    } catch (URISyntaxException e) {
+      e.printStackTrace();
+    }
   }
 
-  public String getName() {
-    return name;
+  public void set(String databaseName, String tableName, String indexName, URI 
indexPath, SortSpec[] keySortSpecs,
+                  IndexMethod type,  boolean isUnique, boolean isClustered, 
Schema targetRelationSchema) {
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+    this.indexMeta = new IndexMeta(indexName, indexPath, keySortSpecs, type, 
isUnique, isClustered,
+        targetRelationSchema);
   }
-  
-  public Path getIndexPath() {
-    return indexPath;
+
+  public String getDatabaseName() {
+    return databaseName;
   }
-  
+
   public String getTableName() {
     return tableName;
   }
+
+  public String getName() {
+    return indexMeta.getIndexName();
+  }
   
-  public Column getColumn() {
-    return column;
+  public URI getIndexPath() {
+    return indexMeta.getIndexPath();
+  }
+
+  public SortSpec[] getKeySortSpecs() {
+    return indexMeta.getKeySortSpecs();
   }
   
   public IndexMethod getIndexMethod() {
-    return this.indexMethod;
+    return indexMeta.getIndexMethod();
   }
   
   public boolean isClustered() {
-    return this.isClustered;
+    return indexMeta.isClustered();
   }
   
   public boolean isUnique() {
-    return this.isUnique;
+    return indexMeta.isUnique();
   }
-  
-  public boolean isAscending() {
-    return this.isAscending;
+
+  public Schema getTargetRelationSchema() {
+    return indexMeta.getTargetRelationSchema();
   }
 
   @Override
@@ -110,13 +121,15 @@ public class IndexDesc implements 
ProtoObject<IndexDescProto>, Cloneable {
     }
 
     builder.setTableIdentifier(tableIdentifierBuilder.build());
-    builder.setName(this.name);
-    builder.setIndexPath(this.indexPath.toString());
-    builder.setColumn(this.column.getProto());
-    builder.setIndexMethod(indexMethod);
-    builder.setIsUnique(this.isUnique);
-    builder.setIsClustered(this.isClustered);
-    builder.setIsAscending(this.isAscending);
+    builder.setIndexName(indexMeta.getIndexName());
+    builder.setIndexPath(indexMeta.getIndexPath().toString());
+    for (SortSpec colSpec : indexMeta.getKeySortSpecs()) {
+      builder.addKeySortSpecs(colSpec.getProto());
+    }
+    builder.setIndexMethod(indexMeta.getIndexMethod());
+    builder.setIsUnique(indexMeta.isUnique());
+    builder.setIsClustered(indexMeta.isClustered());
+    
builder.setTargetRelationSchema(indexMeta.getTargetRelationSchema().getProto());
 
     return builder.build();
   }
@@ -124,34 +137,23 @@ public class IndexDesc implements 
ProtoObject<IndexDescProto>, Cloneable {
   public boolean equals(Object obj) {
     if (obj instanceof IndexDesc) {
       IndexDesc other = (IndexDesc) obj;
-      return getIndexPath().equals(other.getIndexPath())
-          && getName().equals(other.getName())
+      return getDatabaseName().equals(other.getDatabaseName())
           && getTableName().equals(other.getTableName())
-          && getColumn().equals(other.getColumn())
-          && getIndexMethod().equals(other.getIndexMethod())
-          && isUnique() == other.isUnique()
-          && isClustered() == other.isClustered()
-          && isAscending() == other.isAscending();
+          && this.indexMeta.equals(other.indexMeta);
     } else {
       return false;
     }
   }
   
   public int hashCode() {
-    return Objects.hashCode(getName(), getIndexPath(), getTableName(), 
getColumn(),
-        getIndexMethod(), isUnique(), isClustered(), isAscending());
+    return Objects.hashCode(databaseName, tableName, indexMeta);
   }
 
   public Object clone() throws CloneNotSupportedException {
     IndexDesc desc = (IndexDesc) super.clone();
-    desc.name = name;
-    desc.indexPath = indexPath;
-    desc.tableName = tableName;
-    desc.column = column;
-    desc.indexMethod = indexMethod;
-    desc.isUnique = isUnique;
-    desc.isClustered = isClustered;
-    desc.isAscending = isAscending;
+    desc.databaseName = this.databaseName;
+    desc.tableName = this.tableName;
+    desc.indexMeta = (IndexMeta) this.indexMeta.clone();
     return desc;
   }
   

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/IndexMeta.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/IndexMeta.java
 
b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/IndexMeta.java
new file mode 100644
index 0000000..a911055
--- /dev/null
+++ 
b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/IndexMeta.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog;
+
+import com.google.common.base.Objects;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod;
+import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.TUtil;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Comparator;
+
+/**
+ * IndexMeta contains meta information of an index.
+ * Meta information is the name, an index method, a path to the stored 
location, index keys, and so on.
+ */
+public class IndexMeta implements Cloneable {
+  @Expose private String indexName;              // index name
+  @Expose private IndexMethod indexMethod;       // index method
+  @Expose private URI indexPath;                 // path to the location
+  @Expose private SortSpec[] keySortSpecs;       // index keys. This array 
should always be sorted
+                                                 // according to the position 
in the targetRelationSchema
+  @Expose private boolean isUnique = false;      // unique key or not
+  @Expose private boolean isClustered = false;   // clustered index or not
+  @Expose private Schema targetRelationSchema;   // schema of the indexed 
relation
+  @Expose private KeyValueSet options;           // index options. TODO: will 
be added
+
+  public IndexMeta() {}
+
+  public IndexMeta(String indexName, URI indexPath, SortSpec[] keySortSpecs,
+                   IndexMethod type,  boolean isUnique, boolean isClustered,
+                   Schema targetRelationSchema) {
+    this.indexName = indexName;
+    this.indexPath = indexPath;
+    this.indexMethod = type;
+    this.isUnique = isUnique;
+    this.isClustered = isClustered;
+    this.targetRelationSchema = targetRelationSchema;
+    initKeySortSpecs(targetRelationSchema, keySortSpecs);
+  }
+
+  private void initKeySortSpecs(final Schema targetRelationSchema, final 
SortSpec[] keySortSpecs) {
+    this.targetRelationSchema = targetRelationSchema;
+    this.keySortSpecs = new SortSpec[keySortSpecs.length];
+    for (int i = 0; i < keySortSpecs.length; i++) {
+      this.keySortSpecs[i] = new SortSpec(keySortSpecs[i].getSortKey(), 
keySortSpecs[i].isAscending(),
+          keySortSpecs[i].isNullFirst());
+    }
+    Arrays.sort(this.keySortSpecs, new Comparator<SortSpec>() {
+      @Override
+      public int compare(SortSpec o1, SortSpec o2) {
+        return 
targetRelationSchema.getColumnId(o1.getSortKey().getSimpleName())
+            - 
targetRelationSchema.getColumnId(o2.getSortKey().getSimpleName());
+      }
+    });
+  }
+
+  public String getIndexName() {
+    return indexName;
+  }
+
+  public void setIndexName(final String indexName) {
+    this.indexName = indexName;
+  }
+
+  public IndexMethod getIndexMethod() {
+    return indexMethod;
+  }
+
+  public void setIndexMethod(final IndexMethod type) {
+    this.indexMethod = type;
+  }
+
+  public URI getIndexPath() {
+    return indexPath;
+  }
+
+  public void setIndexPath(final URI indexPath) {
+    this.indexPath = indexPath;
+  }
+
+  public SortSpec[] getKeySortSpecs() {
+    return keySortSpecs;
+  }
+
+  public void setKeySortSpecs(final Schema targetRelationSchema, final 
SortSpec[] keySortSpecs) {
+    initKeySortSpecs(targetRelationSchema, keySortSpecs);
+  }
+
+  public boolean isUnique() {
+    return isUnique;
+  }
+
+  public void setUnique(boolean unique) {
+    this.isUnique = unique;
+  }
+
+  public boolean isClustered() {
+    return isClustered;
+  }
+
+  public void setClustered(boolean clustered) {
+    this.isClustered = clustered;
+  }
+
+  public Schema getTargetRelationSchema() {
+    return targetRelationSchema;
+  }
+
+  public KeyValueSet getOptions() {
+    return options;
+  }
+
+  public void setOptions(KeyValueSet options) {
+    this.options = options;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof IndexMeta) {
+      IndexMeta other = (IndexMeta) o;
+      return this.indexName.equals(other.indexName)
+          && this.indexPath.equals(other.indexPath)
+          && this.indexMethod.equals(other.indexMethod)
+          && TUtil.checkEquals(this.keySortSpecs, other.keySortSpecs)
+          && this.isUnique == other.isUnique
+          && this.isClustered == other.isClustered
+          && this.targetRelationSchema.equals(other.targetRelationSchema);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(indexName, indexPath, indexMethod, 
Objects.hashCode(keySortSpecs),
+        isUnique, isClustered, targetRelationSchema);
+  }
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    IndexMeta clone = (IndexMeta) super.clone();
+    clone.indexName = indexName;
+    clone.indexPath = indexPath;
+    clone.indexMethod = indexMethod;
+    clone.keySortSpecs = new SortSpec[keySortSpecs.length];
+    for (int i = 0; i < keySortSpecs.length; i++) {
+      clone.keySortSpecs[i] = new SortSpec(this.keySortSpecs[i].getProto());
+    }
+    clone.isUnique = this.isUnique;
+    clone.isClustered = this.isClustered;
+    clone.targetRelationSchema = this.targetRelationSchema;
+    return clone;
+  }
+
+  @Override
+  public String toString() {
+    Gson gson = new GsonBuilder().setPrettyPrinting().create();
+    return gson.toJson(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
 
b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
index ec679f9..f3f71ba 100644
--- 
a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
+++ 
b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
@@ -22,8 +22,6 @@ import com.google.common.base.Objects;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.annotations.Expose;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.catalog.json.CatalogGsonHelper;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -37,8 +35,6 @@ import org.apache.tajo.util.TUtil;
 import java.net.URI;
 
 public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, 
Cloneable {
-  private final Log LOG = LogFactory.getLog(TableDesc.class);
-
        @Expose protected String tableName;                        // required
   @Expose protected Schema schema;
   @Expose protected TableMeta meta;                          // required

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchIndexException.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchIndexException.java
 
b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchIndexException.java
index 0bb7e32..71e6f15 100644
--- 
a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchIndexException.java
+++ 
b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchIndexException.java
@@ -24,7 +24,7 @@ public class NoSuchIndexException extends CatalogException {
   public NoSuchIndexException() {
   }
 
-  public NoSuchIndexException(String databaseName, String columnName) {
+  public NoSuchIndexException(String databaseName, String [] columnName) {
     super(String.format("ERROR: index \" %s \" in %s does not exist", 
columnName, databaseName));
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto 
b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index 6baead9..f10aa42 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -115,13 +115,13 @@ message NamespaceProto {
 
 message IndexDescProto {
   required TableIdentifierProto tableIdentifier = 1;
-  required string name = 2;
-  required string indexPath = 3;
-  required ColumnProto column = 4;
-  required IndexMethod indexMethod = 5;
-  optional bool isUnique = 6 [default = false];
-  optional bool isClustered = 7 [default = false];
-  optional bool isAscending = 8 [default = false];
+  required string indexName = 2;
+  required IndexMethod indexMethod = 3;
+  required string indexPath = 4;
+  repeated SortSpecProto key_sort_specs = 5;
+  required SchemaProto targetRelationSchema = 6;
+  optional bool isUnique = 7 [default = false];
+  optional bool isClustered = 8 [default = false];
 }
 
 enum IndexMethod {
@@ -152,7 +152,7 @@ message GetColumnsProto {
 }
 
 message GetIndexesProto {
-  repeated IndexProto index = 1;
+  repeated IndexDescProto index = 1;
 }
 
 message GetTableOptionsProto {
@@ -167,18 +167,6 @@ message GetTablePartitionsProto {
   repeated TablePartitionProto part = 1;
 }
 
-message IndexProto {
-  required int32 dbId = 1;
-  required int32 tId = 2;
-  required string indexName = 3;
-  required string columnName = 4;
-  required string dataType = 5;
-  required string indexType = 6;
-  optional bool isUnique = 7 [default = false];
-  optional bool isClustered = 8 [default = false];
-  optional bool isAscending = 9 [default = false];
-}
-
 message TableOptionProto {
   required int32 tid = 1;
   required KeyValueProto keyval = 2;
@@ -192,9 +180,9 @@ message TablePartitionProto {
   optional string path = 5;
 }
 
-message GetIndexByColumnRequest {
+message GetIndexByColumnNamesRequest {
   required TableIdentifierProto tableIdentifier = 1;
-  required string columnName = 2;
+  repeated string columnNames = 2;
 }
 
 message IndexNameProto {
@@ -203,6 +191,10 @@ message IndexNameProto {
   required string indexName = 3;
 }
 
+message GetAllIndexesResponse {
+  repeated IndexDescProto indexDesc = 1;
+}
+
 message GetFunctionsResponse {
   repeated FunctionDescProto functionDesc = 1;
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestIndexDesc.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestIndexDesc.java
 
b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestIndexDesc.java
index afd88f1..247cd41 100644
--- 
a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestIndexDesc.java
+++ 
b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestIndexDesc.java
@@ -18,7 +18,6 @@
 
 package org.apache.tajo.catalog;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod;
 import org.apache.tajo.common.TajoDataTypes.Type;
@@ -26,6 +25,9 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.net.URI;
+import java.net.URISyntaxException;
+
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotSame;
@@ -34,23 +36,29 @@ public class TestIndexDesc {
   static IndexDesc desc1;
   static IndexDesc desc2;
   static IndexDesc desc3;
-  
-  static {
-    desc1 = new IndexDesc(
-        "idx_test", new Path("idx_test"), DEFAULT_DATABASE_NAME, "indexed", 
new Column("id", Type.INT4),
-        IndexMethod.TWO_LEVEL_BIN_TREE, true, true, true);
-    
-    desc2 = new IndexDesc(
-        "idx_test2", new Path("idx_test2"), DEFAULT_DATABASE_NAME, "indexed", 
new Column("score", Type.FLOAT8),
-        IndexMethod.TWO_LEVEL_BIN_TREE, false, false, false);
-    
-    desc3 = new IndexDesc(
-        "idx_test", new Path("idx_test"), DEFAULT_DATABASE_NAME, "indexed", 
new Column("id", Type.INT4),
-        IndexMethod.TWO_LEVEL_BIN_TREE, true, true, true);
-  }
+  static Schema relationSchema;
 
   @BeforeClass
   public static void setUp() throws Exception {
+    relationSchema = new Schema(new Column[]{new Column("id", Type.INT4),
+        new Column("score", Type.FLOAT8), new Column("name", Type.TEXT)});
+    SortSpec[] colSpecs1 = new SortSpec[1];
+    colSpecs1[0] = new SortSpec(new Column("id", Type.INT4), true, true);
+    desc1 = new IndexDesc(DEFAULT_DATABASE_NAME, "indexed",
+        "idx_test", new URI("idx_test"), colSpecs1,
+        IndexMethod.TWO_LEVEL_BIN_TREE, true, true, relationSchema);
+
+    SortSpec[] colSpecs2 = new SortSpec[1];
+    colSpecs2[0] = new SortSpec(new Column("score", Type.FLOAT8), false, 
false);
+    desc2 = new IndexDesc(DEFAULT_DATABASE_NAME, "indexed",
+        "idx_test2", new URI("idx_test2"), colSpecs2,
+        IndexMethod.TWO_LEVEL_BIN_TREE, false, false, relationSchema);
+
+    SortSpec[] colSpecs3 = new SortSpec[1];
+    colSpecs3[0] = new SortSpec(new Column("id", Type.INT4), true, true);
+    desc3 = new IndexDesc(DEFAULT_DATABASE_NAME, "indexed",
+        "idx_test", new URI("idx_test"), colSpecs3,
+        IndexMethod.TWO_LEVEL_BIN_TREE, true, true, relationSchema);
   }
 
   @AfterClass
@@ -65,24 +73,28 @@ public class TestIndexDesc {
   }
 
   @Test
-  public void testGetFields() {
+  public void testGetFields() throws URISyntaxException {
     assertEquals("idx_test", desc1.getName());
     assertEquals("indexed", desc1.getTableName());
-    assertEquals(new Column("id", Type.INT4), desc1.getColumn());
+    assertEquals(1, desc1.getKeySortSpecs().length);
+    assertEquals(new Column("id", Type.INT4), 
desc1.getKeySortSpecs()[0].getSortKey());
+    assertEquals(true, desc1.getKeySortSpecs()[0].isAscending());
+    assertEquals(true, desc1.getKeySortSpecs()[0].isNullFirst());
     assertEquals(IndexMethod.TWO_LEVEL_BIN_TREE, desc1.getIndexMethod());
-    assertEquals(new Path("idx_test"), desc1.getIndexPath());
+    assertEquals(new URI("idx_test"), desc1.getIndexPath());
     assertEquals(true, desc1.isUnique());
     assertEquals(true, desc1.isClustered());
-    assertEquals(true, desc1.isAscending());
-    
+
     assertEquals("idx_test2", desc2.getName());
     assertEquals("indexed", desc2.getTableName());
-    assertEquals(new Column("score", Type.FLOAT8), desc2.getColumn());
+    assertEquals(1, desc2.getKeySortSpecs().length);
+    assertEquals(new Column("score", Type.FLOAT8), 
desc2.getKeySortSpecs()[0].getSortKey());
+    assertEquals(false, desc2.getKeySortSpecs()[0].isAscending());
+    assertEquals(false, desc2.getKeySortSpecs()[0].isNullFirst());
     assertEquals(IndexMethod.TWO_LEVEL_BIN_TREE, desc2.getIndexMethod());
-    assertEquals(new Path("idx_test2"), desc2.getIndexPath());
+    assertEquals(new URI("idx_test2"), desc2.getIndexPath());
     assertEquals(false, desc2.isUnique());
     assertEquals(false, desc2.isClustered());
-    assertEquals(false, desc2.isAscending());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java
 
b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java
index 2c3fc6a..78fee1d 100644
--- 
a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java
+++ 
b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java
@@ -43,7 +43,7 @@ import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.TablePartitionProto;
@@ -704,115 +704,135 @@ public class HCatalogStore extends CatalogConstants 
implements CatalogStore {
   @Override
   public void addPartitionMethod(CatalogProtos.PartitionMethodProto 
partitionMethodProto) throws CatalogException {
     // TODO - not implemented yet
+    throw new UnsupportedOperationException();
   }
 
   @Override
   public CatalogProtos.PartitionMethodProto getPartitionMethod(String 
databaseName, String tableName)
       throws CatalogException {
-    return null;  // TODO - not implemented yet
+    // TODO - not implemented yet
+    throw new UnsupportedOperationException();
   }
 
   @Override
   public boolean existPartitionMethod(String databaseName, String tableName) 
throws CatalogException {
-    return false;  // TODO - not implemented yet
+    // TODO - not implemented yet
+    throw new UnsupportedOperationException();
   }
 
   @Override
   public void dropPartitionMethod(String databaseName, String tableName) 
throws CatalogException {
     // TODO - not implemented yet
+    throw new UnsupportedOperationException();
   }
 
   @Override
   public void addPartitions(CatalogProtos.PartitionsProto partitionsProto) 
throws CatalogException {
     // TODO - not implemented yet
+    throw new UnsupportedOperationException();
   }
 
   @Override
   public void addPartition(String databaseName, String tableName, 
CatalogProtos.PartitionDescProto partitionDescProto) throws CatalogException {
-
+    throw new UnsupportedOperationException();
   }
 
   @Override
   public CatalogProtos.PartitionsProto getPartitions(String tableName) throws 
CatalogException {
-    return null; // TODO - not implemented yet
+    // TODO - not implemented yet
+    throw new UnsupportedOperationException();
   }
 
   @Override
   public CatalogProtos.PartitionDescProto getPartition(String partitionName) 
throws CatalogException {
-    return null; // TODO - not implemented yet
+    // TODO - not implemented yet
+    throw new UnsupportedOperationException();
   }
 
   @Override
   public void delPartition(String partitionName) throws CatalogException {
     // TODO - not implemented yet
+    throw new UnsupportedOperationException();
   }
 
   @Override
   public void dropPartitions(String tableName) throws CatalogException {
-
+    throw new UnsupportedOperationException();
   }
 
 
   @Override
   public final void addFunction(final FunctionDesc func) throws 
CatalogException {
     // TODO - not implemented yet
+    throw new UnsupportedOperationException();
   }
 
   @Override
   public final void deleteFunction(final FunctionDesc func) throws 
CatalogException {
     // TODO - not implemented yet
+    throw new UnsupportedOperationException();
   }
 
   @Override
   public final void existFunction(final FunctionDesc func) throws 
CatalogException {
     // TODO - not implemented yet
+    throw new UnsupportedOperationException();
   }
 
   @Override
   public final List<String> getAllFunctionNames() throws CatalogException {
     // TODO - not implemented yet
-    return null;
+    throw new UnsupportedOperationException();
   }
 
   @Override
-  public void dropIndex(String databaseName, String indexName) throws 
CatalogException {
+  public void createIndex(CatalogProtos.IndexDescProto proto) throws 
CatalogException {
     // TODO - not implemented yet
+    throw new UnsupportedOperationException();
   }
 
   @Override
-  public boolean existIndexByName(String databaseName, String indexName) 
throws CatalogException {
+  public void dropIndex(String databaseName, String indexName) throws 
CatalogException {
     // TODO - not implemented yet
-    return false;
+    throw new UnsupportedOperationException();
   }
 
   @Override
-  public CatalogProtos.IndexDescProto[] getIndexes(String databaseName, String 
tableName) throws CatalogException {
+  public CatalogProtos.IndexDescProto getIndexByName(String databaseName, 
String indexName) throws CatalogException {
     // TODO - not implemented yet
-    return null;
+    throw new UnsupportedOperationException();
   }
 
   @Override
-  public void createIndex(CatalogProtos.IndexDescProto proto) throws 
CatalogException {
+  public CatalogProtos.IndexDescProto getIndexByColumns(String databaseName, 
String tableName, String[] columnNames)
+      throws CatalogException {
     // TODO - not implemented yet
+    throw new UnsupportedOperationException();
   }
 
   @Override
-  public CatalogProtos.IndexDescProto getIndexByName(String databaseName, 
String indexName) throws CatalogException {
+  public boolean existIndexByName(String databaseName, String indexName) 
throws CatalogException {
     // TODO - not implemented yet
-    return null;
+    throw new UnsupportedOperationException();
   }
 
   @Override
-  public CatalogProtos.IndexDescProto getIndexByColumn(String databaseName, 
String tableName, String columnName)
+  public boolean existIndexByColumns(String databaseName, String tableName, 
String[] columnNames)
       throws CatalogException {
     // TODO - not implemented yet
-    return null;
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<String> getAllIndexNamesByTable(String databaseName, String 
tableName) throws CatalogException {
+    // TODO - not implemented yet
+    throw new UnsupportedOperationException();
   }
 
   @Override
-  public boolean existIndexByColumn(String databaseName, String tableName, 
String columnName) throws CatalogException {
+  public boolean existIndexesByTable(String databaseName, String tableName) 
throws CatalogException {
     // TODO - not implemented yet
-    return false;
+    throw new UnsupportedOperationException();
   }
 
   @Override
@@ -860,7 +880,7 @@ public class HCatalogStore extends CatalogConstants 
implements CatalogStore {
   }
 
   @Override
-  public List<IndexProto> getAllIndexes() throws CatalogException {
+  public List<IndexDescProto> getAllIndexes() throws CatalogException {
     throw new UnsupportedOperationException();
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index 3f4d38d..a2b8eaf 100644
--- 
a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -870,12 +870,12 @@ public class CatalogServer extends AbstractService {
       try {
         if (store.existIndexByName(
             databaseName,
-            indexDesc.getName())) {
-          throw new AlreadyExistsIndexException(indexDesc.getName());
+            indexDesc.getIndexName())) {
+          throw new AlreadyExistsIndexException(indexDesc.getIndexName());
         }
         store.createIndex(indexDesc);
       } catch (Exception e) {
-        LOG.error("ERROR : cannot add index " + indexDesc.getName(), e);
+        LOG.error("ERROR : cannot add index " + indexDesc.getIndexName(), e);
         LOG.error(indexDesc);
         throw new ServiceException(e);
       } finally {
@@ -903,17 +903,35 @@ public class CatalogServer extends AbstractService {
     }
 
     @Override
-    public BoolProto existIndexByColumn(RpcController controller, 
GetIndexByColumnRequest request)
+    public BoolProto existIndexByColumnNames(RpcController controller, 
GetIndexByColumnNamesRequest request)
         throws ServiceException {
 
       TableIdentifierProto identifier = request.getTableIdentifier();
       String databaseName = identifier.getDatabaseName();
       String tableName = identifier.getTableName();
-      String columnName = request.getColumnName();
+      List<String> columnNames = request.getColumnNamesList();
 
       rlock.lock();
       try {
-        return store.existIndexByColumn(databaseName, tableName, columnName) ?
+        return store.existIndexByColumns(databaseName, tableName,
+            columnNames.toArray(new String[columnNames.size()])) ?
+            ProtoUtil.TRUE : ProtoUtil.FALSE;
+      } catch (Exception e) {
+        LOG.error(e);
+        return BoolProto.newBuilder().setValue(false).build();
+      } finally {
+        rlock.unlock();
+      }
+    }
+
+    @Override
+    public BoolProto existIndexesByTable(RpcController controller, 
TableIdentifierProto request) throws ServiceException {
+      String databaseName = request.getDatabaseName();
+      String tableName = request.getTableName();
+
+      rlock.lock();
+      try {
+        return store.existIndexesByTable(databaseName, tableName) ?
             ProtoUtil.TRUE : ProtoUtil.FALSE;
       } catch (Exception e) {
         LOG.error(e);
@@ -933,7 +951,7 @@ public class CatalogServer extends AbstractService {
       rlock.lock();
       try {
         if (!store.existIndexByName(databaseName, indexName)) {
-          throw new NoSuchIndexException(databaseName, indexName);
+          throw new NoSuchIndexException(indexName);
         }
         return store.getIndexByName(databaseName, indexName);
       } catch (Exception e) {
@@ -945,22 +963,59 @@ public class CatalogServer extends AbstractService {
     }
 
     @Override
-    public IndexDescProto getIndexByColumn(RpcController controller, 
GetIndexByColumnRequest request)
+    public IndexDescProto getIndexByColumnNames(RpcController controller, 
GetIndexByColumnNamesRequest request)
         throws ServiceException {
 
       TableIdentifierProto identifier = request.getTableIdentifier();
       String databaseName = identifier.getDatabaseName();
       String tableName = identifier.getTableName();
-      String columnName = request.getColumnName();
+      List<String> columnNamesList = request.getColumnNamesList();
+      String[] columnNames = new String[columnNamesList.size()];
+      columnNames = columnNamesList.toArray(columnNames);
 
       rlock.lock();
       try {
-        if (!store.existIndexByColumn(databaseName, tableName, columnName)) {
-          throw new NoSuchIndexException(databaseName, columnName);
+        if (!store.existIndexByColumns(databaseName, tableName, columnNames)) {
+          throw new NoSuchIndexException(databaseName, columnNames);
+        }
+        return store.getIndexByColumns(databaseName, tableName, columnNames);
+      } catch (Exception e) {
+        LOG.error("ERROR: cannot get index for " + tableName + "." + 
columnNames, e);
+        return null;
+      } finally {
+        rlock.unlock();
+      }
+    }
+
+    @Override
+    public GetAllIndexesResponse getAllIndexesByTable(RpcController 
controller, TableIdentifierProto request)
+        throws ServiceException {
+      rlock.lock();
+      String databaseName = request.getDatabaseName();
+      String tableName = request.getTableName();
+      try {
+        GetAllIndexesResponse.Builder builder = 
GetAllIndexesResponse.newBuilder();
+        for (String eachIndexName : 
store.getAllIndexNamesByTable(databaseName, tableName)) {
+          builder.addIndexDesc(store.getIndexByName(databaseName, 
eachIndexName));
         }
-        return store.getIndexByColumn(databaseName, tableName, columnName);
+        return builder.build();
       } catch (Exception e) {
-        LOG.error("ERROR : cannot get index for " + tableName + "." + 
columnName, e);
+        LOG.error("ERROR: cannot get all indexes for " + databaseName + "." + 
tableName, e);
+        return null;
+      } finally {
+        rlock.unlock();
+      }
+    }
+
+    @Override
+    public GetIndexesProto getAllIndexes(RpcController controller, NullProto 
request) throws ServiceException {
+      rlock.lock();
+      try {
+        GetIndexesProto.Builder builder = GetIndexesProto.newBuilder();
+        builder.addAllIndex(store.getAllIndexes());
+        return builder.build();
+      } catch (Exception e) {
+        LOG.error("ERROR: cannot get all indexes", e);
         return null;
       } finally {
         rlock.unlock();
@@ -988,18 +1043,6 @@ public class CatalogServer extends AbstractService {
 
       return BOOL_TRUE;
     }
-    
-    @Override
-    public GetIndexesProto getAllIndexes(RpcController controller, NullProto 
request) throws ServiceException {
-      rlock.lock();
-      try {
-        return 
GetIndexesProto.newBuilder().addAllIndex(store.getAllIndexes()).build();
-      } catch (Exception e) {
-        throw new ServiceException(e);
-      } finally {
-        rlock.unlock();
-      }
-    }
 
     public boolean checkIfBuiltin(FunctionType type) {
       return type == GENERAL || type == AGGREGATION || type == 
DISTINCT_AGGREGATION;

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/IndexesTableDescriptor.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/IndexesTableDescriptor.java
 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/IndexesTableDescriptor.java
index a079a93..d527b19 100644
--- 
a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/IndexesTableDescriptor.java
+++ 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/IndexesTableDescriptor.java
@@ -27,12 +27,8 @@ class IndexesTableDescriptor extends AbstractTableDescriptor 
{
       new ColumnDescriptor("db_id", Type.INT4, 0),
       new ColumnDescriptor("tid", Type.INT4, 0),
       new ColumnDescriptor("index_name", Type.TEXT, 0),
-      new ColumnDescriptor("column_name", Type.TEXT, 0),
-      new ColumnDescriptor("data_type", Type.TEXT, 0),
-      new ColumnDescriptor("index_type", Type.TEXT, 0),
-      new ColumnDescriptor("is_unique", Type.BOOLEAN, 0),
-      new ColumnDescriptor("is_clustered", Type.BOOLEAN, 0),
-      new ColumnDescriptor("is_ascending", Type.BOOLEAN, 0)
+      new ColumnDescriptor("index_method", Type.TEXT, 0),
+      new ColumnDescriptor("index_path", Type.TEXT, 0),
   };
 
   public IndexesTableDescriptor(InfoSchemaMetadataDictionary 
metadataDictionary) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
index b541e67..0cd8803 100644
--- 
a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
+++ 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
@@ -27,9 +27,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tajo.annotation.Nullable;
-import org.apache.tajo.catalog.CatalogConstants;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.FunctionDesc;
+import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.exception.*;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.*;
@@ -1968,37 +1966,55 @@ public abstract class AbstractDBStore extends 
CatalogConstants implements Catalo
     Connection conn = null;
     PreparedStatement pstmt = null;
 
-    String databaseName = proto.getTableIdentifier().getDatabaseName();
-    String tableName = proto.getTableIdentifier().getTableName();
-    String columnName = 
CatalogUtil.extractSimpleName(proto.getColumn().getName());
+    final String databaseName = proto.getTableIdentifier().getDatabaseName();
+    final String tableName = 
CatalogUtil.extractSimpleName(proto.getTableIdentifier().getTableName());
 
     try {
+      // indexes table
       int databaseId = getDatabaseId(databaseName);
       int tableId = getTableId(databaseId, databaseName, tableName);
 
       String sql = "INSERT INTO " + TB_INDEXES +
           " (" + COL_DATABASES_PK + ", " + COL_TABLES_PK + ", INDEX_NAME, " +
-          "COLUMN_NAME, DATA_TYPE, INDEX_TYPE, PATH, IS_UNIQUE, IS_CLUSTERED, 
IS_ASCENDING) " +
-          "VALUES (?,?,?,?,?,?,?,?,?,?)";
+          "INDEX_TYPE, PATH, COLUMN_NAMES, DATA_TYPES, ORDERS, NULL_ORDERS, 
IS_UNIQUE, IS_CLUSTERED) " +
+          "VALUES (?,?,?,?,?,?,?,?,?,?,?)";
 
       if (LOG.isDebugEnabled()) {
         LOG.debug(sql);
       }
 
+      StringBuilder columnNamesBuilder = new StringBuilder();
+      StringBuilder dataTypesBuilder= new StringBuilder();
+      StringBuilder ordersBuilder = new StringBuilder();
+      StringBuilder nullOrdersBuilder = new StringBuilder();
+      for (SortSpecProto columnSpec : proto.getKeySortSpecsList()) {
+        // Since the key columns are always sorted in order of their 
occurrence position in the relation schema,
+        // the concatenated name can be uniquely identified.
+        
columnNamesBuilder.append(columnSpec.getColumn().getName()).append(",");
+        
dataTypesBuilder.append(columnSpec.getColumn().getDataType().getType().name()).append(",");
+        ordersBuilder.append(columnSpec.getAscending()).append(",");
+        nullOrdersBuilder.append(columnSpec.getNullFirst()).append(",");
+      }
+      columnNamesBuilder.deleteCharAt(columnNamesBuilder.length()-1);
+      dataTypesBuilder.deleteCharAt(dataTypesBuilder.length()-1);
+      ordersBuilder.deleteCharAt(ordersBuilder.length()-1);
+      nullOrdersBuilder.deleteCharAt(nullOrdersBuilder.length()-1);
+
       conn = getConnection();
       conn.setAutoCommit(false);
 
       pstmt = conn.prepareStatement(sql);
       pstmt.setInt(1, databaseId);
       pstmt.setInt(2, tableId);
-      pstmt.setString(3, proto.getName());
-      pstmt.setString(4, columnName);
-      pstmt.setString(5, proto.getColumn().getDataType().getType().name());
-      pstmt.setString(6, proto.getIndexMethod().toString());
-      pstmt.setString(7, proto.getIndexPath());
-      pstmt.setBoolean(8, proto.hasIsUnique() && proto.getIsUnique());
-      pstmt.setBoolean(9, proto.hasIsClustered() && proto.getIsClustered());
-      pstmt.setBoolean(10, proto.hasIsAscending() && proto.getIsAscending());
+      pstmt.setString(3, proto.getIndexName()); // index name
+      pstmt.setString(4, proto.getIndexMethod().toString()); // index type
+      pstmt.setString(5, proto.getIndexPath()); // index path
+      pstmt.setString(6, columnNamesBuilder.toString());
+      pstmt.setString(7, dataTypesBuilder.toString());
+      pstmt.setString(8, ordersBuilder.toString());
+      pstmt.setString(9, nullOrdersBuilder.toString());
+      pstmt.setBoolean(10, proto.hasIsUnique() && proto.getIsUnique());
+      pstmt.setBoolean(11, proto.hasIsClustered() && proto.getIsClustered());
       pstmt.executeUpdate();
       conn.commit();
     } catch (SQLException se) {
@@ -2052,9 +2068,7 @@ public abstract class AbstractDBStore extends 
CatalogConstants implements Catalo
   }
 
   final static String GET_INDEXES_SQL =
-      "SELECT " + COL_TABLES_PK + ", INDEX_NAME, COLUMN_NAME, DATA_TYPE, 
INDEX_TYPE, PATH, IS_UNIQUE, " +
-          "IS_CLUSTERED, IS_ASCENDING FROM " + TB_INDEXES;
-
+      "SELECT * FROM " + TB_INDEXES;
 
   @Override
   public IndexDescProto getIndexByName(String databaseName, final String 
indexName)
@@ -2085,6 +2099,7 @@ public abstract class AbstractDBStore extends 
CatalogConstants implements Catalo
       resultToIndexDescProtoBuilder(builder, res);
       String tableName = getTableName(conn, res.getInt(COL_TABLES_PK));
       
builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, 
tableName));
+      builder.setTargetRelationSchema(getTable(databaseName, 
tableName).getSchema());
       proto = builder.build();
     } catch (SQLException se) {
       throw new CatalogException(se);
@@ -2096,9 +2111,8 @@ public abstract class AbstractDBStore extends 
CatalogConstants implements Catalo
   }
 
   @Override
-  public IndexDescProto getIndexByColumn(final String databaseName,
-                                         final String tableName,
-                                         final String columnName) throws 
CatalogException {
+  public IndexDescProto getIndexByColumns(String databaseName, String 
tableName, String[] columnNames)
+      throws CatalogException {
     Connection conn = null;
     ResultSet res = null;
     PreparedStatement pstmt = null;
@@ -2106,25 +2120,35 @@ public abstract class AbstractDBStore extends 
CatalogConstants implements Catalo
 
     try {
       int databaseId = getDatabaseId(databaseName);
+      int tableId = getTableId(databaseId, databaseName, tableName);
+      TableDescProto tableDescProto = getTable(databaseName, tableName);
 
-      String sql = GET_INDEXES_SQL + " WHERE " + COL_DATABASES_PK + "=? AND 
COLUMN_NAME=?";
+      String sql = GET_INDEXES_SQL + " WHERE " + COL_DATABASES_PK + "=? AND " +
+          COL_TABLES_PK + "=? AND COLUMN_NAMES=?";
 
       if (LOG.isDebugEnabled()) {
         LOG.debug(sql);
       }
 
+      // Since the column names in the unified name are always sorted
+      // in order of occurrence position in the relation schema,
+      // they can be uniquely identified.
+      String unifiedName = CatalogUtil.buildFQName(databaseName, tableName,
+          CatalogUtil.getUnifiedSimpleColumnName(new 
Schema(tableDescProto.getSchema()), columnNames));
       conn = getConnection();
       pstmt = conn.prepareStatement(sql);
       pstmt.setInt(1, databaseId);
-      ;
-      pstmt.setString(2, columnName);
+      pstmt.setInt(2, tableId);
+      pstmt.setString(3, unifiedName);
       res = pstmt.executeQuery();
       if (!res.next()) {
-        throw new CatalogException("ERROR: there is no index matched to " + 
columnName);
+        throw new CatalogException("ERROR: there is no index matched to " + 
unifiedName);
       }
+
       IndexDescProto.Builder builder = IndexDescProto.newBuilder();
       resultToIndexDescProtoBuilder(builder, res);
       
builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, 
tableName));
+      builder.setTargetRelationSchema(tableDescProto.getSchema());
       proto = builder.build();
     } catch (SQLException se) {
       throw new CatalogException(se);
@@ -2169,7 +2193,7 @@ public abstract class AbstractDBStore extends 
CatalogConstants implements Catalo
   }
 
   @Override
-  public boolean existIndexByColumn(String databaseName, String tableName, 
String columnName)
+  public boolean existIndexByColumns(String databaseName, String tableName, 
String[] columnNames)
       throws CatalogException {
     Connection conn = null;
     ResultSet res = null;
@@ -2179,18 +2203,27 @@ public abstract class AbstractDBStore extends 
CatalogConstants implements Catalo
 
     try {
       int databaseId = getDatabaseId(databaseName);
+      int tableId = getTableId(databaseId, databaseName, tableName);
+      Schema relationSchema = new Schema(getTable(databaseName, 
tableName).getSchema());
 
       String sql =
-          "SELECT INDEX_NAME FROM " + TB_INDEXES + " WHERE " + 
COL_DATABASES_PK + "=? AND COLUMN_NAME=?";
+          "SELECT " + COL_INDEXES_PK + " FROM " + TB_INDEXES +
+              " WHERE " + COL_DATABASES_PK + "=? AND " + COL_TABLES_PK + "=? 
AND COLUMN_NAMES=?";
 
       if (LOG.isDebugEnabled()) {
         LOG.debug(sql);
       }
 
+      // Since the column names in the unified name are always sorted
+      // in order of occurrence position in the relation schema,
+      // they can be uniquely identified.
+      String unifiedName = CatalogUtil.buildFQName(databaseName, tableName,
+          CatalogUtil.getUnifiedSimpleColumnName(new Schema(relationSchema), 
columnNames));
       conn = getConnection();
       pstmt = conn.prepareStatement(sql);
       pstmt.setInt(1, databaseId);
-      pstmt.setString(2, columnName);
+      pstmt.setInt(2, tableId);
+      pstmt.setString(3, unifiedName);
       res = pstmt.executeQuery();
       exist = res.next();
     } catch (SQLException se) {
@@ -2202,22 +2235,18 @@ public abstract class AbstractDBStore extends 
CatalogConstants implements Catalo
   }
 
   @Override
-  public IndexDescProto[] getIndexes(String databaseName, final String 
tableName)
+  public List<String> getAllIndexNamesByTable(final String databaseName, final 
String tableName)
       throws CatalogException {
-    Connection conn = null;
     ResultSet res = null;
     PreparedStatement pstmt = null;
-    final List<IndexDescProto> protos = new ArrayList<IndexDescProto>();
+    final List<String> indexNames = new ArrayList<String>();
 
     try {
       final int databaseId = getDatabaseId(databaseName);
       final int tableId = getTableId(databaseId, databaseName, tableName);
-      final TableIdentifierProto tableIdentifier = 
CatalogUtil.buildTableIdentifier(databaseName, tableName);
-
 
       String sql = GET_INDEXES_SQL + " WHERE " + COL_DATABASES_PK + "=? AND " 
+ COL_TABLES_PK + "=?";
 
-
       if (LOG.isDebugEnabled()) {
         LOG.debug(sql);
       }
@@ -2229,10 +2258,7 @@ public abstract class AbstractDBStore extends 
CatalogConstants implements Catalo
       res = pstmt.executeQuery();
 
       while (res.next()) {
-        IndexDescProto.Builder builder = IndexDescProto.newBuilder();
-        resultToIndexDescProtoBuilder(builder, res);
-        builder.setTableIdentifier(tableIdentifier);
-        protos.add(builder.build());
+        indexNames.add(res.getString("index_name"));
       }
     } catch (SQLException se) {
       throw new CatalogException(se);
@@ -2240,57 +2266,73 @@ public abstract class AbstractDBStore extends 
CatalogConstants implements Catalo
       CatalogUtil.closeQuietly(pstmt, res);
     }
 
-    return protos.toArray(new IndexDescProto[protos.size()]);
+    return indexNames;
   }
-  
-  @Override
-  public List<IndexProto> getAllIndexes() throws CatalogException {
-    Connection conn = null;
-    Statement stmt = null;
-    ResultSet resultSet = null;
 
-    List<IndexProto> indexes = new ArrayList<IndexProto>();
+  @Override
+  public boolean existIndexesByTable(String databaseName, String tableName) 
throws CatalogException {
+    ResultSet res = null;
+    PreparedStatement pstmt = null;
+    final List<String> indexNames = new ArrayList<String>();
 
     try {
-      String sql = "SELECT " + COL_DATABASES_PK + ", " + COL_TABLES_PK + ", 
INDEX_NAME, " +
-        "COLUMN_NAME, DATA_TYPE, INDEX_TYPE, IS_UNIQUE, IS_CLUSTERED, 
IS_ASCENDING FROM " + TB_INDEXES;
+      final int databaseId = getDatabaseId(databaseName);
+      final int tableId = getTableId(databaseId, databaseName, tableName);
 
-      conn = getConnection();
-      stmt = conn.createStatement();
-      resultSet = stmt.executeQuery(sql);
-      while (resultSet.next()) {
-        IndexProto.Builder builder = IndexProto.newBuilder();
-        
-        builder.setDbId(resultSet.getInt(COL_DATABASES_PK));
-        builder.setTId(resultSet.getInt(COL_TABLES_PK));
-        builder.setIndexName(resultSet.getString("INDEX_NAME"));
-        builder.setColumnName(resultSet.getString("COLUMN_NAME"));
-        builder.setDataType(resultSet.getString("DATA_TYPE"));
-        builder.setIndexType(resultSet.getString("INDEX_TYPE"));
-        builder.setIsUnique(resultSet.getBoolean("IS_UNIQUE"));
-        builder.setIsClustered(resultSet.getBoolean("IS_CLUSTERED"));
-        builder.setIsAscending(resultSet.getBoolean("IS_ASCENDING"));
-        
-        indexes.add(builder.build());
+      String sql = GET_INDEXES_SQL + " WHERE " + COL_DATABASES_PK + "=? AND " 
+ COL_TABLES_PK + "=?";
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
       }
+
+      conn = getConnection();
+      pstmt = conn.prepareStatement(sql);
+      pstmt.setInt(1, databaseId);
+      pstmt.setInt(2, tableId);
+      res = pstmt.executeQuery();
+
+      return res.next();
     } catch (SQLException se) {
       throw new CatalogException(se);
     } finally {
-      CatalogUtil.closeQuietly(stmt, resultSet);
+      CatalogUtil.closeQuietly(pstmt, res);
     }
-    
-    return indexes;
+  }
+
+  @Override
+  public List<IndexDescProto> getAllIndexes() throws CatalogException {
+    List<IndexDescProto> indexDescProtos = TUtil.newList();
+    for (String databaseName : getAllDatabaseNames()) {
+      for (String tableName : getAllTableNames(databaseName)) {
+        for (String indexName: getAllIndexNamesByTable(databaseName, 
tableName)) {
+          indexDescProtos.add(getIndexByName(databaseName, indexName));
+        }
+      }
+    }
+    return indexDescProtos;
   }
 
   private void resultToIndexDescProtoBuilder(IndexDescProto.Builder builder,
                                              final ResultSet res) throws 
SQLException {
-    builder.setName(res.getString("index_name"));
-    builder.setColumn(indexResultToColumnProto(res));
+    builder.setIndexName(res.getString("index_name"));
     builder.setIndexMethod(getIndexMethod(res.getString("index_type").trim()));
     builder.setIndexPath(res.getString("path"));
+    String[] columnNames, dataTypes, orders, nullOrders;
+    columnNames = res.getString("column_names").trim().split(",");
+    dataTypes = res.getString("data_types").trim().split(",");
+    orders = res.getString("orders").trim().split(",");
+    nullOrders = res.getString("null_orders").trim().split(",");
+    int columnNum = columnNames.length;
+    for (int i = 0; i < columnNum; i++) {
+      SortSpecProto.Builder colSpecBuilder = SortSpecProto.newBuilder();
+      colSpecBuilder.setColumn(ColumnProto.newBuilder().setName(columnNames[i])
+          
.setDataType(CatalogUtil.newSimpleDataType(getDataType(dataTypes[i]))).build());
+      colSpecBuilder.setAscending(orders[i].equals("true"));
+      colSpecBuilder.setNullFirst(nullOrders[i].equals("true"));
+      builder.addKeySortSpecs(colSpecBuilder.build());
+    }
     builder.setIsUnique(res.getBoolean("is_unique"));
     builder.setIsClustered(res.getBoolean("is_clustered"));
-    builder.setIsAscending(res.getBoolean("is_ascending"));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
index ed6fedc..ed1cbc4 100644
--- 
a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
+++ 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
@@ -20,14 +20,7 @@ package org.apache.tajo.catalog.store;
 
 import org.apache.tajo.catalog.FunctionDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TablePartitionProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.*;
 
 import java.io.Closeable;
 
@@ -36,10 +29,6 @@ import org.apache.tajo.catalog.exception.CatalogException;
 import java.util.Collection;
 import java.util.List;
 
-import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto;
-import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionMethodProto;
-import static org.apache.tajo.catalog.proto.CatalogProtos.TablespaceProto;
-
 public interface CatalogStore extends Closeable {
   /*************************** Tablespace ******************************/
   void createTablespace(String spaceName, String spaceUri) throws 
CatalogException;
@@ -129,18 +118,20 @@ public interface CatalogStore extends Closeable {
   void dropIndex(String databaseName, String indexName) throws 
CatalogException;
   
   IndexDescProto getIndexByName(String databaseName, String indexName) throws 
CatalogException;
-  
-  IndexDescProto getIndexByColumn(String databaseName, String tableName, 
String columnName)
+
+  IndexDescProto getIndexByColumns(String databaseName, String tableName, 
String[] columnNames)
       throws CatalogException;
   
   boolean existIndexByName(String databaseName, String indexName) throws 
CatalogException;
-  
-  boolean existIndexByColumn(String databaseName, String tableName, String 
columnName)
+
+  boolean existIndexByColumns(String databaseName, String tableName, String[] 
columnNames)
       throws CatalogException;
 
-  IndexDescProto [] getIndexes(String databaseName, String tableName) throws 
CatalogException;
-  
-  List<IndexProto> getAllIndexes() throws CatalogException;
+  List<String> getAllIndexNamesByTable(String databaseName, String tableName) 
throws CatalogException;
+
+  boolean existIndexesByTable(String databaseName, String tableName) throws 
CatalogException;
+
+  List<IndexDescProto> getAllIndexes() throws CatalogException;
 
   /************************** FUNCTION *****************************/
 

Reply via email to