http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java deleted file mode 100644 index c6466f5..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java +++ /dev/null @@ -1,616 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Stack; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.QueryId; -import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.TaskId; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto; -import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto; -import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto; -import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto; -import org.apache.tajo.catalog.proto.CatalogProtos.TablePartitionProto; -import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto; -import org.apache.tajo.catalog.proto.CatalogProtos.TablespaceProto; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.common.TajoDataTypes.DataType; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.engine.codegen.CompilationError; -import org.apache.tajo.engine.planner.PhysicalPlannerImpl; -import org.apache.tajo.engine.planner.Projector; -import org.apache.tajo.engine.planner.global.ExecutionBlock; -import org.apache.tajo.engine.planner.global.ExecutionBlockCursor; -import org.apache.tajo.engine.planner.global.GlobalPlanner; -import org.apache.tajo.engine.planner.global.MasterPlan; -import org.apache.tajo.engine.planner.physical.PhysicalExec; -import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.master.TajoMaster.MasterContext; -import org.apache.tajo.plan.LogicalPlan; -import org.apache.tajo.plan.PlanningException; -import org.apache.tajo.plan.expr.EvalNode; -import org.apache.tajo.plan.logical.IndexScanNode; -import org.apache.tajo.plan.logical.LogicalNode; -import org.apache.tajo.plan.logical.ScanNode; -import org.apache.tajo.storage.RowStoreUtil; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; -import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; -import org.apache.tajo.util.KeyValueSet; -import org.apache.tajo.util.TUtil; -import org.apache.tajo.worker.TaskAttemptContext; - -import com.google.protobuf.ByteString; - -public class NonForwardQueryResultSystemScanner implements NonForwardQueryResultScanner { - - private final Log LOG = LogFactory.getLog(getClass()); - - private MasterContext masterContext; - private LogicalPlan logicalPlan; - private final QueryId queryId; - private final String sessionId; - private TaskAttemptContext taskContext; - private int currentRow; - private long maxRow; - private TableDesc tableDesc; - private Schema outSchema; - private RowStoreEncoder encoder; - private PhysicalExec physicalExec; - - public NonForwardQueryResultSystemScanner(MasterContext context, LogicalPlan plan, QueryId queryId, - String sessionId, int maxRow) { - masterContext = context; - logicalPlan = plan; - this.queryId = queryId; - this.sessionId = sessionId; - this.maxRow = maxRow; - - } - - @Override - public void init() throws IOException { - QueryContext queryContext = new QueryContext(masterContext.getConf()); - currentRow = 0; - - MasterPlan masterPlan = new MasterPlan(queryId, queryContext, logicalPlan); - GlobalPlanner globalPlanner = new GlobalPlanner(masterContext.getConf(), masterContext.getCatalog()); - try { - globalPlanner.build(masterPlan); - } catch (PlanningException e) { - throw new RuntimeException(e); - } - - ExecutionBlockCursor cursor = new ExecutionBlockCursor(masterPlan); - ExecutionBlock leafBlock = null; - while (cursor.hasNext()) { - ExecutionBlock block = cursor.nextBlock(); - if (masterPlan.isLeaf(block)) { - leafBlock = block; - break; - } - } - - taskContext = new TaskAttemptContext(queryContext, null, - new TaskAttemptId(new TaskId(leafBlock.getId(), 0), 0), - null, null); - physicalExec = new SimplePhysicalPlannerImpl(masterContext.getConf()) - .createPlan(taskContext, leafBlock.getPlan()); - - tableDesc = new TableDesc("table_"+System.currentTimeMillis(), physicalExec.getSchema(), - new TableMeta(StoreType.SYSTEM, new KeyValueSet()), null); - outSchema = physicalExec.getSchema(); - encoder = RowStoreUtil.createEncoder(getLogicalSchema()); - - physicalExec.init(); - } - - @Override - public void close() throws Exception { - tableDesc = null; - outSchema = null; - encoder = null; - if (physicalExec != null) { - try { - physicalExec.close(); - } catch (Exception ignored) {} - } - physicalExec = null; - currentRow = -1; - } - - private List<Tuple> getTablespaces(Schema outSchema) { - List<TablespaceProto> tablespaces = masterContext.getCatalog().getAllTablespaces(); - List<Tuple> tuples = new ArrayList<Tuple>(tablespaces.size()); - List<Column> columns = outSchema.getColumns(); - Tuple aTuple; - - for (TablespaceProto tablespace: tablespaces) { - aTuple = new VTuple(outSchema.size()); - - for (int fieldId = 0; fieldId < columns.size(); fieldId++) { - Column column = columns.get(fieldId); - if ("space_id".equalsIgnoreCase(column.getSimpleName())) { - if (tablespace.hasId()) { - aTuple.put(fieldId, DatumFactory.createInt4(tablespace.getId())); - } else { - aTuple.put(fieldId, DatumFactory.createNullDatum()); - } - } else if ("space_name".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createText(tablespace.getSpaceName())); - } else if ("space_handler".equalsIgnoreCase(column.getSimpleName())) { - if (tablespace.hasHandler()) { - aTuple.put(fieldId, DatumFactory.createText(tablespace.getHandler())); - } else { - aTuple.put(fieldId, DatumFactory.createNullDatum()); - } - } else if ("space_uri".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createText(tablespace.getUri())); - } - } - tuples.add(aTuple); - } - - return tuples; - } - - private List<Tuple> getDatabases(Schema outSchema) { - List<DatabaseProto> databases = masterContext.getCatalog().getAllDatabases(); - List<Tuple> tuples = new ArrayList<Tuple>(databases.size()); - List<Column> columns = outSchema.getColumns(); - Tuple aTuple; - - for (DatabaseProto database: databases) { - aTuple = new VTuple(outSchema.size()); - - for (int fieldId = 0; fieldId < columns.size(); fieldId++) { - Column column = columns.get(fieldId); - if ("db_id".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createInt4(database.getId())); - } else if ("db_name".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createText(database.getName())); - } else if ("space_id".equalsIgnoreCase(column.getSimpleName())) { - if (database.hasSpaceId()) { - aTuple.put(fieldId, DatumFactory.createInt4(database.getSpaceId())); - } else { - aTuple.put(fieldId, DatumFactory.createNullDatum()); - } - } - } - - tuples.add(aTuple); - } - - return tuples; - } - - private List<Tuple> getTables(Schema outSchema) { - List<TableDescriptorProto> tables = masterContext.getCatalog().getAllTables(); - List<Tuple> tuples = new ArrayList<Tuple>(tables.size()); - List<Column> columns = outSchema.getColumns(); - Tuple aTuple; - - for (TableDescriptorProto table: tables) { - aTuple = new VTuple(outSchema.size()); - - for (int fieldId = 0; fieldId < columns.size(); fieldId++) { - Column column = columns.get(fieldId); - if ("tid".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createInt4(table.getTid())); - } else if ("db_id".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createInt4(table.getDbId())); - } else if ("table_name".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createText(table.getName())); - } else if ("table_type".equalsIgnoreCase(column.getSimpleName())) { - if (table.hasTableType()) { - aTuple.put(fieldId, DatumFactory.createText(table.getTableType())); - } else { - aTuple.put(fieldId, DatumFactory.createNullDatum()); - } - } else if ("path".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createText(table.getPath())); - } else if ("store_type".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createText(table.getStoreType())); - } - } - - tuples.add(aTuple); - } - - return tuples; - } - - private List<Tuple> getColumns(Schema outSchema) { - List<ColumnProto> columnsList = masterContext.getCatalog().getAllColumns(); - List<Tuple> tuples = new ArrayList<Tuple>(columnsList.size()); - List<Column> columns = outSchema.getColumns(); - Tuple aTuple; - int columnId = 1, prevtid = -1, tid = 0; - - for (ColumnProto column: columnsList) { - aTuple = new VTuple(outSchema.size()); - - tid = column.getTid(); - if (prevtid != tid) { - columnId = 1; - prevtid = tid; - } - - for (int fieldId = 0; fieldId < columns.size(); fieldId++) { - Column colObj = columns.get(fieldId); - - if ("tid".equalsIgnoreCase(colObj.getSimpleName())) { - if (column.hasTid()) { - aTuple.put(fieldId, DatumFactory.createInt4(tid)); - } else { - aTuple.put(fieldId, DatumFactory.createNullDatum()); - } - } else if ("column_name".equalsIgnoreCase(colObj.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createText(column.getName())); - } else if ("ordinal_position".equalsIgnoreCase(colObj.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createInt4(columnId)); - } else if ("data_type".equalsIgnoreCase(colObj.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createText(column.getDataType().getType().toString())); - } else if ("type_length".equalsIgnoreCase(colObj.getSimpleName())) { - DataType dataType = column.getDataType(); - if (dataType.hasLength()) { - aTuple.put(fieldId, DatumFactory.createInt4(dataType.getLength())); - } else { - aTuple.put(fieldId, DatumFactory.createNullDatum()); - } - } - } - - columnId++; - tuples.add(aTuple); - } - - return tuples; - } - - private List<Tuple> getIndexes(Schema outSchema) { - List<IndexProto> indexList = masterContext.getCatalog().getAllIndexes(); - List<Tuple> tuples = new ArrayList<Tuple>(indexList.size()); - List<Column> columns = outSchema.getColumns(); - Tuple aTuple; - - for (IndexProto index: indexList) { - aTuple = new VTuple(outSchema.size()); - - for (int fieldId = 0; fieldId < columns.size(); fieldId++) { - Column column = columns.get(fieldId); - - if ("db_id".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createInt4(index.getDbId())); - } else if ("tid".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createInt4(index.getTId())); - } else if ("index_name".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createText(index.getIndexName())); - } else if ("column_name".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createText(index.getColumnName())); - } else if ("data_type".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createText(index.getDataType())); - } else if ("index_type".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createText(index.getIndexType())); - } else if ("is_unique".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createBool(index.getIsUnique())); - } else if ("is_clustered".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createBool(index.getIsClustered())); - } else if ("is_ascending".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createBool(index.getIsAscending())); - } - } - - tuples.add(aTuple); - } - - return tuples; - } - - private List<Tuple> getAllTableOptions(Schema outSchema) { - List<TableOptionProto> optionList = masterContext.getCatalog().getAllTableOptions(); - List<Tuple> tuples = new ArrayList<Tuple>(optionList.size()); - List<Column> columns = outSchema.getColumns(); - Tuple aTuple; - - for (TableOptionProto option: optionList) { - aTuple = new VTuple(outSchema.size()); - - for (int fieldId = 0; fieldId < columns.size(); fieldId++) { - Column column = columns.get(fieldId); - - if ("tid".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createInt4(option.getTid())); - } else if ("key_".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createText(option.getKeyval().getKey())); - } else if ("value_".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createText(option.getKeyval().getValue())); - } - } - - tuples.add(aTuple); - } - - return tuples; - } - - private List<Tuple> getAllTableStats(Schema outSchema) { - List<TableStatsProto> statList = masterContext.getCatalog().getAllTableStats(); - List<Tuple> tuples = new ArrayList<Tuple>(statList.size()); - List<Column> columns = outSchema.getColumns(); - Tuple aTuple; - - for (TableStatsProto stat: statList) { - aTuple = new VTuple(outSchema.size()); - - for (int fieldId = 0; fieldId < columns.size(); fieldId++) { - Column column = columns.get(fieldId); - - if ("tid".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createInt4(stat.getTid())); - } else if ("num_rows".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createInt8(stat.getNumRows())); - } else if ("num_bytes".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createInt8(stat.getNumBytes())); - } - } - - tuples.add(aTuple); - } - - return tuples; - } - - private List<Tuple> getAllPartitions(Schema outSchema) { - List<TablePartitionProto> partitionList = masterContext.getCatalog().getAllPartitions(); - List<Tuple> tuples = new ArrayList<Tuple>(partitionList.size()); - List<Column> columns = outSchema.getColumns(); - Tuple aTuple; - - for (TablePartitionProto partition: partitionList) { - aTuple = new VTuple(outSchema.size()); - - for (int fieldId = 0; fieldId < columns.size(); fieldId++) { - Column column = columns.get(fieldId); - - if ("pid".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createInt4(partition.getPid())); - } else if ("tid".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createInt4(partition.getTid())); - } else if ("partition_name".equalsIgnoreCase(column.getSimpleName())) { - if (partition.hasPartitionName()) { - aTuple.put(fieldId, DatumFactory.createText(partition.getPartitionName())); - } else { - aTuple.put(fieldId, DatumFactory.createNullDatum()); - } - } else if ("ordinal_position".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createInt4(partition.getOrdinalPosition())); - } else if ("path".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createText(partition.getPath())); - } - } - - tuples.add(aTuple); - } - - return tuples; - } - - private List<Tuple> fetchSystemTable(TableDesc tableDesc, Schema inSchema) { - List<Tuple> tuples = null; - String tableName = CatalogUtil.extractSimpleName(tableDesc.getName()); - - if ("tablespace".equalsIgnoreCase(tableName)) { - tuples = getTablespaces(inSchema); - } else if ("databases".equalsIgnoreCase(tableName)) { - tuples = getDatabases(inSchema); - } else if ("tables".equalsIgnoreCase(tableName)) { - tuples = getTables(inSchema); - } else if ("columns".equalsIgnoreCase(tableName)) { - tuples = getColumns(inSchema); - } else if ("indexes".equalsIgnoreCase(tableName)) { - tuples = getIndexes(inSchema); - } else if ("table_options".equalsIgnoreCase(tableName)) { - tuples = getAllTableOptions(inSchema); - } else if ("table_stats".equalsIgnoreCase(tableName)) { - tuples = getAllTableStats(inSchema); - } else if ("partitions".equalsIgnoreCase(tableName)) { - tuples = getAllPartitions(inSchema); - } - - return tuples; - } - - @Override - public List<ByteString> getNextRows(int fetchRowNum) throws IOException { - List<ByteString> rows = new ArrayList<ByteString>(); - int startRow = currentRow; - int endRow = startRow + fetchRowNum; - - if (physicalExec == null) { - return rows; - } - - while (currentRow < endRow) { - Tuple currentTuple = physicalExec.next(); - - if (currentTuple == null) { - physicalExec.close(); - physicalExec = null; - break; - } - - currentRow++; - rows.add(ByteString.copyFrom(encoder.toBytes(currentTuple))); - - if (currentRow >= maxRow) { - physicalExec.close(); - physicalExec = null; - break; - } - } - - return rows; - } - - @Override - public QueryId getQueryId() { - return queryId; - } - - @Override - public String getSessionId() { - return sessionId; - } - - @Override - public TableDesc getTableDesc() { - return tableDesc; - } - - @Override - public Schema getLogicalSchema() { - return outSchema; - } - - class SimplePhysicalPlannerImpl extends PhysicalPlannerImpl { - - public SimplePhysicalPlannerImpl(TajoConf conf) { - super(conf); - } - - @Override - public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> node) - throws IOException { - return new SystemPhysicalExec(ctx, scanNode); - } - - @Override - public PhysicalExec createIndexScanExec(TaskAttemptContext ctx, IndexScanNode annotation) throws IOException { - return new SystemPhysicalExec(ctx, annotation); - } - } - - class SystemPhysicalExec extends PhysicalExec { - - private ScanNode scanNode; - private EvalNode qual; - private Projector projector; - private TableStats tableStats; - private final List<Tuple> cachedData; - private int currentRow; - private boolean isClosed; - - public SystemPhysicalExec(TaskAttemptContext context, ScanNode scanNode) { - super(context, scanNode.getInSchema(), scanNode.getOutSchema()); - this.scanNode = scanNode; - this.qual = this.scanNode.getQual(); - cachedData = TUtil.newList(); - currentRow = 0; - isClosed = false; - - projector = new Projector(context, inSchema, outSchema, scanNode.getTargets()); - } - - @Override - public Tuple next() throws IOException { - Tuple aTuple = null; - Tuple outTuple = new VTuple(outColumnNum); - - if (isClosed) { - return null; - } - - if (cachedData.size() == 0) { - rescan(); - } - - if (!scanNode.hasQual()) { - if (currentRow < cachedData.size()) { - aTuple = cachedData.get(currentRow++); - projector.eval(aTuple, outTuple); - outTuple.setOffset(aTuple.getOffset()); - return outTuple; - } - return null; - } else { - while (currentRow < cachedData.size()) { - aTuple = cachedData.get(currentRow++); - if (qual.eval(inSchema, aTuple).isTrue()) { - projector.eval(aTuple, outTuple); - return outTuple; - } - } - return null; - } - } - - @Override - public void rescan() throws IOException { - cachedData.clear(); - cachedData.addAll(fetchSystemTable(scanNode.getTableDesc(), inSchema)); - - tableStats = new TableStats(); - tableStats.setNumRows(cachedData.size()); - } - - @Override - public void close() throws IOException { - scanNode = null; - qual = null; - projector = null; - cachedData.clear(); - currentRow = -1; - isClosed = true; - } - - @Override - public float getProgress() { - return 1.0f; - } - - @Override - protected void compile() throws CompilationError { - if (scanNode.hasQual()) { - qual = context.getPrecompiledEval(inSchema, qual); - } - } - - @Override - public TableStats getInputStats() { - return tableStats; - } - - } - -}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java new file mode 100644 index 0000000..f902081 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master; + + +import com.google.gson.annotations.Expose; +import org.apache.tajo.QueryId; +import org.apache.tajo.TajoProtos; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.engine.json.CoreGsonHelper; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.ipc.ClientProtos.QueryInfoProto; +import org.apache.tajo.json.GsonObject; +import org.apache.tajo.util.TajoIdUtils; +import org.apache.tajo.util.history.History; + +public class QueryInfo implements GsonObject, History { + private QueryId queryId; + @Expose + private QueryContext context; + @Expose + private String sql; + @Expose + private volatile TajoProtos.QueryState queryState; + @Expose + private volatile float progress; + @Expose + private volatile long startTime; + @Expose + private volatile long finishTime; + @Expose + private String lastMessage; + @Expose + private String hostNameOfQM; + @Expose + private int queryMasterPort; + @Expose + private int queryMasterClientPort; + @Expose + private int queryMasterInfoPort; + @Expose + private String queryIdStr; + @Expose + private volatile TableDesc resultDesc; + + private String jsonExpr; + + public QueryInfo(QueryId queryId) { + this(queryId, null, null, null); + } + + public QueryInfo(QueryId queryId, QueryContext queryContext, String sql, String jsonExpr) { + this.queryId = queryId; + this.queryIdStr = queryId.toString(); + this.context = queryContext; + this.sql = sql; + this.jsonExpr = jsonExpr; + + this.queryState = TajoProtos.QueryState.QUERY_MASTER_INIT; + } + + public QueryId getQueryId() { + return queryId; + } + + public QueryContext getQueryContext() { + return context; + } + + public String getSql() { + return sql; + } + + public String getQueryMasterHost() { + return hostNameOfQM; + } + + public void setQueryMaster(String hostName) { + this.hostNameOfQM = hostName; + } + + public int getQueryMasterInfoPort() { + return queryMasterInfoPort; + } + + public void setQueryMasterInfoPort(int queryMasterInfoPort) { + this.queryMasterInfoPort = queryMasterInfoPort; + } + + public void setQueryMasterPort(int port) { + this.queryMasterPort = port; + } + + public int getQueryMasterPort() { + return queryMasterPort; + } + + public void setQueryMasterclientPort(int port) { + queryMasterClientPort = port; + } + + public int getQueryMasterClientPort() { + return queryMasterClientPort; + } + + public TajoProtos.QueryState getQueryState() { + return queryState; + } + + public void setQueryState(TajoProtos.QueryState queryState) { + this.queryState = queryState; + } + + public long getStartTime() { + return startTime; + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + public long getFinishTime() { + return finishTime; + } + + public void setFinishTime(long finishTime) { + this.finishTime = finishTime; + } + + public String getLastMessage() { + return lastMessage; + } + + public void setLastMessage(String lastMessage) { + this.lastMessage = lastMessage; + } + + public float getProgress() { + return progress; + } + + public void setProgress(float progress) { + this.progress = progress; + } + + public void setResultDesc(TableDesc result) { + this.resultDesc = result; + } + + public boolean hasResultdesc() { + return resultDesc != null; + } + + public TableDesc getResultDesc() { + return resultDesc; + } + + @Override + public String toString() { + return queryId.toString() + ",state=" + queryState +",progress=" + progress + ", queryMaster=" + + getQueryMasterHost(); + } + + public String getJsonExpr() { + return jsonExpr; + } + + @Override + public String toJson() { + return CoreGsonHelper.toJson(this, QueryInfo.class); + } + + @Override + public HistoryType getHistoryType() { + return HistoryType.QUERY_SUMMARY; + } + + public static QueryInfo fromJson(String json) { + QueryInfo queryInfo = CoreGsonHelper.fromJson(json, QueryInfo.class); + queryInfo.queryId = TajoIdUtils.parseQueryId(queryInfo.queryIdStr); + return queryInfo; + } + + public String getQueryIdStr() { + return queryIdStr; + } + + public QueryInfoProto getProto() { + QueryInfoProto.Builder builder = QueryInfoProto.newBuilder(); + + builder.setQueryId(queryId.toString()) + .setQueryState(queryState) + .setContextVars(context.getProto()) + .setProgress(progress) + .setStartTime(startTime) + .setFinishTime(finishTime) + .setQueryMasterPort(queryMasterPort) + .setQueryMasterClientPort(queryMasterClientPort) + .setQueryMasterInfoPort(queryMasterInfoPort); + + if (resultDesc != null) { + builder.setResultDesc(resultDesc.getProto()); + } + + if (sql != null) { + builder.setSql(sql); + } + + if (lastMessage != null) { + builder.setLastMessage(lastMessage); + } + + if (hostNameOfQM != null) { + builder.setHostNameOfQM(hostNameOfQM); + } + + return builder.build(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java new file mode 100644 index 0000000..c9b8711 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java @@ -0,0 +1,311 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tajo.QueryId; +import org.apache.tajo.QueryIdFactory; +import org.apache.tajo.TajoProtos; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.ipc.TajoMasterProtocol; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.querymaster.QueryInProgress; +import org.apache.tajo.querymaster.QueryJobEvent; +import org.apache.tajo.session.Session; +import org.apache.tajo.plan.logical.LogicalRootNode; +import org.apache.tajo.master.scheduler.SimpleFifoScheduler; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +public class QueryJobManager extends CompositeService { + private static final Log LOG = LogFactory.getLog(QueryJobManager.class.getName()); + + // TajoMaster Context + private final TajoMaster.MasterContext masterContext; + + private AsyncDispatcher dispatcher; + + private SimpleFifoScheduler scheduler; + + private final Map<QueryId, QueryInProgress> submittedQueries = Maps.newConcurrentMap(); + + private final Map<QueryId, QueryInProgress> runningQueries = Maps.newConcurrentMap(); + + private AtomicLong minExecutionTime = new AtomicLong(Long.MAX_VALUE); + private AtomicLong maxExecutionTime = new AtomicLong(); + private AtomicLong avgExecutionTime = new AtomicLong(); + private AtomicLong executedQuerySize = new AtomicLong(); + + public QueryJobManager(final TajoMaster.MasterContext masterContext) { + super(QueryJobManager.class.getName()); + this.masterContext = masterContext; + } + + @Override + public void init(Configuration conf) { + try { + this.dispatcher = new AsyncDispatcher(); + addService(this.dispatcher); + + this.dispatcher.register(QueryJobEvent.Type.class, new QueryJobManagerEventHandler()); + + this.scheduler = new SimpleFifoScheduler(this); + } catch (Exception e) { + catchException(null, e); + } + + super.init(conf); + } + + @Override + public void stop() { + synchronized(runningQueries) { + for(QueryInProgress eachQueryInProgress: runningQueries.values()) { + eachQueryInProgress.stop(); + } + } + this.scheduler.stop(); + super.stop(); + } + + @Override + public void start() { + this.scheduler.start(); + super.start(); + } + + public EventHandler getEventHandler() { + return dispatcher.getEventHandler(); + } + + public Collection<QueryInProgress> getSubmittedQueries() { + synchronized (submittedQueries){ + return Collections.unmodifiableCollection(submittedQueries.values()); + } + } + + public Collection<QueryInProgress> getRunningQueries() { + synchronized (runningQueries){ + return Collections.unmodifiableCollection(runningQueries.values()); + } + } + + public synchronized Collection<QueryInfo> getFinishedQueries() { + try { + return this.masterContext.getHistoryReader().getQueries(null); + } catch (Throwable e) { + LOG.error(e); + return Lists.newArrayList(); + } + } + + + public synchronized QueryInfo getFinishedQuery(QueryId queryId) { + try { + return this.masterContext.getHistoryReader().getQueryInfo(queryId.toString()); + } catch (Throwable e) { + LOG.error(e); + return null; + } + } + + public QueryInfo createNewQueryJob(Session session, QueryContext queryContext, String sql, + String jsonExpr, LogicalRootNode plan) + throws Exception { + QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId()); + QueryInProgress queryInProgress = new QueryInProgress(masterContext, session, queryContext, queryId, sql, + jsonExpr, plan); + + synchronized (submittedQueries) { + queryInProgress.getQueryInfo().setQueryMaster(""); + submittedQueries.put(queryInProgress.getQueryId(), queryInProgress); + } + + scheduler.addQuery(queryInProgress); + return queryInProgress.getQueryInfo(); + } + + public QueryInfo startQueryJob(QueryId queryId) throws Exception { + + QueryInProgress queryInProgress; + + synchronized (submittedQueries) { + queryInProgress = submittedQueries.remove(queryId); + } + + synchronized (runningQueries) { + runningQueries.put(queryInProgress.getQueryId(), queryInProgress); + } + + addService(queryInProgress); + queryInProgress.init(getConfig()); + queryInProgress.start(); + + if (!queryInProgress.startQueryMaster()) { + stopQuery(queryId); + } + + return queryInProgress.getQueryInfo(); + } + + public TajoMaster.MasterContext getMasterContext() { + return masterContext; + } + + class QueryJobManagerEventHandler implements EventHandler<QueryJobEvent> { + @Override + public void handle(QueryJobEvent event) { + QueryInProgress queryInProgress = getQueryInProgress(event.getQueryInfo().getQueryId()); + if(queryInProgress == null) { + LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]"); + return; + } + + if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) { + stopQuery(event.getQueryInfo().getQueryId()); + } else if (queryInProgress.isStarted()) { + queryInProgress.getEventHandler().handle(event); + } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) { + scheduler.removeQuery(queryInProgress.getQueryId()); + queryInProgress.getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED); + + stopQuery(queryInProgress.getQueryId()); + } + } + } + + public QueryInProgress getQueryInProgress(QueryId queryId) { + QueryInProgress queryInProgress; + synchronized (submittedQueries) { + queryInProgress = submittedQueries.get(queryId); + } + + if (queryInProgress == null) { + synchronized (runningQueries) { + queryInProgress = runningQueries.get(queryId); + } + } + return queryInProgress; + } + + public void stopQuery(QueryId queryId) { + LOG.info("Stop QueryInProgress:" + queryId); + QueryInProgress queryInProgress = getQueryInProgress(queryId); + if(queryInProgress != null) { + queryInProgress.stop(); + synchronized(submittedQueries) { + submittedQueries.remove(queryId); + } + + synchronized(runningQueries) { + runningQueries.remove(queryId); + } + + QueryInfo queryInfo = queryInProgress.getQueryInfo(); + long executionTime = queryInfo.getFinishTime() - queryInfo.getStartTime(); + if (executionTime < minExecutionTime.get()) { + minExecutionTime.set(executionTime); + } + + if (executionTime > maxExecutionTime.get()) { + maxExecutionTime.set(executionTime); + } + + long totalExecutionTime = executedQuerySize.get() * avgExecutionTime.get(); + if (totalExecutionTime > 0) { + avgExecutionTime.set((totalExecutionTime + executionTime) / (executedQuerySize.get() + 1)); + } else { + avgExecutionTime.set(executionTime); + } + executedQuerySize.incrementAndGet(); + removeService(queryInProgress); + } else { + LOG.warn("No QueryInProgress while query stopping: " + queryId); + } + } + + public long getMinExecutionTime() { + if (getExecutedQuerySize() == 0) return 0; + return minExecutionTime.get(); + } + + public long getMaxExecutionTime() { + return maxExecutionTime.get(); + } + + public long getAvgExecutionTime() { + return avgExecutionTime.get(); + } + + public long getExecutedQuerySize() { + return executedQuerySize.get(); + } + + private void catchException(QueryId queryId, Exception e) { + LOG.error(e.getMessage(), e); + QueryInProgress queryInProgress = runningQueries.get(queryId); + queryInProgress.catchException(e); + } + + public synchronized TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand queryHeartbeat( + TajoMasterProtocol.TajoHeartbeat queryHeartbeat) { + QueryInProgress queryInProgress = getQueryInProgress(new QueryId(queryHeartbeat.getQueryId())); + if(queryInProgress == null) { + return null; + } + + QueryInfo queryInfo = makeQueryInfoFromHeartbeat(queryHeartbeat); + getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_HEARTBEAT, queryInfo)); + + return null; + } + + private QueryInfo makeQueryInfoFromHeartbeat(TajoMasterProtocol.TajoHeartbeat queryHeartbeat) { + QueryInfo queryInfo = new QueryInfo(new QueryId(queryHeartbeat.getQueryId())); + WorkerConnectionInfo connectionInfo = new WorkerConnectionInfo(queryHeartbeat.getConnectionInfo()); + + queryInfo.setQueryMaster(connectionInfo.getHost()); + queryInfo.setQueryMasterPort(connectionInfo.getQueryMasterPort()); + queryInfo.setQueryMasterclientPort(connectionInfo.getClientPort()); + queryInfo.setLastMessage(queryHeartbeat.getStatusMessage()); + queryInfo.setQueryState(queryHeartbeat.getState()); + queryInfo.setProgress(queryHeartbeat.getQueryProgress()); + + if (queryHeartbeat.hasQueryFinishTime()) { + queryInfo.setFinishTime(queryHeartbeat.getQueryFinishTime()); + } + + if (queryHeartbeat.hasResultDesc()) { + queryInfo.setResultDesc(new TableDesc(queryHeartbeat.getResultDesc())); + } + + return queryInfo; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/ScheduledFetches.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ScheduledFetches.java b/tajo-core/src/main/java/org/apache/tajo/master/ScheduledFetches.java deleted file mode 100644 index b05572b..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/ScheduledFetches.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master; - -import org.apache.tajo.worker.FetchImpl; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -public class ScheduledFetches { - private List<Map<String, List<FetchImpl>>> fetches = new ArrayList<Map<String, List<FetchImpl>>>(); - - public void addFetch(Map<String, List<FetchImpl>> fetch) { - this.fetches.add(fetch); - } - - public boolean hasNextFetch() { - return fetches.size() > 0; - } - - public Map<String, List<FetchImpl>> getNextFetch() { - return hasNextFetch() ? fetches.get(0) : null; - } - - public Map<String, List<FetchImpl>> popNextFetch() { - return hasNextFetch() ? fetches.remove(0) : null; - } - - public int size() { - return fetches.size(); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java index 4649d99..7209080 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java @@ -28,7 +28,7 @@ import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.ContainerProtocol; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.master.querymaster.QueryMasterTask; +import org.apache.tajo.querymaster.QueryMasterTask; import org.apache.tajo.master.container.TajoContainer; import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.master.rm.TajoWorkerContainer; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java index d021e43..c054599 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java @@ -40,14 +40,13 @@ import org.apache.tajo.catalog.LocalCatalogWrapper; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.function.FunctionLoader; -import org.apache.tajo.master.ha.HAService; -import org.apache.tajo.master.ha.HAServiceHDFSImpl; -import org.apache.tajo.master.metrics.CatalogMetricsGaugeSet; -import org.apache.tajo.master.metrics.WorkerResourceMetricsGaugeSet; -import org.apache.tajo.master.querymaster.QueryJobManager; +import org.apache.tajo.ha.HAService; +import org.apache.tajo.ha.HAServiceHDFSImpl; +import org.apache.tajo.metrics.CatalogMetricsGaugeSet; +import org.apache.tajo.metrics.WorkerResourceMetricsGaugeSet; import org.apache.tajo.master.rm.TajoWorkerResourceManager; import org.apache.tajo.master.rm.WorkerResourceManager; -import org.apache.tajo.master.session.SessionManager; +import org.apache.tajo.session.SessionManager; import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.rule.EvaluationContext; import org.apache.tajo.rule.EvaluationFailedException; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index 249d335..93326be 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -44,15 +44,14 @@ import org.apache.tajo.ipc.ClientProtos.*; import org.apache.tajo.ipc.TajoMasterClientProtocol; import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService; import org.apache.tajo.master.TajoMaster.MasterContext; -import org.apache.tajo.master.querymaster.QueryInProgress; -import org.apache.tajo.master.querymaster.QueryInfo; -import org.apache.tajo.master.querymaster.QueryJobEvent; -import org.apache.tajo.master.querymaster.QueryJobManager; +import org.apache.tajo.master.exec.NonForwardQueryResultScanner; +import org.apache.tajo.querymaster.QueryInProgress; +import org.apache.tajo.querymaster.QueryJobEvent; import org.apache.tajo.master.rm.Worker; import org.apache.tajo.master.rm.WorkerResource; -import org.apache.tajo.master.session.InvalidSessionException; -import org.apache.tajo.master.session.NoSuchSessionVariableException; -import org.apache.tajo.master.session.Session; +import org.apache.tajo.session.InvalidSessionException; +import org.apache.tajo.session.NoSuchSessionVariableException; +import org.apache.tajo.session.Session; import org.apache.tajo.rpc.BlockingRpcServer; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java index 1e3501c..a7df206 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java @@ -23,14 +23,12 @@ import com.google.protobuf.RpcController; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.tajo.QueryId; import org.apache.tajo.TajoIdProtos; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.ContainerProtocol; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; -import org.apache.tajo.master.querymaster.QueryJobManager; import org.apache.tajo.master.rm.Worker; import org.apache.tajo.master.rm.WorkerResource; import org.apache.tajo.rpc.AsyncRpcServer; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerContext.java b/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerContext.java deleted file mode 100644 index 755df5a..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerContext.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master; - -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.master.querymaster.QueryMasterTask; - -public class TaskSchedulerContext { - private QueryMasterTask.QueryMasterTaskContext masterContext; - private boolean isLeafQuery; - private ExecutionBlockId blockId; - private int taskSize; - private int estimatedTaskNum; - - public TaskSchedulerContext(QueryMasterTask.QueryMasterTaskContext masterContext, boolean isLeafQuery, - ExecutionBlockId blockId) { - this.masterContext = masterContext; - this.isLeafQuery = isLeafQuery; - this.blockId = blockId; - } - - public QueryMasterTask.QueryMasterTaskContext getMasterContext() { - return masterContext; - } - - public boolean isLeafQuery() { - return isLeafQuery; - } - - public ExecutionBlockId getBlockId() { - return blockId; - } - - public int getTaskSize() { - return taskSize; - } - - public int getEstimatedTaskNum() { - return estimatedTaskNum; - } - - public void setTaskSize(int taskSize) { - this.taskSize = taskSize; - } - - public void setEstimatedTaskNum(int estimatedTaskNum) { - this.estimatedTaskNum = estimatedTaskNum; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java b/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java deleted file mode 100644 index e5291e9..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master; - -import com.google.common.collect.Maps; -import org.apache.hadoop.conf.Configuration; -import org.apache.tajo.master.querymaster.Stage; - -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.util.Map; - -public class TaskSchedulerFactory { - private static Class<? extends AbstractTaskScheduler> CACHED_ALGORITHM_CLASS; - private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap(); - private static final Class<?>[] DEFAULT_PARAMS = { TaskSchedulerContext.class, Stage.class }; - - public static Class<? extends AbstractTaskScheduler> getTaskSchedulerClass(Configuration conf) - throws IOException { - if (CACHED_ALGORITHM_CLASS != null) { - return CACHED_ALGORITHM_CLASS; - } else { - CACHED_ALGORITHM_CLASS = conf.getClass("tajo.querymaster.task-scheduler", null, AbstractTaskScheduler.class); - } - - if (CACHED_ALGORITHM_CLASS == null) { - throw new IOException("Task scheduler is null"); - } - return CACHED_ALGORITHM_CLASS; - } - - public static <T extends AbstractTaskScheduler> T get(Class<T> clazz, TaskSchedulerContext context, - Stage stage) { - T result; - try { - Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz); - if (constructor == null) { - constructor = clazz.getDeclaredConstructor(DEFAULT_PARAMS); - constructor.setAccessible(true); - CONSTRUCTOR_CACHE.put(clazz, constructor); - } - result = constructor.newInstance(new Object[]{context, stage}); - } catch (Exception e) { - throw new RuntimeException(e); - } - return result; - } - - public static AbstractTaskScheduler get(Configuration conf, TaskSchedulerContext context, Stage stage) - throws IOException { - return get(getTaskSchedulerClass(conf), context, stage); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java index e5a9a32..fcae53c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java @@ -19,7 +19,7 @@ package org.apache.tajo.master.event; import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.master.querymaster.StageState; +import org.apache.tajo.querymaster.StageState; public class QueryCompletedEvent extends QueryEvent { private final ExecutionBlockId executionBlockId; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java index 623576a..3a387fa 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java @@ -21,7 +21,7 @@ package org.apache.tajo.master.event; import org.apache.hadoop.yarn.event.AbstractEvent; import org.apache.tajo.QueryId; import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.master.session.Session; +import org.apache.tajo.session.Session; /** * This event is conveyed to QueryMaster. http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/event/StageCompletedEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageCompletedEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageCompletedEvent.java index 2d16fbe..f012e92 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/StageCompletedEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageCompletedEvent.java @@ -19,7 +19,7 @@ package org.apache.tajo.master.event; import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.master.querymaster.StageState; +import org.apache.tajo.querymaster.StageState; public class StageCompletedEvent extends QueryEvent { private final ExecutionBlockId executionBlockId; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java index 91ef942..5a016fb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java @@ -21,7 +21,7 @@ package org.apache.tajo.master.event; import com.google.protobuf.RpcCallback; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.master.querymaster.TaskAttempt; +import org.apache.tajo.querymaster.TaskAttempt; import org.apache.tajo.master.container.TajoContainerId; public class TaskAttemptToSchedulerEvent extends TaskSchedulerEvent { http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java index acbaa01..2030b55 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java @@ -34,7 +34,6 @@ import org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.master.TajoMaster; -import org.apache.tajo.master.session.Session; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.util.PlannerUtil; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java new file mode 100644 index 0000000..dc0c44a --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.exec; + +import com.google.protobuf.ByteString; + +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.QueryId; +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.TaskId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.engine.planner.physical.SeqScanExec; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.storage.RowStoreUtil; +import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; +import org.apache.tajo.storage.StorageManager; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.fragment.FragmentConvertor; +import org.apache.tajo.worker.TaskAttemptContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class NonForwardQueryResultFileScanner implements NonForwardQueryResultScanner { + private static final int MAX_FRAGMENT_NUM_PER_SCAN = 100; + + private QueryId queryId; + private String sessionId; + private SeqScanExec scanExec; + private TableDesc tableDesc; + private RowStoreEncoder rowEncoder; + private int maxRow; + private int currentNumRows; + private TaskAttemptContext taskContext; + private TajoConf tajoConf; + private ScanNode scanNode; + + private int currentFragmentIndex = 0; + + public NonForwardQueryResultFileScanner(TajoConf tajoConf, String sessionId, QueryId queryId, ScanNode scanNode, + TableDesc tableDesc, int maxRow) throws IOException { + this.tajoConf = tajoConf; + this.sessionId = sessionId; + this.queryId = queryId; + this.scanNode = scanNode; + this.tableDesc = tableDesc; + this.maxRow = maxRow; + this.rowEncoder = RowStoreUtil.createEncoder(tableDesc.getLogicalSchema()); + } + + public void init() throws IOException { + initSeqScanExec(); + } + + private void initSeqScanExec() throws IOException { + List<Fragment> fragments = StorageManager.getStorageManager(tajoConf, tableDesc.getMeta().getStoreType()) + .getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN); + + if (fragments != null && !fragments.isEmpty()) { + FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[] {})); + this.taskContext = new TaskAttemptContext( + new QueryContext(tajoConf), null, + new TaskAttemptId(new TaskId(new ExecutionBlockId(queryId, 1), 0), 0), + fragmentProtos, null); + try { + // scanNode must be clone cause SeqScanExec change target in the case of + // a partitioned table. + scanExec = new SeqScanExec(taskContext, (ScanNode) scanNode.clone(), fragmentProtos); + } catch (CloneNotSupportedException e) { + throw new IOException(e.getMessage(), e); + } + scanExec.init(); + currentFragmentIndex += fragments.size(); + } + } + + public QueryId getQueryId() { + return queryId; + } + + public String getSessionId() { + return sessionId; + } + + public void setScanExec(SeqScanExec scanExec) { + this.scanExec = scanExec; + } + + public TableDesc getTableDesc() { + return tableDesc; + } + + public void close() throws Exception { + if (scanExec != null) { + scanExec.close(); + scanExec = null; + } + } + + public List<ByteString> getNextRows(int fetchRowNum) throws IOException { + List<ByteString> rows = new ArrayList<ByteString>(); + if (scanExec == null) { + return rows; + } + int rowCount = 0; + while (true) { + Tuple tuple = scanExec.next(); + if (tuple == null) { + scanExec.close(); + scanExec = null; + initSeqScanExec(); + if (scanExec != null) { + tuple = scanExec.next(); + } + if (tuple == null) { + if (scanExec != null) { + scanExec.close(); + scanExec = null; + } + break; + } + } + rows.add(ByteString.copyFrom((rowEncoder.toBytes(tuple)))); + rowCount++; + currentNumRows++; + if (rowCount >= fetchRowNum) { + break; + } + if (currentNumRows >= maxRow) { + scanExec.close(); + scanExec = null; + break; + } + } + return rows; + } + + @Override + public Schema getLogicalSchema() { + return tableDesc.getLogicalSchema(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java new file mode 100644 index 0000000..86d2843 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.exec; + +import java.io.IOException; +import java.util.List; + +import org.apache.tajo.QueryId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableDesc; + +import com.google.protobuf.ByteString; + +public interface NonForwardQueryResultScanner { + + public void close() throws Exception; + + public Schema getLogicalSchema(); + + public List<ByteString> getNextRows(int fetchRowNum) throws IOException; + + public QueryId getQueryId(); + + public String getSessionId(); + + public TableDesc getTableDesc(); + + public void init() throws IOException; + +}
