Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into 
index_support

Conflicts:
        
tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/02800343
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/02800343
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/02800343

Branch: refs/heads/index_support
Commit: 02800343b7ad99d4c0ed51fca1771111679ca61f
Parents: 2faa47c d2a4f9b
Author: Jihoon Son <[email protected]>
Authored: Fri Apr 17 17:33:56 2015 +0900
Committer: Jihoon Son <[email protected]>
Committed: Fri Apr 17 17:33:56 2015 +0900

----------------------------------------------------------------------
 .travis.yml                                     |   2 +-
 CHANGES                                         |   4 +
 tajo-catalog/pom.xml                            |   4 +-
 .../tajo/catalog/AbstractCatalogClient.java     |  92 +-
 tajo-catalog/tajo-catalog-drivers/pom.xml       |  34 +-
 .../tajo-catalog-drivers/tajo-hcatalog/pom.xml  | 739 --------------
 .../tajo/catalog/store/HCatalogStore.java       | 993 -------------------
 .../catalog/store/HCatalogStoreClientPool.java  | 170 ----
 .../apache/tajo/catalog/store/HCatalogUtil.java | 147 ---
 .../tajo/catalog/store/TestHCatalogStore.java   | 467 ---------
 .../tajo-catalog-drivers/tajo-hive/pom.xml      | 351 +++++++
 .../tajo/catalog/store/HiveCatalogStore.java    | 980 ++++++++++++++++++
 .../store/HiveCatalogStoreClientPool.java       | 170 ++++
 .../tajo/catalog/store/HiveCatalogUtil.java     | 127 +++
 .../catalog/store/TestHiveCatalogStore.java     | 504 ++++++++++
 .../org/apache/tajo/catalog/TestCatalog.java    |   6 +-
 .../tajo/client/CatalogAdminClientImpl.java     |  68 +-
 .../org/apache/tajo/client/QueryClientImpl.java |  50 +-
 .../apache/tajo/client/SessionConnection.java   |  40 +-
 tajo-core/pom.xml                               | 212 ----
 .../org/apache/tajo/master/QueryInProgress.java |   6 +-
 .../apache/tajo/master/TajoContainerProxy.java  |  38 +-
 .../apache/tajo/querymaster/QueryMaster.java    |  24 +-
 .../tajo/worker/ExecutionBlockContext.java      |  29 +-
 .../tajo/worker/TajoResourceAllocator.java      |  20 +-
 .../main/java/org/apache/tajo/worker/Task.java  |  73 +-
 .../java/org/apache/tajo/worker/TaskRunner.java |   2 -
 .../tajo/worker/WorkerHeartbeatService.java     |  10 +-
 .../ConnectivityCheckerRuleForTajoWorker.java   |  26 +-
 .../java/org/apache/tajo/QueryTestCaseBase.java |   4 +-
 .../org/apache/tajo/TajoTestingCluster.java     |  28 +-
 .../org/apache/tajo/cli/tools/TestTajoDump.java |   4 +-
 .../org/apache/tajo/cli/tsql/TestTajoCli.java   |  56 +-
 .../tajo/engine/query/TestAlterTablespace.java  |   2 +-
 .../apache/tajo/engine/query/TestCTASQuery.java |   6 +-
 .../tajo/engine/query/TestCreateTable.java      |  14 +-
 .../tajo/engine/query/TestInsertQuery.java      |  40 +-
 .../apache/tajo/engine/query/TestNetTypes.java  |  26 +-
 .../tajo/engine/query/TestSelectQuery.java      |   4 +-
 .../apache/tajo/engine/query/TestSortQuery.java |   6 +-
 .../tajo/engine/query/TestTablePartitions.java  |  22 +-
 .../org/apache/tajo/jdbc/TestResultSet.java     |   4 +-
 .../tajo/jdbc/TestTajoDatabaseMetaData.java     |  22 +-
 .../java/org/apache/tajo/jdbc/TestTajoJdbc.java |  12 +-
 .../create_table_various_types_for_hcatalog.sql |  50 -
 ...ate_table_various_types_for_hive_catalog.sql |  50 +
 tajo-dist/pom.xml                               |   4 +-
 tajo-dist/src/main/bin/tajo                     |   3 -
 .../src/main/conf/catalog-site.xml.template     |   6 +-
 tajo-dist/src/main/conf/tajo-env.sh             |   2 +-
 .../configuration/catalog_configuration.rst     |  40 +-
 .../src/main/sphinx/hcatalog_integration.rst    |  52 -
 tajo-docs/src/main/sphinx/hive_integration.rst  |  42 +
 tajo-docs/src/main/sphinx/index.rst             |   2 +-
 .../plan/rewrite/rules/FilterPushDownRule.java  |   2 +-
 tajo-project/pom.xml                            |   1 +
 .../main/java/org/apache/tajo/rpc/RpcUtils.java |  34 -
 .../org/apache/tajo/rpc/AsyncRpcClient.java     |  58 +-
 .../org/apache/tajo/rpc/AsyncRpcServer.java     |  82 +-
 .../org/apache/tajo/rpc/BlockingRpcClient.java  |  88 +-
 .../org/apache/tajo/rpc/BlockingRpcServer.java  |  85 +-
 .../tajo/rpc/ConnectionCloseFutureListener.java |  35 +
 .../org/apache/tajo/rpc/NettyClientBase.java    | 124 +--
 .../tajo/rpc/ProtoChannelInitializer.java       |  11 +-
 .../org/apache/tajo/rpc/RpcClientManager.java   | 185 ++++
 .../org/apache/tajo/rpc/RpcConnectionPool.java  | 191 ----
 .../org/apache/tajo/rpc/ServerCallable.java     |  36 +-
 .../java/org/apache/tajo/rpc/TestAsyncRpc.java  |  72 +-
 .../org/apache/tajo/rpc/TestBlockingRpc.java    |  85 +-
 .../apache/tajo/rpc/TestRpcClientManager.java   |  97 ++
 70 files changed, 3243 insertions(+), 3832 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/02800343/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --cc 
tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index 3b72639,49be29a..6cfbb1a
--- 
a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ 
b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@@ -373,27 -372,9 +373,27 @@@ public abstract class AbstractCatalogCl
    }
  
    @Override
 +  public List<IndexDescProto> getAllIndexes() {
 +    try {
-       return new ServerCallable<List<IndexDescProto>>(pool, 
getCatalogServerAddr(), CatalogProtocol.class, false) {
++      return new ServerCallable<List<IndexDescProto>>(this.manager, 
getCatalogServerAddr(), CatalogProtocol.class, false) {
 +
 +        @Override
 +        public List<IndexDescProto> call(NettyClientBase client) throws 
Exception {
 +          CatalogProtocolService.BlockingInterface stub = getStub(client);
 +          GetIndexesProto response = stub.getAllIndexes(null, 
ProtoUtil.NULL_PROTO);
 +          return response.getIndexList();
 +        }
 +      }.withRetries();
 +    } catch (ServiceException e) {
 +      LOG.error(e.getMessage(), e);
 +      return null;
 +    }
 +  }
 +
 +  @Override
    public final PartitionMethodDesc getPartitionMethod(final String 
databaseName, final String tableName) {
      try {
-       return new ServerCallable<PartitionMethodDesc>(this.pool, 
getCatalogServerAddr(), CatalogProtocol.class, false) {
+       return new ServerCallable<PartitionMethodDesc>(this.manager, 
getCatalogServerAddr(), CatalogProtocol.class, false) {
          public PartitionMethodDesc call(NettyClientBase client) throws 
ServiceException {
  
            TableIdentifierProto.Builder builder = 
TableIdentifierProto.newBuilder();
@@@ -637,40 -618,17 +637,40 @@@
    }
  
    @Override
 -  public boolean existIndexByColumn(final String databaseName, final String 
tableName, final String columnName) {
 +  public boolean existIndexByColumns(final String databaseName, final String 
tableName, final Column [] columns) {
 +    return existIndexByColumnNames(databaseName, tableName, 
extractColumnNames(columns));
 +  }
 +
 +  @Override
 +  public boolean existIndexByColumnNames(final String databaseName, final 
String tableName, final String [] columnNames) {
      try {
-       return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), 
CatalogProtocol.class, false) {
+       return new ServerCallable<Boolean>(this.manager, 
getCatalogServerAddr(), CatalogProtocol.class, false) {
          public Boolean call(NettyClientBase client) throws ServiceException {
  
 -          GetIndexByColumnRequest.Builder builder = 
GetIndexByColumnRequest.newBuilder();
 +          GetIndexByColumnNamesRequest.Builder builder = 
GetIndexByColumnNamesRequest.newBuilder();
            
builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, 
tableName));
 -          builder.setColumnName(columnName);
 +          for (String colunName : columnNames) {
 +            builder.addColumnNames(colunName);
 +          }
  
            CatalogProtocolService.BlockingInterface stub = getStub(client);
 -          return stub.existIndexByColumn(null, builder.build()).getValue();
 +          return stub.existIndexByColumnNames(null, 
builder.build()).getValue();
 +        }
 +      }.withRetries();
 +    } catch (ServiceException e) {
 +      LOG.error(e.getMessage(), e);
 +      return false;
 +    }
 +  }
 +
 +  @Override
 +  public boolean existIndexesByTable(final String databaseName, final String 
tableName) {
 +    try {
-       return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), 
CatalogProtocol.class, false) {
++      return new ServerCallable<Boolean>(this.manager, 
getCatalogServerAddr(), CatalogProtocol.class, false) {
 +        public Boolean call(NettyClientBase client) throws ServiceException {
 +
 +          CatalogProtocolService.BlockingInterface stub = getStub(client);
 +          return stub.existIndexesByTable(null, 
CatalogUtil.buildTableIdentifier(databaseName, tableName)).getValue();
          }
        }.withRetries();
      } catch (ServiceException e) {
@@@ -699,60 -657,20 +699,60 @@@
      }
    }
  
 +  private static String[] extractColumnNames(Column[] columns) {
 +    String[] columnNames = new String [columns.length];
 +    for (int i = 0; i < columnNames.length; i++) {
 +      columnNames[i] = columns[i].getSimpleName();
 +    }
 +    return columnNames;
 +  }
 +
 +  @Override
 +  public final IndexDesc getIndexByColumns(final String databaseName,
 +                                               final String tableName,
 +                                               final Column [] columns) {
 +    return getIndexByColumnNames(databaseName, tableName, 
extractColumnNames(columns));
 +  }
 +
    @Override
 -  public final IndexDesc getIndexByColumn(final String databaseName,
 -                                          final String tableName,
 -                                          final String columnName) {
 +  public final IndexDesc getIndexByColumnNames(final String databaseName,
 +                                           final String tableName,
 +                                           final String [] columnNames) {
      try {
-       return new ServerCallable<IndexDesc>(this.pool, getCatalogServerAddr(), 
CatalogProtocol.class, false) {
+       return new ServerCallable<IndexDesc>(this.manager, 
getCatalogServerAddr(), CatalogProtocol.class, false) {
          public IndexDesc call(NettyClientBase client) throws ServiceException 
{
  
 -          GetIndexByColumnRequest.Builder builder = 
GetIndexByColumnRequest.newBuilder();
 +          GetIndexByColumnNamesRequest.Builder builder = 
GetIndexByColumnNamesRequest.newBuilder();
            
builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, 
tableName));
 -          builder.setColumnName(columnName);
 +          for (String columnName : columnNames) {
 +            builder.addColumnNames(columnName);
 +          }
  
            CatalogProtocolService.BlockingInterface stub = getStub(client);
 -          return new IndexDesc(stub.getIndexByColumn(null, builder.build()));
 +          return new IndexDesc(stub.getIndexByColumnNames(null, 
builder.build()));
 +        }
 +      }.withRetries();
 +    } catch (ServiceException e) {
 +      LOG.error(e.getMessage(), e);
 +      return null;
 +    }
 +  }
 +
 +  @Override
 +  public final Collection<IndexDesc> getAllIndexesByTable(final String 
databaseName,
 +                                                          final String 
tableName) {
 +    try {
-       return new ServerCallable<Collection<IndexDesc>>(this.pool, 
getCatalogServerAddr(), CatalogProtocol.class, false) {
++      return new ServerCallable<Collection<IndexDesc>>(this.manager, 
getCatalogServerAddr(), CatalogProtocol.class, false) {
 +        @Override
 +        public Collection<IndexDesc> call(NettyClientBase client) throws 
Exception {
 +          TableIdentifierProto proto = 
CatalogUtil.buildTableIdentifier(databaseName, tableName);
 +          CatalogProtocolService.BlockingInterface stub = getStub(client);
 +          GetAllIndexesResponse response = stub.getAllIndexesByTable(null, 
proto);
 +          List<IndexDesc> indexDescs = TUtil.newList();
 +          for (IndexDescProto descProto : response.getIndexDescList()) {
 +            indexDescs.add(new IndexDesc(descProto));
 +          }
 +          return indexDescs;
          }
        }.withRetries();
      } catch (ServiceException e) {
@@@ -781,7 -699,25 +781,7 @@@
        return false;
      }
    }
--  
 -  @Override
 -  public List<IndexProto> getAllIndexes() {
 -    try {
 -      return new ServerCallable<List<IndexProto>>(manager, 
getCatalogServerAddr(), CatalogProtocol.class, false) {
 -
 -        @Override
 -        public List<IndexProto> call(NettyClientBase client) throws Exception 
{
 -          CatalogProtocolService.BlockingInterface stub = getStub(client);
 -          GetIndexesProto response = stub.getAllIndexes(null, 
ProtoUtil.NULL_PROTO);
 -          return response.getIndexList();
 -        }
 -      }.withRetries();
 -    } catch (ServiceException e) {
 -      LOG.error(e.getMessage(), e);
 -      return null;
 -    }
 -  }
+ 
    @Override
    public final boolean createFunction(final FunctionDesc funcDesc) {
      try {

http://git-wip-us.apache.org/repos/asf/tajo/blob/02800343/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
----------------------------------------------------------------------
diff --cc 
tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
index 0000000,5b1a996..700b327
mode 000000,100644..100644
--- 
a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
+++ 
b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
@@@ -1,0 -1,964 +1,980 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.tajo.catalog.store;
+ 
+ import com.google.common.collect.Lists;
+ 
+ import org.apache.commons.lang.StringEscapeUtils;
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hive.metastore.TableType;
+ import org.apache.hadoop.hive.metastore.api.*;
+ import org.apache.hadoop.hive.serde.serdeConstants;
+ import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
+ import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
+ import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+ import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
+ import org.apache.tajo.TajoConstants;
+ import org.apache.tajo.catalog.*;
+ import org.apache.tajo.catalog.exception.*;
+ import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+ import org.apache.tajo.catalog.proto.CatalogProtos;
+ import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
+ import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto;
 -import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto;
++import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
+ import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto;
+ import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto;
+ import org.apache.tajo.catalog.proto.CatalogProtos.TablePartitionProto;
+ import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto;
+ import org.apache.tajo.catalog.proto.CatalogProtos.TablespaceProto;
+ import org.apache.tajo.catalog.statistics.TableStats;
+ import org.apache.tajo.common.TajoDataTypes;
+ import org.apache.tajo.common.exception.NotImplementedException;
+ import org.apache.tajo.conf.TajoConf;
+ import org.apache.tajo.exception.InternalException;
+ import org.apache.tajo.storage.StorageConstants;
+ import org.apache.tajo.util.KeyValueSet;
+ import org.apache.thrift.TException;
+ 
+ import java.io.IOException;
+ import java.util.*;
+ 
+ import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionType;
+ 
+ public class HiveCatalogStore extends CatalogConstants implements 
CatalogStore {
+   protected final Log LOG = LogFactory.getLog(getClass());
+ 
+   private static String HIVE_WAREHOUSE_DIR_CONF_KEY = 
"hive.metastore.warehouse.dir";
+ 
+   protected Configuration conf;
+   private static final int CLIENT_POOL_SIZE = 2;
+   private final HiveCatalogStoreClientPool clientPool;
+   private final String defaultTableSpaceUri;
+ 
+   public HiveCatalogStore(final Configuration conf) throws InternalException {
+     if (!(conf instanceof TajoConf)) {
+       throw new CatalogException("Invalid Configuration Type:" + 
conf.getClass().getSimpleName());
+     }
+     this.conf = conf;
+     this.defaultTableSpaceUri = TajoConf.getWarehouseDir((TajoConf) 
conf).toString();
+     this.clientPool = new HiveCatalogStoreClientPool(CLIENT_POOL_SIZE, conf);
+   }
+ 
+   @Override
+   public boolean existTable(final String databaseName, final String 
tableName) throws CatalogException {
+     boolean exist = false;
+     org.apache.hadoop.hive.ql.metadata.Table table;
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+ 
+     // get table
+     try {
+       client = clientPool.getClient();
+       table = HiveCatalogUtil.getTable(client.getHiveClient(), databaseName, 
tableName);
+       if (table != null) {
+         exist = true;
+       }
+     } catch (NoSuchObjectException nsoe) {
+       exist = false;
+     } catch (Exception e) {
+       throw new CatalogException(e);
+     } finally {
+       if (client != null) {
+         client.release();
+       }
+     }
+ 
+     return exist;
+   }
+ 
+   @Override
+   public final CatalogProtos.TableDescProto getTable(String databaseName, 
final String tableName) throws CatalogException {
+     org.apache.hadoop.hive.ql.metadata.Table table = null;
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+     Path path = null;
+     CatalogProtos.StoreType storeType = null;
+     org.apache.tajo.catalog.Schema schema = null;
+     KeyValueSet options = null;
+     TableStats stats = null;
+     PartitionMethodDesc partitions = null;
+ 
+     //////////////////////////////////
+     // set tajo table schema.
+     //////////////////////////////////
+     try {
+       // get hive table schema
+       try {
+         client = clientPool.getClient();
+         table = HiveCatalogUtil.getTable(client.getHiveClient(), 
databaseName, tableName);
+         path = table.getPath();
+       } catch (NoSuchObjectException nsoe) {
+         throw new CatalogException("Table not found. - tableName:" + 
tableName, nsoe);
+       } catch (Exception e) {
+         throw new CatalogException(e);
+       }
+ 
+       // convert HiveCatalogStore field schema into tajo field schema.
+       schema = new org.apache.tajo.catalog.Schema();
+ 
+       List<FieldSchema> fieldSchemaList = table.getCols();
+       boolean isPartitionKey = false;
+       for (FieldSchema eachField : fieldSchemaList) {
+         isPartitionKey = false;
+ 
+         if (table.getPartitionKeys() != null) {
+           for (FieldSchema partitionKey : table.getPartitionKeys()) {
+             if (partitionKey.getName().equals(eachField.getName())) {
+               isPartitionKey = true;
+             }
+           }
+         }
+ 
+         if (!isPartitionKey) {
+           String fieldName = databaseName + 
CatalogConstants.IDENTIFIER_DELIMITER + tableName +
+               CatalogConstants.IDENTIFIER_DELIMITER + eachField.getName();
+           TajoDataTypes.Type dataType = 
HiveCatalogUtil.getTajoFieldType(eachField.getType().toString());
+           schema.addColumn(fieldName, dataType);
+         }
+       }
+ 
+       // validate field schema.
+       HiveCatalogUtil.validateSchema(table);
+ 
+       stats = new TableStats();
+       options = new KeyValueSet();
+       options.putAll(table.getParameters());
+       options.remove("EXTERNAL");
+ 
+       Properties properties = table.getMetadata();
+       if (properties != null) {
+         // set field delimiter
+         String fieldDelimiter = "", nullFormat = "";
+         if (properties.getProperty(serdeConstants.FIELD_DELIM) != null) {
+           fieldDelimiter = properties.getProperty(serdeConstants.FIELD_DELIM);
+         } else {
+           // if hive table used default row format delimiter, Properties 
doesn't have it.
+           // So, Tajo must set as follows:
+           fieldDelimiter = "\u0001";
+         }
+ 
+         // set null format
+         if (properties.getProperty(serdeConstants.SERIALIZATION_NULL_FORMAT) 
!= null) {
+           nullFormat = 
properties.getProperty(serdeConstants.SERIALIZATION_NULL_FORMAT);
+         } else {
+           nullFormat = "\\N";
+         }
+         options.remove(serdeConstants.SERIALIZATION_NULL_FORMAT);
+ 
+         // set file output format
+         String fileOutputformat = 
properties.getProperty(hive_metastoreConstants.FILE_OUTPUT_FORMAT);
+         storeType = 
CatalogUtil.getStoreType(HiveCatalogUtil.getStoreType(fileOutputformat));
+ 
+         if (storeType.equals(CatalogProtos.StoreType.TEXTFILE)) {
+           options.set(StorageConstants.TEXT_DELIMITER, 
StringEscapeUtils.escapeJava(fieldDelimiter));
+           options.set(StorageConstants.TEXT_NULL, 
StringEscapeUtils.escapeJava(nullFormat));
+         } else if (storeType.equals(CatalogProtos.StoreType.RCFILE)) {
+           options.set(StorageConstants.RCFILE_NULL, 
StringEscapeUtils.escapeJava(nullFormat));
+           String serde = 
properties.getProperty(serdeConstants.SERIALIZATION_LIB);
+           if (LazyBinaryColumnarSerDe.class.getName().equals(serde)) {
+             options.set(StorageConstants.RCFILE_SERDE, 
StorageConstants.DEFAULT_BINARY_SERDE);
+           } else if (ColumnarSerDe.class.getName().equals(serde)) {
+             options.set(StorageConstants.RCFILE_SERDE, 
StorageConstants.DEFAULT_TEXT_SERDE);
+           }
+         } else if (storeType.equals(CatalogProtos.StoreType.SEQUENCEFILE) ) {
+           options.set(StorageConstants.SEQUENCEFILE_DELIMITER, 
StringEscapeUtils.escapeJava(fieldDelimiter));
+           options.set(StorageConstants.SEQUENCEFILE_NULL, 
StringEscapeUtils.escapeJava(nullFormat));
+           String serde = 
properties.getProperty(serdeConstants.SERIALIZATION_LIB);
+           if (LazyBinarySerDe.class.getName().equals(serde)) {
+             options.set(StorageConstants.SEQUENCEFILE_SERDE, 
StorageConstants.DEFAULT_BINARY_SERDE);
+           } else if (LazySimpleSerDe.class.getName().equals(serde)) {
+             options.set(StorageConstants.SEQUENCEFILE_SERDE, 
StorageConstants.DEFAULT_TEXT_SERDE);
+           }
+         }
+ 
+         // set data size
+         long totalSize = 0;
+         if (properties.getProperty("totalSize") != null) {
+           totalSize = Long.parseLong(properties.getProperty("totalSize"));
+         } else {
+           try {
+             FileSystem fs = path.getFileSystem(conf);
+             if (fs.exists(path)) {
+               totalSize = fs.getContentSummary(path).getLength();
+             }
+           } catch (IOException ioe) {
+             throw new CatalogException("Fail to get path. - path:" + 
path.toString(), ioe);
+           }
+         }
+         stats.setNumBytes(totalSize);
+       }
+ 
+       // set partition keys
+       List<FieldSchema> partitionKeys = table.getPartitionKeys();
+ 
+       if (null != partitionKeys) {
+         org.apache.tajo.catalog.Schema expressionSchema = new 
org.apache.tajo.catalog.Schema();
+         StringBuilder sb = new StringBuilder();
+         if (partitionKeys.size() > 0) {
+           for (int i = 0; i < partitionKeys.size(); i++) {
+             FieldSchema fieldSchema = partitionKeys.get(i);
+             TajoDataTypes.Type dataType = 
HiveCatalogUtil.getTajoFieldType(fieldSchema.getType().toString());
+             String fieldName = databaseName + 
CatalogConstants.IDENTIFIER_DELIMITER + tableName +
+                 CatalogConstants.IDENTIFIER_DELIMITER + fieldSchema.getName();
+             expressionSchema.addColumn(new Column(fieldName, dataType));
+             if (i > 0) {
+               sb.append(",");
+             }
+             sb.append(fieldSchema.getName());
+           }
+           partitions = new PartitionMethodDesc(
+               databaseName,
+               tableName,
+               PartitionType.COLUMN,
+               sb.toString(),
+               expressionSchema);
+         }
+       }
+     } finally {
+       if(client != null) client.release();
+     }
+     TableMeta meta = new TableMeta(storeType, options);
+     TableDesc tableDesc = new TableDesc(databaseName + "." + tableName, 
schema, meta, path.toUri());
+     if (table.getTableType().equals(TableType.EXTERNAL_TABLE)) {
+       tableDesc.setExternal(true);
+     }
+     if (stats != null) {
+       tableDesc.setStats(stats);
+     }
+     if (partitions != null) {
+       tableDesc.setPartitionMethod(partitions);
+     }
+     return tableDesc.getProto();
+   }
+ 
+ 
+   private TajoDataTypes.Type getDataType(final String typeStr) {
+     try {
+       return Enum.valueOf(TajoDataTypes.Type.class, typeStr);
+     } catch (IllegalArgumentException iae) {
+       LOG.error("Cannot find a matched type against from '" + typeStr + "'");
+       return null;
+     }
+   }
+ 
+   @Override
+   public final List<String> getAllTableNames(String databaseName) throws 
CatalogException {
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+ 
+     try {
+       client = clientPool.getClient();
+       return client.getHiveClient().getAllTables(databaseName);
+     } catch (TException e) {
+       throw new CatalogException(e);
+     } finally {
+       if(client != null) client.release();
+     }
+   }
+ 
+   @Override
+   public void createTablespace(String spaceName, String spaceUri) throws 
CatalogException {
+     // SKIP
+   }
+ 
+   @Override
+   public boolean existTablespace(String spaceName) throws CatalogException {
+     // SKIP
+     return spaceName.equals(TajoConstants.DEFAULT_TABLESPACE_NAME);
+   }
+ 
+   @Override
+   public void dropTablespace(String spaceName) throws CatalogException {
+     // SKIP
+   }
+ 
+   @Override
+   public Collection<String> getAllTablespaceNames() throws CatalogException {
+     return Lists.newArrayList(TajoConstants.DEFAULT_TABLESPACE_NAME);
+   }
+ 
+   @Override
+   public TablespaceProto getTablespace(String spaceName) throws 
CatalogException {
+     if (spaceName.equals(TajoConstants.DEFAULT_TABLESPACE_NAME)) {
+       TablespaceProto.Builder builder = TablespaceProto.newBuilder();
+       builder.setSpaceName(TajoConstants.DEFAULT_TABLESPACE_NAME);
+       builder.setUri(defaultTableSpaceUri);
+       return builder.build();
+     } else {
+       throw new CatalogException("tablespace concept is not supported in 
HiveCatalogStore");
+     }
+   }
+ 
+   @Override
+   public void updateTableStats(CatalogProtos.UpdateTableStatsProto 
statsProto) throws
+     CatalogException {
+     // TODO - not implemented yet
+   }
+ 
+   @Override
+   public void alterTablespace(CatalogProtos.AlterTablespaceProto alterProto) 
throws CatalogException {
+     throw new CatalogException("tablespace concept is not supported in 
HiveCatalogStore");
+   }
+ 
+   @Override
+   public void createDatabase(String databaseName, String tablespaceName) 
throws CatalogException {
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+ 
+     try {
+       Database database = new Database(
+           databaseName,
+           "",
+           defaultTableSpaceUri + "/" + databaseName,
+           new HashMap<String, String>());
+       client = clientPool.getClient();
+       client.getHiveClient().createDatabase(database);
+     } catch (AlreadyExistsException e) {
+       throw new AlreadyExistsDatabaseException(databaseName);
+     } catch (Throwable t) {
+       throw new CatalogException(t);
+     } finally {
+       if (client != null) {
+         client.release();
+       }
+     }
+   }
+ 
+   @Override
+   public boolean existDatabase(String databaseName) throws CatalogException {
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+ 
+     try {
+       client = clientPool.getClient();
+       List<String> databaseNames = client.getHiveClient().getAllDatabases();
+       return databaseNames.contains(databaseName);
+     } catch (Throwable t) {
+       throw new CatalogException(t);
+     } finally {
+       if (client != null) {
+         client.release();
+       }
+     }
+   }
+ 
+   @Override
+   public void dropDatabase(String databaseName) throws CatalogException {
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+ 
+     try {
+       client = clientPool.getClient();
+       client.getHiveClient().dropDatabase(databaseName);
+     } catch (NoSuchObjectException e) {
+       throw new NoSuchDatabaseException(databaseName);
+     } catch (Throwable t) {
+       throw new CatalogException(databaseName);
+     } finally {
+       if (client != null) {
+         client.release();
+       }
+     }
+   }
+ 
+   @Override
+   public Collection<String> getAllDatabaseNames() throws CatalogException {
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+ 
+     try {
+       client = clientPool.getClient();
+       return client.getHiveClient().getAllDatabases();
+     } catch (TException e) {
+       throw new CatalogException(e);
+     } finally {
+       if (client != null) {
+         client.release();
+       }
+     }
+   }
+ 
+   @Override
+   public final void createTable(final CatalogProtos.TableDescProto 
tableDescProto) throws CatalogException {
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+ 
+     TableDesc tableDesc = new TableDesc(tableDescProto);
+     String[] splitted = CatalogUtil.splitFQTableName(tableDesc.getName());
+     String databaseName = splitted[0];
+     String tableName = splitted[1];
+ 
+     try {
+       client = clientPool.getClient();
+ 
+       org.apache.hadoop.hive.metastore.api.Table table = new 
org.apache.hadoop.hive.metastore.api.Table();
+       table.setDbName(databaseName);
+       table.setTableName(tableName);
+       table.setParameters(new HashMap<String, 
String>(tableDesc.getMeta().getOptions().getAllKeyValus()));
+       // TODO: set owner
+       //table.setOwner();
+ 
+       StorageDescriptor sd = new StorageDescriptor();
+       sd.setSerdeInfo(new SerDeInfo());
+       sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+       sd.getSerdeInfo().setName(table.getTableName());
+ 
+       // if tajo set location method, thrift client make exception as follows:
+       // Caused by: MetaException(message:java.lang.NullPointerException)
+       // If you want to modify table path, you have to modify on Hive cli.
+       if (tableDesc.isExternal()) {
+         table.setTableType(TableType.EXTERNAL_TABLE.name());
+         table.putToParameters("EXTERNAL", "TRUE");
+ 
+         Path tablePath = new Path(tableDesc.getPath());
+         FileSystem fs = tablePath.getFileSystem(conf);
+         if (fs.isFile(tablePath)) {
+           LOG.warn("A table path is a file, but HiveCatalogStore does not 
allow a file path.");
+           sd.setLocation(tablePath.getParent().toString());
+         } else {
+           sd.setLocation(tablePath.toString());
+         }
+       }
+ 
+       // set column information
+       List<Column> columns = tableDesc.getSchema().getColumns();
+       ArrayList<FieldSchema> cols = new 
ArrayList<FieldSchema>(columns.size());
+ 
+       for (Column eachField : columns) {
+         cols.add(new FieldSchema(eachField.getSimpleName(),
+             HiveCatalogUtil.getHiveFieldType(eachField.getDataType()), ""));
+       }
+       sd.setCols(cols);
+ 
+       // set partition keys
+       if (tableDesc.hasPartition() && 
tableDesc.getPartitionMethod().getPartitionType().equals(PartitionType.COLUMN)) 
{
+         List<FieldSchema> partitionKeys = new ArrayList<FieldSchema>();
+         for (Column eachPartitionKey : 
tableDesc.getPartitionMethod().getExpressionSchema().getColumns()) {
+           partitionKeys.add(new FieldSchema(eachPartitionKey.getSimpleName(),
+               
HiveCatalogUtil.getHiveFieldType(eachPartitionKey.getDataType()), ""));
+         }
+         table.setPartitionKeys(partitionKeys);
+       }
+ 
+       if 
(tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.RCFILE)) {
+         String serde = 
tableDesc.getMeta().getOption(StorageConstants.RCFILE_SERDE);
+         
sd.setInputFormat(org.apache.hadoop.hive.ql.io.RCFileInputFormat.class.getName());
+         
sd.setOutputFormat(org.apache.hadoop.hive.ql.io.RCFileOutputFormat.class.getName());
+         if (StorageConstants.DEFAULT_TEXT_SERDE.equals(serde)) {
+           
sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe.class.getName());
+         } else {
+           sd.getSerdeInfo().setSerializationLib(
+               
org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe.class.getName());
+         }
+ 
+         if 
(tableDesc.getMeta().getOptions().containsKey(StorageConstants.RCFILE_NULL)) {
+           table.putToParameters(serdeConstants.SERIALIZATION_NULL_FORMAT,
+               
StringEscapeUtils.unescapeJava(tableDesc.getMeta().getOption(StorageConstants.RCFILE_NULL)));
+         }
+       } else if 
(tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.CSV)
+           || 
tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.TEXTFILE)) {
+         
sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName());
+         
sd.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class.getName());
+         
sd.setOutputFormat(org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat.class.getName());
+ 
+         String fieldDelimiter = 
tableDesc.getMeta().getOption(StorageConstants.TEXT_DELIMITER,
+             StorageConstants.DEFAULT_FIELD_DELIMITER);
+ 
+         // User can use an unicode for filed delimiter such as \u0001, \001.
+         // In this case, java console will convert this value into "\\u001".
+         // And hive will un-espace this value again.
+         // As a result, user can use right field delimiter.
+         // So, we have to un-escape this value.
+         sd.getSerdeInfo().putToParameters(serdeConstants.SERIALIZATION_FORMAT,
+             StringEscapeUtils.unescapeJava(fieldDelimiter));
+         sd.getSerdeInfo().putToParameters(serdeConstants.FIELD_DELIM,
+             StringEscapeUtils.unescapeJava(fieldDelimiter));
+         table.getParameters().remove(StorageConstants.TEXT_DELIMITER);
+ 
+         if (tableDesc.getMeta().containsOption(StorageConstants.TEXT_NULL)) {
+           table.putToParameters(serdeConstants.SERIALIZATION_NULL_FORMAT,
+               
StringEscapeUtils.unescapeJava(tableDesc.getMeta().getOption(StorageConstants.TEXT_NULL)));
+           table.getParameters().remove(StorageConstants.TEXT_NULL);
+         }
+       } else if 
(tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.SEQUENCEFILE))
 {
+         String serde = 
tableDesc.getMeta().getOption(StorageConstants.SEQUENCEFILE_SERDE);
+         
sd.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class.getName());
+         
sd.setOutputFormat(org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat.class.getName());
+ 
+         if (StorageConstants.DEFAULT_TEXT_SERDE.equals(serde)) {
+           
sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName());
+ 
+           String fieldDelimiter = 
tableDesc.getMeta().getOption(StorageConstants.SEQUENCEFILE_DELIMITER,
+               StorageConstants.DEFAULT_FIELD_DELIMITER);
+ 
+           // User can use an unicode for filed delimiter such as \u0001, \001.
+           // In this case, java console will convert this value into "\\u001".
+           // And hive will un-espace this value again.
+           // As a result, user can use right field delimiter.
+           // So, we have to un-escape this value.
+           
sd.getSerdeInfo().putToParameters(serdeConstants.SERIALIZATION_FORMAT,
+               StringEscapeUtils.unescapeJava(fieldDelimiter));
+           sd.getSerdeInfo().putToParameters(serdeConstants.FIELD_DELIM,
+               StringEscapeUtils.unescapeJava(fieldDelimiter));
+           
table.getParameters().remove(StorageConstants.SEQUENCEFILE_DELIMITER);
+         } else {
+           
sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.class.getName());
+         }
+ 
+         if 
(tableDesc.getMeta().containsOption(StorageConstants.SEQUENCEFILE_NULL)) {
+           table.putToParameters(serdeConstants.SERIALIZATION_NULL_FORMAT,
+               
StringEscapeUtils.unescapeJava(tableDesc.getMeta().getOption(StorageConstants.SEQUENCEFILE_NULL)));
+           table.getParameters().remove(StorageConstants.SEQUENCEFILE_NULL);
+         }
+       } else {
+         if 
(tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.PARQUET)) {
+           
sd.setInputFormat(parquet.hive.DeprecatedParquetInputFormat.class.getName());
+           
sd.setOutputFormat(parquet.hive.DeprecatedParquetOutputFormat.class.getName());
+           
sd.getSerdeInfo().setSerializationLib(parquet.hive.serde.ParquetHiveSerDe.class.getName());
+         } else {
+           throw new CatalogException(new 
NotImplementedException(tableDesc.getMeta().getStoreType
+               ().name()));
+         }
+       }
+ 
+       sd.setSortCols(new ArrayList<Order>());
+ 
+       table.setSd(sd);
+       client.getHiveClient().createTable(table);
+     } catch (RuntimeException e) {
+       throw e;
+     } catch (Exception e) {
+       throw new CatalogException(e);
+     } finally {
+       if(client != null) client.release();
+     }
+   }
+ 
+   @Override
+   public final void dropTable(String databaseName, final String tableName) 
throws CatalogException {
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+ 
+     try {
+       client = clientPool.getClient();
+       client.getHiveClient().dropTable(databaseName, tableName, false, false);
+     } catch (NoSuchObjectException nsoe) {
+     } catch (Exception e) {
+       throw new CatalogException(e);
+     } finally {
+       if (client != null) {
+         client.release();
+       }
+     }
+   }
+ 
+ 
+   @Override
+   public void alterTable(final CatalogProtos.AlterTableDescProto 
alterTableDescProto) throws CatalogException {
+     final String[] split = 
CatalogUtil.splitFQTableName(alterTableDescProto.getTableName());
+ 
+     if (split.length == 1) {
+       throw new IllegalArgumentException("alterTable() requires a qualified 
table name, but it is \""
+           + alterTableDescProto.getTableName() + "\".");
+     }
+ 
+     final String databaseName = split[0];
+     final String tableName = split[1];
+     String partitionName = null;
+     CatalogProtos.PartitionDescProto partitionDesc = null;
+ 
+     switch (alterTableDescProto.getAlterTableType()) {
+       case RENAME_TABLE:
+         if 
(existTable(databaseName,alterTableDescProto.getNewTableName().toLowerCase())) {
+           throw new 
AlreadyExistsTableException(alterTableDescProto.getNewTableName());
+         }
+         renameTable(databaseName, tableName, 
alterTableDescProto.getNewTableName().toLowerCase());
+         break;
+       case RENAME_COLUMN:
+         if (existColumn(databaseName,tableName, 
alterTableDescProto.getAlterColumnName().getNewColumnName())) {
+           throw new 
ColumnNameAlreadyExistException(alterTableDescProto.getAlterColumnName().getNewColumnName());
+         }
+         renameColumn(databaseName, tableName, 
alterTableDescProto.getAlterColumnName());
+         break;
+       case ADD_COLUMN:
+         if (existColumn(databaseName,tableName, 
alterTableDescProto.getAddColumn().getName())) {
+           throw new 
ColumnNameAlreadyExistException(alterTableDescProto.getAddColumn().getName());
+         }
+         addNewColumn(databaseName, tableName, 
alterTableDescProto.getAddColumn());
+         break;
+       case ADD_PARTITION:
+         partitionName = 
alterTableDescProto.getPartitionDesc().getPartitionName();
+         partitionDesc = getPartition(databaseName, tableName, partitionName);
+         if(partitionDesc != null) {
+           throw new AlreadyExistsPartitionException(databaseName, tableName, 
partitionName);
+         }
+         addPartition(databaseName, tableName, 
alterTableDescProto.getPartitionDesc());
+         break;
+       case DROP_PARTITION:
+         partitionName = 
alterTableDescProto.getPartitionDesc().getPartitionName();
+         partitionDesc = getPartition(databaseName, tableName, partitionName);
+         if(partitionDesc == null) {
+           throw new NoSuchPartitionException(databaseName, tableName, 
partitionName);
+         }
+         dropPartition(databaseName, tableName, partitionDesc);
+         break;
+       case SET_PROPERTY:
+         // TODO - not implemented yet
+         break;
+       default:
+         //TODO
+     }
+   }
+ 
+ 
+   private void renameTable(String databaseName, String tableName, String 
newTableName) {
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+     try {
+       client = clientPool.getClient();
+       Table newTable = client.getHiveClient().getTable(databaseName, 
tableName);
+       newTable.setTableName(newTableName);
+       client.getHiveClient().alter_table(databaseName, tableName, newTable);
+ 
+     } catch (NoSuchObjectException nsoe) {
+     } catch (Exception e) {
+       throw new CatalogException(e);
+     } finally {
+       if (client != null) {
+         client.release();
+       }
+     }
+   }
+ 
+   private void renameColumn(String databaseName, String tableName, 
CatalogProtos.AlterColumnProto alterColumnProto) {
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+     try {
+ 
+       client = clientPool.getClient();
+       Table table = client.getHiveClient().getTable(databaseName, tableName);
+       List<FieldSchema> columns = table.getSd().getCols();
+ 
+       for (final FieldSchema currentColumn : columns) {
+         if 
(currentColumn.getName().equalsIgnoreCase(alterColumnProto.getOldColumnName())) 
{
+           currentColumn.setName(alterColumnProto.getNewColumnName());
+         }
+       }
+       client.getHiveClient().alter_table(databaseName, tableName, table);
+ 
+     } catch (NoSuchObjectException nsoe) {
+     } catch (Exception e) {
+       throw new CatalogException(e);
+     } finally {
+       if (client != null) {
+         client.release();
+       }
+     }
+   }
+ 
+ 
+   private void addNewColumn(String databaseName, String tableName, 
CatalogProtos.ColumnProto columnProto) {
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+     try {
+ 
+       client = clientPool.getClient();
+       Table table = client.getHiveClient().getTable(databaseName, tableName);
+       List<FieldSchema> columns = table.getSd().getCols();
+       columns.add(new FieldSchema(columnProto.getName(),
+           HiveCatalogUtil.getHiveFieldType(columnProto.getDataType()), ""));
+       client.getHiveClient().alter_table(databaseName, tableName, table);
+ 
+ 
+     } catch (NoSuchObjectException nsoe) {
+     } catch (Exception e) {
+       throw new CatalogException(e);
+     } finally {
+       if (client != null) {
+         client.release();
+       }
+     }
+   }
+ 
+   private void addPartition(String databaseName, String tableName, 
CatalogProtos.PartitionDescProto
+     partitionDescProto) {
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+     try {
+ 
+       client = clientPool.getClient();
+ 
+       Partition partition = new Partition();
+       partition.setDbName(databaseName);
+       partition.setTableName(tableName);
+ 
+       List<String> values = Lists.newArrayList();
+       for(CatalogProtos.PartitionKeyProto keyProto : 
partitionDescProto.getPartitionKeysList()) {
+         values.add(keyProto.getPartitionValue());
+       }
+       partition.setValues(values);
+ 
+       Table table = client.getHiveClient().getTable(databaseName, tableName);
+       StorageDescriptor sd = table.getSd();
+       sd.setLocation(partitionDescProto.getPath());
+       partition.setSd(sd);
+ 
+       client.getHiveClient().add_partition(partition);
+     } catch (Exception e) {
+       throw new CatalogException(e);
+     } finally {
+       if (client != null) {
+         client.release();
+       }
+     }
+   }
+ 
+   private void dropPartition(String databaseName, String tableName, 
CatalogProtos.PartitionDescProto
+     partitionDescProto) {
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+     try {
+ 
+       client = clientPool.getClient();
+ 
+       List<String> values = Lists.newArrayList();
+       for(CatalogProtos.PartitionKeyProto keyProto : 
partitionDescProto.getPartitionKeysList()) {
+         values.add(keyProto.getPartitionValue());
+       }
+       client.getHiveClient().dropPartition(databaseName, tableName, values, 
true);
+     } catch (Exception e) {
+       throw new CatalogException(e);
+     } finally {
+       if (client != null) {
+         client.release();
+       }
+     }
+   }
+ 
+   @Override
+   public void addPartitionMethod(CatalogProtos.PartitionMethodProto 
partitionMethodProto) throws CatalogException {
+     // TODO - not implemented yet
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public CatalogProtos.PartitionMethodProto getPartitionMethod(String 
databaseName, String tableName)
+       throws CatalogException {
 -    return null;  // TODO - not implemented yet
++    // TODO - not implemented yet
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public boolean existPartitionMethod(String databaseName, String tableName) 
throws CatalogException {
 -    return false;  // TODO - not implemented yet
++    // TODO - not implemented yet
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public void dropPartitionMethod(String databaseName, String tableName) 
throws CatalogException {
+     // TODO - not implemented yet
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public List<CatalogProtos.PartitionDescProto> getPartitions(String 
databaseName,
+                                                          String tableName) 
throws CatalogException {
+     throw new UnsupportedOperationException();
+   }
+ 
+ 
+   @Override
+   public CatalogProtos.PartitionDescProto getPartition(String databaseName, 
String tableName,
+                                                        String partitionName) 
throws CatalogException {
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+     CatalogProtos.PartitionDescProto.Builder builder = null;
+ 
+     try {
+       client = clientPool.getClient();
+ 
+       Partition partition = client.getHiveClient().getPartition(databaseName, 
tableName, partitionName);
+       builder = CatalogProtos.PartitionDescProto.newBuilder();
+       builder.setPartitionName(partitionName);
+       builder.setPath(partition.getSd().getLocation());
+ 
+       String[] partitionNames = partitionName.split("/");
+ 
+       for (int i = 0; i < partition.getValues().size(); i++) {
+         String value = partition.getValues().get(i);
+         CatalogProtos.PartitionKeyProto.Builder keyBuilder = 
CatalogProtos.PartitionKeyProto.newBuilder();
+ 
+         String columnName = partitionNames[i].split("=")[0];
+         keyBuilder.setColumnName(columnName);
+         keyBuilder.setPartitionValue(value);
+         builder.addPartitionKeys(keyBuilder);
+       }
+     } catch (NoSuchObjectException e) {
+       return null;
+     } catch (Exception e) {
+       throw new CatalogException(e);
+     } finally {
+       if (client != null) {
+         client.release();
+       }
+     }
+     return builder.build();
+   }
+ 
+   @Override
+   public final void addFunction(final FunctionDesc func) throws 
CatalogException {
+     // TODO - not implemented yet
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public final void deleteFunction(final FunctionDesc func) throws 
CatalogException {
+     // TODO - not implemented yet
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public final void existFunction(final FunctionDesc func) throws 
CatalogException {
+     // TODO - not implemented yet
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public final List<String> getAllFunctionNames() throws CatalogException {
+     // TODO - not implemented yet
 -    return null;
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
 -  public void dropIndex(String databaseName, String indexName) throws 
CatalogException {
++  public void createIndex(CatalogProtos.IndexDescProto proto) throws 
CatalogException {
+     // TODO - not implemented yet
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
 -  public boolean existIndexByName(String databaseName, String indexName) 
throws CatalogException {
++  public void dropIndex(String databaseName, String indexName) throws 
CatalogException {
+     // TODO - not implemented yet
 -    return false;
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
 -  public CatalogProtos.IndexDescProto[] getIndexes(String databaseName, 
String tableName) throws CatalogException {
++  public CatalogProtos.IndexDescProto getIndexByName(String databaseName, 
String indexName) throws CatalogException {
+     // TODO - not implemented yet
 -    return null;
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
 -  public void createIndex(CatalogProtos.IndexDescProto proto) throws 
CatalogException {
++  public CatalogProtos.IndexDescProto getIndexByColumns(String databaseName, 
String tableName, String[] columnNames)
++      throws CatalogException {
+     // TODO - not implemented yet
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
 -  public CatalogProtos.IndexDescProto getIndexByName(String databaseName, 
String indexName) throws CatalogException {
++  public boolean existIndexByName(String databaseName, String indexName) 
throws CatalogException {
+     // TODO - not implemented yet
 -    return null;
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
 -  public CatalogProtos.IndexDescProto getIndexByColumn(String databaseName, 
String tableName, String columnName)
++  public boolean existIndexByColumns(String databaseName, String tableName, 
String[] columnNames)
+       throws CatalogException {
+     // TODO - not implemented yet
 -    return null;
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
 -  public boolean existIndexByColumn(String databaseName, String tableName, 
String columnName) throws CatalogException {
++  public List<String> getAllIndexNamesByTable(String databaseName, String 
tableName) throws CatalogException {
+     // TODO - not implemented yet
 -    return false;
++    throw new UnsupportedOperationException();
++  }
++
++  @Override
++  public boolean existIndexesByTable(String databaseName, String tableName) 
throws CatalogException {
++    // TODO - not implemented yet
++    throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public final void close() {
+     clientPool.close();
+   }
+ 
+   private boolean existColumn(final String databaseName ,final String 
tableName , final String columnName) throws CatalogException {
+     boolean exist = false;
+     HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+ 
+     try {
+ 
+       client = clientPool.getClient();
+       Table table = client.getHiveClient().getTable(databaseName, tableName);
+       List<FieldSchema> columns = table.getSd().getCols();
+ 
+       for (final FieldSchema currentColumn : columns) {
+         if (currentColumn.getName().equalsIgnoreCase(columnName)) {
+           exist = true;
+         }
+       }
+       client.getHiveClient().alter_table(databaseName, tableName, table);
+ 
+     } catch (NoSuchObjectException nsoe) {
+     } catch (Exception e) {
+       throw new CatalogException(e);
+     } finally {
+       if (client != null) {
+         client.release();
+       }
+     }
+ 
+     return exist;
+   }
+ 
+   @Override
+   public List<ColumnProto> getAllColumns() throws CatalogException {
+     throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public List<DatabaseProto> getAllDatabases() throws CatalogException {
+     throw new UnsupportedOperationException();
+   }
+ 
+   @Override
 -  public List<IndexProto> getAllIndexes() throws CatalogException {
++  public List<IndexDescProto> getAllIndexes() throws CatalogException {
+     throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public List<TablePartitionProto> getAllPartitions() throws CatalogException 
{
+     throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public List<TableOptionProto> getAllTableOptions() throws CatalogException {
+     throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public List<TableStatsProto> getAllTableStats() throws CatalogException {
+     throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public List<TableDescriptorProto> getAllTables() throws CatalogException {
+     throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public List<TablespaceProto> getTablespaces() throws CatalogException {
+     throw new UnsupportedOperationException();
+   }
+ }

http://git-wip-us.apache.org/repos/asf/tajo/blob/02800343/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/02800343/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
----------------------------------------------------------------------
diff --cc 
tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
index 90a51b6,9d0e427..5fa1c67
--- 
a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
+++ 
b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
@@@ -259,124 -260,6 +259,124 @@@ public class CatalogAdminClientImpl imp
    }
  
    @Override
 +  public IndexDescProto getIndex(final String indexName) throws 
ServiceException {
-     return new ServerCallable<IndexDescProto>(connection.connPool,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, 
false, true) {
++    return new ServerCallable<IndexDescProto>(connection.manager,
++        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, 
false) {
 +
 +      @Override
 +      public IndexDescProto call(NettyClientBase client) throws Exception {
 +        BlockingInterface tajoMasterService = client.getStub();
 +        return tajoMasterService.getIndexWithName(null,
 +            connection.convertSessionedString(indexName));
 +      }
 +    }.withRetries();
 +  }
 +
 +  @Override
 +  public boolean existIndex(final String indexName) throws ServiceException {
-     return new ServerCallable<Boolean>(connection.connPool,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, 
false, true) {
++    return new ServerCallable<Boolean>(connection.manager,
++        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, 
false) {
 +
 +      @Override
 +      public Boolean call(NettyClientBase client) throws Exception {
 +        BlockingInterface tajoMasterService = client.getStub();
 +        return tajoMasterService.existIndexWithName(null,
 +            connection.convertSessionedString(indexName)).getValue();
 +      }
 +    }.withRetries();
 +  }
 +
 +  @Override
 +  public List<IndexDescProto> getIndexes(final String tableName) throws 
ServiceException {
-     return new ServerCallable<List<IndexDescProto>>(connection.connPool,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, 
false, true) {
++    return new ServerCallable<List<IndexDescProto>>(connection.manager,
++        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, 
false) {
 +
 +      @Override
 +      public List<IndexDescProto> call(NettyClientBase client) throws 
Exception {
 +        BlockingInterface tajoMasterService = client.getStub();
 +        GetIndexesResponse response = 
tajoMasterService.getIndexesForTable(null,
 +            connection.convertSessionedString(tableName));
 +        if (response.getResult().getResultCode() == ResultCode.OK) {
 +          return response.getIndexesList();
 +        } else {
 +          throw new SQLException(response.getResult().getErrorMessage());
 +        }
 +      }
 +    }.withRetries();
 +  }
 +
 +  @Override
 +  public boolean hasIndexes(final String tableName) throws ServiceException {
-     return new ServerCallable<Boolean>(connection.connPool,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, 
false, true) {
++    return new ServerCallable<Boolean>(connection.manager,
++        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, 
false) {
 +
 +      @Override
 +      public Boolean call(NettyClientBase client) throws Exception {
 +        BlockingInterface tajoMasterService = client.getStub();
 +        return tajoMasterService.existIndexesForTable(null,
 +            connection.convertSessionedString(tableName)).getValue();
 +      }
 +    }.withRetries();
 +  }
 +
 +  @Override
 +  public IndexDescProto getIndex(final String tableName, final String[] 
columnNames) throws ServiceException {
-     return new ServerCallable<IndexDescProto>(connection.connPool,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, 
false, true) {
++    return new ServerCallable<IndexDescProto>(connection.manager,
++        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, 
false) {
 +
 +      @Override
 +      public IndexDescProto call(NettyClientBase client) throws Exception {
 +        BlockingInterface tajoMasterService = client.getStub();
 +        GetIndexWithColumnsRequest.Builder builder = 
GetIndexWithColumnsRequest.newBuilder();
 +        builder.setSessionId(connection.sessionId);
 +        builder.setTableName(tableName);
 +        for (String eachColumnName : columnNames) {
 +          builder.addColumnNames(eachColumnName);
 +        }
 +        GetIndexWithColumnsResponse response = 
tajoMasterService.getIndexWithColumns(null, builder.build());
 +        if (response.getResult().getResultCode() == ResultCode.OK) {
 +          return response.getIndexDesc();
 +        } else {
 +          throw new SQLException(response.getResult().getErrorMessage());
 +        }
 +      }
 +    }.withRetries();
 +  }
 +
 +  @Override
 +  public boolean existIndex(final String tableName, final String[] 
columnName) throws ServiceException {
-     return new ServerCallable<Boolean>(connection.connPool,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, 
false, true) {
++    return new ServerCallable<Boolean>(connection.manager,
++        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, 
false) {
 +
 +      @Override
 +      public Boolean call(NettyClientBase client) throws Exception {
 +        BlockingInterface tajoMasterService = client.getStub();
 +        GetIndexWithColumnsRequest.Builder builder = 
GetIndexWithColumnsRequest.newBuilder();
 +        builder.setSessionId(connection.sessionId);
 +        builder.setTableName(tableName);
 +        for (String eachColumnName : columnName) {
 +          builder.addColumnNames(eachColumnName);
 +        }
 +        return tajoMasterService.existIndexWithColumns(null, 
builder.build()).getValue();
 +      }
 +    }.withRetries();
 +  }
 +
 +  @Override
 +  public boolean dropIndex(final String indexName) throws ServiceException {
-     return new ServerCallable<Boolean>(connection.connPool,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, 
false, true) {
++    return new ServerCallable<Boolean>(connection.manager,
++        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, 
false) {
 +
 +      @Override
 +      public Boolean call(NettyClientBase client) throws Exception {
 +        BlockingInterface tajoMasterService = client.getStub();
 +        return tajoMasterService.dropIndex(null,
 +            connection.convertSessionedString(indexName)).getValue();
 +      }
 +    }.withRetries();
 +  }
 +
 +  @Override
    public void close() throws IOException {
    }
  }

http://git-wip-us.apache.org/repos/asf/tajo/blob/02800343/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/02800343/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/02800343/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/02800343/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/02800343/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/02800343/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
----------------------------------------------------------------------

Reply via email to