Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into
index_support
Conflicts:
tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/02800343
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/02800343
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/02800343
Branch: refs/heads/index_support
Commit: 02800343b7ad99d4c0ed51fca1771111679ca61f
Parents: 2faa47c d2a4f9b
Author: Jihoon Son <[email protected]>
Authored: Fri Apr 17 17:33:56 2015 +0900
Committer: Jihoon Son <[email protected]>
Committed: Fri Apr 17 17:33:56 2015 +0900
----------------------------------------------------------------------
.travis.yml | 2 +-
CHANGES | 4 +
tajo-catalog/pom.xml | 4 +-
.../tajo/catalog/AbstractCatalogClient.java | 92 +-
tajo-catalog/tajo-catalog-drivers/pom.xml | 34 +-
.../tajo-catalog-drivers/tajo-hcatalog/pom.xml | 739 --------------
.../tajo/catalog/store/HCatalogStore.java | 993 -------------------
.../catalog/store/HCatalogStoreClientPool.java | 170 ----
.../apache/tajo/catalog/store/HCatalogUtil.java | 147 ---
.../tajo/catalog/store/TestHCatalogStore.java | 467 ---------
.../tajo-catalog-drivers/tajo-hive/pom.xml | 351 +++++++
.../tajo/catalog/store/HiveCatalogStore.java | 980 ++++++++++++++++++
.../store/HiveCatalogStoreClientPool.java | 170 ++++
.../tajo/catalog/store/HiveCatalogUtil.java | 127 +++
.../catalog/store/TestHiveCatalogStore.java | 504 ++++++++++
.../org/apache/tajo/catalog/TestCatalog.java | 6 +-
.../tajo/client/CatalogAdminClientImpl.java | 68 +-
.../org/apache/tajo/client/QueryClientImpl.java | 50 +-
.../apache/tajo/client/SessionConnection.java | 40 +-
tajo-core/pom.xml | 212 ----
.../org/apache/tajo/master/QueryInProgress.java | 6 +-
.../apache/tajo/master/TajoContainerProxy.java | 38 +-
.../apache/tajo/querymaster/QueryMaster.java | 24 +-
.../tajo/worker/ExecutionBlockContext.java | 29 +-
.../tajo/worker/TajoResourceAllocator.java | 20 +-
.../main/java/org/apache/tajo/worker/Task.java | 73 +-
.../java/org/apache/tajo/worker/TaskRunner.java | 2 -
.../tajo/worker/WorkerHeartbeatService.java | 10 +-
.../ConnectivityCheckerRuleForTajoWorker.java | 26 +-
.../java/org/apache/tajo/QueryTestCaseBase.java | 4 +-
.../org/apache/tajo/TajoTestingCluster.java | 28 +-
.../org/apache/tajo/cli/tools/TestTajoDump.java | 4 +-
.../org/apache/tajo/cli/tsql/TestTajoCli.java | 56 +-
.../tajo/engine/query/TestAlterTablespace.java | 2 +-
.../apache/tajo/engine/query/TestCTASQuery.java | 6 +-
.../tajo/engine/query/TestCreateTable.java | 14 +-
.../tajo/engine/query/TestInsertQuery.java | 40 +-
.../apache/tajo/engine/query/TestNetTypes.java | 26 +-
.../tajo/engine/query/TestSelectQuery.java | 4 +-
.../apache/tajo/engine/query/TestSortQuery.java | 6 +-
.../tajo/engine/query/TestTablePartitions.java | 22 +-
.../org/apache/tajo/jdbc/TestResultSet.java | 4 +-
.../tajo/jdbc/TestTajoDatabaseMetaData.java | 22 +-
.../java/org/apache/tajo/jdbc/TestTajoJdbc.java | 12 +-
.../create_table_various_types_for_hcatalog.sql | 50 -
...ate_table_various_types_for_hive_catalog.sql | 50 +
tajo-dist/pom.xml | 4 +-
tajo-dist/src/main/bin/tajo | 3 -
.../src/main/conf/catalog-site.xml.template | 6 +-
tajo-dist/src/main/conf/tajo-env.sh | 2 +-
.../configuration/catalog_configuration.rst | 40 +-
.../src/main/sphinx/hcatalog_integration.rst | 52 -
tajo-docs/src/main/sphinx/hive_integration.rst | 42 +
tajo-docs/src/main/sphinx/index.rst | 2 +-
.../plan/rewrite/rules/FilterPushDownRule.java | 2 +-
tajo-project/pom.xml | 1 +
.../main/java/org/apache/tajo/rpc/RpcUtils.java | 34 -
.../org/apache/tajo/rpc/AsyncRpcClient.java | 58 +-
.../org/apache/tajo/rpc/AsyncRpcServer.java | 82 +-
.../org/apache/tajo/rpc/BlockingRpcClient.java | 88 +-
.../org/apache/tajo/rpc/BlockingRpcServer.java | 85 +-
.../tajo/rpc/ConnectionCloseFutureListener.java | 35 +
.../org/apache/tajo/rpc/NettyClientBase.java | 124 +--
.../tajo/rpc/ProtoChannelInitializer.java | 11 +-
.../org/apache/tajo/rpc/RpcClientManager.java | 185 ++++
.../org/apache/tajo/rpc/RpcConnectionPool.java | 191 ----
.../org/apache/tajo/rpc/ServerCallable.java | 36 +-
.../java/org/apache/tajo/rpc/TestAsyncRpc.java | 72 +-
.../org/apache/tajo/rpc/TestBlockingRpc.java | 85 +-
.../apache/tajo/rpc/TestRpcClientManager.java | 97 ++
70 files changed, 3243 insertions(+), 3832 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/02800343/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --cc
tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index 3b72639,49be29a..6cfbb1a
---
a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++
b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@@ -373,27 -372,9 +373,27 @@@ public abstract class AbstractCatalogCl
}
@Override
+ public List<IndexDescProto> getAllIndexes() {
+ try {
- return new ServerCallable<List<IndexDescProto>>(pool,
getCatalogServerAddr(), CatalogProtocol.class, false) {
++ return new ServerCallable<List<IndexDescProto>>(this.manager,
getCatalogServerAddr(), CatalogProtocol.class, false) {
+
+ @Override
+ public List<IndexDescProto> call(NettyClientBase client) throws
Exception {
+ CatalogProtocolService.BlockingInterface stub = getStub(client);
+ GetIndexesProto response = stub.getAllIndexes(null,
ProtoUtil.NULL_PROTO);
+ return response.getIndexList();
+ }
+ }.withRetries();
+ } catch (ServiceException e) {
+ LOG.error(e.getMessage(), e);
+ return null;
+ }
+ }
+
+ @Override
public final PartitionMethodDesc getPartitionMethod(final String
databaseName, final String tableName) {
try {
- return new ServerCallable<PartitionMethodDesc>(this.pool,
getCatalogServerAddr(), CatalogProtocol.class, false) {
+ return new ServerCallable<PartitionMethodDesc>(this.manager,
getCatalogServerAddr(), CatalogProtocol.class, false) {
public PartitionMethodDesc call(NettyClientBase client) throws
ServiceException {
TableIdentifierProto.Builder builder =
TableIdentifierProto.newBuilder();
@@@ -637,40 -618,17 +637,40 @@@
}
@Override
- public boolean existIndexByColumn(final String databaseName, final String
tableName, final String columnName) {
+ public boolean existIndexByColumns(final String databaseName, final String
tableName, final Column [] columns) {
+ return existIndexByColumnNames(databaseName, tableName,
extractColumnNames(columns));
+ }
+
+ @Override
+ public boolean existIndexByColumnNames(final String databaseName, final
String tableName, final String [] columnNames) {
try {
- return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(),
CatalogProtocol.class, false) {
+ return new ServerCallable<Boolean>(this.manager,
getCatalogServerAddr(), CatalogProtocol.class, false) {
public Boolean call(NettyClientBase client) throws ServiceException {
- GetIndexByColumnRequest.Builder builder =
GetIndexByColumnRequest.newBuilder();
+ GetIndexByColumnNamesRequest.Builder builder =
GetIndexByColumnNamesRequest.newBuilder();
builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName,
tableName));
- builder.setColumnName(columnName);
+ for (String colunName : columnNames) {
+ builder.addColumnNames(colunName);
+ }
CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.existIndexByColumn(null, builder.build()).getValue();
+ return stub.existIndexByColumnNames(null,
builder.build()).getValue();
+ }
+ }.withRetries();
+ } catch (ServiceException e) {
+ LOG.error(e.getMessage(), e);
+ return false;
+ }
+ }
+
+ @Override
+ public boolean existIndexesByTable(final String databaseName, final String
tableName) {
+ try {
- return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(),
CatalogProtocol.class, false) {
++ return new ServerCallable<Boolean>(this.manager,
getCatalogServerAddr(), CatalogProtocol.class, false) {
+ public Boolean call(NettyClientBase client) throws ServiceException {
+
+ CatalogProtocolService.BlockingInterface stub = getStub(client);
+ return stub.existIndexesByTable(null,
CatalogUtil.buildTableIdentifier(databaseName, tableName)).getValue();
}
}.withRetries();
} catch (ServiceException e) {
@@@ -699,60 -657,20 +699,60 @@@
}
}
+ private static String[] extractColumnNames(Column[] columns) {
+ String[] columnNames = new String [columns.length];
+ for (int i = 0; i < columnNames.length; i++) {
+ columnNames[i] = columns[i].getSimpleName();
+ }
+ return columnNames;
+ }
+
+ @Override
+ public final IndexDesc getIndexByColumns(final String databaseName,
+ final String tableName,
+ final Column [] columns) {
+ return getIndexByColumnNames(databaseName, tableName,
extractColumnNames(columns));
+ }
+
@Override
- public final IndexDesc getIndexByColumn(final String databaseName,
- final String tableName,
- final String columnName) {
+ public final IndexDesc getIndexByColumnNames(final String databaseName,
+ final String tableName,
+ final String [] columnNames) {
try {
- return new ServerCallable<IndexDesc>(this.pool, getCatalogServerAddr(),
CatalogProtocol.class, false) {
+ return new ServerCallable<IndexDesc>(this.manager,
getCatalogServerAddr(), CatalogProtocol.class, false) {
public IndexDesc call(NettyClientBase client) throws ServiceException
{
- GetIndexByColumnRequest.Builder builder =
GetIndexByColumnRequest.newBuilder();
+ GetIndexByColumnNamesRequest.Builder builder =
GetIndexByColumnNamesRequest.newBuilder();
builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName,
tableName));
- builder.setColumnName(columnName);
+ for (String columnName : columnNames) {
+ builder.addColumnNames(columnName);
+ }
CatalogProtocolService.BlockingInterface stub = getStub(client);
- return new IndexDesc(stub.getIndexByColumn(null, builder.build()));
+ return new IndexDesc(stub.getIndexByColumnNames(null,
builder.build()));
+ }
+ }.withRetries();
+ } catch (ServiceException e) {
+ LOG.error(e.getMessage(), e);
+ return null;
+ }
+ }
+
+ @Override
+ public final Collection<IndexDesc> getAllIndexesByTable(final String
databaseName,
+ final String
tableName) {
+ try {
- return new ServerCallable<Collection<IndexDesc>>(this.pool,
getCatalogServerAddr(), CatalogProtocol.class, false) {
++ return new ServerCallable<Collection<IndexDesc>>(this.manager,
getCatalogServerAddr(), CatalogProtocol.class, false) {
+ @Override
+ public Collection<IndexDesc> call(NettyClientBase client) throws
Exception {
+ TableIdentifierProto proto =
CatalogUtil.buildTableIdentifier(databaseName, tableName);
+ CatalogProtocolService.BlockingInterface stub = getStub(client);
+ GetAllIndexesResponse response = stub.getAllIndexesByTable(null,
proto);
+ List<IndexDesc> indexDescs = TUtil.newList();
+ for (IndexDescProto descProto : response.getIndexDescList()) {
+ indexDescs.add(new IndexDesc(descProto));
+ }
+ return indexDescs;
}
}.withRetries();
} catch (ServiceException e) {
@@@ -781,7 -699,25 +781,7 @@@
return false;
}
}
--
- @Override
- public List<IndexProto> getAllIndexes() {
- try {
- return new ServerCallable<List<IndexProto>>(manager,
getCatalogServerAddr(), CatalogProtocol.class, false) {
-
- @Override
- public List<IndexProto> call(NettyClientBase client) throws Exception
{
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- GetIndexesProto response = stub.getAllIndexes(null,
ProtoUtil.NULL_PROTO);
- return response.getIndexList();
- }
- }.withRetries();
- } catch (ServiceException e) {
- LOG.error(e.getMessage(), e);
- return null;
- }
- }
+
@Override
public final boolean createFunction(final FunctionDesc funcDesc) {
try {
http://git-wip-us.apache.org/repos/asf/tajo/blob/02800343/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
----------------------------------------------------------------------
diff --cc
tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
index 0000000,5b1a996..700b327
mode 000000,100644..100644
---
a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
+++
b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
@@@ -1,0 -1,964 +1,980 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.tajo.catalog.store;
+
+ import com.google.common.collect.Lists;
+
+ import org.apache.commons.lang.StringEscapeUtils;
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hive.metastore.TableType;
+ import org.apache.hadoop.hive.metastore.api.*;
+ import org.apache.hadoop.hive.serde.serdeConstants;
+ import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
+ import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
+ import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+ import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
+ import org.apache.tajo.TajoConstants;
+ import org.apache.tajo.catalog.*;
+ import org.apache.tajo.catalog.exception.*;
+ import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+ import org.apache.tajo.catalog.proto.CatalogProtos;
+ import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
+ import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto;
++import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
+ import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto;
+ import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto;
+ import org.apache.tajo.catalog.proto.CatalogProtos.TablePartitionProto;
+ import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto;
+ import org.apache.tajo.catalog.proto.CatalogProtos.TablespaceProto;
+ import org.apache.tajo.catalog.statistics.TableStats;
+ import org.apache.tajo.common.TajoDataTypes;
+ import org.apache.tajo.common.exception.NotImplementedException;
+ import org.apache.tajo.conf.TajoConf;
+ import org.apache.tajo.exception.InternalException;
+ import org.apache.tajo.storage.StorageConstants;
+ import org.apache.tajo.util.KeyValueSet;
+ import org.apache.thrift.TException;
+
+ import java.io.IOException;
+ import java.util.*;
+
+ import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionType;
+
+ public class HiveCatalogStore extends CatalogConstants implements
CatalogStore {
+ protected final Log LOG = LogFactory.getLog(getClass());
+
+ private static String HIVE_WAREHOUSE_DIR_CONF_KEY =
"hive.metastore.warehouse.dir";
+
+ protected Configuration conf;
+ private static final int CLIENT_POOL_SIZE = 2;
+ private final HiveCatalogStoreClientPool clientPool;
+ private final String defaultTableSpaceUri;
+
+ public HiveCatalogStore(final Configuration conf) throws InternalException {
+ if (!(conf instanceof TajoConf)) {
+ throw new CatalogException("Invalid Configuration Type:" +
conf.getClass().getSimpleName());
+ }
+ this.conf = conf;
+ this.defaultTableSpaceUri = TajoConf.getWarehouseDir((TajoConf)
conf).toString();
+ this.clientPool = new HiveCatalogStoreClientPool(CLIENT_POOL_SIZE, conf);
+ }
+
+ @Override
+ public boolean existTable(final String databaseName, final String
tableName) throws CatalogException {
+ boolean exist = false;
+ org.apache.hadoop.hive.ql.metadata.Table table;
+ HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+
+ // get table
+ try {
+ client = clientPool.getClient();
+ table = HiveCatalogUtil.getTable(client.getHiveClient(), databaseName,
tableName);
+ if (table != null) {
+ exist = true;
+ }
+ } catch (NoSuchObjectException nsoe) {
+ exist = false;
+ } catch (Exception e) {
+ throw new CatalogException(e);
+ } finally {
+ if (client != null) {
+ client.release();
+ }
+ }
+
+ return exist;
+ }
+
+ @Override
+ public final CatalogProtos.TableDescProto getTable(String databaseName,
final String tableName) throws CatalogException {
+ org.apache.hadoop.hive.ql.metadata.Table table = null;
+ HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+ Path path = null;
+ CatalogProtos.StoreType storeType = null;
+ org.apache.tajo.catalog.Schema schema = null;
+ KeyValueSet options = null;
+ TableStats stats = null;
+ PartitionMethodDesc partitions = null;
+
+ //////////////////////////////////
+ // set tajo table schema.
+ //////////////////////////////////
+ try {
+ // get hive table schema
+ try {
+ client = clientPool.getClient();
+ table = HiveCatalogUtil.getTable(client.getHiveClient(),
databaseName, tableName);
+ path = table.getPath();
+ } catch (NoSuchObjectException nsoe) {
+ throw new CatalogException("Table not found. - tableName:" +
tableName, nsoe);
+ } catch (Exception e) {
+ throw new CatalogException(e);
+ }
+
+ // convert HiveCatalogStore field schema into tajo field schema.
+ schema = new org.apache.tajo.catalog.Schema();
+
+ List<FieldSchema> fieldSchemaList = table.getCols();
+ boolean isPartitionKey = false;
+ for (FieldSchema eachField : fieldSchemaList) {
+ isPartitionKey = false;
+
+ if (table.getPartitionKeys() != null) {
+ for (FieldSchema partitionKey : table.getPartitionKeys()) {
+ if (partitionKey.getName().equals(eachField.getName())) {
+ isPartitionKey = true;
+ }
+ }
+ }
+
+ if (!isPartitionKey) {
+ String fieldName = databaseName +
CatalogConstants.IDENTIFIER_DELIMITER + tableName +
+ CatalogConstants.IDENTIFIER_DELIMITER + eachField.getName();
+ TajoDataTypes.Type dataType =
HiveCatalogUtil.getTajoFieldType(eachField.getType().toString());
+ schema.addColumn(fieldName, dataType);
+ }
+ }
+
+ // validate field schema.
+ HiveCatalogUtil.validateSchema(table);
+
+ stats = new TableStats();
+ options = new KeyValueSet();
+ options.putAll(table.getParameters());
+ options.remove("EXTERNAL");
+
+ Properties properties = table.getMetadata();
+ if (properties != null) {
+ // set field delimiter
+ String fieldDelimiter = "", nullFormat = "";
+ if (properties.getProperty(serdeConstants.FIELD_DELIM) != null) {
+ fieldDelimiter = properties.getProperty(serdeConstants.FIELD_DELIM);
+ } else {
+ // if hive table used default row format delimiter, Properties
doesn't have it.
+ // So, Tajo must set as follows:
+ fieldDelimiter = "\u0001";
+ }
+
+ // set null format
+ if (properties.getProperty(serdeConstants.SERIALIZATION_NULL_FORMAT)
!= null) {
+ nullFormat =
properties.getProperty(serdeConstants.SERIALIZATION_NULL_FORMAT);
+ } else {
+ nullFormat = "\\N";
+ }
+ options.remove(serdeConstants.SERIALIZATION_NULL_FORMAT);
+
+ // set file output format
+ String fileOutputformat =
properties.getProperty(hive_metastoreConstants.FILE_OUTPUT_FORMAT);
+ storeType =
CatalogUtil.getStoreType(HiveCatalogUtil.getStoreType(fileOutputformat));
+
+ if (storeType.equals(CatalogProtos.StoreType.TEXTFILE)) {
+ options.set(StorageConstants.TEXT_DELIMITER,
StringEscapeUtils.escapeJava(fieldDelimiter));
+ options.set(StorageConstants.TEXT_NULL,
StringEscapeUtils.escapeJava(nullFormat));
+ } else if (storeType.equals(CatalogProtos.StoreType.RCFILE)) {
+ options.set(StorageConstants.RCFILE_NULL,
StringEscapeUtils.escapeJava(nullFormat));
+ String serde =
properties.getProperty(serdeConstants.SERIALIZATION_LIB);
+ if (LazyBinaryColumnarSerDe.class.getName().equals(serde)) {
+ options.set(StorageConstants.RCFILE_SERDE,
StorageConstants.DEFAULT_BINARY_SERDE);
+ } else if (ColumnarSerDe.class.getName().equals(serde)) {
+ options.set(StorageConstants.RCFILE_SERDE,
StorageConstants.DEFAULT_TEXT_SERDE);
+ }
+ } else if (storeType.equals(CatalogProtos.StoreType.SEQUENCEFILE) ) {
+ options.set(StorageConstants.SEQUENCEFILE_DELIMITER,
StringEscapeUtils.escapeJava(fieldDelimiter));
+ options.set(StorageConstants.SEQUENCEFILE_NULL,
StringEscapeUtils.escapeJava(nullFormat));
+ String serde =
properties.getProperty(serdeConstants.SERIALIZATION_LIB);
+ if (LazyBinarySerDe.class.getName().equals(serde)) {
+ options.set(StorageConstants.SEQUENCEFILE_SERDE,
StorageConstants.DEFAULT_BINARY_SERDE);
+ } else if (LazySimpleSerDe.class.getName().equals(serde)) {
+ options.set(StorageConstants.SEQUENCEFILE_SERDE,
StorageConstants.DEFAULT_TEXT_SERDE);
+ }
+ }
+
+ // set data size
+ long totalSize = 0;
+ if (properties.getProperty("totalSize") != null) {
+ totalSize = Long.parseLong(properties.getProperty("totalSize"));
+ } else {
+ try {
+ FileSystem fs = path.getFileSystem(conf);
+ if (fs.exists(path)) {
+ totalSize = fs.getContentSummary(path).getLength();
+ }
+ } catch (IOException ioe) {
+ throw new CatalogException("Fail to get path. - path:" +
path.toString(), ioe);
+ }
+ }
+ stats.setNumBytes(totalSize);
+ }
+
+ // set partition keys
+ List<FieldSchema> partitionKeys = table.getPartitionKeys();
+
+ if (null != partitionKeys) {
+ org.apache.tajo.catalog.Schema expressionSchema = new
org.apache.tajo.catalog.Schema();
+ StringBuilder sb = new StringBuilder();
+ if (partitionKeys.size() > 0) {
+ for (int i = 0; i < partitionKeys.size(); i++) {
+ FieldSchema fieldSchema = partitionKeys.get(i);
+ TajoDataTypes.Type dataType =
HiveCatalogUtil.getTajoFieldType(fieldSchema.getType().toString());
+ String fieldName = databaseName +
CatalogConstants.IDENTIFIER_DELIMITER + tableName +
+ CatalogConstants.IDENTIFIER_DELIMITER + fieldSchema.getName();
+ expressionSchema.addColumn(new Column(fieldName, dataType));
+ if (i > 0) {
+ sb.append(",");
+ }
+ sb.append(fieldSchema.getName());
+ }
+ partitions = new PartitionMethodDesc(
+ databaseName,
+ tableName,
+ PartitionType.COLUMN,
+ sb.toString(),
+ expressionSchema);
+ }
+ }
+ } finally {
+ if(client != null) client.release();
+ }
+ TableMeta meta = new TableMeta(storeType, options);
+ TableDesc tableDesc = new TableDesc(databaseName + "." + tableName,
schema, meta, path.toUri());
+ if (table.getTableType().equals(TableType.EXTERNAL_TABLE)) {
+ tableDesc.setExternal(true);
+ }
+ if (stats != null) {
+ tableDesc.setStats(stats);
+ }
+ if (partitions != null) {
+ tableDesc.setPartitionMethod(partitions);
+ }
+ return tableDesc.getProto();
+ }
+
+
+ private TajoDataTypes.Type getDataType(final String typeStr) {
+ try {
+ return Enum.valueOf(TajoDataTypes.Type.class, typeStr);
+ } catch (IllegalArgumentException iae) {
+ LOG.error("Cannot find a matched type against from '" + typeStr + "'");
+ return null;
+ }
+ }
+
+ @Override
+ public final List<String> getAllTableNames(String databaseName) throws
CatalogException {
+ HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+
+ try {
+ client = clientPool.getClient();
+ return client.getHiveClient().getAllTables(databaseName);
+ } catch (TException e) {
+ throw new CatalogException(e);
+ } finally {
+ if(client != null) client.release();
+ }
+ }
+
+ @Override
+ public void createTablespace(String spaceName, String spaceUri) throws
CatalogException {
+ // SKIP
+ }
+
+ @Override
+ public boolean existTablespace(String spaceName) throws CatalogException {
+ // SKIP
+ return spaceName.equals(TajoConstants.DEFAULT_TABLESPACE_NAME);
+ }
+
+ @Override
+ public void dropTablespace(String spaceName) throws CatalogException {
+ // SKIP
+ }
+
+ @Override
+ public Collection<String> getAllTablespaceNames() throws CatalogException {
+ return Lists.newArrayList(TajoConstants.DEFAULT_TABLESPACE_NAME);
+ }
+
+ @Override
+ public TablespaceProto getTablespace(String spaceName) throws
CatalogException {
+ if (spaceName.equals(TajoConstants.DEFAULT_TABLESPACE_NAME)) {
+ TablespaceProto.Builder builder = TablespaceProto.newBuilder();
+ builder.setSpaceName(TajoConstants.DEFAULT_TABLESPACE_NAME);
+ builder.setUri(defaultTableSpaceUri);
+ return builder.build();
+ } else {
+ throw new CatalogException("tablespace concept is not supported in
HiveCatalogStore");
+ }
+ }
+
+ @Override
+ public void updateTableStats(CatalogProtos.UpdateTableStatsProto
statsProto) throws
+ CatalogException {
+ // TODO - not implemented yet
+ }
+
+ @Override
+ public void alterTablespace(CatalogProtos.AlterTablespaceProto alterProto)
throws CatalogException {
+ throw new CatalogException("tablespace concept is not supported in
HiveCatalogStore");
+ }
+
+ @Override
+ public void createDatabase(String databaseName, String tablespaceName)
throws CatalogException {
+ HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+
+ try {
+ Database database = new Database(
+ databaseName,
+ "",
+ defaultTableSpaceUri + "/" + databaseName,
+ new HashMap<String, String>());
+ client = clientPool.getClient();
+ client.getHiveClient().createDatabase(database);
+ } catch (AlreadyExistsException e) {
+ throw new AlreadyExistsDatabaseException(databaseName);
+ } catch (Throwable t) {
+ throw new CatalogException(t);
+ } finally {
+ if (client != null) {
+ client.release();
+ }
+ }
+ }
+
+ @Override
+ public boolean existDatabase(String databaseName) throws CatalogException {
+ HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+
+ try {
+ client = clientPool.getClient();
+ List<String> databaseNames = client.getHiveClient().getAllDatabases();
+ return databaseNames.contains(databaseName);
+ } catch (Throwable t) {
+ throw new CatalogException(t);
+ } finally {
+ if (client != null) {
+ client.release();
+ }
+ }
+ }
+
+ @Override
+ public void dropDatabase(String databaseName) throws CatalogException {
+ HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+
+ try {
+ client = clientPool.getClient();
+ client.getHiveClient().dropDatabase(databaseName);
+ } catch (NoSuchObjectException e) {
+ throw new NoSuchDatabaseException(databaseName);
+ } catch (Throwable t) {
+ throw new CatalogException(databaseName);
+ } finally {
+ if (client != null) {
+ client.release();
+ }
+ }
+ }
+
+ @Override
+ public Collection<String> getAllDatabaseNames() throws CatalogException {
+ HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+
+ try {
+ client = clientPool.getClient();
+ return client.getHiveClient().getAllDatabases();
+ } catch (TException e) {
+ throw new CatalogException(e);
+ } finally {
+ if (client != null) {
+ client.release();
+ }
+ }
+ }
+
+ @Override
+ public final void createTable(final CatalogProtos.TableDescProto
tableDescProto) throws CatalogException {
+ HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+
+ TableDesc tableDesc = new TableDesc(tableDescProto);
+ String[] splitted = CatalogUtil.splitFQTableName(tableDesc.getName());
+ String databaseName = splitted[0];
+ String tableName = splitted[1];
+
+ try {
+ client = clientPool.getClient();
+
+ org.apache.hadoop.hive.metastore.api.Table table = new
org.apache.hadoop.hive.metastore.api.Table();
+ table.setDbName(databaseName);
+ table.setTableName(tableName);
+ table.setParameters(new HashMap<String,
String>(tableDesc.getMeta().getOptions().getAllKeyValus()));
+ // TODO: set owner
+ //table.setOwner();
+
+ StorageDescriptor sd = new StorageDescriptor();
+ sd.setSerdeInfo(new SerDeInfo());
+ sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+ sd.getSerdeInfo().setName(table.getTableName());
+
+ // if tajo set location method, thrift client make exception as follows:
+ // Caused by: MetaException(message:java.lang.NullPointerException)
+ // If you want to modify table path, you have to modify on Hive cli.
+ if (tableDesc.isExternal()) {
+ table.setTableType(TableType.EXTERNAL_TABLE.name());
+ table.putToParameters("EXTERNAL", "TRUE");
+
+ Path tablePath = new Path(tableDesc.getPath());
+ FileSystem fs = tablePath.getFileSystem(conf);
+ if (fs.isFile(tablePath)) {
+ LOG.warn("A table path is a file, but HiveCatalogStore does not
allow a file path.");
+ sd.setLocation(tablePath.getParent().toString());
+ } else {
+ sd.setLocation(tablePath.toString());
+ }
+ }
+
+ // set column information
+ List<Column> columns = tableDesc.getSchema().getColumns();
+ ArrayList<FieldSchema> cols = new
ArrayList<FieldSchema>(columns.size());
+
+ for (Column eachField : columns) {
+ cols.add(new FieldSchema(eachField.getSimpleName(),
+ HiveCatalogUtil.getHiveFieldType(eachField.getDataType()), ""));
+ }
+ sd.setCols(cols);
+
+ // set partition keys
+ if (tableDesc.hasPartition() &&
tableDesc.getPartitionMethod().getPartitionType().equals(PartitionType.COLUMN))
{
+ List<FieldSchema> partitionKeys = new ArrayList<FieldSchema>();
+ for (Column eachPartitionKey :
tableDesc.getPartitionMethod().getExpressionSchema().getColumns()) {
+ partitionKeys.add(new FieldSchema(eachPartitionKey.getSimpleName(),
+
HiveCatalogUtil.getHiveFieldType(eachPartitionKey.getDataType()), ""));
+ }
+ table.setPartitionKeys(partitionKeys);
+ }
+
+ if
(tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.RCFILE)) {
+ String serde =
tableDesc.getMeta().getOption(StorageConstants.RCFILE_SERDE);
+
sd.setInputFormat(org.apache.hadoop.hive.ql.io.RCFileInputFormat.class.getName());
+
sd.setOutputFormat(org.apache.hadoop.hive.ql.io.RCFileOutputFormat.class.getName());
+ if (StorageConstants.DEFAULT_TEXT_SERDE.equals(serde)) {
+
sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe.class.getName());
+ } else {
+ sd.getSerdeInfo().setSerializationLib(
+
org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe.class.getName());
+ }
+
+ if
(tableDesc.getMeta().getOptions().containsKey(StorageConstants.RCFILE_NULL)) {
+ table.putToParameters(serdeConstants.SERIALIZATION_NULL_FORMAT,
+
StringEscapeUtils.unescapeJava(tableDesc.getMeta().getOption(StorageConstants.RCFILE_NULL)));
+ }
+ } else if
(tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.CSV)
+ ||
tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.TEXTFILE)) {
+
sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName());
+
sd.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class.getName());
+
sd.setOutputFormat(org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat.class.getName());
+
+ String fieldDelimiter =
tableDesc.getMeta().getOption(StorageConstants.TEXT_DELIMITER,
+ StorageConstants.DEFAULT_FIELD_DELIMITER);
+
+ // User can use an unicode for filed delimiter such as \u0001, \001.
+ // In this case, java console will convert this value into "\\u001".
+ // And hive will un-espace this value again.
+ // As a result, user can use right field delimiter.
+ // So, we have to un-escape this value.
+ sd.getSerdeInfo().putToParameters(serdeConstants.SERIALIZATION_FORMAT,
+ StringEscapeUtils.unescapeJava(fieldDelimiter));
+ sd.getSerdeInfo().putToParameters(serdeConstants.FIELD_DELIM,
+ StringEscapeUtils.unescapeJava(fieldDelimiter));
+ table.getParameters().remove(StorageConstants.TEXT_DELIMITER);
+
+ if (tableDesc.getMeta().containsOption(StorageConstants.TEXT_NULL)) {
+ table.putToParameters(serdeConstants.SERIALIZATION_NULL_FORMAT,
+
StringEscapeUtils.unescapeJava(tableDesc.getMeta().getOption(StorageConstants.TEXT_NULL)));
+ table.getParameters().remove(StorageConstants.TEXT_NULL);
+ }
+ } else if
(tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.SEQUENCEFILE))
{
+ String serde =
tableDesc.getMeta().getOption(StorageConstants.SEQUENCEFILE_SERDE);
+
sd.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class.getName());
+
sd.setOutputFormat(org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat.class.getName());
+
+ if (StorageConstants.DEFAULT_TEXT_SERDE.equals(serde)) {
+
sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName());
+
+ String fieldDelimiter =
tableDesc.getMeta().getOption(StorageConstants.SEQUENCEFILE_DELIMITER,
+ StorageConstants.DEFAULT_FIELD_DELIMITER);
+
+ // User can use an unicode for filed delimiter such as \u0001, \001.
+ // In this case, java console will convert this value into "\\u001".
+ // And hive will un-espace this value again.
+ // As a result, user can use right field delimiter.
+ // So, we have to un-escape this value.
+
sd.getSerdeInfo().putToParameters(serdeConstants.SERIALIZATION_FORMAT,
+ StringEscapeUtils.unescapeJava(fieldDelimiter));
+ sd.getSerdeInfo().putToParameters(serdeConstants.FIELD_DELIM,
+ StringEscapeUtils.unescapeJava(fieldDelimiter));
+
table.getParameters().remove(StorageConstants.SEQUENCEFILE_DELIMITER);
+ } else {
+
sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.class.getName());
+ }
+
+ if
(tableDesc.getMeta().containsOption(StorageConstants.SEQUENCEFILE_NULL)) {
+ table.putToParameters(serdeConstants.SERIALIZATION_NULL_FORMAT,
+
StringEscapeUtils.unescapeJava(tableDesc.getMeta().getOption(StorageConstants.SEQUENCEFILE_NULL)));
+ table.getParameters().remove(StorageConstants.SEQUENCEFILE_NULL);
+ }
+ } else {
+ if
(tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.PARQUET)) {
+
sd.setInputFormat(parquet.hive.DeprecatedParquetInputFormat.class.getName());
+
sd.setOutputFormat(parquet.hive.DeprecatedParquetOutputFormat.class.getName());
+
sd.getSerdeInfo().setSerializationLib(parquet.hive.serde.ParquetHiveSerDe.class.getName());
+ } else {
+ throw new CatalogException(new
NotImplementedException(tableDesc.getMeta().getStoreType
+ ().name()));
+ }
+ }
+
+ sd.setSortCols(new ArrayList<Order>());
+
+ table.setSd(sd);
+ client.getHiveClient().createTable(table);
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new CatalogException(e);
+ } finally {
+ if(client != null) client.release();
+ }
+ }
+
+ @Override
+ public final void dropTable(String databaseName, final String tableName)
throws CatalogException {
+ HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+
+ try {
+ client = clientPool.getClient();
+ client.getHiveClient().dropTable(databaseName, tableName, false, false);
+ } catch (NoSuchObjectException nsoe) {
+ } catch (Exception e) {
+ throw new CatalogException(e);
+ } finally {
+ if (client != null) {
+ client.release();
+ }
+ }
+ }
+
+
+ @Override
+ public void alterTable(final CatalogProtos.AlterTableDescProto
alterTableDescProto) throws CatalogException {
+ final String[] split =
CatalogUtil.splitFQTableName(alterTableDescProto.getTableName());
+
+ if (split.length == 1) {
+ throw new IllegalArgumentException("alterTable() requires a qualified
table name, but it is \""
+ + alterTableDescProto.getTableName() + "\".");
+ }
+
+ final String databaseName = split[0];
+ final String tableName = split[1];
+ String partitionName = null;
+ CatalogProtos.PartitionDescProto partitionDesc = null;
+
+ switch (alterTableDescProto.getAlterTableType()) {
+ case RENAME_TABLE:
+ if
(existTable(databaseName,alterTableDescProto.getNewTableName().toLowerCase())) {
+ throw new
AlreadyExistsTableException(alterTableDescProto.getNewTableName());
+ }
+ renameTable(databaseName, tableName,
alterTableDescProto.getNewTableName().toLowerCase());
+ break;
+ case RENAME_COLUMN:
+ if (existColumn(databaseName,tableName,
alterTableDescProto.getAlterColumnName().getNewColumnName())) {
+ throw new
ColumnNameAlreadyExistException(alterTableDescProto.getAlterColumnName().getNewColumnName());
+ }
+ renameColumn(databaseName, tableName,
alterTableDescProto.getAlterColumnName());
+ break;
+ case ADD_COLUMN:
+ if (existColumn(databaseName,tableName,
alterTableDescProto.getAddColumn().getName())) {
+ throw new
ColumnNameAlreadyExistException(alterTableDescProto.getAddColumn().getName());
+ }
+ addNewColumn(databaseName, tableName,
alterTableDescProto.getAddColumn());
+ break;
+ case ADD_PARTITION:
+ partitionName =
alterTableDescProto.getPartitionDesc().getPartitionName();
+ partitionDesc = getPartition(databaseName, tableName, partitionName);
+ if(partitionDesc != null) {
+ throw new AlreadyExistsPartitionException(databaseName, tableName,
partitionName);
+ }
+ addPartition(databaseName, tableName,
alterTableDescProto.getPartitionDesc());
+ break;
+ case DROP_PARTITION:
+ partitionName =
alterTableDescProto.getPartitionDesc().getPartitionName();
+ partitionDesc = getPartition(databaseName, tableName, partitionName);
+ if(partitionDesc == null) {
+ throw new NoSuchPartitionException(databaseName, tableName,
partitionName);
+ }
+ dropPartition(databaseName, tableName, partitionDesc);
+ break;
+ case SET_PROPERTY:
+ // TODO - not implemented yet
+ break;
+ default:
+ //TODO
+ }
+ }
+
+
+ private void renameTable(String databaseName, String tableName, String
newTableName) {
+ HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+ try {
+ client = clientPool.getClient();
+ Table newTable = client.getHiveClient().getTable(databaseName,
tableName);
+ newTable.setTableName(newTableName);
+ client.getHiveClient().alter_table(databaseName, tableName, newTable);
+
+ } catch (NoSuchObjectException nsoe) {
+ } catch (Exception e) {
+ throw new CatalogException(e);
+ } finally {
+ if (client != null) {
+ client.release();
+ }
+ }
+ }
+
+ private void renameColumn(String databaseName, String tableName,
CatalogProtos.AlterColumnProto alterColumnProto) {
+ HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+ try {
+
+ client = clientPool.getClient();
+ Table table = client.getHiveClient().getTable(databaseName, tableName);
+ List<FieldSchema> columns = table.getSd().getCols();
+
+ for (final FieldSchema currentColumn : columns) {
+ if
(currentColumn.getName().equalsIgnoreCase(alterColumnProto.getOldColumnName()))
{
+ currentColumn.setName(alterColumnProto.getNewColumnName());
+ }
+ }
+ client.getHiveClient().alter_table(databaseName, tableName, table);
+
+ } catch (NoSuchObjectException nsoe) {
+ } catch (Exception e) {
+ throw new CatalogException(e);
+ } finally {
+ if (client != null) {
+ client.release();
+ }
+ }
+ }
+
+
+ private void addNewColumn(String databaseName, String tableName,
CatalogProtos.ColumnProto columnProto) {
+ HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+ try {
+
+ client = clientPool.getClient();
+ Table table = client.getHiveClient().getTable(databaseName, tableName);
+ List<FieldSchema> columns = table.getSd().getCols();
+ columns.add(new FieldSchema(columnProto.getName(),
+ HiveCatalogUtil.getHiveFieldType(columnProto.getDataType()), ""));
+ client.getHiveClient().alter_table(databaseName, tableName, table);
+
+
+ } catch (NoSuchObjectException nsoe) {
+ } catch (Exception e) {
+ throw new CatalogException(e);
+ } finally {
+ if (client != null) {
+ client.release();
+ }
+ }
+ }
+
+ private void addPartition(String databaseName, String tableName,
CatalogProtos.PartitionDescProto
+ partitionDescProto) {
+ HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+ try {
+
+ client = clientPool.getClient();
+
+ Partition partition = new Partition();
+ partition.setDbName(databaseName);
+ partition.setTableName(tableName);
+
+ List<String> values = Lists.newArrayList();
+ for(CatalogProtos.PartitionKeyProto keyProto :
partitionDescProto.getPartitionKeysList()) {
+ values.add(keyProto.getPartitionValue());
+ }
+ partition.setValues(values);
+
+ Table table = client.getHiveClient().getTable(databaseName, tableName);
+ StorageDescriptor sd = table.getSd();
+ sd.setLocation(partitionDescProto.getPath());
+ partition.setSd(sd);
+
+ client.getHiveClient().add_partition(partition);
+ } catch (Exception e) {
+ throw new CatalogException(e);
+ } finally {
+ if (client != null) {
+ client.release();
+ }
+ }
+ }
+
+ private void dropPartition(String databaseName, String tableName,
CatalogProtos.PartitionDescProto
+ partitionDescProto) {
+ HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+ try {
+
+ client = clientPool.getClient();
+
+ List<String> values = Lists.newArrayList();
+ for(CatalogProtos.PartitionKeyProto keyProto :
partitionDescProto.getPartitionKeysList()) {
+ values.add(keyProto.getPartitionValue());
+ }
+ client.getHiveClient().dropPartition(databaseName, tableName, values,
true);
+ } catch (Exception e) {
+ throw new CatalogException(e);
+ } finally {
+ if (client != null) {
+ client.release();
+ }
+ }
+ }
+
+ @Override
+ public void addPartitionMethod(CatalogProtos.PartitionMethodProto
partitionMethodProto) throws CatalogException {
+ // TODO - not implemented yet
++ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CatalogProtos.PartitionMethodProto getPartitionMethod(String
databaseName, String tableName)
+ throws CatalogException {
- return null; // TODO - not implemented yet
++ // TODO - not implemented yet
++ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean existPartitionMethod(String databaseName, String tableName)
throws CatalogException {
- return false; // TODO - not implemented yet
++ // TODO - not implemented yet
++ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropPartitionMethod(String databaseName, String tableName)
throws CatalogException {
+ // TODO - not implemented yet
++ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<CatalogProtos.PartitionDescProto> getPartitions(String
databaseName,
+ String tableName)
throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+
+ @Override
+ public CatalogProtos.PartitionDescProto getPartition(String databaseName,
String tableName,
+ String partitionName)
throws CatalogException {
+ HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+ CatalogProtos.PartitionDescProto.Builder builder = null;
+
+ try {
+ client = clientPool.getClient();
+
+ Partition partition = client.getHiveClient().getPartition(databaseName,
tableName, partitionName);
+ builder = CatalogProtos.PartitionDescProto.newBuilder();
+ builder.setPartitionName(partitionName);
+ builder.setPath(partition.getSd().getLocation());
+
+ String[] partitionNames = partitionName.split("/");
+
+ for (int i = 0; i < partition.getValues().size(); i++) {
+ String value = partition.getValues().get(i);
+ CatalogProtos.PartitionKeyProto.Builder keyBuilder =
CatalogProtos.PartitionKeyProto.newBuilder();
+
+ String columnName = partitionNames[i].split("=")[0];
+ keyBuilder.setColumnName(columnName);
+ keyBuilder.setPartitionValue(value);
+ builder.addPartitionKeys(keyBuilder);
+ }
+ } catch (NoSuchObjectException e) {
+ return null;
+ } catch (Exception e) {
+ throw new CatalogException(e);
+ } finally {
+ if (client != null) {
+ client.release();
+ }
+ }
+ return builder.build();
+ }
+
+ @Override
+ public final void addFunction(final FunctionDesc func) throws
CatalogException {
+ // TODO - not implemented yet
++ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void deleteFunction(final FunctionDesc func) throws
CatalogException {
+ // TODO - not implemented yet
++ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void existFunction(final FunctionDesc func) throws
CatalogException {
+ // TODO - not implemented yet
++ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final List<String> getAllFunctionNames() throws CatalogException {
+ // TODO - not implemented yet
- return null;
++ throw new UnsupportedOperationException();
+ }
+
+ @Override
- public void dropIndex(String databaseName, String indexName) throws
CatalogException {
++ public void createIndex(CatalogProtos.IndexDescProto proto) throws
CatalogException {
+ // TODO - not implemented yet
++ throw new UnsupportedOperationException();
+ }
+
+ @Override
- public boolean existIndexByName(String databaseName, String indexName)
throws CatalogException {
++ public void dropIndex(String databaseName, String indexName) throws
CatalogException {
+ // TODO - not implemented yet
- return false;
++ throw new UnsupportedOperationException();
+ }
+
+ @Override
- public CatalogProtos.IndexDescProto[] getIndexes(String databaseName,
String tableName) throws CatalogException {
++ public CatalogProtos.IndexDescProto getIndexByName(String databaseName,
String indexName) throws CatalogException {
+ // TODO - not implemented yet
- return null;
++ throw new UnsupportedOperationException();
+ }
+
+ @Override
- public void createIndex(CatalogProtos.IndexDescProto proto) throws
CatalogException {
++ public CatalogProtos.IndexDescProto getIndexByColumns(String databaseName,
String tableName, String[] columnNames)
++ throws CatalogException {
+ // TODO - not implemented yet
++ throw new UnsupportedOperationException();
+ }
+
+ @Override
- public CatalogProtos.IndexDescProto getIndexByName(String databaseName,
String indexName) throws CatalogException {
++ public boolean existIndexByName(String databaseName, String indexName)
throws CatalogException {
+ // TODO - not implemented yet
- return null;
++ throw new UnsupportedOperationException();
+ }
+
+ @Override
- public CatalogProtos.IndexDescProto getIndexByColumn(String databaseName,
String tableName, String columnName)
++ public boolean existIndexByColumns(String databaseName, String tableName,
String[] columnNames)
+ throws CatalogException {
+ // TODO - not implemented yet
- return null;
++ throw new UnsupportedOperationException();
+ }
+
+ @Override
- public boolean existIndexByColumn(String databaseName, String tableName,
String columnName) throws CatalogException {
++ public List<String> getAllIndexNamesByTable(String databaseName, String
tableName) throws CatalogException {
+ // TODO - not implemented yet
- return false;
++ throw new UnsupportedOperationException();
++ }
++
++ @Override
++ public boolean existIndexesByTable(String databaseName, String tableName)
throws CatalogException {
++ // TODO - not implemented yet
++ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void close() {
+ clientPool.close();
+ }
+
+ private boolean existColumn(final String databaseName ,final String
tableName , final String columnName) throws CatalogException {
+ boolean exist = false;
+ HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+
+ try {
+
+ client = clientPool.getClient();
+ Table table = client.getHiveClient().getTable(databaseName, tableName);
+ List<FieldSchema> columns = table.getSd().getCols();
+
+ for (final FieldSchema currentColumn : columns) {
+ if (currentColumn.getName().equalsIgnoreCase(columnName)) {
+ exist = true;
+ }
+ }
+ client.getHiveClient().alter_table(databaseName, tableName, table);
+
+ } catch (NoSuchObjectException nsoe) {
+ } catch (Exception e) {
+ throw new CatalogException(e);
+ } finally {
+ if (client != null) {
+ client.release();
+ }
+ }
+
+ return exist;
+ }
+
+ @Override
+ public List<ColumnProto> getAllColumns() throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<DatabaseProto> getAllDatabases() throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
- public List<IndexProto> getAllIndexes() throws CatalogException {
++ public List<IndexDescProto> getAllIndexes() throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<TablePartitionProto> getAllPartitions() throws CatalogException
{
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<TableOptionProto> getAllTableOptions() throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<TableStatsProto> getAllTableStats() throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<TableDescriptorProto> getAllTables() throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<TablespaceProto> getTablespaces() throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+ }
http://git-wip-us.apache.org/repos/asf/tajo/blob/02800343/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/02800343/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
----------------------------------------------------------------------
diff --cc
tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
index 90a51b6,9d0e427..5fa1c67
---
a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
+++
b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
@@@ -259,124 -260,6 +259,124 @@@ public class CatalogAdminClientImpl imp
}
@Override
+ public IndexDescProto getIndex(final String indexName) throws
ServiceException {
- return new ServerCallable<IndexDescProto>(connection.connPool,
- connection.getTajoMasterAddr(), TajoMasterClientProtocol.class,
false, true) {
++ return new ServerCallable<IndexDescProto>(connection.manager,
++ connection.getTajoMasterAddr(), TajoMasterClientProtocol.class,
false) {
+
+ @Override
+ public IndexDescProto call(NettyClientBase client) throws Exception {
+ BlockingInterface tajoMasterService = client.getStub();
+ return tajoMasterService.getIndexWithName(null,
+ connection.convertSessionedString(indexName));
+ }
+ }.withRetries();
+ }
+
+ @Override
+ public boolean existIndex(final String indexName) throws ServiceException {
- return new ServerCallable<Boolean>(connection.connPool,
- connection.getTajoMasterAddr(), TajoMasterClientProtocol.class,
false, true) {
++ return new ServerCallable<Boolean>(connection.manager,
++ connection.getTajoMasterAddr(), TajoMasterClientProtocol.class,
false) {
+
+ @Override
+ public Boolean call(NettyClientBase client) throws Exception {
+ BlockingInterface tajoMasterService = client.getStub();
+ return tajoMasterService.existIndexWithName(null,
+ connection.convertSessionedString(indexName)).getValue();
+ }
+ }.withRetries();
+ }
+
+ @Override
+ public List<IndexDescProto> getIndexes(final String tableName) throws
ServiceException {
- return new ServerCallable<List<IndexDescProto>>(connection.connPool,
- connection.getTajoMasterAddr(), TajoMasterClientProtocol.class,
false, true) {
++ return new ServerCallable<List<IndexDescProto>>(connection.manager,
++ connection.getTajoMasterAddr(), TajoMasterClientProtocol.class,
false) {
+
+ @Override
+ public List<IndexDescProto> call(NettyClientBase client) throws
Exception {
+ BlockingInterface tajoMasterService = client.getStub();
+ GetIndexesResponse response =
tajoMasterService.getIndexesForTable(null,
+ connection.convertSessionedString(tableName));
+ if (response.getResult().getResultCode() == ResultCode.OK) {
+ return response.getIndexesList();
+ } else {
+ throw new SQLException(response.getResult().getErrorMessage());
+ }
+ }
+ }.withRetries();
+ }
+
+ @Override
+ public boolean hasIndexes(final String tableName) throws ServiceException {
- return new ServerCallable<Boolean>(connection.connPool,
- connection.getTajoMasterAddr(), TajoMasterClientProtocol.class,
false, true) {
++ return new ServerCallable<Boolean>(connection.manager,
++ connection.getTajoMasterAddr(), TajoMasterClientProtocol.class,
false) {
+
+ @Override
+ public Boolean call(NettyClientBase client) throws Exception {
+ BlockingInterface tajoMasterService = client.getStub();
+ return tajoMasterService.existIndexesForTable(null,
+ connection.convertSessionedString(tableName)).getValue();
+ }
+ }.withRetries();
+ }
+
+ @Override
+ public IndexDescProto getIndex(final String tableName, final String[]
columnNames) throws ServiceException {
- return new ServerCallable<IndexDescProto>(connection.connPool,
- connection.getTajoMasterAddr(), TajoMasterClientProtocol.class,
false, true) {
++ return new ServerCallable<IndexDescProto>(connection.manager,
++ connection.getTajoMasterAddr(), TajoMasterClientProtocol.class,
false) {
+
+ @Override
+ public IndexDescProto call(NettyClientBase client) throws Exception {
+ BlockingInterface tajoMasterService = client.getStub();
+ GetIndexWithColumnsRequest.Builder builder =
GetIndexWithColumnsRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setTableName(tableName);
+ for (String eachColumnName : columnNames) {
+ builder.addColumnNames(eachColumnName);
+ }
+ GetIndexWithColumnsResponse response =
tajoMasterService.getIndexWithColumns(null, builder.build());
+ if (response.getResult().getResultCode() == ResultCode.OK) {
+ return response.getIndexDesc();
+ } else {
+ throw new SQLException(response.getResult().getErrorMessage());
+ }
+ }
+ }.withRetries();
+ }
+
+ @Override
+ public boolean existIndex(final String tableName, final String[]
columnName) throws ServiceException {
- return new ServerCallable<Boolean>(connection.connPool,
- connection.getTajoMasterAddr(), TajoMasterClientProtocol.class,
false, true) {
++ return new ServerCallable<Boolean>(connection.manager,
++ connection.getTajoMasterAddr(), TajoMasterClientProtocol.class,
false) {
+
+ @Override
+ public Boolean call(NettyClientBase client) throws Exception {
+ BlockingInterface tajoMasterService = client.getStub();
+ GetIndexWithColumnsRequest.Builder builder =
GetIndexWithColumnsRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setTableName(tableName);
+ for (String eachColumnName : columnName) {
+ builder.addColumnNames(eachColumnName);
+ }
+ return tajoMasterService.existIndexWithColumns(null,
builder.build()).getValue();
+ }
+ }.withRetries();
+ }
+
+ @Override
+ public boolean dropIndex(final String indexName) throws ServiceException {
- return new ServerCallable<Boolean>(connection.connPool,
- connection.getTajoMasterAddr(), TajoMasterClientProtocol.class,
false, true) {
++ return new ServerCallable<Boolean>(connection.manager,
++ connection.getTajoMasterAddr(), TajoMasterClientProtocol.class,
false) {
+
+ @Override
+ public Boolean call(NettyClientBase client) throws Exception {
+ BlockingInterface tajoMasterService = client.getStub();
+ return tajoMasterService.dropIndex(null,
+ connection.convertSessionedString(indexName)).getValue();
+ }
+ }.withRetries();
+ }
+
+ @Override
public void close() throws IOException {
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/02800343/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/02800343/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/02800343/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/02800343/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/02800343/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/02800343/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
----------------------------------------------------------------------