http://git-wip-us.apache.org/repos/asf/tajo/blob/c9b8f511/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java index 73f3cf5..aed2fe6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java @@ -35,12 +35,14 @@ import org.apache.tajo.algebra.Expr; import org.apache.tajo.algebra.JsonHelper; import org.apache.tajo.annotation.Nullable; import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.IndexDesc.IndexKey; import org.apache.tajo.catalog.exception.*; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.engine.eval.EvalNode; import org.apache.tajo.engine.exception.IllegalQueryStatusException; @@ -59,6 +61,7 @@ import org.apache.tajo.master.querymaster.QueryJobManager; import org.apache.tajo.master.querymaster.QueryMasterTask; import org.apache.tajo.master.session.Session; import org.apache.tajo.storage.*; +import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -187,10 +190,17 @@ public class GlobalEngine extends AbstractService { if (PlannerUtil.checkIfDDLPlan(rootNode)) { context.getSystemMetrics().counter("Query", "numDDLQuery").inc(); - updateQuery(session, rootNode.getChild()); - responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); - responseBuilder.setResultCode(ClientProtos.ResultCode.OK); + if (PlannerUtil.checkIfCreateIndexPlan(rootNode)) { + return createIndex(session, (CreateIndexNode)rootNode.getChild(), queryContext, + plan, sql, jsonExpr, responseBuilder); + } else { + updateQuery(session, rootNode.getChild()); + responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); + responseBuilder.setResultCode(ClientProtos.ResultCode.OK); + + return responseBuilder.build(); + } } else if (plan.isExplain()) { // explain query String explainStr = PlannerUtil.buildExplainString(plan.getRootBlock().getRoot()); Schema schema = new Schema(); @@ -216,8 +226,10 @@ public class GlobalEngine extends AbstractService { responseBuilder.setResultCode(ClientProtos.ResultCode.OK); responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); - // Simple query indicates a form of 'select * from tb_name [LIMIT X];'. + return responseBuilder.build(); + } else if (PlannerUtil.checkIfSimpleQuery(plan)) { + // Simple query indicates a form of 'select * from tb_name [LIMIT X];'. ScanNode scanNode = plan.getRootBlock().getNode(NodeType.SCAN); TableDesc desc = scanNode.getTableDesc(); if (plan.getRootBlock().hasNode(NodeType.LIMIT)) { @@ -232,8 +244,10 @@ public class GlobalEngine extends AbstractService { responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); responseBuilder.setResultCode(ClientProtos.ResultCode.OK); - // NonFromQuery indicates a form of 'select a, x+y;' + return responseBuilder.build(); + } else if (PlannerUtil.checkIfNonFromQuery(plan)) { + // NonFromQuery indicates a form of 'select a, x+y;' Target [] targets = plan.getRootBlock().getRawTargets(); if (targets == null) { throw new PlanningException("No targets"); @@ -261,32 +275,40 @@ public class GlobalEngine extends AbstractService { responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); responseBuilder.setResultCode(ClientProtos.ResultCode.OK); } + return responseBuilder.build(); + } else { // it requires distributed execution. So, the query is forwarded to a query master. context.getSystemMetrics().counter("Query", "numDMLQuery").inc(); - hookManager.doHooks(queryContext, plan); + return executeInCluster(queryContext, plan, session, sql, jsonExpr, responseBuilder); + } + } - QueryJobManager queryJobManager = this.context.getQueryJobManager(); - QueryInfo queryInfo; + private SubmitQueryResponse executeInCluster(QueryContext queryContext, LogicalPlan plan, Session session, String sql, String jsonExpr, + SubmitQueryResponse.Builder responseBuilder) + throws Exception { + LogicalRootNode rootNode = plan.getRootBlock().getRoot(); + hookManager.doHooks(queryContext, plan); - queryInfo = queryJobManager.createNewQueryJob(session, queryContext, sql, jsonExpr, rootNode); + QueryJobManager queryJobManager = this.context.getQueryJobManager(); + QueryInfo queryInfo; - if(queryInfo == null) { - responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); - responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR); - responseBuilder.setErrorMessage("Fail starting QueryMaster."); - } else { - responseBuilder.setIsForwarded(true); - responseBuilder.setQueryId(queryInfo.getQueryId().getProto()); - responseBuilder.setResultCode(ClientProtos.ResultCode.OK); - if(queryInfo.getQueryMasterHost() != null) { - responseBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost()); - } - responseBuilder.setQueryMasterPort(queryInfo.getQueryMasterClientPort()); - LOG.info("Query is forwarded to " + queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort()); + queryInfo = queryJobManager.createNewQueryJob(session, queryContext, sql, jsonExpr, rootNode); + + if(queryInfo == null) { + responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); + responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR); + responseBuilder.setErrorMessage("Fail starting QueryMaster."); + } else { + responseBuilder.setIsForwarded(true); + responseBuilder.setQueryId(queryInfo.getQueryId().getProto()); + responseBuilder.setResultCode(ClientProtos.ResultCode.OK); + if(queryInfo.getQueryMasterHost() != null) { + responseBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost()); } + responseBuilder.setQueryMasterPort(queryInfo.getQueryMasterClientPort()); + LOG.info("Query is forwarded to " + queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort()); } - SubmitQueryResponse response = responseBuilder.build(); - return response; + return responseBuilder.build(); } private void insertNonFromQuery(QueryContext queryContext, InsertNode insertNode, SubmitQueryResponse.Builder responseBuilder) @@ -448,6 +470,11 @@ public class GlobalEngine extends AbstractService { TruncateTableNode truncateTable = (TruncateTableNode) root; truncateTable(session, truncateTable); return true; + case DROP_INDEX: + DropIndexNode dropIndexNode = (DropIndexNode) root; + dropIndex(session, dropIndexNode); + return true; + default: throw new InternalError("updateQuery cannot handle such query: \n" + root.toJson()); } @@ -636,6 +663,98 @@ public class GlobalEngine extends AbstractService { } } + /** + * Create an index for a given table. + * @param session user session + * @param createIndexNode the root of logical plan + */ + private SubmitQueryResponse createIndex(final Session session, final CreateIndexNode createIndexNode, + QueryContext queryContext, LogicalPlan plan, + String sql, String jsonExpr, + SubmitQueryResponse.Builder responseBuilder) throws Exception { + SubmitQueryResponse response = null; + final CatalogService catalog = context.getCatalog(); + final String dbName = session.getCurrentDatabase(); + String indexName = createIndexNode.getIndexName(); + if (CatalogUtil.isFQTableName(indexName)) { + indexName = CatalogUtil.splitFQTableName(indexName)[1]; + } + + boolean exists = catalog.existIndexByName(dbName, indexName); + if (exists) { + if (createIndexNode.isIfNotExists()) { + LOG.info("index \"" + indexName + "\" already exists." ); + } else { + throw new AlreadyExistsIndexException(createIndexNode.getIndexName()); + } + } else { + response = executeInCluster(queryContext, plan, session, sql, jsonExpr, responseBuilder); + + // get the table name and predicate from scan + try { + ScanNode scanNode = PlannerUtil.findTopNode(createIndexNode, NodeType.SCAN); + String tableName; + if (CatalogUtil.isFQTableName(scanNode.getTableName())) { + tableName = CatalogUtil.splitFQTableName(scanNode.getTableName())[1]; + } else { + tableName = scanNode.getTableName(); + } + String predicate = scanNode.hasQual() ? scanNode.getQual().toJson() : null; + // extract index keys + List<IndexKey> indexKeys = TUtil.newList(); + for (SortSpec eachKey : createIndexNode.getSortSpecs()) { + indexKeys.add(new IndexKey(eachKey.getSortKey().toJson(), eachKey.isAscending(), eachKey.isNullFirst())); + } + + IndexDesc indexDesc = new IndexDesc(indexName, dbName, tableName, createIndexNode.getIndexType(), + indexKeys, createIndexNode.isUnique(), createIndexNode.isClustered(), predicate); + catalog.createIndex(indexDesc); + } catch (Exception e) { + // delete index + deleteIndexFiles(dbName, indexName); + } + } + return response; + } + + /** + * Drop the specified index. + * @param session user session + * @param indexName index name + * @param ifExists if exists + */ + public void dropIndex(final Session session, String indexName, boolean ifExists) { + final CatalogService catalog = context.getCatalog(); + final String dbName = session.getCurrentDatabase(); + + boolean exists = catalog.existIndexByName(dbName, indexName); + LOG.info("index name exist: " + exists); + if (!exists) { + if (ifExists) { + LOG.info("index \"" + indexName + "\" does not exist." ); + } else { + throw new NoSuchIndexException(indexName); + } + } else { + catalog.dropIndex(dbName, indexName); + deleteIndexFiles(dbName, indexName); + } + } + + private void deleteIndexFiles(String dbName, String indexName) { + Path indexPath = new Path(context.getConf().getVar(ConfVars.WAREHOUSE_DIR), dbName + "/" + indexName); + try { + FileSystem fs = indexPath.getFileSystem(context.getConf()); + fs.delete(indexPath, true); + } catch (IOException e) { + throw new InternalError(e.getMessage()); + } + } + + private void dropIndex(final Session session, final DropIndexNode dropIndexNode) { + dropIndex(session, dropIndexNode.getIndexName(), dropIndexNode.isIfExists()); + } + private boolean existColumnName(String tableName, String columnName) { final TableDesc tableDesc = catalog.getTableDesc(tableName); return tableDesc.getSchema().containsByName(columnName) ? true : false; @@ -692,7 +811,7 @@ public class GlobalEngine extends AbstractService { if (exists) { if (ifNotExists) { - LOG.info("relation \"" + qualifiedName + "\" is already exists." ); + LOG.info("relation \"" + qualifiedName + "\" already exists." ); return catalog.getTableDesc(databaseName, simpleTableName); } else { throw new AlreadyExistsTableException(CatalogUtil.buildFQName(databaseName, tableName));
http://git-wip-us.apache.org/repos/asf/tajo/blob/c9b8f511/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index 97f59ef..1ac85e3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -27,7 +27,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; -import org.apache.tajo.*; +import org.apache.tajo.QueryId; +import org.apache.tajo.QueryIdFactory; +import org.apache.tajo.TajoIdProtos; +import org.apache.tajo.TajoProtos; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.exception.NoSuchDatabaseException; import org.apache.tajo.catalog.partition.PartitionMethodDesc; @@ -751,5 +754,42 @@ public class TajoMasterClientService extends AbstractService { throw new ServiceException(t); } } + + @Override + public BoolProto existIndex(RpcController controller, SessionedStringProto request) throws ServiceException { + try { + Session session = context.getSessionManager().getSession(request.getSessionId().getId()); + + String databaseName; + String indexName; + if (CatalogUtil.isFQTableName(request.getValue())) { + String [] splitted = CatalogUtil.splitFQTableName(request.getValue()); + databaseName = splitted[0]; + indexName = splitted[1]; + } else { + databaseName = session.getCurrentDatabase(); + indexName = request.getValue(); + } + + if (catalog.existIndexByName(databaseName, indexName)) { + return BOOL_TRUE; + } else { + return BOOL_FALSE; + } + } catch (Throwable e) { + throw new ServiceException(e); + } + } + + @Override + public BoolProto dropIndex(RpcController controller, SessionedStringProto request) throws ServiceException { + try { + Session session = context.getSessionManager().getSession(request.getSessionId().getId()); + context.getGlobalEngine().dropIndex(session, request.getValue(), true); + return BOOL_TRUE; + } catch (Throwable t) { + throw new ServiceException(t); + } + } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/c9b8f511/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java index 7ecf27a..2b67538 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java @@ -294,14 +294,6 @@ public class TaskAttemptContext { return fragmentMap.get(id).toArray(new FragmentProto[fragmentMap.get(id).size()]); } - public long getUniqueKeyFromFragments() { - List<FragmentProto> totalFragments = new ArrayList<FragmentProto>(); - for (List<FragmentProto> eachFragments : fragmentMap.values()) { - totalFragments.addAll(eachFragments); - } - return Objects.hashCode(totalFragments.toArray(new FragmentProto[totalFragments.size()])); - } - public int hashCode() { return Objects.hashCode(queryId); } http://git-wip-us.apache.org/repos/asf/tajo/blob/c9b8f511/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java index b97eb30..7e87c29 100644 --- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java +++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java @@ -36,7 +36,9 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.parser.SQLAnalyzer; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.util.FileUtil; -import org.junit.*; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.rules.TestName; import java.io.File; @@ -46,7 +48,10 @@ import java.net.URL; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; -import java.util.*; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import static org.junit.Assert.*; @@ -382,6 +387,16 @@ public class QueryTestCaseBase { } /** + * Assert that the index exists. + * + * @param indexName The index name to be checked. This name is case sensitive. + * @throws ServiceException + */ + public void assertIndexExists(String indexName) throws ServiceException { + assertTrue(client.existIndex(indexName)); + } + + /** * Assert that the table does not exist. * * @param tableName The table name to be checked. This name is case sensitive. @@ -390,6 +405,10 @@ public class QueryTestCaseBase { assertTrue(!client.existTable(tableName)); } + public void assertIndexNotExists(String indexName) throws ServiceException { + assertFalse(client.existIndex(indexName)); + } + public void assertColumnExists(String tableName,String columnName) throws ServiceException { TableDesc tableDesc = fetchTableMetaData(tableName); assertTrue(tableDesc.getSchema().containsByName(columnName)); http://git-wip-us.apache.org/repos/asf/tajo/blob/c9b8f511/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java index 2ca85cc..6a6d07c 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java @@ -54,7 +54,6 @@ import java.util.*; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; import static org.junit.Assert.*; -import static org.junit.Assert.assertEquals; public class TestLogicalPlanner { private static TajoTestingCluster util; http://git-wip-us.apache.org/repos/asf/tajo/blob/c9b8f511/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java index 920c932..6c36040 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java @@ -34,7 +34,6 @@ import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.NullDatum; @@ -798,7 +797,7 @@ public class TestPhysicalPlanner { FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCreateIndex"); - Path indexPath = StorageUtil.concatPath(conf.getVar(ConfVars.ROOT_DIR), "default/idx_employee"); + Path indexPath = StorageUtil.concatPath(TajoConf.getWarehouseDir(conf), "default/idx_employee"); if (sm.getFileSystem().exists(indexPath)) { sm.getFileSystem().delete(indexPath, true); } http://git-wip-us.apache.org/repos/asf/tajo/blob/c9b8f511/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCreateIndex.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCreateIndex.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCreateIndex.java index 2a66909..4714bd6 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCreateIndex.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCreateIndex.java @@ -18,6 +18,7 @@ package org.apache.tajo.engine.query; +import com.google.protobuf.ServiceException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.tajo.IntegrationTest; @@ -29,7 +30,9 @@ import org.junit.experimental.categories.Category; import java.io.IOException; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; @Category(IntegrationTest.class) public class TestCreateIndex extends QueryTestCaseBase { @@ -38,35 +41,50 @@ public class TestCreateIndex extends QueryTestCaseBase { super(TajoConstants.DEFAULT_DATABASE_NAME); } - private static void assertIndexExist(String indexName) throws IOException { + private void checkIndexExist(String indexName) throws IOException, ServiceException { Path indexPath = new Path(conf.getVar(ConfVars.WAREHOUSE_DIR), "default/" + indexName); FileSystem fs = indexPath.getFileSystem(conf); assertTrue(fs.exists(indexPath)); assertEquals(2, fs.listStatus(indexPath).length); - fs.deleteOnExit(indexPath); + assertIndexExists(indexName); + } + + private void checkIndexNotExist(String indexName) throws IOException, ServiceException { + Path indexPath = new Path(conf.getVar(ConfVars.WAREHOUSE_DIR), "default/" + indexName); + FileSystem fs = indexPath.getFileSystem(conf); + assertFalse(fs.exists(indexPath)); + assertIndexNotExists(indexName); } @Test public final void testCreateIndex() throws Exception { executeQuery(); - assertIndexExist("l_orderkey_idx"); + checkIndexExist("l_orderkey_idx"); + executeString("drop index l_orderkey_idx"); + checkIndexNotExist("l_orderkey_idx"); } @Test public final void testCreateIndexOnMultiAttrs() throws Exception { executeQuery(); - assertIndexExist("l_orderkey_partkey_idx"); + checkIndexExist("l_orderkey_partkey_idx"); + executeString("drop index l_orderkey_partkey_idx"); + checkIndexNotExist("l_orderkey_partkey_idx"); } @Test public final void testCreateIndexWithCondition() throws Exception { executeQuery(); - assertIndexExist("l_orderkey_partkey_lt10_idx"); + checkIndexExist("l_orderkey_partkey_lt10_idx"); + executeString("drop index l_orderkey_partkey_lt10_idx"); + checkIndexNotExist("l_orderkey_partkey_lt10_idx"); } @Test public final void testCreateIndexOnExpression() throws Exception { executeQuery(); - assertIndexExist("l_orderkey_100_lt10_idx"); + checkIndexExist("l_orderkey_100_lt10_idx"); + executeString("drop index l_orderkey_100_lt10_idx"); + checkIndexNotExist("l_orderkey_100_lt10_idx"); } }
