http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java index c6466f5..9187a69 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java @@ -33,15 +33,7 @@ import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto; -import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto; -import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto; -import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto; -import org.apache.tajo.catalog.proto.CatalogProtos.TablePartitionProto; -import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto; -import org.apache.tajo.catalog.proto.CatalogProtos.TablespaceProto; +import org.apache.tajo.catalog.proto.CatalogProtos.*; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.conf.TajoConf; @@ -297,43 +289,35 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult return tuples; } - + private List<Tuple> getIndexes(Schema outSchema) { - List<IndexProto> indexList = masterContext.getCatalog().getAllIndexes(); + List<IndexDescProto> indexList = masterContext.getCatalog().getAllIndexes(); List<Tuple> tuples = new ArrayList<Tuple>(indexList.size()); List<Column> columns = outSchema.getColumns(); Tuple aTuple; - - for (IndexProto index: indexList) { + + for (IndexDescProto index: indexList) { aTuple = new VTuple(outSchema.size()); - + for (int fieldId = 0; fieldId < columns.size(); fieldId++) { Column column = columns.get(fieldId); - + if ("db_id".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createInt4(index.getDbId())); + aTuple.put(fieldId, DatumFactory.createInt4(index.getTableIdentifier().getDbId())); } else if ("tid".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createInt4(index.getTId())); + aTuple.put(fieldId, DatumFactory.createInt4(index.getTableIdentifier().getTid())); } else if ("index_name".equalsIgnoreCase(column.getSimpleName())) { aTuple.put(fieldId, DatumFactory.createText(index.getIndexName())); - } else if ("column_name".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createText(index.getColumnName())); - } else if ("data_type".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createText(index.getDataType())); - } else if ("index_type".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createText(index.getIndexType())); - } else if ("is_unique".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createBool(index.getIsUnique())); - } else if ("is_clustered".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createBool(index.getIsClustered())); - } else if ("is_ascending".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createBool(index.getIsAscending())); + } else if ("index_method".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(index.getIndexMethod().name())); + } else if ("index_path".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(index.getIndexPath())); } } - + tuples.add(aTuple); } - + return tuples; }
http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index 249d335..f05f54b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -36,6 +36,7 @@ import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.exception.NoSuchDatabaseException; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.query.QueryContext; @@ -57,6 +58,7 @@ import org.apache.tajo.rpc.BlockingRpcServer; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto; +import org.apache.tajo.util.IPCUtil; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.ProtoUtil; @@ -143,19 +145,17 @@ public class TajoMasterClientService extends AbstractService { String sessionId = context.getSessionManager().createSession(request.getUsername(), databaseName); CreateSessionResponse.Builder builder = CreateSessionResponse.newBuilder(); - builder.setResultCode(ResultCode.OK); + builder.setResult(IPCUtil.buildOkRequestResult()); builder.setSessionId(TajoIdProtos.SessionIdProto.newBuilder().setId(sessionId).build()); builder.setSessionVars(ProtoUtil.convertFromMap(context.getSessionManager().getAllVariables(sessionId))); return builder.build(); } catch (NoSuchDatabaseException nsde) { CreateSessionResponse.Builder builder = CreateSessionResponse.newBuilder(); - builder.setResultCode(ResultCode.ERROR); - builder.setMessage(nsde.getMessage()); + builder.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR, nsde.getMessage(), null)); return builder.build(); } catch (InvalidSessionException e) { CreateSessionResponse.Builder builder = CreateSessionResponse.newBuilder(); - builder.setResultCode(ResultCode.ERROR); - builder.setMessage(e.getMessage()); + builder.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR, e.getMessage(), null)); return builder.build(); } } @@ -173,15 +173,14 @@ public class TajoMasterClientService extends AbstractService { public SessionUpdateResponse buildSessionUpdateOnSuccess(Map<String, String> variables) { SessionUpdateResponse.Builder builder = SessionUpdateResponse.newBuilder(); - builder.setResultCode(ResultCode.OK); + builder.setResult(IPCUtil.buildOkRequestResult()); builder.setSessionVars(new KeyValueSet(variables).getProto()); return builder.build(); } public SessionUpdateResponse buildSessionUpdateOnError(String message) { SessionUpdateResponse.Builder builder = SessionUpdateResponse.newBuilder(); - builder.setResultCode(ResultCode.ERROR); - builder.setMessage(message); + builder.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR, message, null)); return builder.build(); } @@ -288,12 +287,8 @@ public class TajoMasterClientService extends AbstractService { responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); responseBuilder.setIsForwarded(true); responseBuilder.setUserName(context.getConf().getVar(ConfVars.USERNAME)); - responseBuilder.setResultCode(ResultCode.ERROR); - if (e.getMessage() != null) { - responseBuilder.setErrorMessage(ExceptionUtils.getStackTrace(e)); - } else { - responseBuilder.setErrorMessage("Internal Error"); - } + responseBuilder.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR, + e.getMessage() == null ? "Internal Error" : ExceptionUtils.getStackTrace(e), null)); return responseBuilder.build(); } } @@ -304,23 +299,15 @@ public class TajoMasterClientService extends AbstractService { try { Session session = context.getSessionManager().getSession(request.getSessionId().getId()); QueryContext queryContext = new QueryContext(conf, session); - if (queryContext.getCurrentDatabase() == null) { - for (Map.Entry<String,String> e : queryContext.getAllKeyValus().entrySet()) { - System.out.println(e.getKey() + "=" + e.getValue()); - } - } - UpdateQueryResponse.Builder builder = UpdateQueryResponse.newBuilder(); + UpdateQueryResponse.Builder responseBuilder = UpdateQueryResponse.newBuilder(); try { context.getGlobalEngine().updateQuery(queryContext, request.getQuery(), request.getIsJson()); - builder.setResultCode(ResultCode.OK); - return builder.build(); + return responseBuilder.setResult(IPCUtil.buildOkRequestResult()).build(); } catch (Exception e) { - builder.setResultCode(ResultCode.ERROR); - if (e.getMessage() == null) { - builder.setErrorMessage(ExceptionUtils.getStackTrace(e)); - } - return builder.build(); + responseBuilder.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR, + e.getMessage() == null ? ExceptionUtils.getStackTrace(e) : null, null)); + return responseBuilder.build(); } } catch (Throwable t) { throw new ServiceException(t); @@ -460,7 +447,7 @@ public class TajoMasterClientService extends AbstractService { builder.setQueryId(request.getQueryId()); if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { - builder.setResultCode(ResultCode.OK); + builder.setResult(IPCUtil.buildOkRequestResult()); builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED); } else { QueryInProgress queryInProgress = context.getQueryJobManager().getQueryInProgress(queryId); @@ -474,7 +461,7 @@ public class TajoMasterClientService extends AbstractService { } if (queryInfo != null) { - builder.setResultCode(ResultCode.OK); + builder.setResult(IPCUtil.buildOkRequestResult()); builder.setState(queryInfo.getQueryState()); boolean isCreateTable = queryInfo.getQueryContext().isCreateTable(); @@ -495,11 +482,11 @@ public class TajoMasterClientService extends AbstractService { } else { Session session = context.getSessionManager().getSession(request.getSessionId().getId()); if (session.getNonForwardQueryResultScanner(queryId) != null) { - builder.setResultCode(ResultCode.OK); + builder.setResult(IPCUtil.buildOkRequestResult()); builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED); } else { - builder.setResultCode(ResultCode.ERROR); - builder.setErrorMessage("No such query: " + queryId.toString()); + builder.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR, + "No such query: " + queryId.toString(), null)); } } } @@ -531,17 +518,16 @@ public class TajoMasterClientService extends AbstractService { resultSetBuilder.addAllSerializedTuples(rows); builder.setResultSet(resultSetBuilder.build()); - builder.setResultCode(ResultCode.OK); + builder.setResult(IPCUtil.buildOkRequestResult()); LOG.info("Send result to client for " + request.getSessionId().getId() + "," + queryId + ", " + rows.size() + " rows"); } catch (Throwable t) { LOG.error(t.getMessage(), t); - builder.setResultCode(ResultCode.ERROR); String errorMessage = t.getMessage() == null ? t.getClass().getName() : t.getMessage(); - builder.setErrorMessage(errorMessage); - builder.setErrorTrace(org.apache.hadoop.util.StringUtils.stringifyException(t)); + builder.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR, + errorMessage, org.apache.hadoop.util.StringUtils.stringifyException(t))); } return builder.build(); } @@ -581,11 +567,11 @@ public class TajoMasterClientService extends AbstractService { if (queryInfo != null) { builder.setQueryInfo(queryInfo.getProto()); } - builder.setResultCode(ResultCode.OK); + builder.setResult(IPCUtil.buildOkRequestResult()); } catch (Throwable t) { LOG.warn(t.getMessage(), t); - builder.setResultCode(ResultCode.ERROR); - builder.setErrorMessage(org.apache.hadoop.util.StringUtils.stringifyException(t)); + builder.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR, + org.apache.hadoop.util.StringUtils.stringifyException(t), null)); } return builder.build(); @@ -779,13 +765,13 @@ public class TajoMasterClientService extends AbstractService { if (catalog.existsTable(databaseName, tableName)) { return TableResponse.newBuilder() - .setResultCode(ResultCode.OK) + .setResult(IPCUtil.buildOkRequestResult()) .setTableDesc(catalog.getTableDesc(databaseName, tableName).getProto()) .build(); } else { return TableResponse.newBuilder() - .setResultCode(ResultCode.ERROR) - .setErrorMessage("ERROR: no such a table: " + request.getTableName()) + .setResult(IPCUtil.buildRequestResult(ResultCode.ERROR, + "ERROR: no such a table: " + request.getTableName(), null)) .build(); } } catch (Throwable t) { @@ -821,21 +807,21 @@ public class TajoMasterClientService extends AbstractService { meta, path, true, partitionDesc, false); } catch (Exception e) { return TableResponse.newBuilder() - .setResultCode(ResultCode.ERROR) - .setErrorMessage(e.getMessage()).build(); + .setResult(IPCUtil.buildRequestResult(ResultCode.ERROR, e.getMessage(), null)) + .build(); } return TableResponse.newBuilder() - .setResultCode(ResultCode.OK) + .setResult(IPCUtil.buildOkRequestResult()) .setTableDesc(desc.getProto()).build(); } catch (InvalidSessionException ise) { return TableResponse.newBuilder() - .setResultCode(ResultCode.ERROR) - .setErrorMessage(ise.getMessage()).build(); + .setResult(IPCUtil.buildRequestResult(ResultCode.ERROR, ise.getMessage(), null)) + .build(); } catch (IOException ioe) { return TableResponse.newBuilder() - .setResultCode(ResultCode.ERROR) - .setErrorMessage(ioe.getMessage()).build(); + .setResult(IPCUtil.buildRequestResult(ResultCode.ERROR, ioe.getMessage(), null)) + .build(); } } @@ -875,12 +861,184 @@ public class TajoMasterClientService extends AbstractService { } } return FunctionResponse.newBuilder() - .setResultCode(ResultCode.OK) + .setResult(IPCUtil.buildOkRequestResult()) .addAllFunctions(functionProtos) .build(); } catch (Throwable t) { throw new ServiceException(t); } } + + @Override + public IndexDescProto getIndexWithName(RpcController controller, SessionedStringProto request) + throws ServiceException { + try { + context.getSessionManager().touch(request.getSessionId().getId()); + Session session = context.getSessionManager().getSession(request.getSessionId().getId()); + + String indexName, databaseName; + if (CatalogUtil.isFQTableName(request.getValue())) { + String [] splitted = CatalogUtil.splitFQTableName(request.getValue()); + databaseName = splitted[0]; + indexName = splitted[1]; + } else { + databaseName = session.getCurrentDatabase(); + indexName = request.getValue(); + } + return catalog.getIndexByName(databaseName, indexName).getProto(); + } catch (Throwable t) { + throw new ServiceException(t); + } + } + + @Override + public BoolProto existIndexWithName(RpcController controller, SessionedStringProto request) + throws ServiceException { + try { + context.getSessionManager().touch(request.getSessionId().getId()); + Session session = context.getSessionManager().getSession(request.getSessionId().getId()); + + String indexName, databaseName; + if (CatalogUtil.isFQTableName(request.getValue())) { + String [] splitted = CatalogUtil.splitFQTableName(request.getValue()); + databaseName = splitted[0]; + indexName = splitted[1]; + } else { + databaseName = session.getCurrentDatabase(); + indexName = request.getValue(); + } + return catalog.existIndexByName(databaseName, indexName) ? + ProtoUtil.TRUE : ProtoUtil.FALSE; + } catch (Throwable t) { + throw new ServiceException(t); + } + } + + @Override + public GetIndexesResponse getIndexesForTable(RpcController controller, SessionedStringProto request) + throws ServiceException { + try { + context.getSessionManager().touch(request.getSessionId().getId()); + Session session = context.getSessionManager().getSession(request.getSessionId().getId()); + + String tableName, databaseName; + if (CatalogUtil.isFQTableName(request.getValue())) { + String [] splitted = CatalogUtil.splitFQTableName(request.getValue()); + databaseName = splitted[0]; + tableName = splitted[1]; + } else { + databaseName = session.getCurrentDatabase(); + tableName = request.getValue(); + } + + GetIndexesResponse.Builder builder = GetIndexesResponse.newBuilder(); + for (IndexDesc index : catalog.getAllIndexesByTable(databaseName, tableName)) { + builder.addIndexes(index.getProto()); + } + builder.setResult(IPCUtil.buildOkRequestResult()); + return builder.build(); + } catch (Throwable t) { + throw new ServiceException(t); + } + } + + @Override + public BoolProto existIndexesForTable(RpcController controller, SessionedStringProto request) + throws ServiceException { + try { + context.getSessionManager().touch(request.getSessionId().getId()); + Session session = context.getSessionManager().getSession(request.getSessionId().getId()); + + String tableName, databaseName; + if (CatalogUtil.isFQTableName(request.getValue())) { + String [] splitted = CatalogUtil.splitFQTableName(request.getValue()); + databaseName = splitted[0]; + tableName = splitted[1]; + } else { + databaseName = session.getCurrentDatabase(); + tableName = request.getValue(); + } + return catalog.existIndexesByTable(databaseName, tableName) ? + ProtoUtil.TRUE : ProtoUtil.FALSE; + } catch (Throwable t) { + throw new ServiceException(t); + } + } + + @Override + public GetIndexWithColumnsResponse getIndexWithColumns(RpcController controller, GetIndexWithColumnsRequest request) + throws ServiceException { + try { + context.getSessionManager().touch(request.getSessionId().getId()); + Session session = context.getSessionManager().getSession(request.getSessionId().getId()); + + String tableName, databaseName; + if (CatalogUtil.isFQTableName(request.getTableName())) { + String [] splitted = CatalogUtil.splitFQTableName(request.getTableName()); + databaseName = splitted[0]; + tableName = splitted[1]; + } else { + databaseName = session.getCurrentDatabase(); + tableName = request.getTableName(); + } + String[] columnNames = new String[request.getColumnNamesCount()]; + columnNames = request.getColumnNamesList().toArray(columnNames); + + GetIndexWithColumnsResponse.Builder builder = GetIndexWithColumnsResponse.newBuilder(); + builder.setResult(IPCUtil.buildOkRequestResult()); + builder.setIndexDesc(catalog.getIndexByColumnNames(databaseName, tableName, columnNames).getProto()); + return builder.build(); + } catch (Throwable t) { + throw new ServiceException(t); + } + } + + @Override + public BoolProto existIndexWithColumns(RpcController controller, GetIndexWithColumnsRequest request) + throws ServiceException { + try { + context.getSessionManager().touch(request.getSessionId().getId()); + Session session = context.getSessionManager().getSession(request.getSessionId().getId()); + + String tableName, databaseName; + if (CatalogUtil.isFQTableName(request.getTableName())) { + String [] splitted = CatalogUtil.splitFQTableName(request.getTableName()); + databaseName = splitted[0]; + tableName = splitted[1]; + } else { + databaseName = session.getCurrentDatabase(); + tableName = request.getTableName(); + } + String[] columnNames = new String[request.getColumnNamesCount()]; + columnNames = request.getColumnNamesList().toArray(columnNames); + return catalog.existIndexByColumnNames(databaseName, tableName, columnNames) ? + ProtoUtil.TRUE : ProtoUtil.FALSE; + } catch (Throwable t) { + throw new ServiceException(t); + } + } + + @Override + public BoolProto dropIndex(RpcController controller, SessionedStringProto request) + throws ServiceException { + try { + context.getSessionManager().touch(request.getSessionId().getId()); + Session session = context.getSessionManager().getSession(request.getSessionId().getId()); + + String indexName, databaseName; + if (CatalogUtil.isFQTableName(request.getValue())) { + String [] splitted = CatalogUtil.splitFQTableName(request.getValue()); + databaseName = splitted[0]; + indexName = splitted[1]; + } else { + databaseName = session.getCurrentDatabase(); + indexName = request.getValue(); + } + return catalog.dropIndex(databaseName, indexName) ? + ProtoUtil.TRUE : ProtoUtil.FALSE; + } catch (Throwable t) { + throw new ServiceException(t); + } + } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java index acbaa01..5b12438 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java @@ -68,45 +68,88 @@ public class DDLExecutor { switch (root.getType()) { - case ALTER_TABLESPACE: - AlterTablespaceNode alterTablespace = (AlterTablespaceNode) root; - alterTablespace(context, queryContext, alterTablespace); - return true; - - - case CREATE_DATABASE: - CreateDatabaseNode createDatabase = (CreateDatabaseNode) root; - createDatabase(queryContext, createDatabase.getDatabaseName(), null, createDatabase.isIfNotExists()); - return true; - case DROP_DATABASE: - DropDatabaseNode dropDatabaseNode = (DropDatabaseNode) root; - dropDatabase(queryContext, dropDatabaseNode.getDatabaseName(), dropDatabaseNode.isIfExists()); - return true; - - - case CREATE_TABLE: - CreateTableNode createTable = (CreateTableNode) root; - createTable(queryContext, createTable, createTable.isIfNotExists()); - return true; - case DROP_TABLE: - DropTableNode dropTable = (DropTableNode) root; - dropTable(queryContext, dropTable.getTableName(), dropTable.isIfExists(), dropTable.isPurge()); - return true; - case TRUNCATE_TABLE: - TruncateTableNode truncateTable = (TruncateTableNode) root; - truncateTable(queryContext, truncateTable); - return true; - - case ALTER_TABLE: - AlterTableNode alterTable = (AlterTableNode) root; - alterTable(context, queryContext, alterTable); - return true; + case ALTER_TABLESPACE: + AlterTablespaceNode alterTablespace = (AlterTablespaceNode) root; + alterTablespace(context, queryContext, alterTablespace); + return true; + + + case CREATE_DATABASE: + CreateDatabaseNode createDatabase = (CreateDatabaseNode) root; + createDatabase(queryContext, createDatabase.getDatabaseName(), null, createDatabase.isIfNotExists()); + return true; + case DROP_DATABASE: + DropDatabaseNode dropDatabaseNode = (DropDatabaseNode) root; + dropDatabase(queryContext, dropDatabaseNode.getDatabaseName(), dropDatabaseNode.isIfExists()); + return true; + + + case CREATE_TABLE: + CreateTableNode createTable = (CreateTableNode) root; + createTable(queryContext, createTable, createTable.isIfNotExists()); + return true; + case DROP_TABLE: + DropTableNode dropTable = (DropTableNode) root; + dropTable(queryContext, dropTable.getTableName(), dropTable.isIfExists(), dropTable.isPurge()); + return true; + case TRUNCATE_TABLE: + TruncateTableNode truncateTable = (TruncateTableNode) root; + truncateTable(queryContext, truncateTable); + return true; + + case ALTER_TABLE: + AlterTableNode alterTable = (AlterTableNode) root; + alterTable(context, queryContext, alterTable); + return true; + + case CREATE_INDEX: + // The catalog information for the created index is automatically updated when the query is successfully finished. + // See the Query.CreateIndexHook class. + return true; + + case DROP_INDEX: + DropIndexNode dropIndexNode = (DropIndexNode) root; + dropIndex(queryContext, dropIndexNode); + return true; default: throw new InternalError("updateQuery cannot handle such query: \n" + root.toJson()); } } + public void dropIndex(final QueryContext queryContext, final DropIndexNode dropIndexNode) { + String databaseName, simpleIndexName; + if (CatalogUtil.isFQTableName(dropIndexNode.getIndexName())) { + String [] splits = CatalogUtil.splitFQTableName(dropIndexNode.getIndexName()); + databaseName = splits[0]; + simpleIndexName = splits[1]; + } else { + databaseName = queryContext.getCurrentDatabase(); + simpleIndexName = dropIndexNode.getIndexName(); + } + + if (!catalog.existIndexByName(databaseName, simpleIndexName)) { + throw new NoSuchIndexException(simpleIndexName); + } + + IndexDesc desc = catalog.getIndexByName(databaseName, simpleIndexName); + + if (!catalog.dropIndex(databaseName, simpleIndexName)) { + LOG.info("Cannot drop index \"" + simpleIndexName + "\"."); + throw new CatalogException("Cannot drop index \"" + simpleIndexName + "\"."); + } + + Path indexPath = new Path(desc.getIndexPath()); + try { + FileSystem fs = indexPath.getFileSystem(context.getConf()); + fs.delete(indexPath, true); + } catch (IOException e) { + throw new InternalError(e.getMessage()); + } + + LOG.info("Index " + simpleIndexName + " is dropped."); + } + /** * Alter a given table */ http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index 2242445..26476a3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -29,8 +29,10 @@ import org.apache.tajo.QueryIdFactory; import org.apache.tajo.SessionVars; import org.apache.tajo.TajoConstants; import org.apache.tajo.catalog.CatalogService; +import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.exception.AlreadyExistsIndexException; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.common.TajoDataTypes; @@ -40,6 +42,7 @@ import org.apache.tajo.engine.planner.physical.EvalExprExec; import org.apache.tajo.engine.planner.physical.StoreTableExec; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.ClientProtos; +import org.apache.tajo.ipc.ClientProtos.ResultCode; import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse; import org.apache.tajo.master.NonForwardQueryResultFileScanner; import org.apache.tajo.master.NonForwardQueryResultScanner; @@ -58,6 +61,7 @@ import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.verifier.VerifyException; import org.apache.tajo.storage.*; +import org.apache.tajo.util.IPCUtil; import org.apache.tajo.util.ProtoUtil; import org.apache.tajo.worker.TaskAttemptContext; @@ -98,10 +102,18 @@ public class QueryExecutor { } else if (PlannerUtil.checkIfDDLPlan(rootNode)) { context.getSystemMetrics().counter("Query", "numDDLQuery").inc(); - ddlExecutor.execute(queryContext, plan); - response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); - response.setResultCode(ClientProtos.ResultCode.OK); + if (PlannerUtil.isDistExecDDL(rootNode)) { + if (rootNode.getChild().getType() == NodeType.CREATE_INDEX) { + checkIndexExistence(queryContext, (CreateIndexNode) rootNode.getChild()); + } + executeDistributedQuery(queryContext, session, plan, sql, jsonExpr, response); + } else { + response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); + response.setResult(IPCUtil.buildOkRequestResult()); + } + + ddlExecutor.execute(queryContext, plan); } else if (plan.isExplain()) { // explain query execExplain(plan, response); @@ -142,8 +154,8 @@ public class QueryExecutor { session.selectDatabase(setSessionNode.getValue()); } else { response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); - response.setResultCode(ClientProtos.ResultCode.ERROR); - response.setErrorMessage("database \"" + databaseName + "\" does not exists."); + response.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR, + "database \"" + databaseName + "\" does not exists.", null)); } // others @@ -157,7 +169,7 @@ public class QueryExecutor { context.getSystemMetrics().counter("Query", "numDDLQuery").inc(); response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); - response.setResultCode(ClientProtos.ResultCode.OK); + response.setResult(IPCUtil.buildOkRequestResult()); } public void execExplain(LogicalPlan plan, SubmitQueryResponse.Builder response) throws IOException { @@ -183,7 +195,7 @@ public class QueryExecutor { response.setResultSet(serializedResBuilder.build()); response.setMaxRowNum(lines.length); - response.setResultCode(ClientProtos.ResultCode.OK); + response.setResult(IPCUtil.buildOkRequestResult()); response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); } @@ -205,7 +217,7 @@ public class QueryExecutor { response.setQueryId(queryId.getProto()); response.setMaxRowNum(maxRow); response.setTableDesc(queryResultScanner.getTableDesc().getProto()); - response.setResultCode(ClientProtos.ResultCode.OK); + response.setResult(IPCUtil.buildOkRequestResult()); } public void execSimpleQuery(QueryContext queryContext, Session session, String query, LogicalPlan plan, @@ -235,7 +247,7 @@ public class QueryExecutor { response.setQueryId(queryId.getProto()); response.setMaxRowNum(maxRow); response.setTableDesc(desc.getProto()); - response.setResultCode(ClientProtos.ResultCode.OK); + response.setResult(IPCUtil.buildOkRequestResult()); } public void execNonFromQuery(QueryContext queryContext, Session session, String query, @@ -267,7 +279,7 @@ public class QueryExecutor { responseBuilder.setResultSet(serializedResBuilder); responseBuilder.setMaxRowNum(1); responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); - responseBuilder.setResultCode(ClientProtos.ResultCode.OK); + responseBuilder.setResult(IPCUtil.buildOkRequestResult()); } } @@ -369,7 +381,7 @@ public class QueryExecutor { // If queryId == NULL_QUERY_ID and MaxRowNum == -1, TajoCli prints only number of inserted rows. responseBuilder.setMaxRowNum(-1); responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); - responseBuilder.setResultCode(ClientProtos.ResultCode.OK); + responseBuilder.setResult(IPCUtil.buildOkRequestResult()); } public void executeDistributedQuery(QueryContext queryContext, Session session, @@ -398,13 +410,13 @@ public class QueryExecutor { if(queryInfo == null) { responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); - responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR); - responseBuilder.setErrorMessage("Fail starting QueryMaster."); + responseBuilder.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR, + "Fail starting QueryMaster.", null)); LOG.error("Fail starting QueryMaster: " + sql); } else { responseBuilder.setIsForwarded(true); responseBuilder.setQueryId(queryInfo.getQueryId().getProto()); - responseBuilder.setResultCode(ClientProtos.ResultCode.OK); + responseBuilder.setResult(IPCUtil.buildOkRequestResult()); if(queryInfo.getQueryMasterHost() != null) { responseBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost()); } @@ -413,4 +425,23 @@ public class QueryExecutor { " is forwarded to " + queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort()); } } + + private void checkIndexExistence(final QueryContext queryContext, final CreateIndexNode createIndexNode) + throws IOException { + String databaseName, simpleIndexName, qualifiedIndexName; + if (CatalogUtil.isFQTableName(createIndexNode.getIndexName())) { + String [] splits = CatalogUtil.splitFQTableName(createIndexNode.getIndexName()); + databaseName = splits[0]; + simpleIndexName = splits[1]; + qualifiedIndexName = createIndexNode.getIndexName(); + } else { + databaseName = queryContext.getCurrentDatabase(); + simpleIndexName = createIndexNode.getIndexName(); + qualifiedIndexName = CatalogUtil.buildFQName(databaseName, simpleIndexName); + } + + if (catalog.existIndexByName(databaseName, simpleIndexName)) { + throw new AlreadyExistsIndexException(qualifiedIndexName); + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java index a626df1..88ecebc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java @@ -32,6 +32,8 @@ import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryId; import org.apache.tajo.SessionVars; import org.apache.tajo.TajoProtos.QueryState; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.exception.CatalogException; import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto; import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.TableDesc; @@ -472,6 +474,7 @@ public class Query implements EventHandler<QueryEvent> { hookList.add(new MaterializedResultHook()); hookList.add(new CreateTableHook()); hookList.add(new InsertTableHook()); + hookList.add(new CreateIndexHook()); } public void execute(QueryContext queryContext, Query query, @@ -485,6 +488,48 @@ public class Query implements EventHandler<QueryEvent> { } } + private class CreateIndexHook implements QueryHook { + + @Override + public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) { + Stage lastStage = query.getStage(finalExecBlockId); + return lastStage.getBlock().getPlan().getType() == NodeType.CREATE_INDEX; + } + + @Override + public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception { + CatalogService catalog = context.getWorkerContext().getCatalog(); + Stage lastStage = query.getStage(finalExecBlockId); + + CreateIndexNode createIndexNode = (CreateIndexNode) lastStage.getBlock().getPlan(); + String databaseName, simpleIndexName, qualifiedIndexName; + if (CatalogUtil.isFQTableName(createIndexNode.getIndexName())) { + String [] splits = CatalogUtil.splitFQTableName(createIndexNode.getIndexName()); + databaseName = splits[0]; + simpleIndexName = splits[1]; + qualifiedIndexName = createIndexNode.getIndexName(); + } else { + databaseName = queryContext.getCurrentDatabase(); + simpleIndexName = createIndexNode.getIndexName(); + qualifiedIndexName = CatalogUtil.buildFQName(databaseName, simpleIndexName); + } + ScanNode scanNode = PlannerUtil.findTopNode(createIndexNode, NodeType.SCAN); + if (scanNode == null) { + throw new IOException("Cannot find the table of the relation"); + } + IndexDesc indexDesc = new IndexDesc(databaseName, scanNode.getTableName(), + simpleIndexName, createIndexNode.getIndexPath(), + createIndexNode.getKeySortSpecs(), createIndexNode.getIndexMethod(), + createIndexNode.isUnique(), false, scanNode.getLogicalSchema()); + if (catalog.createIndex(indexDesc)) { + LOG.info("Index " + qualifiedIndexName + " is created for the table " + scanNode.getTableName() + "."); + } else { + LOG.info("Index creation " + qualifiedIndexName + " is failed."); + throw new CatalogException("Cannot create index \"" + qualifiedIndexName + "\"."); + } + } + } + private class MaterializedResultHook implements QueryHook { @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index 9c789a5..a125415 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@ -38,6 +38,12 @@ import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.plan.LogicalOptimizer; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.logical.LogicalRootNode; +import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; +import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.exception.UnimplementedException; @@ -48,15 +54,9 @@ import org.apache.tajo.master.TajoContainerProxy; import org.apache.tajo.master.event.*; import org.apache.tajo.master.rm.TajoWorkerResourceManager; import org.apache.tajo.master.session.Session; -import org.apache.tajo.plan.LogicalOptimizer; -import org.apache.tajo.plan.LogicalPlan; -import org.apache.tajo.plan.LogicalPlanner; import org.apache.tajo.plan.logical.LogicalNode; -import org.apache.tajo.plan.logical.LogicalRootNode; import org.apache.tajo.plan.logical.NodeType; import org.apache.tajo.plan.logical.ScanNode; -import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; -import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.verifier.VerifyException; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcConnectionPool; @@ -347,9 +347,10 @@ public class QueryMasterTask extends CompositeService { LOG.warn("Query already started"); return; } + LOG.info(SessionVars.INDEX_ENABLED.keyname() + " : " + queryContext.getBool(SessionVars.INDEX_ENABLED)); CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog(); LogicalPlanner planner = new LogicalPlanner(catalog); - LogicalOptimizer optimizer = new LogicalOptimizer(systemConf); + LogicalOptimizer optimizer = new LogicalOptimizer(systemConf, catalog); Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class); jsonExpr = null; // remove the possible OOM plan = planner.createPlan(queryContext, expr); @@ -393,6 +394,14 @@ public class QueryMasterTask extends CompositeService { tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc()); } } + + scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.INDEX_SCAN); + if (scanNodes != null) { + for (LogicalNode eachScanNode : scanNodes) { + ScanNode scanNode = (ScanNode) eachScanNode; + tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc()); + } + } } MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan); queryMasterContext.getGlobalPlanner().build(masterPlan); http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/util/IPCUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/IPCUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/IPCUtil.java new file mode 100644 index 0000000..dcffe62 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/util/IPCUtil.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.util; + +import org.apache.tajo.annotation.Nullable; +import org.apache.tajo.ipc.ClientProtos.RequestResult; +import org.apache.tajo.ipc.ClientProtos.ResultCode; + +public class IPCUtil { + + public static RequestResult buildOkRequestResult() { + return buildRequestResult(ResultCode.OK, null, null); + } + + public static RequestResult buildRequestResult(ResultCode code, + @Nullable String errorMessage, + @Nullable String errorTrace) { + RequestResult.Builder builder = RequestResult.newBuilder(); + builder.setResultCode(code); + if (errorMessage != null) { + builder.setErrorMessage(errorMessage); + } + if (errorTrace != null) { + builder.setErrorTrace(errorTrace); + } + return builder.build(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java deleted file mode 100644 index 3147bb6..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java +++ /dev/null @@ -1,149 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.util; - -import com.google.gson.Gson; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.SortSpec; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.engine.json.CoreGsonHelper; -import org.apache.tajo.plan.LogicalPlan; -import org.apache.tajo.plan.expr.*; -import org.apache.tajo.plan.logical.IndexScanNode; -import org.apache.tajo.plan.logical.ScanNode; -import org.apache.tajo.storage.fragment.FileFragment; - -import java.util.Iterator; -import java.util.LinkedList; -import java.util.Map.Entry; - -public class IndexUtil { - public static String getIndexNameOfFrag(FileFragment fragment, SortSpec[] keys) { - StringBuilder builder = new StringBuilder(); - builder.append(fragment.getPath().getName() + "_"); - builder.append(fragment.getStartKey() + "_" + fragment.getLength() + "_"); - for(int i = 0 ; i < keys.length ; i ++) { - builder.append(keys[i].getSortKey().getSimpleName()+"_"); - } - builder.append("_index"); - return builder.toString(); - - } - - public static String getIndexName(String indexName , SortSpec[] keys) { - StringBuilder builder = new StringBuilder(); - builder.append(indexName + "_"); - for(int i = 0 ; i < keys.length ; i ++) { - builder.append(keys[i].getSortKey().getSimpleName() + "_"); - } - return builder.toString(); - } - - public static IndexScanNode indexEval(LogicalPlan plan, ScanNode scanNode, - Iterator<Entry<String, String>> iter ) { - - EvalNode qual = scanNode.getQual(); - Gson gson = CoreGsonHelper.getInstance(); - - FieldAndValueFinder nodeFinder = new FieldAndValueFinder(); - qual.preOrder(nodeFinder); - LinkedList<BinaryEval> nodeList = nodeFinder.getNodeList(); - - int maxSize = Integer.MIN_VALUE; - SortSpec[] maxIndex = null; - - String json; - while(iter.hasNext()) { - Entry<String , String> entry = iter.next(); - json = entry.getValue(); - SortSpec[] sortKey = gson.fromJson(json, SortSpec[].class); - if(sortKey.length > nodeList.size()) { - /* If the number of the sort key is greater than where condition, - * this index cannot be used - * */ - continue; - } else { - boolean[] equal = new boolean[sortKey.length]; - for(int i = 0 ; i < sortKey.length ; i ++) { - for(int j = 0 ; j < nodeList.size() ; j ++) { - Column col = ((FieldEval)(nodeList.get(j).getLeftExpr())).getColumnRef(); - if(col.equals(sortKey[i].getSortKey())) { - equal[i] = true; - } - } - } - boolean chk = true; - for(int i = 0 ; i < equal.length ; i ++) { - chk = chk && equal[i]; - } - if(chk) { - if(maxSize < sortKey.length) { - maxSize = sortKey.length; - maxIndex = sortKey; - } - } - } - } - if(maxIndex == null) { - return null; - } else { - Schema keySchema = new Schema(); - for(int i = 0 ; i < maxIndex.length ; i ++ ) { - keySchema.addColumn(maxIndex[i].getSortKey()); - } - Datum[] datum = new Datum[nodeList.size()]; - for(int i = 0 ; i < nodeList.size() ; i ++ ) { - datum[i] = ((ConstEval)(nodeList.get(i).getRightExpr())).getValue(); - } - - return new IndexScanNode(plan.newPID(), scanNode, keySchema , datum , maxIndex); - } - - } - - - private static class FieldAndValueFinder implements EvalNodeVisitor { - private LinkedList<BinaryEval> nodeList = new LinkedList<BinaryEval>(); - - public LinkedList<BinaryEval> getNodeList () { - return this.nodeList; - } - - @Override - public void visit(EvalNode node) { - BinaryEval binaryEval = (BinaryEval) node; - switch(node.getType()) { - case AND: - break; - case EQUAL: - if( binaryEval.getLeftExpr().getType() == EvalType.FIELD - && binaryEval.getRightExpr().getType() == EvalType.CONST ) { - nodeList.add(binaryEval); - } - break; - case IS_NULL: - if( binaryEval.getLeftExpr().getType() == EvalType.FIELD - && binaryEval.getRightExpr().getType() == EvalType.CONST) { - nodeList.add(binaryEval); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java index da25fe6..cd9e6ef 100644 --- a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java +++ b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java @@ -304,7 +304,7 @@ public class QueryExecutorServlet extends HttpServlet { LOG.error("Internal Error: SubmissionResponse is NULL"); error = new Exception("Internal Error: SubmissionResponse is NULL"); - } else if (response.getResultCode() == ClientProtos.ResultCode.OK) { + } else if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) { if (response.getIsForwarded()) { queryId = new QueryId(response.getQueryId()); getQueryResult(queryId); @@ -316,9 +316,9 @@ public class QueryExecutorServlet extends HttpServlet { progress.set(100); } - } else if (response.getResultCode() == ClientProtos.ResultCode.ERROR) { - if (response.hasErrorMessage()) { - StringBuffer errorMessage = new StringBuffer(response.getErrorMessage()); + } else if (response.getResult().getResultCode() == ClientProtos.ResultCode.ERROR) { + if (response.getResult().hasErrorMessage()) { + StringBuffer errorMessage = new StringBuffer(response.getResult().getErrorMessage()); String modifiedMessage; if (errorMessage.length() > 200) { http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java index 1c83110..6a13898 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java @@ -25,10 +25,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.StringUtils; import org.apache.tajo.QueryId; +import org.apache.tajo.annotation.Nullable; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.ClientProtos.GetQueryHistoryResponse; import org.apache.tajo.ipc.ClientProtos.QueryIdRequest; +import org.apache.tajo.ipc.ClientProtos.RequestResult; import org.apache.tajo.ipc.ClientProtos.ResultCode; import org.apache.tajo.ipc.QueryMasterClientProtocol; import org.apache.tajo.master.querymaster.QueryMasterTask; @@ -129,14 +132,32 @@ public class TajoWorkerClientService extends AbstractService { if (queryHistory != null) { builder.setQueryHistory(queryHistory.getProto()); } - builder.setResultCode(ResultCode.OK); + builder.setResult(buildOkRequestResult()); } catch (Throwable t) { LOG.warn(t.getMessage(), t); - builder.setResultCode(ResultCode.ERROR); - builder.setErrorMessage(org.apache.hadoop.util.StringUtils.stringifyException(t)); + builder.setResult(buildRequestResult(ResultCode.ERROR, + StringUtils.stringifyException(t), null)); } return builder.build(); } + + private RequestResult buildOkRequestResult() { + return buildRequestResult(ResultCode.OK, null, null); + } + + private RequestResult buildRequestResult(ResultCode code, + @Nullable String errorMessage, + @Nullable String errorTrace) { + RequestResult.Builder builder = RequestResult.newBuilder(); + builder.setResultCode(code); + if (errorMessage != null) { + builder.setErrorMessage(errorMessage); + } + if (errorTrace != null) { + builder.setErrorTrace(errorTrace); + } + return builder.build(); + } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 5f9c6ac..b784c64 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -125,21 +125,9 @@ public class Task { this.inputStats = new TableStats(); plan = LogicalNodeDeserializer.deserialize(queryContext, request.getPlan()); - LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN); - if (scanNode != null) { - for (LogicalNode node : scanNode) { - ScanNode scan = (ScanNode) node; - descs.put(scan.getCanonicalName(), scan.getTableDesc()); - } - } - - LogicalNode [] partitionScanNode = PlannerUtil.findAllNodes(plan, NodeType.PARTITIONS_SCAN); - if (partitionScanNode != null) { - for (LogicalNode node : partitionScanNode) { - PartitionedTableScanNode scan = (PartitionedTableScanNode) node; - descs.put(scan.getCanonicalName(), scan.getTableDesc()); - } - } + updateDescsForScanNodes(NodeType.SCAN); + updateDescsForScanNodes(NodeType.PARTITIONS_SCAN); + updateDescsForScanNodes(NodeType.INDEX_SCAN); interQuery = request.getProto().getInterQuery(); if (interQuery) { @@ -181,6 +169,17 @@ public class Task { LOG.info("=================================="); } + private void updateDescsForScanNodes(NodeType nodeType) { + assert nodeType == NodeType.SCAN || nodeType == NodeType.PARTITIONS_SCAN || nodeType == NodeType.INDEX_SCAN; + LogicalNode[] scanNodes = PlannerUtil.findAllNodes(plan, nodeType); + if (scanNodes != null) { + for (LogicalNode node : scanNodes) { + ScanNode scanNode = (ScanNode) node; + descs.put(scanNode.getCanonicalName(), scanNode.getTableDesc()); + } + } + } + public void init() throws IOException { if (context.getState() == TaskAttemptState.TA_PENDING) { // initialize a task temporal dir http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java index dfc8a9b..6c0af89 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java @@ -379,12 +379,15 @@ public class TaskAttemptContext { return fragmentMap.get(id).toArray(new FragmentProto[fragmentMap.get(id).size()]); } - public long getUniqueKeyFromFragments() { - List<FragmentProto> totalFragments = new ArrayList<FragmentProto>(); - for (List<FragmentProto> eachFragments : fragmentMap.values()) { - totalFragments.addAll(eachFragments); + public String getUniqueKeyFromFragments() { + StringBuilder sb = new StringBuilder(); + for (List<FragmentProto> fragments : fragmentMap.values()) { + for (FragmentProto f : fragments) { + FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, f); + sb.append(fileFragment.getPath().getName()).append(fileFragment.getStartKey()).append(fileFragment.getLength()); + } } - return Objects.hashCode(totalFragments.toArray(new FragmentProto[totalFragments.size()])); + return sb.toString(); } public int hashCode() { http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java index 4e4b710..615dbb9 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java @@ -36,11 +36,11 @@ import org.apache.tajo.engine.codegen.TajoClassLoader; import org.apache.tajo.engine.function.FunctionLoader; import org.apache.tajo.engine.json.CoreGsonHelper; import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.plan.*; import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.serder.EvalNodeDeserializer; import org.apache.tajo.plan.serder.EvalNodeSerializer; -import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.plan.serder.PlanProto; import org.apache.tajo.plan.verifier.LogicalPlanVerifier; @@ -98,7 +98,7 @@ public class ExprTestBase { analyzer = new SQLAnalyzer(); preLogicalPlanVerifier = new PreLogicalPlanVerifier(cat); planner = new LogicalPlanner(cat); - optimizer = new LogicalOptimizer(util.getConfiguration()); + optimizer = new LogicalOptimizer(util.getConfiguration(), cat); annotatedPlanVerifier = new LogicalPlanVerifier(util.getConfiguration(), cat); } http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java index c9b52fd..3122b25 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java @@ -104,9 +104,9 @@ public class TestLogicalOptimizer { catalog.createFunction(funcDesc); sqlAnalyzer = new SQLAnalyzer(); planner = new LogicalPlanner(catalog); - optimizer = new LogicalOptimizer(util.getConfiguration()); defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration()); + optimizer = new LogicalOptimizer(util.getConfiguration(), catalog); } @AfterClass http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java index 996d736..0bf4602 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java @@ -497,7 +497,7 @@ public class TestLogicalPlanner { Schema expected = tpch.getOutSchema("q2"); assertSchema(expected, node.getOutSchema()); - LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration()); + LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog); optimizer.optimize(plan); LogicalNode[] nodes = PlannerUtil.findAllNodes(node, NodeType.JOIN); @@ -536,7 +536,7 @@ public class TestLogicalPlanner { LogicalNode node = plan.getRootBlock().getRoot(); testJsonSerDerObject(node); - LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration()); + LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog); optimizer.optimize(plan); LogicalNode[] nodes = PlannerUtil.findAllNodes(node, NodeType.SCAN); @@ -577,7 +577,7 @@ public class TestLogicalPlanner { LogicalNode node = plan.getRootBlock().getRoot(); testJsonSerDerObject(node); - LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration()); + LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog); optimizer.optimize(plan); LogicalNode[] nodes = PlannerUtil.findAllNodes(node, NodeType.SCAN); @@ -624,7 +624,7 @@ public class TestLogicalPlanner { LogicalNode node = plan.getRootBlock().getRoot(); testJsonSerDerObject(node); - LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration()); + LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog); optimizer.optimize(plan); Map<BinaryEval, Boolean> scanMap = TUtil.newHashMap(); http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java index 3803c7a..9f20776 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java @@ -180,7 +180,7 @@ public class TestBroadcastJoinPlan { "join small2 on small1_id = small2_id"; LogicalPlanner planner = new LogicalPlanner(catalog); - LogicalOptimizer optimizer = new LogicalOptimizer(conf); + LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog); Expr expr = analyzer.parse(query); LogicalPlan plan = planner.createPlan(defaultContext, expr); @@ -241,7 +241,7 @@ public class TestBroadcastJoinPlan { "join small3 on small1_id = small3_id"; LogicalPlanner planner = new LogicalPlanner(catalog); - LogicalOptimizer optimizer = new LogicalOptimizer(conf); + LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog); Expr expr = analyzer.parse(query); LogicalPlan plan = planner.createPlan(defaultContext, expr); @@ -305,7 +305,7 @@ public class TestBroadcastJoinPlan { "join large2 on large1_id = large2_id "; LogicalPlanner planner = new LogicalPlanner(catalog); - LogicalOptimizer optimizer = new LogicalOptimizer(conf); + LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog); Expr expr = analyzer.parse(query); LogicalPlan plan = planner.createPlan(defaultContext, expr); @@ -333,7 +333,7 @@ public class TestBroadcastJoinPlan { "join small2 on large2_id = small2_id"; LogicalPlanner planner = new LogicalPlanner(catalog); - LogicalOptimizer optimizer = new LogicalOptimizer(conf); + LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog); Expr expr = analyzer.parse(query); LogicalPlan plan = planner.createPlan(defaultContext, expr); @@ -383,7 +383,7 @@ public class TestBroadcastJoinPlan { "join small2 on a.small1_id = small2_id"; LogicalPlanner planner = new LogicalPlanner(catalog); - LogicalOptimizer optimizer = new LogicalOptimizer(conf); + LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog); Expr expr = analyzer.parse(query); LogicalPlan plan = planner.createPlan(defaultContext, expr); @@ -424,7 +424,7 @@ public class TestBroadcastJoinPlan { "join (select * from small1) a on large1_id = a.small1_id"; LogicalPlanner planner = new LogicalPlanner(catalog); - LogicalOptimizer optimizer = new LogicalOptimizer(conf); + LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog); Expr expr = analyzer.parse(query); LogicalPlan plan = planner.createPlan(defaultContext, expr); @@ -480,7 +480,7 @@ public class TestBroadcastJoinPlan { "left outer join large2 on small1_id = large2_id "; LogicalPlanner planner = new LogicalPlanner(catalog); - LogicalOptimizer optimizer = new LogicalOptimizer(conf); + LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog); Expr expr = analyzer.parse(query); LogicalPlan plan = planner.createPlan(defaultContext, expr); @@ -534,7 +534,7 @@ public class TestBroadcastJoinPlan { "left outer join small3 on large1_id = small3_id "; LogicalPlanner planner = new LogicalPlanner(catalog); - LogicalOptimizer optimizer = new LogicalOptimizer(conf); + LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog); Expr expr = analyzer.parse(query); LogicalPlan plan = planner.createPlan(defaultContext, expr); @@ -617,7 +617,7 @@ public class TestBroadcastJoinPlan { "left outer join small3 on large3_id = small3_id "; LogicalPlanner planner = new LogicalPlanner(catalog); - LogicalOptimizer optimizer = new LogicalOptimizer(conf); + LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog); Expr expr = analyzer.parse(query); LogicalPlan plan = planner.createPlan(defaultContext, expr); @@ -700,7 +700,7 @@ public class TestBroadcastJoinPlan { "left outer join small3 on small1_id = small3_id "; LogicalPlanner planner = new LogicalPlanner(catalog); - LogicalOptimizer optimizer = new LogicalOptimizer(conf); + LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog); Expr expr = analyzer.parse(query); LogicalPlan plan = planner.createPlan(defaultContext, expr); @@ -759,7 +759,7 @@ public class TestBroadcastJoinPlan { "left outer join small3 on small1_id = small3_id " ; LogicalPlanner planner = new LogicalPlanner(catalog); - LogicalOptimizer optimizer = new LogicalOptimizer(conf); + LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog); Expr expr = analyzer.parse(query); LogicalPlan plan = planner.createPlan(defaultContext, expr); @@ -812,7 +812,7 @@ public class TestBroadcastJoinPlan { "left outer join small3 on small1_id = small3_id " ; LogicalPlanner planner = new LogicalPlanner(catalog); - LogicalOptimizer optimizer = new LogicalOptimizer(conf); + LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog); Expr expr = analyzer.parse(query); LogicalPlan plan = planner.createPlan(defaultContext, expr); @@ -904,7 +904,7 @@ public class TestBroadcastJoinPlan { "left outer join small3 on small3_id = large1_id " ; LogicalPlanner planner = new LogicalPlanner(catalog); - LogicalOptimizer optimizer = new LogicalOptimizer(conf); + LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog); Expr expr = analyzer.parse(query); LogicalPlan plan = planner.createPlan(defaultContext, expr); @@ -969,7 +969,7 @@ public class TestBroadcastJoinPlan { "left outer join small2 on large1_id = small2_id " ; LogicalPlanner planner = new LogicalPlanner(catalog); - LogicalOptimizer optimizer = new LogicalOptimizer(conf); + LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog); Expr expr = analyzer.parse(query); LogicalPlan plan = planner.createPlan(defaultContext, expr); http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java deleted file mode 100644 index c897461..0000000 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java +++ /dev/null @@ -1,206 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.engine.planner.physical; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.LocalTajoTestingUtility; -import org.apache.tajo.TajoConstants; -import org.apache.tajo.TajoTestingCluster; -import org.apache.tajo.algebra.Expr; -import org.apache.tajo.catalog.*; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.engine.parser.SQLAnalyzer; -import org.apache.tajo.plan.LogicalOptimizer; -import org.apache.tajo.plan.LogicalPlan; -import org.apache.tajo.plan.LogicalPlanner; -import org.apache.tajo.engine.planner.PhysicalPlannerImpl; -import org.apache.tajo.plan.logical.LogicalNode; -import org.apache.tajo.plan.logical.ScanNode; -import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.storage.*; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.fragment.FragmentConvertor; -import org.apache.tajo.storage.index.bst.BSTIndex; -import org.apache.tajo.util.CommonTestingUtil; -import org.apache.tajo.worker.TaskAttemptContext; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Random; -import java.util.Stack; - -import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; -import static org.junit.Assert.assertEquals; - -public class TestBSTIndexExec { - - private TajoConf conf; - private Path idxPath; - private CatalogService catalog; - private SQLAnalyzer analyzer; - private LogicalPlanner planner; - private LogicalOptimizer optimizer; - private FileStorageManager sm; - private Schema idxSchema; - private BaseTupleComparator comp; - private BSTIndex.BSTIndexWriter writer; - private HashMap<Integer , Integer> randomValues ; - private int rndKey = -1; - private FileSystem fs; - private TableMeta meta; - private Path tablePath; - - private Random rnd = new Random(System.currentTimeMillis()); - - private TajoTestingCluster util; - - @Before - public void setup() throws Exception { - this.randomValues = new HashMap<Integer, Integer>(); - this.conf = new TajoConf(); - util = new TajoTestingCluster(); - util.startCatalogCluster(); - catalog = util.getMiniCatalogCluster().getCatalog(); - - Path workDir = CommonTestingUtil.getTestDir(); - catalog.createTablespace(DEFAULT_TABLESPACE_NAME, workDir.toUri().toString()); - catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); - sm = (FileStorageManager)StorageManager.getFileStorageManager(conf, workDir); - - idxPath = new Path(workDir, "test.idx"); - - Schema schema = new Schema(); - schema.addColumn("managerid", Type.INT4); - schema.addColumn("empid", Type.INT4); - schema.addColumn("deptname", Type.TEXT); - - this.idxSchema = new Schema(); - idxSchema.addColumn("managerid", Type.INT4); - SortSpec[] sortKeys = new SortSpec[1]; - sortKeys[0] = new SortSpec(idxSchema.getColumn("managerid"), true, false); - this.comp = new BaseTupleComparator(idxSchema, sortKeys); - - this.writer = new BSTIndex(conf).getIndexWriter(idxPath, - BSTIndex.TWO_LEVEL_INDEX, this.idxSchema, this.comp); - writer.setLoadNum(100); - writer.open(); - long offset; - - meta = CatalogUtil.newTableMeta(StoreType.CSV); - tablePath = StorageUtil.concatPath(workDir, "employee", "table.csv"); - fs = tablePath.getFileSystem(conf); - fs.mkdirs(tablePath.getParent()); - - FileAppender appender = (FileAppender)sm.getAppender(meta, schema, tablePath); - appender.init(); - Tuple tuple = new VTuple(schema.size()); - for (int i = 0; i < 10000; i++) { - - Tuple key = new VTuple(this.idxSchema.size()); - int rndKey = rnd.nextInt(250); - if(this.randomValues.containsKey(rndKey)) { - int t = this.randomValues.remove(rndKey) + 1; - this.randomValues.put(rndKey, t); - } else { - this.randomValues.put(rndKey, 1); - } - - key.put(new Datum[] { DatumFactory.createInt4(rndKey) }); - tuple.put(new Datum[] { DatumFactory.createInt4(rndKey), - DatumFactory.createInt4(rnd.nextInt(10)), - DatumFactory.createText("dept_" + rnd.nextInt(10)) }); - offset = appender.getOffset(); - appender.addTuple(tuple); - writer.write(key, offset); - } - appender.flush(); - appender.close(); - writer.close(); - - TableDesc desc = new TableDesc( - CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "employee"), schema, meta, - sm.getTablePath("employee").toUri()); - catalog.createTable(desc); - - analyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog); - optimizer = new LogicalOptimizer(conf); - } - - @After - public void tearDown() { - util.shutdownCatalogCluster(); - } - - @Test - public void testEqual() throws Exception { - this.rndKey = rnd.nextInt(250); - final String QUERY = "select * from employee where managerId = " + rndKey; - - FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", meta, tablePath, Integer.MAX_VALUE); - Path workDir = CommonTestingUtil.getTestDir("target/test-data/testEqual"); - TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), - LocalTajoTestingUtility.newTaskAttemptId(), new FileFragment[] { frags[0] }, workDir); - Expr expr = analyzer.parse(QUERY); - LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr); - LogicalNode rootNode = optimizer.optimize(plan); - - TmpPlanner phyPlanner = new TmpPlanner(conf); - PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); - - int tupleCount = this.randomValues.get(rndKey); - int counter = 0; - exec.init(); - while (exec.next() != null) { - counter ++; - } - exec.close(); - assertEquals(tupleCount , counter); - } - - private class TmpPlanner extends PhysicalPlannerImpl { - public TmpPlanner(TajoConf conf) { - super(conf); - } - - @Override - public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> stack) - throws IOException { - Preconditions.checkNotNull(ctx.getTable(scanNode.getTableName()), - "Error: There is no table matched to %s", scanNode.getTableName()); - - List<FileFragment> fragments = FragmentConvertor.convert(ctx.getConf(), ctx.getTables(scanNode.getTableName())); - - Datum[] datum = new Datum[]{DatumFactory.createInt4(rndKey)}; - - return new BSTIndexScanExec(ctx, scanNode, fragments.get(0), idxPath, idxSchema, comp , datum); - - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java index 64da88b..a64b525 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java @@ -63,6 +63,7 @@ public class TestHashAntiJoinExec { private LogicalOptimizer optimizer; private StorageManager sm; private Path testDir; + private QueryContext queryContext; private TableDesc employee; private TableDesc people; @@ -128,11 +129,12 @@ public class TestHashAntiJoinExec { appender.flush(); appender.close(); + queryContext = new QueryContext(conf); people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath); catalog.createTable(people); analyzer = new SQLAnalyzer(); planner = new LogicalPlanner(catalog); - optimizer = new LogicalOptimizer(conf); + optimizer = new LogicalOptimizer(conf, catalog); } @After @@ -159,7 +161,7 @@ public class TestHashAntiJoinExec { FileFragment[] merged = TUtil.concat(empFrags, peopleFrags); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashAntiJoin"); - TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + TaskAttemptContext ctx = new TaskAttemptContext(queryContext, LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); ctx.setEnforcer(new Enforcer()); Expr expr = analyzer.parse(QUERIES[0]); http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java index 4e218c5..196f3bf 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java @@ -64,6 +64,7 @@ public class TestHashSemiJoinExec { private LogicalOptimizer optimizer; private StorageManager sm; private Path testDir; + private QueryContext queryContext; private TableDesc employee; private TableDesc people; @@ -133,11 +134,12 @@ public class TestHashSemiJoinExec { appender.flush(); appender.close(); + queryContext = new QueryContext(conf); people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath); catalog.createTable(people); analyzer = new SQLAnalyzer(); planner = new LogicalPlanner(catalog); - optimizer = new LogicalOptimizer(conf); + optimizer = new LogicalOptimizer(conf, catalog); } @After @@ -164,7 +166,7 @@ public class TestHashSemiJoinExec { FileFragment[] merged = TUtil.concat(empFrags, peopleFrags); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashSemiJoin"); - TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + TaskAttemptContext ctx = new TaskAttemptContext(queryContext, LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); ctx.setEnforcer(new Enforcer()); Expr expr = analyzer.parse(QUERIES[0]); http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java index a4e49f7..9de58a3 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java @@ -171,12 +171,13 @@ public class TestPhysicalPlanner { } appender.flush(); appender.close(); + + defaultContext = LocalTajoTestingUtility.createDummyContext(conf); catalog.createTable(score); analyzer = new SQLAnalyzer(); planner = new LogicalPlanner(catalog); - optimizer = new LogicalOptimizer(conf); + optimizer = new LogicalOptimizer(conf, catalog); - defaultContext = LocalTajoTestingUtility.createDummyContext(conf); masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null); createLargeScoreTable(); http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java index 2e093c1..84abfff 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java @@ -61,6 +61,7 @@ public class TestSortExec { private static Path workDir; private static Path tablePath; private static TableMeta employeeMeta; + private static QueryContext queryContext; private static Random rnd = new Random(System.currentTimeMillis()); @@ -101,9 +102,10 @@ public class TestSortExec { tablePath.toUri()); catalog.createTable(desc); + queryContext = new QueryContext(conf); analyzer = new SQLAnalyzer(); planner = new LogicalPlanner(catalog); - optimizer = new LogicalOptimizer(conf); + optimizer = new LogicalOptimizer(conf, catalog); } public static String[] QUERIES = { @@ -113,7 +115,7 @@ public class TestSortExec { public final void testNext() throws IOException, PlanningException { FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employeeMeta, tablePath, Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestSortExec"); - TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + TaskAttemptContext ctx = new TaskAttemptContext(queryContext, LocalTajoTestingUtility .newTaskAttemptId(), new FileFragment[] { frags[0] }, workDir); ctx.setEnforcer(new Enforcer());
