http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java new file mode 100644 index 0000000..f645dc5 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java @@ -0,0 +1,616 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.exec; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.QueryId; +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.TaskId; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto; +import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto; +import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto; +import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto; +import org.apache.tajo.catalog.proto.CatalogProtos.TablePartitionProto; +import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto; +import org.apache.tajo.catalog.proto.CatalogProtos.TablespaceProto; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.TajoDataTypes.DataType; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.engine.codegen.CompilationError; +import org.apache.tajo.engine.planner.PhysicalPlannerImpl; +import org.apache.tajo.engine.planner.Projector; +import org.apache.tajo.engine.planner.global.ExecutionBlock; +import org.apache.tajo.engine.planner.global.ExecutionBlockCursor; +import org.apache.tajo.engine.planner.global.GlobalPlanner; +import org.apache.tajo.engine.planner.global.MasterPlan; +import org.apache.tajo.engine.planner.physical.PhysicalExec; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.master.TajoMaster.MasterContext; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.PlanningException; +import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.logical.IndexScanNode; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.storage.RowStoreUtil; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; +import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.TUtil; +import org.apache.tajo.worker.TaskAttemptContext; + +import com.google.protobuf.ByteString; + +public class NonForwardQueryResultSystemScanner implements NonForwardQueryResultScanner { + + private final Log LOG = LogFactory.getLog(getClass()); + + private MasterContext masterContext; + private LogicalPlan logicalPlan; + private final QueryId queryId; + private final String sessionId; + private TaskAttemptContext taskContext; + private int currentRow; + private long maxRow; + private TableDesc tableDesc; + private Schema outSchema; + private RowStoreEncoder encoder; + private PhysicalExec physicalExec; + + public NonForwardQueryResultSystemScanner(MasterContext context, LogicalPlan plan, QueryId queryId, + String sessionId, int maxRow) { + masterContext = context; + logicalPlan = plan; + this.queryId = queryId; + this.sessionId = sessionId; + this.maxRow = maxRow; + + } + + @Override + public void init() throws IOException { + QueryContext queryContext = new QueryContext(masterContext.getConf()); + currentRow = 0; + + MasterPlan masterPlan = new MasterPlan(queryId, queryContext, logicalPlan); + GlobalPlanner globalPlanner = new GlobalPlanner(masterContext.getConf(), masterContext.getCatalog()); + try { + globalPlanner.build(masterPlan); + } catch (PlanningException e) { + throw new RuntimeException(e); + } + + ExecutionBlockCursor cursor = new ExecutionBlockCursor(masterPlan); + ExecutionBlock leafBlock = null; + while (cursor.hasNext()) { + ExecutionBlock block = cursor.nextBlock(); + if (masterPlan.isLeaf(block)) { + leafBlock = block; + break; + } + } + + taskContext = new TaskAttemptContext(queryContext, null, + new TaskAttemptId(new TaskId(leafBlock.getId(), 0), 0), + null, null); + physicalExec = new SimplePhysicalPlannerImpl(masterContext.getConf()) + .createPlan(taskContext, leafBlock.getPlan()); + + tableDesc = new TableDesc("table_"+System.currentTimeMillis(), physicalExec.getSchema(), + new TableMeta(StoreType.SYSTEM, new KeyValueSet()), null); + outSchema = physicalExec.getSchema(); + encoder = RowStoreUtil.createEncoder(getLogicalSchema()); + + physicalExec.init(); + } + + @Override + public void close() throws Exception { + tableDesc = null; + outSchema = null; + encoder = null; + if (physicalExec != null) { + try { + physicalExec.close(); + } catch (Exception ignored) {} + } + physicalExec = null; + currentRow = -1; + } + + private List<Tuple> getTablespaces(Schema outSchema) { + List<TablespaceProto> tablespaces = masterContext.getCatalog().getAllTablespaces(); + List<Tuple> tuples = new ArrayList<Tuple>(tablespaces.size()); + List<Column> columns = outSchema.getColumns(); + Tuple aTuple; + + for (TablespaceProto tablespace: tablespaces) { + aTuple = new VTuple(outSchema.size()); + + for (int fieldId = 0; fieldId < columns.size(); fieldId++) { + Column column = columns.get(fieldId); + if ("space_id".equalsIgnoreCase(column.getSimpleName())) { + if (tablespace.hasId()) { + aTuple.put(fieldId, DatumFactory.createInt4(tablespace.getId())); + } else { + aTuple.put(fieldId, DatumFactory.createNullDatum()); + } + } else if ("space_name".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(tablespace.getSpaceName())); + } else if ("space_handler".equalsIgnoreCase(column.getSimpleName())) { + if (tablespace.hasHandler()) { + aTuple.put(fieldId, DatumFactory.createText(tablespace.getHandler())); + } else { + aTuple.put(fieldId, DatumFactory.createNullDatum()); + } + } else if ("space_uri".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(tablespace.getUri())); + } + } + tuples.add(aTuple); + } + + return tuples; + } + + private List<Tuple> getDatabases(Schema outSchema) { + List<DatabaseProto> databases = masterContext.getCatalog().getAllDatabases(); + List<Tuple> tuples = new ArrayList<Tuple>(databases.size()); + List<Column> columns = outSchema.getColumns(); + Tuple aTuple; + + for (DatabaseProto database: databases) { + aTuple = new VTuple(outSchema.size()); + + for (int fieldId = 0; fieldId < columns.size(); fieldId++) { + Column column = columns.get(fieldId); + if ("db_id".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt4(database.getId())); + } else if ("db_name".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(database.getName())); + } else if ("space_id".equalsIgnoreCase(column.getSimpleName())) { + if (database.hasSpaceId()) { + aTuple.put(fieldId, DatumFactory.createInt4(database.getSpaceId())); + } else { + aTuple.put(fieldId, DatumFactory.createNullDatum()); + } + } + } + + tuples.add(aTuple); + } + + return tuples; + } + + private List<Tuple> getTables(Schema outSchema) { + List<TableDescriptorProto> tables = masterContext.getCatalog().getAllTables(); + List<Tuple> tuples = new ArrayList<Tuple>(tables.size()); + List<Column> columns = outSchema.getColumns(); + Tuple aTuple; + + for (TableDescriptorProto table: tables) { + aTuple = new VTuple(outSchema.size()); + + for (int fieldId = 0; fieldId < columns.size(); fieldId++) { + Column column = columns.get(fieldId); + if ("tid".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt4(table.getTid())); + } else if ("db_id".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt4(table.getDbId())); + } else if ("table_name".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(table.getName())); + } else if ("table_type".equalsIgnoreCase(column.getSimpleName())) { + if (table.hasTableType()) { + aTuple.put(fieldId, DatumFactory.createText(table.getTableType())); + } else { + aTuple.put(fieldId, DatumFactory.createNullDatum()); + } + } else if ("path".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(table.getPath())); + } else if ("store_type".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(table.getStoreType())); + } + } + + tuples.add(aTuple); + } + + return tuples; + } + + private List<Tuple> getColumns(Schema outSchema) { + List<ColumnProto> columnsList = masterContext.getCatalog().getAllColumns(); + List<Tuple> tuples = new ArrayList<Tuple>(columnsList.size()); + List<Column> columns = outSchema.getColumns(); + Tuple aTuple; + int columnId = 1, prevtid = -1, tid = 0; + + for (ColumnProto column: columnsList) { + aTuple = new VTuple(outSchema.size()); + + tid = column.getTid(); + if (prevtid != tid) { + columnId = 1; + prevtid = tid; + } + + for (int fieldId = 0; fieldId < columns.size(); fieldId++) { + Column colObj = columns.get(fieldId); + + if ("tid".equalsIgnoreCase(colObj.getSimpleName())) { + if (column.hasTid()) { + aTuple.put(fieldId, DatumFactory.createInt4(tid)); + } else { + aTuple.put(fieldId, DatumFactory.createNullDatum()); + } + } else if ("column_name".equalsIgnoreCase(colObj.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(column.getName())); + } else if ("ordinal_position".equalsIgnoreCase(colObj.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt4(columnId)); + } else if ("data_type".equalsIgnoreCase(colObj.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(column.getDataType().getType().toString())); + } else if ("type_length".equalsIgnoreCase(colObj.getSimpleName())) { + DataType dataType = column.getDataType(); + if (dataType.hasLength()) { + aTuple.put(fieldId, DatumFactory.createInt4(dataType.getLength())); + } else { + aTuple.put(fieldId, DatumFactory.createNullDatum()); + } + } + } + + columnId++; + tuples.add(aTuple); + } + + return tuples; + } + + private List<Tuple> getIndexes(Schema outSchema) { + List<IndexProto> indexList = masterContext.getCatalog().getAllIndexes(); + List<Tuple> tuples = new ArrayList<Tuple>(indexList.size()); + List<Column> columns = outSchema.getColumns(); + Tuple aTuple; + + for (IndexProto index: indexList) { + aTuple = new VTuple(outSchema.size()); + + for (int fieldId = 0; fieldId < columns.size(); fieldId++) { + Column column = columns.get(fieldId); + + if ("db_id".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt4(index.getDbId())); + } else if ("tid".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt4(index.getTId())); + } else if ("index_name".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(index.getIndexName())); + } else if ("column_name".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(index.getColumnName())); + } else if ("data_type".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(index.getDataType())); + } else if ("index_type".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(index.getIndexType())); + } else if ("is_unique".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createBool(index.getIsUnique())); + } else if ("is_clustered".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createBool(index.getIsClustered())); + } else if ("is_ascending".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createBool(index.getIsAscending())); + } + } + + tuples.add(aTuple); + } + + return tuples; + } + + private List<Tuple> getAllTableOptions(Schema outSchema) { + List<TableOptionProto> optionList = masterContext.getCatalog().getAllTableOptions(); + List<Tuple> tuples = new ArrayList<Tuple>(optionList.size()); + List<Column> columns = outSchema.getColumns(); + Tuple aTuple; + + for (TableOptionProto option: optionList) { + aTuple = new VTuple(outSchema.size()); + + for (int fieldId = 0; fieldId < columns.size(); fieldId++) { + Column column = columns.get(fieldId); + + if ("tid".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt4(option.getTid())); + } else if ("key_".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(option.getKeyval().getKey())); + } else if ("value_".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(option.getKeyval().getValue())); + } + } + + tuples.add(aTuple); + } + + return tuples; + } + + private List<Tuple> getAllTableStats(Schema outSchema) { + List<TableStatsProto> statList = masterContext.getCatalog().getAllTableStats(); + List<Tuple> tuples = new ArrayList<Tuple>(statList.size()); + List<Column> columns = outSchema.getColumns(); + Tuple aTuple; + + for (TableStatsProto stat: statList) { + aTuple = new VTuple(outSchema.size()); + + for (int fieldId = 0; fieldId < columns.size(); fieldId++) { + Column column = columns.get(fieldId); + + if ("tid".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt4(stat.getTid())); + } else if ("num_rows".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt8(stat.getNumRows())); + } else if ("num_bytes".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt8(stat.getNumBytes())); + } + } + + tuples.add(aTuple); + } + + return tuples; + } + + private List<Tuple> getAllPartitions(Schema outSchema) { + List<TablePartitionProto> partitionList = masterContext.getCatalog().getAllPartitions(); + List<Tuple> tuples = new ArrayList<Tuple>(partitionList.size()); + List<Column> columns = outSchema.getColumns(); + Tuple aTuple; + + for (TablePartitionProto partition: partitionList) { + aTuple = new VTuple(outSchema.size()); + + for (int fieldId = 0; fieldId < columns.size(); fieldId++) { + Column column = columns.get(fieldId); + + if ("pid".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt4(partition.getPid())); + } else if ("tid".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt4(partition.getTid())); + } else if ("partition_name".equalsIgnoreCase(column.getSimpleName())) { + if (partition.hasPartitionName()) { + aTuple.put(fieldId, DatumFactory.createText(partition.getPartitionName())); + } else { + aTuple.put(fieldId, DatumFactory.createNullDatum()); + } + } else if ("ordinal_position".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt4(partition.getOrdinalPosition())); + } else if ("path".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(partition.getPath())); + } + } + + tuples.add(aTuple); + } + + return tuples; + } + + private List<Tuple> fetchSystemTable(TableDesc tableDesc, Schema inSchema) { + List<Tuple> tuples = null; + String tableName = CatalogUtil.extractSimpleName(tableDesc.getName()); + + if ("tablespace".equalsIgnoreCase(tableName)) { + tuples = getTablespaces(inSchema); + } else if ("databases".equalsIgnoreCase(tableName)) { + tuples = getDatabases(inSchema); + } else if ("tables".equalsIgnoreCase(tableName)) { + tuples = getTables(inSchema); + } else if ("columns".equalsIgnoreCase(tableName)) { + tuples = getColumns(inSchema); + } else if ("indexes".equalsIgnoreCase(tableName)) { + tuples = getIndexes(inSchema); + } else if ("table_options".equalsIgnoreCase(tableName)) { + tuples = getAllTableOptions(inSchema); + } else if ("table_stats".equalsIgnoreCase(tableName)) { + tuples = getAllTableStats(inSchema); + } else if ("partitions".equalsIgnoreCase(tableName)) { + tuples = getAllPartitions(inSchema); + } + + return tuples; + } + + @Override + public List<ByteString> getNextRows(int fetchRowNum) throws IOException { + List<ByteString> rows = new ArrayList<ByteString>(); + int startRow = currentRow; + int endRow = startRow + fetchRowNum; + + if (physicalExec == null) { + return rows; + } + + while (currentRow < endRow) { + Tuple currentTuple = physicalExec.next(); + + if (currentTuple == null) { + physicalExec.close(); + physicalExec = null; + break; + } + + currentRow++; + rows.add(ByteString.copyFrom(encoder.toBytes(currentTuple))); + + if (currentRow >= maxRow) { + physicalExec.close(); + physicalExec = null; + break; + } + } + + return rows; + } + + @Override + public QueryId getQueryId() { + return queryId; + } + + @Override + public String getSessionId() { + return sessionId; + } + + @Override + public TableDesc getTableDesc() { + return tableDesc; + } + + @Override + public Schema getLogicalSchema() { + return outSchema; + } + + class SimplePhysicalPlannerImpl extends PhysicalPlannerImpl { + + public SimplePhysicalPlannerImpl(TajoConf conf) { + super(conf); + } + + @Override + public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> node) + throws IOException { + return new SystemPhysicalExec(ctx, scanNode); + } + + @Override + public PhysicalExec createIndexScanExec(TaskAttemptContext ctx, IndexScanNode annotation) throws IOException { + return new SystemPhysicalExec(ctx, annotation); + } + } + + class SystemPhysicalExec extends PhysicalExec { + + private ScanNode scanNode; + private EvalNode qual; + private Projector projector; + private TableStats tableStats; + private final List<Tuple> cachedData; + private int currentRow; + private boolean isClosed; + + public SystemPhysicalExec(TaskAttemptContext context, ScanNode scanNode) { + super(context, scanNode.getInSchema(), scanNode.getOutSchema()); + this.scanNode = scanNode; + this.qual = this.scanNode.getQual(); + cachedData = TUtil.newList(); + currentRow = 0; + isClosed = false; + + projector = new Projector(context, inSchema, outSchema, scanNode.getTargets()); + } + + @Override + public Tuple next() throws IOException { + Tuple aTuple = null; + Tuple outTuple = new VTuple(outColumnNum); + + if (isClosed) { + return null; + } + + if (cachedData.size() == 0) { + rescan(); + } + + if (!scanNode.hasQual()) { + if (currentRow < cachedData.size()) { + aTuple = cachedData.get(currentRow++); + projector.eval(aTuple, outTuple); + outTuple.setOffset(aTuple.getOffset()); + return outTuple; + } + return null; + } else { + while (currentRow < cachedData.size()) { + aTuple = cachedData.get(currentRow++); + if (qual.eval(inSchema, aTuple).isTrue()) { + projector.eval(aTuple, outTuple); + return outTuple; + } + } + return null; + } + } + + @Override + public void rescan() throws IOException { + cachedData.clear(); + cachedData.addAll(fetchSystemTable(scanNode.getTableDesc(), inSchema)); + + tableStats = new TableStats(); + tableStats.setNumRows(cachedData.size()); + } + + @Override + public void close() throws IOException { + scanNode = null; + qual = null; + projector = null; + cachedData.clear(); + currentRow = -1; + isClosed = true; + } + + @Override + public float getProgress() { + return 1.0f; + } + + @Override + protected void compile() throws CompilationError { + if (scanNode.hasQual()) { + qual = context.getPrecompiledEval(inSchema, qual); + } + } + + @Override + public TableStats getInputStats() { + return tableStats; + } + + } + +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index 2242445..2fbebc1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -41,15 +41,12 @@ import org.apache.tajo.engine.planner.physical.StoreTableExec; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse; -import org.apache.tajo.master.NonForwardQueryResultFileScanner; -import org.apache.tajo.master.NonForwardQueryResultScanner; -import org.apache.tajo.master.NonForwardQueryResultSystemScanner; -import org.apache.tajo.master.TajoMaster; +import org.apache.tajo.master.*; import org.apache.tajo.master.exec.prehook.CreateTableHook; import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager; import org.apache.tajo.master.exec.prehook.InsertIntoHook; -import org.apache.tajo.master.querymaster.*; -import org.apache.tajo.master.session.Session; +import org.apache.tajo.querymaster.*; +import org.apache.tajo.session.Session; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.PlanningException; import org.apache.tajo.plan.Target; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/ha/HAService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ha/HAService.java b/tajo-core/src/main/java/org/apache/tajo/master/ha/HAService.java deleted file mode 100644 index 3d6669c..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/ha/HAService.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.ha; - -import java.io.IOException; -import java.util.List; - -/** - * The HAService is responsible for setting active TajoMaster on startup or when the - * current active is changing (eg due to failure), monitoring the health of TajoMaster. - * - */ -public interface HAService { - - /** - * Add master name to shared storage. - */ - public void register() throws IOException; - - - /** - * Delete master name to shared storage. - * - */ - public void delete() throws IOException; - - /** - * - * @return True if current master is an active master. - */ - public boolean isActiveStatus(); - - /** - * - * @return return all master list - * @throws IOException - */ - public List<TajoMasterInfo> getMasters() throws IOException; - -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/ha/HAServiceHDFSImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ha/HAServiceHDFSImpl.java b/tajo-core/src/main/java/org/apache/tajo/master/ha/HAServiceHDFSImpl.java deleted file mode 100644 index 45219b3..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/ha/HAServiceHDFSImpl.java +++ /dev/null @@ -1,318 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.ha; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.net.NetUtils; -import org.apache.tajo.TajoConstants; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ha.HAConstants; -import org.apache.tajo.master.TajoMaster; -import org.apache.tajo.master.TajoMaster.MasterContext; -import org.apache.tajo.ha.HAServiceUtil; -import org.apache.tajo.util.TUtil; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.List; - -/** - * This implements HAService utilizing HDFS cluster. This saves master status to HDFS cluster. - * - */ -public class HAServiceHDFSImpl implements HAService { - private static Log LOG = LogFactory.getLog(HAServiceHDFSImpl.class); - - private MasterContext context; - private TajoConf conf; - - private FileSystem fs; - - private String masterName; - private Path rootPath; - private Path haPath; - private Path activePath; - private Path backupPath; - - private boolean isActiveStatus = false; - - //thread which runs periodically to see the last time since a heartbeat is received. - private Thread checkerThread; - private volatile boolean stopped = false; - - private int monitorInterval; - - private String currentActiveMaster; - - public HAServiceHDFSImpl(MasterContext context) throws IOException { - this.context = context; - this.conf = context.getConf(); - initSystemDirectory(); - - InetSocketAddress socketAddress = context.getTajoMasterService().getBindAddress(); - this.masterName = socketAddress.getAddress().getHostAddress() + ":" + socketAddress.getPort(); - - monitorInterval = conf.getIntVar(TajoConf.ConfVars.TAJO_MASTER_HA_MONITOR_INTERVAL); - } - - private void initSystemDirectory() throws IOException { - // Get Tajo root dir - this.rootPath = TajoConf.getTajoRootDir(conf); - - // Check Tajo root dir - this.fs = rootPath.getFileSystem(conf); - - // Check and create Tajo system HA dir - haPath = TajoConf.getSystemHADir(conf); - if (!fs.exists(haPath)) { - fs.mkdirs(haPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION)); - LOG.info("System HA dir '" + haPath + "' is created"); - } - - activePath = new Path(haPath, TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME); - if (!fs.exists(activePath)) { - fs.mkdirs(activePath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION)); - LOG.info("System HA Active dir '" + activePath + "' is created"); - } - - backupPath = new Path(haPath, TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME); - if (!fs.exists(backupPath)) { - fs.mkdirs(backupPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION)); - LOG.info("System HA Backup dir '" + backupPath + "' is created"); - } - } - - private void startPingChecker() { - if (checkerThread == null) { - checkerThread = new Thread(new PingChecker()); - checkerThread.setName("Ping Checker"); - checkerThread.start(); - } - } - - @Override - public void register() throws IOException { - FileStatus[] files = fs.listStatus(activePath); - - // Phase 1: If there is not another active master, this try to become active master. - if (files.length == 0) { - createMasterFile(true); - currentActiveMaster = masterName; - LOG.info(String.format("This is added to active master (%s)", masterName)); - } else { - // Phase 2: If there is active master information, we need to check its status. - Path activePath = files[0].getPath(); - currentActiveMaster = activePath.getName().replaceAll("_", ":"); - - // Phase 3: If current active master is dead, this master should be active master. - if (!HAServiceUtil.isMasterAlive(currentActiveMaster, conf)) { - fs.delete(activePath, true); - createMasterFile(true); - currentActiveMaster = masterName; - LOG.info(String.format("This is added to active master (%s)", masterName)); - } else { - // Phase 4: If current active master is alive, this master need to be backup master. - createMasterFile(false); - LOG.info(String.format("This is added to backup masters (%s)", masterName)); - } - } - } - - private void createMasterFile(boolean isActive) throws IOException { - String fileName = masterName.replaceAll(":", "_"); - Path path = null; - - if (isActive) { - path = new Path(activePath, fileName); - } else { - path = new Path(backupPath, fileName); - } - - StringBuilder sb = new StringBuilder(); - InetSocketAddress address = getHostAddress(HAConstants.MASTER_CLIENT_RPC_ADDRESS); - sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_"); - - address = getHostAddress(HAConstants.RESOURCE_TRACKER_RPC_ADDRESS); - sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_"); - - address = getHostAddress(HAConstants.CATALOG_ADDRESS); - sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_"); - - address = getHostAddress(HAConstants.MASTER_INFO_ADDRESS); - sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()); - - FSDataOutputStream out = fs.create(path); - - try { - out.writeUTF(sb.toString()); - out.hflush(); - out.close(); - } catch (FileAlreadyExistsException e) { - createMasterFile(false); - } - - if (isActive) { - isActiveStatus = true; - } else { - isActiveStatus = false; - } - - startPingChecker(); - } - - - private InetSocketAddress getHostAddress(int type) { - InetSocketAddress address = null; - - switch (type) { - case HAConstants.MASTER_UMBILICAL_RPC_ADDRESS: - address = context.getConf().getSocketAddrVar(TajoConf.ConfVars - .TAJO_MASTER_UMBILICAL_RPC_ADDRESS); - break; - case HAConstants.MASTER_CLIENT_RPC_ADDRESS: - address = context.getConf().getSocketAddrVar(TajoConf.ConfVars - .TAJO_MASTER_CLIENT_RPC_ADDRESS); - break; - case HAConstants.RESOURCE_TRACKER_RPC_ADDRESS: - address = context.getConf().getSocketAddrVar(TajoConf.ConfVars - .RESOURCE_TRACKER_RPC_ADDRESS); - break; - case HAConstants.CATALOG_ADDRESS: - address = context.getConf().getSocketAddrVar(TajoConf.ConfVars - .CATALOG_ADDRESS); - break; - case HAConstants.MASTER_INFO_ADDRESS: - address = context.getConf().getSocketAddrVar(TajoConf.ConfVars - .TAJO_MASTER_INFO_ADDRESS); - default: - break; - } - - return NetUtils.createSocketAddr(masterName.split(":")[0] + ":" + address.getPort()); - } - - @Override - public void delete() throws IOException { - String fileName = masterName.replaceAll(":", "_"); - - Path activeFile = new Path(activePath, fileName); - if (fs.exists(activeFile)) { - fs.delete(activeFile, true); - } - - Path backupFile = new Path(backupPath, fileName); - if (fs.exists(backupFile)) { - fs.delete(backupFile, true); - } - if (isActiveStatus) { - isActiveStatus = false; - } - stopped = true; - } - - @Override - public boolean isActiveStatus() { - return isActiveStatus; - } - - @Override - public List<TajoMasterInfo> getMasters() throws IOException { - List<TajoMasterInfo> list = TUtil.newList(); - Path path = null; - - FileStatus[] files = fs.listStatus(activePath); - if (files.length == 1) { - path = files[0].getPath(); - list.add(createTajoMasterInfo(path, true)); - } - - files = fs.listStatus(backupPath); - for (FileStatus status : files) { - path = status.getPath(); - list.add(createTajoMasterInfo(path, false)); - } - - return list; - } - - private TajoMasterInfo createTajoMasterInfo(Path path, boolean isActive) throws IOException { - String masterAddress = path.getName().replaceAll("_", ":"); - boolean isAlive = HAServiceUtil.isMasterAlive(masterAddress, conf); - - FSDataInputStream stream = fs.open(path); - String data = stream.readUTF(); - - stream.close(); - - String[] addresses = data.split("_"); - TajoMasterInfo info = new TajoMasterInfo(); - - info.setTajoMasterAddress(NetUtils.createSocketAddr(masterAddress)); - info.setTajoClientAddress(NetUtils.createSocketAddr(addresses[0])); - info.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(addresses[1])); - info.setCatalogAddress(NetUtils.createSocketAddr(addresses[2])); - info.setWebServerAddress(NetUtils.createSocketAddr(addresses[3])); - - info.setAvailable(isAlive); - info.setActive(isActive); - - return info; - } - - private class PingChecker implements Runnable { - @Override - public void run() { - while (!stopped && !Thread.currentThread().isInterrupted()) { - synchronized (HAServiceHDFSImpl.this) { - try { - if (!currentActiveMaster.equals(masterName)) { - boolean isAlive = HAServiceUtil.isMasterAlive(currentActiveMaster, conf); - if (LOG.isDebugEnabled()) { - LOG.debug("currentActiveMaster:" + currentActiveMaster + ", thisMasterName:" + masterName - + ", isAlive:" + isAlive); - } - - // If active master is dead, this master should be active master instead of - // previous active master. - if (!isAlive) { - FileStatus[] files = fs.listStatus(activePath); - if (files.length == 0 || (files.length == 1 - && currentActiveMaster.equals(files[0].getPath().getName().replaceAll("_", ":")))) { - delete(); - register(); - } - } - } - } catch (Exception e) { - e.printStackTrace(); - } - } - try { - Thread.sleep(monitorInterval); - } catch (InterruptedException e) { - LOG.info("PingChecker interrupted. - masterName:" + masterName); - break; - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/ha/TajoMasterInfo.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ha/TajoMasterInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/ha/TajoMasterInfo.java deleted file mode 100644 index 6ed975a..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/ha/TajoMasterInfo.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.ha; - -import java.net.InetSocketAddress; - -public class TajoMasterInfo { - - private boolean available; - private boolean isActive; - - private InetSocketAddress tajoMasterAddress; - private InetSocketAddress tajoClientAddress; - private InetSocketAddress workerResourceTrackerAddr; - private InetSocketAddress catalogAddress; - private InetSocketAddress webServerAddress; - - public InetSocketAddress getTajoMasterAddress() { - return tajoMasterAddress; - } - - public void setTajoMasterAddress(InetSocketAddress tajoMasterAddress) { - this.tajoMasterAddress = tajoMasterAddress; - } - - public InetSocketAddress getTajoClientAddress() { - return tajoClientAddress; - } - - public void setTajoClientAddress(InetSocketAddress tajoClientAddress) { - this.tajoClientAddress = tajoClientAddress; - } - - public InetSocketAddress getWorkerResourceTrackerAddr() { - return workerResourceTrackerAddr; - } - - public void setWorkerResourceTrackerAddr(InetSocketAddress workerResourceTrackerAddr) { - this.workerResourceTrackerAddr = workerResourceTrackerAddr; - } - - public InetSocketAddress getCatalogAddress() { - return catalogAddress; - } - - public void setCatalogAddress(InetSocketAddress catalogAddress) { - this.catalogAddress = catalogAddress; - } - - public InetSocketAddress getWebServerAddress() { - return webServerAddress; - } - - public void setWebServerAddress(InetSocketAddress webServerAddress) { - this.webServerAddress = webServerAddress; - } - - public boolean isAvailable() { - return available; - } - - public void setAvailable(boolean available) { - this.available = available; - } - - public boolean isActive() { - return isActive; - } - - public void setActive(boolean active) { - isActive = active; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java b/tajo-core/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java deleted file mode 100644 index 7c3d283..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.metrics; - -import com.codahale.metrics.Gauge; -import com.codahale.metrics.Metric; -import com.codahale.metrics.MetricSet; -import org.apache.tajo.master.TajoMaster; - -import java.util.HashMap; -import java.util.Map; - -import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; - -public class CatalogMetricsGaugeSet implements MetricSet { - TajoMaster.MasterContext tajoMasterContext; - public CatalogMetricsGaugeSet(TajoMaster.MasterContext tajoMasterContext) { - this.tajoMasterContext = tajoMasterContext; - } - - @Override - public Map<String, Metric> getMetrics() { - Map<String, Metric> metricsMap = new HashMap<String, Metric>(); - metricsMap.put("numTables", new Gauge<Integer>() { - @Override - public Integer getValue() { - return tajoMasterContext.getCatalog().getAllTableNames(DEFAULT_DATABASE_NAME).size(); - } - }); - - metricsMap.put("numFunctions", new Gauge<Integer>() { - @Override - public Integer getValue() { - return tajoMasterContext.getCatalog().getFunctions().size(); - } - }); - - return metricsMap; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/metrics/WorkerResourceMetricsGaugeSet.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/metrics/WorkerResourceMetricsGaugeSet.java b/tajo-core/src/main/java/org/apache/tajo/master/metrics/WorkerResourceMetricsGaugeSet.java deleted file mode 100644 index 993d3b7..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/metrics/WorkerResourceMetricsGaugeSet.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.metrics; - -import com.codahale.metrics.Gauge; -import com.codahale.metrics.Metric; -import com.codahale.metrics.MetricSet; -import org.apache.tajo.master.TajoMaster; -import org.apache.tajo.master.rm.Worker; -import org.apache.tajo.master.rm.WorkerState; - -import java.util.HashMap; -import java.util.Map; - -public class WorkerResourceMetricsGaugeSet implements MetricSet { - TajoMaster.MasterContext tajoMasterContext; - public WorkerResourceMetricsGaugeSet(TajoMaster.MasterContext tajoMasterContext) { - this.tajoMasterContext = tajoMasterContext; - } - - @Override - public Map<String, Metric> getMetrics() { - Map<String, Metric> metricsMap = new HashMap<String, Metric>(); - metricsMap.put("totalWorkers", new Gauge<Integer>() { - @Override - public Integer getValue() { - return tajoMasterContext.getResourceManager().getWorkers().size(); - } - }); - - metricsMap.put("liveWorkers", new Gauge<Integer>() { - @Override - public Integer getValue() { - return getNumWorkers(WorkerState.RUNNING); - } - }); - - metricsMap.put("deadWorkers", new Gauge<Integer>() { - @Override - public Integer getValue() { - return getNumWorkers(WorkerState.LOST); - } - }); - - return metricsMap; - } - - protected int getNumWorkers(WorkerState status) { - int numWorkers = 0; - for(Worker eachWorker: tajoMasterContext.getResourceManager().getWorkers().values()) { - if(eachWorker.getState() == status) { - numWorkers++; - } - } - - return numWorkers; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java deleted file mode 100644 index a626df1..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java +++ /dev/null @@ -1,738 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.querymaster; - -import com.google.common.collect.Maps; -import org.apache.commons.lang.exception.ExceptionUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.ContentSummary; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.state.*; -import org.apache.hadoop.yarn.util.Clock; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryId; -import org.apache.tajo.SessionVars; -import org.apache.tajo.TajoProtos.QueryState; -import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto; -import org.apache.tajo.catalog.CatalogService; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.engine.planner.global.ExecutionBlock; -import org.apache.tajo.engine.planner.global.ExecutionBlockCursor; -import org.apache.tajo.engine.planner.global.MasterPlan; -import org.apache.tajo.plan.logical.*; -import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.master.event.*; -import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.storage.StorageManager; -import org.apache.tajo.storage.StorageConstants; -import org.apache.tajo.util.TUtil; -import org.apache.tajo.util.history.QueryHistory; -import org.apache.tajo.util.history.StageHistory; - -import java.io.IOException; -import java.util.*; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -public class Query implements EventHandler<QueryEvent> { - private static final Log LOG = LogFactory.getLog(Query.class); - - // Facilities for Query - private final TajoConf systemConf; - private final Clock clock; - private String queryStr; - private Map<ExecutionBlockId, Stage> stages; - private final EventHandler eventHandler; - private final MasterPlan plan; - QueryMasterTask.QueryMasterTaskContext context; - private ExecutionBlockCursor cursor; - - // Query Status - private final QueryId id; - private long appSubmitTime; - private long startTime; - private long finishTime; - private TableDesc resultDesc; - private int completedStagesCount = 0; - private int successedStagesCount = 0; - private int killedStagesCount = 0; - private int failedStagesCount = 0; - private int erroredStagesCount = 0; - private final List<String> diagnostics = new ArrayList<String>(); - - // Internal Variables - private final Lock readLock; - private final Lock writeLock; - private int priority = 100; - - // State Machine - private final StateMachine<QueryState, QueryEventType, QueryEvent> stateMachine; - private QueryState queryState; - - // Transition Handler - private static final SingleArcTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition(); - private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition(); - private static final StageCompletedTransition STAGE_COMPLETED_TRANSITION = new StageCompletedTransition(); - private static final QueryCompletedTransition QUERY_COMPLETED_TRANSITION = new QueryCompletedTransition(); - - protected static final StateMachineFactory - <Query,QueryState,QueryEventType,QueryEvent> stateMachineFactory = - new StateMachineFactory<Query, QueryState, QueryEventType, QueryEvent> - (QueryState.QUERY_NEW) - - // Transitions from NEW state - .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_RUNNING, - QueryEventType.START, - new StartTransition()) - .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_NEW, - QueryEventType.DIAGNOSTIC_UPDATE, - DIAGNOSTIC_UPDATE_TRANSITION) - .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_KILLED, - QueryEventType.KILL, - new KillNewQueryTransition()) - .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_ERROR, - QueryEventType.INTERNAL_ERROR, - INTERNAL_ERROR_TRANSITION) - - // Transitions from RUNNING state - .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING, - QueryEventType.STAGE_COMPLETED, - STAGE_COMPLETED_TRANSITION) - .addTransition(QueryState.QUERY_RUNNING, - EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED, - QueryState.QUERY_ERROR), - QueryEventType.QUERY_COMPLETED, - QUERY_COMPLETED_TRANSITION) - .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING, - QueryEventType.DIAGNOSTIC_UPDATE, - DIAGNOSTIC_UPDATE_TRANSITION) - .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_KILL_WAIT, - QueryEventType.KILL, - new KillAllStagesTransition()) - .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_ERROR, - QueryEventType.INTERNAL_ERROR, - INTERNAL_ERROR_TRANSITION) - - // Transitions from QUERY_SUCCEEDED state - .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED, - QueryEventType.DIAGNOSTIC_UPDATE, - DIAGNOSTIC_UPDATE_TRANSITION) - // ignore-able transitions - .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED, - QueryEventType.STAGE_COMPLETED, - STAGE_COMPLETED_TRANSITION) - .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED, - QueryEventType.KILL) - .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_ERROR, - QueryEventType.INTERNAL_ERROR, - INTERNAL_ERROR_TRANSITION) - - // Transitions from KILL_WAIT state - .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT, - QueryEventType.STAGE_COMPLETED, - STAGE_COMPLETED_TRANSITION) - .addTransition(QueryState.QUERY_KILL_WAIT, - EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED, - QueryState.QUERY_ERROR), - QueryEventType.QUERY_COMPLETED, - QUERY_COMPLETED_TRANSITION) - .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT, - QueryEventType.DIAGNOSTIC_UPDATE, - DIAGNOSTIC_UPDATE_TRANSITION) - .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_ERROR, - QueryEventType.INTERNAL_ERROR, - INTERNAL_ERROR_TRANSITION) - // Ignore-able transitions - .addTransition(QueryState.QUERY_KILL_WAIT, EnumSet.of(QueryState.QUERY_KILLED), - QueryEventType.KILL, - QUERY_COMPLETED_TRANSITION) - - // Transitions from FAILED state - .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED, - QueryEventType.DIAGNOSTIC_UPDATE, - DIAGNOSTIC_UPDATE_TRANSITION) - .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_ERROR, - QueryEventType.INTERNAL_ERROR, - INTERNAL_ERROR_TRANSITION) - // Ignore-able transitions - .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED, - QueryEventType.KILL) - - // Transitions from ERROR state - .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR, - QueryEventType.DIAGNOSTIC_UPDATE, - DIAGNOSTIC_UPDATE_TRANSITION) - .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR, - QueryEventType.INTERNAL_ERROR, - INTERNAL_ERROR_TRANSITION) - // Ignore-able transitions - .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR, - EnumSet.of(QueryEventType.KILL, QueryEventType.STAGE_COMPLETED)) - - .installTopology(); - - public Query(final QueryMasterTask.QueryMasterTaskContext context, final QueryId id, - final long appSubmitTime, - final String queryStr, - final EventHandler eventHandler, - final MasterPlan plan) { - this.context = context; - this.systemConf = context.getConf(); - this.id = id; - this.clock = context.getClock(); - this.appSubmitTime = appSubmitTime; - this.queryStr = queryStr; - this.stages = Maps.newConcurrentMap(); - this.eventHandler = eventHandler; - this.plan = plan; - this.cursor = new ExecutionBlockCursor(plan, true); - - StringBuilder sb = new StringBuilder("\n======================================================="); - sb.append("\nThe order of execution: \n"); - int order = 1; - while (cursor.hasNext()) { - ExecutionBlock currentEB = cursor.nextBlock(); - sb.append("\n").append(order).append(": ").append(currentEB.getId()); - order++; - } - sb.append("\n======================================================="); - LOG.info(sb); - cursor.reset(); - - ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - this.readLock = readWriteLock.readLock(); - this.writeLock = readWriteLock.writeLock(); - - stateMachine = stateMachineFactory.make(this); - queryState = stateMachine.getCurrentState(); - } - - public float getProgress() { - QueryState state = getState(); - if (state == QueryState.QUERY_SUCCEEDED) { - return 1.0f; - } else { - int idx = 0; - List<Stage> tempStages = new ArrayList<Stage>(); - synchronized(stages) { - tempStages.addAll(stages.values()); - } - - float [] subProgresses = new float[tempStages.size()]; - for (Stage stage: tempStages) { - if (stage.getState() != StageState.NEW) { - subProgresses[idx] = stage.getProgress(); - } else { - subProgresses[idx] = 0.0f; - } - idx++; - } - - float totalProgress = 0.0f; - float proportion = 1.0f / (float)(getExecutionBlockCursor().size() - 1); // minus one is due to - - for (int i = 0; i < subProgresses.length; i++) { - totalProgress += subProgresses[i] * proportion; - } - - return totalProgress; - } - } - - public long getAppSubmitTime() { - return this.appSubmitTime; - } - - public long getStartTime() { - return startTime; - } - - public void setStartTime() { - startTime = clock.getTime(); - } - - public long getFinishTime() { - return finishTime; - } - - public void setFinishTime() { - finishTime = clock.getTime(); - } - - public QueryHistory getQueryHistory() { - QueryHistory queryHistory = makeQueryHistory(); - queryHistory.setStageHistories(makeStageHistories()); - return queryHistory; - } - - private List<StageHistory> makeStageHistories() { - List<StageHistory> stageHistories = new ArrayList<StageHistory>(); - for(Stage eachStage : getStages()) { - stageHistories.add(eachStage.getStageHistory()); - } - - return stageHistories; - } - - private QueryHistory makeQueryHistory() { - QueryHistory queryHistory = new QueryHistory(); - - queryHistory.setQueryId(getId().toString()); - queryHistory.setQueryMaster(context.getQueryMasterContext().getWorkerContext().getWorkerName()); - queryHistory.setHttpPort(context.getQueryMasterContext().getWorkerContext().getConnectionInfo().getHttpInfoPort()); - queryHistory.setLogicalPlan(plan.toString()); - queryHistory.setLogicalPlan(plan.getLogicalPlan().toString()); - queryHistory.setDistributedPlan(plan.toString()); - - List<String[]> sessionVariables = new ArrayList<String[]>(); - for(Map.Entry<String,String> entry: plan.getContext().getAllKeyValus().entrySet()) { - if (SessionVars.exists(entry.getKey()) && SessionVars.isPublic(SessionVars.get(entry.getKey()))) { - sessionVariables.add(new String[]{entry.getKey(), entry.getValue()}); - } - } - queryHistory.setSessionVariables(sessionVariables); - - return queryHistory; - } - - public List<String> getDiagnostics() { - readLock.lock(); - try { - return diagnostics; - } finally { - readLock.unlock(); - } - } - - protected void addDiagnostic(String diag) { - diagnostics.add(diag); - } - - public TableDesc getResultDesc() { - return resultDesc; - } - - public void setResultDesc(TableDesc desc) { - resultDesc = desc; - } - - public MasterPlan getPlan() { - return plan; - } - - public StateMachine<QueryState, QueryEventType, QueryEvent> getStateMachine() { - return stateMachine; - } - - public void addStage(Stage stage) { - stages.put(stage.getId(), stage); - } - - public QueryId getId() { - return this.id; - } - - public Stage getStage(ExecutionBlockId id) { - return this.stages.get(id); - } - - public Collection<Stage> getStages() { - return this.stages.values(); - } - - public QueryState getSynchronizedState() { - readLock.lock(); - try { - return stateMachine.getCurrentState(); - } finally { - readLock.unlock(); - } - } - - /* non-blocking call for client API */ - public QueryState getState() { - return queryState; - } - - public ExecutionBlockCursor getExecutionBlockCursor() { - return cursor; - } - - public static class StartTransition - implements SingleArcTransition<Query, QueryEvent> { - - @Override - public void transition(Query query, QueryEvent queryEvent) { - - query.setStartTime(); - Stage stage = new Stage(query.context, query.getPlan(), - query.getExecutionBlockCursor().nextBlock()); - stage.setPriority(query.priority--); - query.addStage(stage); - - stage.handle(new StageEvent(stage.getId(), StageEventType.SQ_INIT)); - LOG.debug("Schedule unit plan: \n" + stage.getBlock().getPlan()); - } - } - - public static class QueryCompletedTransition implements MultipleArcTransition<Query, QueryEvent, QueryState> { - - @Override - public QueryState transition(Query query, QueryEvent queryEvent) { - QueryCompletedEvent stageEvent = (QueryCompletedEvent) queryEvent; - QueryState finalState; - - if (stageEvent.getState() == StageState.SUCCEEDED) { - finalState = finalizeQuery(query, stageEvent); - } else if (stageEvent.getState() == StageState.FAILED) { - finalState = QueryState.QUERY_FAILED; - } else if (stageEvent.getState() == StageState.KILLED) { - finalState = QueryState.QUERY_KILLED; - } else { - finalState = QueryState.QUERY_ERROR; - } - if (finalState != QueryState.QUERY_SUCCEEDED) { - Stage lastStage = query.getStage(stageEvent.getExecutionBlockId()); - if (lastStage != null && lastStage.getTableMeta() != null) { - StoreType storeType = lastStage.getTableMeta().getStoreType(); - if (storeType != null) { - LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot(); - try { - StorageManager.getStorageManager(query.systemConf, storeType).rollbackOutputCommit(rootNode.getChild()); - } catch (IOException e) { - LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e); - } - } - } - } - query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId())); - query.setFinishTime(); - - return finalState; - } - - private QueryState finalizeQuery(Query query, QueryCompletedEvent event) { - Stage lastStage = query.getStage(event.getExecutionBlockId()); - StoreType storeType = lastStage.getTableMeta().getStoreType(); - try { - LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot(); - CatalogService catalog = lastStage.getContext().getQueryMasterContext().getWorkerContext().getCatalog(); - TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild()); - - Path finalOutputDir = StorageManager.getStorageManager(query.systemConf, storeType) - .commitOutputData(query.context.getQueryContext(), - lastStage.getId(), lastStage.getMasterPlan().getLogicalPlan(), lastStage.getSchema(), tableDesc); - - QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext()); - hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), finalOutputDir); - } catch (Exception e) { - query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e))); - return QueryState.QUERY_ERROR; - } - - return QueryState.QUERY_SUCCEEDED; - } - - private static interface QueryHook { - boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir); - void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query, - ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception; - } - - private class QueryHookExecutor { - private List<QueryHook> hookList = TUtil.newList(); - private QueryMaster.QueryMasterContext context; - - public QueryHookExecutor(QueryMaster.QueryMasterContext context) { - this.context = context; - hookList.add(new MaterializedResultHook()); - hookList.add(new CreateTableHook()); - hookList.add(new InsertTableHook()); - } - - public void execute(QueryContext queryContext, Query query, - ExecutionBlockId finalExecBlockId, - Path finalOutputDir) throws Exception { - for (QueryHook hook : hookList) { - if (hook.isEligible(queryContext, query, finalExecBlockId, finalOutputDir)) { - hook.execute(context, queryContext, query, finalExecBlockId, finalOutputDir); - } - } - } - } - - private class MaterializedResultHook implements QueryHook { - - @Override - public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, - Path finalOutputDir) { - Stage lastStage = query.getStage(finalExecBlockId); - NodeType type = lastStage.getBlock().getPlan().getType(); - return type != NodeType.CREATE_TABLE && type != NodeType.INSERT; - } - - @Override - public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, - Query query, ExecutionBlockId finalExecBlockId, - Path finalOutputDir) throws Exception { - Stage lastStage = query.getStage(finalExecBlockId); - TableMeta meta = lastStage.getTableMeta(); - - String nullChar = queryContext.get(SessionVars.NULL_CHAR); - meta.putOption(StorageConstants.TEXT_NULL, nullChar); - - TableStats stats = lastStage.getResultStats(); - - TableDesc resultTableDesc = - new TableDesc( - query.getId().toString(), - lastStage.getSchema(), - meta, - finalOutputDir.toUri()); - resultTableDesc.setExternal(true); - - stats.setNumBytes(getTableVolume(query.systemConf, finalOutputDir)); - resultTableDesc.setStats(stats); - query.setResultDesc(resultTableDesc); - } - } - - private class CreateTableHook implements QueryHook { - - @Override - public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, - Path finalOutputDir) { - Stage lastStage = query.getStage(finalExecBlockId); - return lastStage.getBlock().getPlan().getType() == NodeType.CREATE_TABLE; - } - - @Override - public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, - Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception { - CatalogService catalog = context.getWorkerContext().getCatalog(); - Stage lastStage = query.getStage(finalExecBlockId); - TableStats stats = lastStage.getResultStats(); - - CreateTableNode createTableNode = (CreateTableNode) lastStage.getBlock().getPlan(); - TableMeta meta = new TableMeta(createTableNode.getStorageType(), createTableNode.getOptions()); - - TableDesc tableDescTobeCreated = - new TableDesc( - createTableNode.getTableName(), - createTableNode.getTableSchema(), - meta, - finalOutputDir.toUri()); - tableDescTobeCreated.setExternal(createTableNode.isExternal()); - - if (createTableNode.hasPartition()) { - tableDescTobeCreated.setPartitionMethod(createTableNode.getPartitionMethod()); - } - - stats.setNumBytes(getTableVolume(query.systemConf, finalOutputDir)); - tableDescTobeCreated.setStats(stats); - query.setResultDesc(tableDescTobeCreated); - - catalog.createTable(tableDescTobeCreated); - } - } - - private class InsertTableHook implements QueryHook { - - @Override - public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, - Path finalOutputDir) { - Stage lastStage = query.getStage(finalExecBlockId); - return lastStage.getBlock().getPlan().getType() == NodeType.INSERT; - } - - @Override - public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, - Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) - throws Exception { - - CatalogService catalog = context.getWorkerContext().getCatalog(); - Stage lastStage = query.getStage(finalExecBlockId); - TableMeta meta = lastStage.getTableMeta(); - TableStats stats = lastStage.getResultStats(); - - InsertNode insertNode = (InsertNode) lastStage.getBlock().getPlan(); - - TableDesc finalTable; - if (insertNode.hasTargetTable()) { - String tableName = insertNode.getTableName(); - finalTable = catalog.getTableDesc(tableName); - } else { - String tableName = query.getId().toString(); - finalTable = new TableDesc(tableName, lastStage.getSchema(), meta, finalOutputDir.toUri()); - } - - long volume = getTableVolume(query.systemConf, finalOutputDir); - stats.setNumBytes(volume); - finalTable.setStats(stats); - - if (insertNode.hasTargetTable()) { - UpdateTableStatsProto.Builder builder = UpdateTableStatsProto.newBuilder(); - builder.setTableName(finalTable.getName()); - builder.setStats(stats.getProto()); - - catalog.updateTableStats(builder.build()); - } - - query.setResultDesc(finalTable); - } - } - } - - public static long getTableVolume(TajoConf systemConf, Path tablePath) throws IOException { - FileSystem fs = tablePath.getFileSystem(systemConf); - ContentSummary directorySummary = fs.getContentSummary(tablePath); - return directorySummary.getLength(); - } - - public static class StageCompletedTransition implements SingleArcTransition<Query, QueryEvent> { - - private boolean hasNext(Query query) { - ExecutionBlockCursor cursor = query.getExecutionBlockCursor(); - ExecutionBlock nextBlock = cursor.peek(); - return !query.getPlan().isTerminal(nextBlock); - } - - private void executeNextBlock(Query query) { - ExecutionBlockCursor cursor = query.getExecutionBlockCursor(); - ExecutionBlock nextBlock = cursor.nextBlock(); - Stage nextStage = new Stage(query.context, query.getPlan(), nextBlock); - nextStage.setPriority(query.priority--); - query.addStage(nextStage); - nextStage.handle(new StageEvent(nextStage.getId(), StageEventType.SQ_INIT)); - - LOG.info("Scheduling Stage:" + nextStage.getId()); - if(LOG.isDebugEnabled()) { - LOG.debug("Scheduling Stage's Priority: " + nextStage.getPriority()); - LOG.debug("Scheduling Stage's Plan: \n" + nextStage.getBlock().getPlan()); - } - } - - @Override - public void transition(Query query, QueryEvent event) { - try { - query.completedStagesCount++; - StageCompletedEvent castEvent = (StageCompletedEvent) event; - - if (castEvent.getState() == StageState.SUCCEEDED) { - query.successedStagesCount++; - } else if (castEvent.getState() == StageState.KILLED) { - query.killedStagesCount++; - } else if (castEvent.getState() == StageState.FAILED) { - query.failedStagesCount++; - } else if (castEvent.getState() == StageState.ERROR) { - query.erroredStagesCount++; - } else { - LOG.error(String.format("Invalid Stage (%s) State %s at %s", - castEvent.getExecutionBlockId().toString(), castEvent.getState().name(), query.getSynchronizedState().name())); - query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR)); - } - - // if a stage is succeeded and a query is running - if (castEvent.getState() == StageState.SUCCEEDED && // latest stage succeeded - query.getSynchronizedState() == QueryState.QUERY_RUNNING && // current state is not in KILL_WAIT, FAILED, or ERROR. - hasNext(query)) { // there remains at least one stage. - query.getStage(castEvent.getExecutionBlockId()).waitingIntermediateReport(); - executeNextBlock(query); - } else { // if a query is completed due to finished, kill, failure, or error - query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState())); - } - } catch (Throwable t) { - LOG.error(t.getMessage(), t); - query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR)); - } - } - } - - private static class DiagnosticsUpdateTransition implements SingleArcTransition<Query, QueryEvent> { - @Override - public void transition(Query query, QueryEvent event) { - query.addDiagnostic(((QueryDiagnosticsUpdateEvent) event).getDiagnosticUpdate()); - } - } - - private static class KillNewQueryTransition implements SingleArcTransition<Query, QueryEvent> { - @Override - public void transition(Query query, QueryEvent event) { - query.setFinishTime(); - query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId())); - } - } - - private static class KillAllStagesTransition implements SingleArcTransition<Query, QueryEvent> { - @Override - public void transition(Query query, QueryEvent event) { - synchronized (query.stages) { - for (Stage stage : query.stages.values()) { - query.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL)); - } - } - } - } - - private static class InternalErrorTransition implements SingleArcTransition<Query, QueryEvent> { - - @Override - public void transition(Query query, QueryEvent event) { - query.setFinishTime(); - query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId())); - } - } - - @Override - public void handle(QueryEvent event) { - LOG.info("Processing " + event.getQueryId() + " of type " + event.getType()); - try { - writeLock.lock(); - QueryState oldState = getSynchronizedState(); - try { - getStateMachine().doTransition(event.getType(), event); - queryState = getSynchronizedState(); - } catch (InvalidStateTransitonException e) { - LOG.error("Can't handle this event at current state" - + ", type:" + event - + ", oldState:" + oldState.name() - + ", nextState:" + getSynchronizedState().name() - , e); - eventHandler.handle(new QueryEvent(this.id, QueryEventType.INTERNAL_ERROR)); - } - - //notify the eventhandler of state change - if (oldState != getSynchronizedState()) { - LOG.info(id + " Query Transitioned from " + oldState + " to " + getSynchronizedState()); - } - } - - finally { - writeLock.unlock(); - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java deleted file mode 100644 index 0a87990..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java +++ /dev/null @@ -1,300 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.querymaster; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.tajo.QueryId; -import org.apache.tajo.TajoProtos; -import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.ipc.QueryMasterProtocol; -import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService; -import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto; -import org.apache.tajo.master.TajoMaster; -import org.apache.tajo.master.rm.WorkerResourceManager; -import org.apache.tajo.master.session.Session; -import org.apache.tajo.plan.logical.LogicalRootNode; -import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.NullCallback; -import org.apache.tajo.rpc.RpcConnectionPool; -import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; -import org.apache.tajo.util.NetUtils; - -import java.net.InetSocketAddress; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource; - -public class QueryInProgress extends CompositeService { - private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName()); - - private QueryId queryId; - - private Session session; - - private AsyncDispatcher dispatcher; - - private LogicalRootNode plan; - - private AtomicBoolean querySubmitted = new AtomicBoolean(false); - - private AtomicBoolean stopped = new AtomicBoolean(false); - - private QueryInfo queryInfo; - - private final TajoMaster.MasterContext masterContext; - - private NettyClientBase queryMasterRpc; - - private QueryMasterProtocolService queryMasterRpcClient; - - public QueryInProgress( - TajoMaster.MasterContext masterContext, - Session session, - QueryContext queryContext, - QueryId queryId, String sql, String jsonExpr, LogicalRootNode plan) { - super(QueryInProgress.class.getName()); - this.masterContext = masterContext; - this.session = session; - this.queryId = queryId; - this.plan = plan; - - queryInfo = new QueryInfo(queryId, queryContext, sql, jsonExpr); - queryInfo.setStartTime(System.currentTimeMillis()); - } - - @Override - public void init(Configuration conf) { - dispatcher = new AsyncDispatcher(); - this.addService(dispatcher); - - dispatcher.register(QueryJobEvent.Type.class, new QueryInProgressEventHandler()); - super.init(conf); - } - - public synchronized void kill() { - if(queryMasterRpcClient != null){ - queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get()); - } - } - - @Override - public void stop() { - if(stopped.getAndSet(true)) { - return; - } - - LOG.info("========================================================="); - LOG.info("Stop query:" + queryId); - - masterContext.getResourceManager().stopQueryMaster(queryId); - - long startTime = System.currentTimeMillis(); - while(true) { - try { - if(masterContext.getResourceManager().isQueryMasterStopped(queryId)) { - LOG.info(queryId + " QueryMaster stopped"); - break; - } - } catch (Exception e) { - LOG.error(e.getMessage(), e); - break; - } - - try { - synchronized (this){ - wait(100); - } - } catch (InterruptedException e) { - break; - } - if(System.currentTimeMillis() - startTime > 60 * 1000) { - LOG.warn("Failed to stop QueryMaster:" + queryId); - break; - } - } - - if(queryMasterRpc != null) { - RpcConnectionPool.getPool(masterContext.getConf()).closeConnection(queryMasterRpc); - } - - masterContext.getHistoryWriter().appendHistory(queryInfo); - super.stop(); - } - - @Override - public void start() { - super.start(); - } - - public EventHandler getEventHandler() { - return dispatcher.getEventHandler(); - } - - - - public boolean startQueryMaster() { - try { - LOG.info("Initializing QueryInProgress for QueryID=" + queryId); - WorkerResourceManager resourceManager = masterContext.getResourceManager(); - WorkerAllocatedResource resource = resourceManager.allocateQueryMaster(this); - - // if no resource to allocate a query master - if(resource == null) { - LOG.info("No Available Resources for QueryMaster"); - return false; - } - - queryInfo.setQueryMaster(resource.getConnectionInfo().getHost()); - queryInfo.setQueryMasterPort(resource.getConnectionInfo().getQueryMasterPort()); - queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort()); - queryInfo.setQueryMasterInfoPort(resource.getConnectionInfo().getHttpInfoPort()); - - getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, queryInfo)); - - return true; - } catch (Exception e) { - catchException(e); - return false; - } - } - - class QueryInProgressEventHandler implements EventHandler<QueryJobEvent> { - @Override - public void handle(QueryJobEvent queryJobEvent) { - if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) { - heartbeat(queryJobEvent.getQueryInfo()); - } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_MASTER_START) { - QueryInProgress queryInProgress = masterContext.getQueryJobManager().getQueryInProgress(queryId); - queryInProgress.getEventHandler().handle( - new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, queryInProgress.getQueryInfo())); - } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_START) { - submmitQueryToMaster(); - } else if (queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) { - kill(); - } - } - } - - private void connectQueryMaster() throws Exception { - InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort()); - LOG.info("Connect to QueryMaster:" + addr); - queryMasterRpc = - RpcConnectionPool.getPool(masterContext.getConf()).getConnection(addr, QueryMasterProtocol.class, true); - queryMasterRpcClient = queryMasterRpc.getStub(); - } - - private synchronized void submmitQueryToMaster() { - if(querySubmitted.get()) { - return; - } - - try { - if(queryMasterRpcClient == null) { - connectQueryMaster(); - } - if(queryMasterRpcClient == null) { - LOG.info("No QueryMaster conneciton info."); - //TODO wait - return; - } - LOG.info("Call executeQuery to :" + - queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort() + "," + queryId); - - QueryExecutionRequestProto.Builder builder = TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder(); - builder.setQueryId(queryId.getProto()) - .setQueryContext(queryInfo.getQueryContext().getProto()) - .setSession(session.getProto()) - .setExprInJson(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getJsonExpr())) - .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build()); - - queryMasterRpcClient.executeQuery(null, builder.build(), NullCallback.get()); - querySubmitted.set(true); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } - } - - public void catchException(Exception e) { - LOG.error(e.getMessage(), e); - queryInfo.setQueryState(TajoProtos.QueryState.QUERY_FAILED); - queryInfo.setLastMessage(StringUtils.stringifyException(e)); - } - - public QueryId getQueryId() { - return queryId; - } - - public QueryInfo getQueryInfo() { - return this.queryInfo; - } - - public boolean isStarted() { - return !stopped.get() && this.querySubmitted.get(); - } - - private void heartbeat(QueryInfo queryInfo) { - LOG.info("Received QueryMaster heartbeat:" + queryInfo); - - // to avoid partial update by different heartbeats - synchronized (this.queryInfo) { - - // terminal state will let client to retrieve a query result - // So, we must set the query result before changing query state - if (isFinishState(queryInfo.getQueryState())) { - if (queryInfo.hasResultdesc()) { - this.queryInfo.setResultDesc(queryInfo.getResultDesc()); - } - } - - this.queryInfo.setQueryState(queryInfo.getQueryState()); - this.queryInfo.setProgress(queryInfo.getProgress()); - this.queryInfo.setFinishTime(queryInfo.getFinishTime()); - - // Update diagnosis message - if (queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) { - this.queryInfo.setLastMessage(queryInfo.getLastMessage()); - LOG.info(queryId + queryInfo.getLastMessage()); - } - - // if any error occurs, print outs the error message - if (this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) { - LOG.warn(queryId + " failed, " + queryInfo.getLastMessage()); - } - - - if (isFinishState(this.queryInfo.getQueryState())) { - masterContext.getQueryJobManager().getEventHandler().handle( - new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_STOP, this.queryInfo)); - } - } - } - - private boolean isFinishState(TajoProtos.QueryState state) { - return state == TajoProtos.QueryState.QUERY_FAILED || - state == TajoProtos.QueryState.QUERY_KILLED || - state == TajoProtos.QueryState.QUERY_SUCCEEDED; - } -}
