http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java index cf6b917..4cf6ce2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java @@ -81,11 +81,11 @@ public class Repartitioner { private final static int HTTP_REQUEST_MAXIMUM_LENGTH = 1900; private final static String UNKNOWN_HOST = "unknown"; - public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerContext, SubQuery subQuery) + public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerContext, Stage stage) throws IOException { - MasterPlan masterPlan = subQuery.getMasterPlan(); - ExecutionBlock execBlock = subQuery.getBlock(); - QueryMasterTask.QueryMasterTaskContext masterContext = subQuery.getContext(); + MasterPlan masterPlan = stage.getMasterPlan(); + ExecutionBlock execBlock = stage.getBlock(); + QueryMasterTask.QueryMasterTaskContext masterContext = stage.getContext(); ScanNode[] scans = execBlock.getScanNodes(); @@ -98,17 +98,17 @@ public class Repartitioner { TableDesc tableDesc = masterContext.getTableDescMap().get(scans[i].getCanonicalName()); if (tableDesc == null) { // if it is a real table stored on storage FileStorageManager storageManager = - (FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf()); + (FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf()); tablePath = storageManager.getTablePath(scans[i].getTableName()); if (execBlock.getUnionScanMap() != null && !execBlock.getUnionScanMap().isEmpty()) { for (Map.Entry<ExecutionBlockId, ExecutionBlockId> unionScanEntry: execBlock.getUnionScanMap().entrySet()) { ExecutionBlockId originScanEbId = unionScanEntry.getKey(); - stats[i] += masterContext.getSubQuery(originScanEbId).getResultStats().getNumBytes(); + stats[i] += masterContext.getStage(originScanEbId).getResultStats().getNumBytes(); } } else { ExecutionBlockId scanEBId = TajoIdUtils.createExecutionBlockId(scans[i].getTableName()); - stats[i] = masterContext.getSubQuery(scanEBId).getResultStats().getNumBytes(); + stats[i] = masterContext.getStage(scanEBId).getResultStats().getNumBytes(); } fragments[i] = new FileFragment(scans[i].getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST}); } else { @@ -119,7 +119,7 @@ public class Repartitioner { } StorageManager storageManager = - StorageManager.getStorageManager(subQuery.getContext().getConf(), tableDesc.getMeta().getStoreType()); + StorageManager.getStorageManager(stage.getContext().getConf(), tableDesc.getMeta().getStoreType()); // if table has no data, storageManager will return empty FileFragment. // So, we need to handle FileFragment by its size. @@ -223,7 +223,7 @@ public class Repartitioner { execBlock.removeBroadcastTable(scans[baseScanIdx].getCanonicalName()); LOG.info(String.format("[Distributed Join Strategy] : Broadcast Join with all tables, base_table=%s, base_volume=%d", scans[baseScanIdx].getCanonicalName(), stats[baseScanIdx])); - scheduleLeafTasksWithBroadcastTable(schedulerContext, subQuery, baseScanIdx, fragments); + scheduleLeafTasksWithBroadcastTable(schedulerContext, stage, baseScanIdx, fragments); } else if (!execBlock.getBroadcastTables().isEmpty()) { // If some relations of this EB are broadcasted boolean hasNonLeafNode = false; List<Integer> largeScanIndexList = new ArrayList<Integer>(); @@ -266,7 +266,7 @@ public class Repartitioner { int baseScanIdx = largeScanIndexList.isEmpty() ? maxStatsScanIdx : largeScanIndexList.get(0); LOG.info(String.format("[Distributed Join Strategy] : Broadcast Join, base_table=%s, base_volume=%d", scans[baseScanIdx].getCanonicalName(), stats[baseScanIdx])); - scheduleLeafTasksWithBroadcastTable(schedulerContext, subQuery, baseScanIdx, fragments); + scheduleLeafTasksWithBroadcastTable(schedulerContext, stage, baseScanIdx, fragments); } else { if (largeScanIndexList.size() > 2) { throw new IOException("Symmetric Repartition Join should have two scan node, but " + nonLeafScanNames); @@ -292,12 +292,12 @@ public class Repartitioner { index++; } LOG.info(String.format("[Distributed Join Strategy] : Broadcast Join, join_node=%s", nonLeafScanNames)); - scheduleSymmetricRepartitionJoin(masterContext, schedulerContext, subQuery, + scheduleSymmetricRepartitionJoin(masterContext, schedulerContext, stage, intermediateScans, intermediateScanStats, intermediateFragments, broadcastScans, broadcastFragments); } } else { LOG.info("[Distributed Join Strategy] : Symmetric Repartition Join"); - scheduleSymmetricRepartitionJoin(masterContext, schedulerContext, subQuery, scans, stats, fragments, null, null); + scheduleSymmetricRepartitionJoin(masterContext, schedulerContext, stage, scans, stats, fragments, null, null); } } @@ -305,7 +305,7 @@ public class Repartitioner { * Scheduling in tech case of Symmetric Repartition Join * @param masterContext * @param schedulerContext - * @param subQuery + * @param stage * @param scans * @param stats * @param fragments @@ -313,21 +313,21 @@ public class Repartitioner { */ private static void scheduleSymmetricRepartitionJoin(QueryMasterTask.QueryMasterTaskContext masterContext, TaskSchedulerContext schedulerContext, - SubQuery subQuery, + Stage stage, ScanNode[] scans, long[] stats, Fragment[] fragments, ScanNode[] broadcastScans, Fragment[] broadcastFragments) throws IOException { - MasterPlan masterPlan = subQuery.getMasterPlan(); - ExecutionBlock execBlock = subQuery.getBlock(); + MasterPlan masterPlan = stage.getMasterPlan(); + ExecutionBlock execBlock = stage.getBlock(); // The hash map is modeling as follows: // <Part Id, <EbId, List<Intermediate Data>>> Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> hashEntries = new HashMap<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>>(); // Grouping IntermediateData by a partition key and a table name - List<ExecutionBlock> childBlocks = masterPlan.getChilds(subQuery.getId()); + List<ExecutionBlock> childBlocks = masterPlan.getChilds(stage.getId()); // In the case of join with union, there is one ScanNode for union. Map<ExecutionBlockId, ExecutionBlockId> unionScanMap = execBlock.getUnionScanMap(); @@ -336,7 +336,7 @@ public class Repartitioner { if (scanEbId == null) { scanEbId = childBlock.getId(); } - SubQuery childExecSM = subQuery.getContext().getSubQuery(childBlock.getId()); + Stage childExecSM = stage.getContext().getStage(childBlock.getId()); if (childExecSM.getHashShuffleIntermediateEntries() != null && !childExecSM.getHashShuffleIntermediateEntries().isEmpty()) { @@ -387,7 +387,7 @@ public class Repartitioner { // Getting the desire number of join tasks according to the volumn // of a larger table int largerIdx = stats[0] >= stats[1] ? 0 : 1; - int desireJoinTaskVolumn = subQuery.getMasterPlan().getContext().getInt(SessionVars.JOIN_TASK_INPUT_SIZE); + int desireJoinTaskVolumn = stage.getMasterPlan().getContext().getInt(SessionVars.JOIN_TASK_INPUT_SIZE); // calculate the number of tasks according to the data size int mb = (int) Math.ceil((double) stats[largerIdx] / 1048576); @@ -412,7 +412,7 @@ public class Repartitioner { TableDesc tableDesc = masterContext.getTableDescMap().get(eachScan.getCanonicalName()); if (eachScan.getType() == NodeType.PARTITIONS_SCAN) { FileStorageManager storageManager = - (FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf()); + (FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf()); PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)eachScan; partitionScanPaths = partitionScan.getInputPaths(); @@ -420,7 +420,7 @@ public class Repartitioner { getFragmentsFromPartitionedTable(storageManager, eachScan, tableDesc); partitionScan.setInputPaths(partitionScanPaths); } else { - StorageManager storageManager = StorageManager.getStorageManager(subQuery.getContext().getConf(), + StorageManager storageManager = StorageManager.getStorageManager(stage.getContext().getConf(), tableDesc.getMeta().getStoreType()); Collection<Fragment> scanFragments = storageManager.getSplits(eachScan.getCanonicalName(), tableDesc, eachScan); @@ -430,12 +430,12 @@ public class Repartitioner { } } } - SubQuery.scheduleFragment(subQuery, fragments[0], rightFragments); + Stage.scheduleFragment(stage, fragments[0], rightFragments); // Assign partitions to tasks in a round robin manner. for (Entry<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> entry : hashEntries.entrySet()) { - addJoinShuffle(subQuery, entry.getKey(), entry.getValue()); + addJoinShuffle(stage, entry.getKey(), entry.getValue()); } schedulerContext.setTaskSize((int) Math.ceil((double) bothFetchSize / joinTaskNum)); @@ -503,9 +503,9 @@ public class Repartitioner { return fragments; } - private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext schedulerContext, SubQuery subQuery, + private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext schedulerContext, Stage stage, int baseScanId, Fragment[] fragments) throws IOException { - ExecutionBlock execBlock = subQuery.getBlock(); + ExecutionBlock execBlock = stage.getBlock(); ScanNode[] scans = execBlock.getScanNodes(); for (int i = 0; i < scans.length; i++) { @@ -527,7 +527,7 @@ public class Repartitioner { List<Fragment> broadcastFragments = new ArrayList<Fragment>(); for (int i = 0; i < scans.length; i++) { ScanNode scan = scans[i]; - TableDesc desc = subQuery.getContext().getTableDescMap().get(scan.getCanonicalName()); + TableDesc desc = stage.getContext().getTableDescMap().get(scan.getCanonicalName()); TableMeta meta = desc.getMeta(); Collection<Fragment> scanFragments; @@ -537,11 +537,11 @@ public class Repartitioner { partitionScanPaths = partitionScan.getInputPaths(); // set null to inputPaths in getFragmentsFromPartitionedTable() FileStorageManager storageManager = - (FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf()); + (FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf()); scanFragments = getFragmentsFromPartitionedTable(storageManager, scan, desc); } else { StorageManager storageManager = - StorageManager.getStorageManager(subQuery.getContext().getConf(), desc.getMeta().getStoreType()); + StorageManager.getStorageManager(stage.getContext().getConf(), desc.getMeta().getStoreType()); scanFragments = storageManager.getSplits(scan.getCanonicalName(), desc, scan); } @@ -565,14 +565,14 @@ public class Repartitioner { throw new IOException("No fragments for " + scans[baseScanId].getTableName()); } - SubQuery.scheduleFragments(subQuery, baseFragments, broadcastFragments); + Stage.scheduleFragments(stage, baseFragments, broadcastFragments); schedulerContext.setEstimatedTaskNum(baseFragments.size()); } - private static void addJoinShuffle(SubQuery subQuery, int partitionId, + private static void addJoinShuffle(Stage stage, int partitionId, Map<ExecutionBlockId, List<IntermediateEntry>> grouppedPartitions) { Map<String, List<FetchImpl>> fetches = new HashMap<String, List<FetchImpl>>(); - for (ExecutionBlock execBlock : subQuery.getMasterPlan().getChilds(subQuery.getId())) { + for (ExecutionBlock execBlock : stage.getMasterPlan().getChilds(stage.getId())) { if (grouppedPartitions.containsKey(execBlock.getId())) { Collection<FetchImpl> requests = mergeShuffleRequest(partitionId, HASH_SHUFFLE, grouppedPartitions.get(execBlock.getId())); @@ -581,10 +581,10 @@ public class Repartitioner { } if (fetches.isEmpty()) { - LOG.info(subQuery.getId() + "'s " + partitionId + " partition has empty result."); + LOG.info(stage.getId() + "'s " + partitionId + " partition has empty result."); return; } - SubQuery.scheduleFetches(subQuery, fetches); + Stage.scheduleFetches(stage, fetches); } /** @@ -616,14 +616,14 @@ public class Repartitioner { } public static void scheduleFragmentsForNonLeafTasks(TaskSchedulerContext schedulerContext, - MasterPlan masterPlan, SubQuery subQuery, int maxNum) + MasterPlan masterPlan, Stage stage, int maxNum) throws IOException { - DataChannel channel = masterPlan.getIncomingChannels(subQuery.getBlock().getId()).get(0); + DataChannel channel = masterPlan.getIncomingChannels(stage.getBlock().getId()).get(0); if (channel.getShuffleType() == HASH_SHUFFLE || channel.getShuffleType() == SCATTERED_HASH_SHUFFLE) { - scheduleHashShuffledFetches(schedulerContext, masterPlan, subQuery, channel, maxNum); + scheduleHashShuffledFetches(schedulerContext, masterPlan, stage, channel, maxNum); } else if (channel.getShuffleType() == RANGE_SHUFFLE) { - scheduleRangeShuffledFetches(schedulerContext, masterPlan, subQuery, channel, maxNum); + scheduleRangeShuffledFetches(schedulerContext, masterPlan, stage, channel, maxNum); } else { throw new InternalException("Cannot support partition type"); } @@ -634,22 +634,22 @@ public class Repartitioner { List<TableStats> tableStatses = new ArrayList<TableStats>(); List<ExecutionBlock> childBlocks = masterPlan.getChilds(parentBlockId); for (ExecutionBlock childBlock : childBlocks) { - SubQuery childExecSM = context.getSubQuery(childBlock.getId()); - tableStatses.add(childExecSM.getResultStats()); + Stage childStage = context.getStage(childBlock.getId()); + tableStatses.add(childStage.getResultStats()); } return StatisticsUtil.aggregateTableStat(tableStatses); } public static void scheduleRangeShuffledFetches(TaskSchedulerContext schedulerContext, MasterPlan masterPlan, - SubQuery subQuery, DataChannel channel, int maxNum) + Stage stage, DataChannel channel, int maxNum) throws IOException { - ExecutionBlock execBlock = subQuery.getBlock(); + ExecutionBlock execBlock = stage.getBlock(); ScanNode scan = execBlock.getScanNodes()[0]; Path tablePath; - tablePath = ((FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf())) + tablePath = ((FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf())) .getTablePath(scan.getTableName()); - ExecutionBlock sampleChildBlock = masterPlan.getChild(subQuery.getId(), 0); + ExecutionBlock sampleChildBlock = masterPlan.getChild(stage.getId(), 0); SortNode sortNode = PlannerUtil.findTopNode(sampleChildBlock.getPlan(), NodeType.SORT); SortSpec [] sortSpecs = sortNode.getSortKeys(); Schema sortSchema = new Schema(channel.getShuffleKeys()); @@ -658,7 +658,7 @@ public class Repartitioner { int determinedTaskNum; // calculate the number of maximum query ranges - TableStats totalStat = computeChildBlocksStats(subQuery.getContext(), masterPlan, subQuery.getId()); + TableStats totalStat = computeChildBlocksStats(stage.getContext(), masterPlan, stage.getId()); // If there is an empty table in inner join, it should return zero rows. if (totalStat.getNumBytes() == 0 && totalStat.getColumnStats().size() == 0) { @@ -668,15 +668,15 @@ public class Repartitioner { if (sortNode.getSortPurpose() == SortPurpose.STORAGE_SPECIFIED) { StoreType storeType = PlannerUtil.getStoreType(masterPlan.getLogicalPlan()); - CatalogService catalog = subQuery.getContext().getQueryMasterContext().getWorkerContext().getCatalog(); + CatalogService catalog = stage.getContext().getQueryMasterContext().getWorkerContext().getCatalog(); LogicalRootNode rootNode = masterPlan.getLogicalPlan().getRootBlock().getRoot(); TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild()); if (tableDesc == null) { throw new IOException("Can't get table meta data from catalog: " + PlannerUtil.getStoreTableName(masterPlan.getLogicalPlan())); } - ranges = StorageManager.getStorageManager(subQuery.getContext().getConf(), storeType) - .getInsertSortRanges(subQuery.getContext().getQueryContext(), tableDesc, + ranges = StorageManager.getStorageManager(stage.getContext().getConf(), storeType) + .getInsertSortRanges(stage.getContext().getQueryContext(), tableDesc, sortNode.getInSchema(), sortSpecs, mergedRange); determinedTaskNum = ranges.length; @@ -687,36 +687,36 @@ public class Repartitioner { // if the number of the range cardinality is less than the desired number of tasks, // we set the the number of tasks to the number of range cardinality. if (card.compareTo(BigInteger.valueOf(maxNum)) < 0) { - LOG.info(subQuery.getId() + ", The range cardinality (" + card + LOG.info(stage.getId() + ", The range cardinality (" + card + ") is less then the desired number of tasks (" + maxNum + ")"); determinedTaskNum = card.intValue(); } else { determinedTaskNum = maxNum; } - LOG.info(subQuery.getId() + ", Try to divide " + mergedRange + " into " + determinedTaskNum + + LOG.info(stage.getId() + ", Try to divide " + mergedRange + " into " + determinedTaskNum + " sub ranges (total units: " + determinedTaskNum + ")"); ranges = partitioner.partition(determinedTaskNum); if (ranges == null || ranges.length == 0) { - LOG.warn(subQuery.getId() + " no range infos."); + LOG.warn(stage.getId() + " no range infos."); } TupleUtil.setMaxRangeIfNull(sortSpecs, sortSchema, totalStat.getColumnStats(), ranges); if (LOG.isDebugEnabled()) { if (ranges != null) { for (TupleRange eachRange : ranges) { - LOG.debug(subQuery.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd()); + LOG.debug(stage.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd()); } } } } FileFragment dummyFragment = new FileFragment(scan.getTableName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST}); - SubQuery.scheduleFragment(subQuery, dummyFragment); + Stage.scheduleFragment(stage, dummyFragment); List<FetchImpl> fetches = new ArrayList<FetchImpl>(); - List<ExecutionBlock> childBlocks = masterPlan.getChilds(subQuery.getId()); + List<ExecutionBlock> childBlocks = masterPlan.getChilds(stage.getId()); for (ExecutionBlock childBlock : childBlocks) { - SubQuery childExecSM = subQuery.getContext().getSubQuery(childBlock.getId()); + Stage childExecSM = stage.getContext().getStage(childBlock.getId()); for (Task qu : childExecSM.getTasks()) { for (IntermediateEntry p : qu.getIntermediateData()) { FetchImpl fetch = new FetchImpl(p.getPullHost(), RANGE_SHUFFLE, childBlock.getId(), 0); @@ -758,12 +758,12 @@ public class Repartitioner { LOG.error(e); } - scheduleFetchesByRoundRobin(subQuery, map, scan.getTableName(), determinedTaskNum); + scheduleFetchesByRoundRobin(stage, map, scan.getTableName(), determinedTaskNum); schedulerContext.setEstimatedTaskNum(determinedTaskNum); } - public static void scheduleFetchesByRoundRobin(SubQuery subQuery, Map<?, Collection<FetchImpl>> partitions, + public static void scheduleFetchesByRoundRobin(Stage stage, Map<?, Collection<FetchImpl>> partitions, String tableName, int num) { int i; Map<String, List<FetchImpl>>[] fetchesArray = new Map[num]; @@ -777,7 +777,7 @@ public class Repartitioner { if (i == num) i = 0; } for (Map<String, List<FetchImpl>> eachFetches : fetchesArray) { - SubQuery.scheduleFetches(subQuery, eachFetches); + Stage.scheduleFetches(stage, eachFetches); } } @@ -807,18 +807,18 @@ public class Repartitioner { } public static void scheduleHashShuffledFetches(TaskSchedulerContext schedulerContext, MasterPlan masterPlan, - SubQuery subQuery, DataChannel channel, + Stage stage, DataChannel channel, int maxNum) throws IOException { - ExecutionBlock execBlock = subQuery.getBlock(); + ExecutionBlock execBlock = stage.getBlock(); ScanNode scan = execBlock.getScanNodes()[0]; Path tablePath; - tablePath = ((FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf())) + tablePath = ((FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf())) .getTablePath(scan.getTableName()); Fragment frag = new FileFragment(scan.getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST}); List<Fragment> fragments = new ArrayList<Fragment>(); fragments.add(frag); - SubQuery.scheduleFragments(subQuery, fragments); + Stage.scheduleFragments(stage, fragments); Map<Integer, FetchGroupMeta> finalFetches = new HashMap<Integer, FetchGroupMeta>(); Map<ExecutionBlockId, List<IntermediateEntry>> intermediates = new HashMap<ExecutionBlockId, @@ -826,7 +826,7 @@ public class Repartitioner { for (ExecutionBlock block : masterPlan.getChilds(execBlock)) { List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>(); - partitions.addAll(subQuery.getContext().getSubQuery(block.getId()).getHashShuffleIntermediateEntries()); + partitions.addAll(stage.getContext().getStage(block.getId()).getHashShuffleIntermediateEntries()); // In scattered hash shuffle, Collecting each IntermediateEntry if (channel.getShuffleType() == SCATTERED_HASH_SHUFFLE) { @@ -861,16 +861,16 @@ public class Repartitioner { } int groupingColumns = 0; - LogicalNode[] groupbyNodes = PlannerUtil.findAllNodes(subQuery.getBlock().getPlan(), + LogicalNode[] groupbyNodes = PlannerUtil.findAllNodes(stage.getBlock().getPlan(), new NodeType[]{NodeType.GROUP_BY, NodeType.DISTINCT_GROUP_BY}); if (groupbyNodes != null && groupbyNodes.length > 0) { LogicalNode bottomNode = groupbyNodes[0]; if (bottomNode.getType() == NodeType.GROUP_BY) { groupingColumns = ((GroupbyNode)bottomNode).getGroupingColumns().length; } else if (bottomNode.getType() == NodeType.DISTINCT_GROUP_BY) { - DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY); + DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(stage.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY); if (distinctNode == null) { - LOG.warn(subQuery.getId() + ", Can't find current DistinctGroupbyNode"); + LOG.warn(stage.getId() + ", Can't find current DistinctGroupbyNode"); distinctNode = (DistinctGroupbyNode)bottomNode; } groupingColumns = distinctNode.getGroupingColumns().length; @@ -879,8 +879,8 @@ public class Repartitioner { EnforceProperty property = PhysicalPlannerImpl.getAlgorithmEnforceProperty(enforcer, distinctNode); if (property != null) { if (property.getDistinct().getIsMultipleAggregation()) { - MultipleAggregationStage stage = property.getDistinct().getMultipleAggregationStage(); - if (stage != MultipleAggregationStage.THRID_STAGE) { + MultipleAggregationStage mulAggStage = property.getDistinct().getMultipleAggregationStage(); + if (mulAggStage != MultipleAggregationStage.THRID_STAGE) { groupingColumns = distinctNode.getOutSchema().size(); } } @@ -889,13 +889,13 @@ public class Repartitioner { } // get a proper number of tasks int determinedTaskNum = Math.min(maxNum, finalFetches.size()); - LOG.info(subQuery.getId() + ", ScheduleHashShuffledFetches - Max num=" + maxNum + ", finalFetchURI=" + finalFetches.size()); + LOG.info(stage.getId() + ", ScheduleHashShuffledFetches - Max num=" + maxNum + ", finalFetchURI=" + finalFetches.size()); if (groupingColumns == 0) { determinedTaskNum = 1; - LOG.info(subQuery.getId() + ", No Grouping Column - determinedTaskNum is set to 1"); + LOG.info(stage.getId() + ", No Grouping Column - determinedTaskNum is set to 1"); } else { - TableStats totalStat = computeChildBlocksStats(subQuery.getContext(), masterPlan, subQuery.getId()); + TableStats totalStat = computeChildBlocksStats(stage.getContext(), masterPlan, stage.getId()); if (totalStat.getNumRows() == 0) { determinedTaskNum = 1; } @@ -903,13 +903,13 @@ public class Repartitioner { // set the proper number of tasks to the estimated task num if (channel.getShuffleType() == SCATTERED_HASH_SHUFFLE) { - scheduleScatteredHashShuffleFetches(schedulerContext, subQuery, intermediates, + scheduleScatteredHashShuffleFetches(schedulerContext, stage, intermediates, scan.getTableName()); } else { schedulerContext.setEstimatedTaskNum(determinedTaskNum); // divide fetch uris into the the proper number of tasks according to volumes - scheduleFetchesByEvenDistributedVolumes(subQuery, finalFetches, scan.getTableName(), determinedTaskNum); - LOG.info(subQuery.getId() + ", DeterminedTaskNum : " + determinedTaskNum); + scheduleFetchesByEvenDistributedVolumes(stage, finalFetches, scan.getTableName(), determinedTaskNum); + LOG.info(stage.getId() + ", DeterminedTaskNum : " + determinedTaskNum); } } @@ -970,12 +970,12 @@ public class Repartitioner { return new Pair<Long[], Map<String, List<FetchImpl>>[]>(assignedVolumes, fetchesArray); } - public static void scheduleFetchesByEvenDistributedVolumes(SubQuery subQuery, Map<Integer, FetchGroupMeta> partitions, + public static void scheduleFetchesByEvenDistributedVolumes(Stage stage, Map<Integer, FetchGroupMeta> partitions, String tableName, int num) { Map<String, List<FetchImpl>>[] fetchsArray = makeEvenDistributedFetchImpl(partitions, tableName, num).getSecond(); // Schedule FetchImpls for (Map<String, List<FetchImpl>> eachFetches : fetchsArray) { - SubQuery.scheduleFetches(subQuery, eachFetches); + Stage.scheduleFetches(stage, eachFetches); } } @@ -987,12 +987,12 @@ public class Repartitioner { // to $DIST_QUERY_TABLE_PARTITION_VOLUME. Finally, each subgroup is assigned to a query unit. // It is usually used for writing partitioned tables. public static void scheduleScatteredHashShuffleFetches(TaskSchedulerContext schedulerContext, - SubQuery subQuery, Map<ExecutionBlockId, List<IntermediateEntry>> intermediates, + Stage stage, Map<ExecutionBlockId, List<IntermediateEntry>> intermediates, String tableName) { long splitVolume = StorageUnit.MB * - subQuery.getMasterPlan().getContext().getLong(SessionVars.TABLE_PARTITION_PER_SHUFFLE_SIZE); + stage.getMasterPlan().getContext().getLong(SessionVars.TABLE_PARTITION_PER_SHUFFLE_SIZE); long pageSize = StorageUnit.MB * - subQuery.getContext().getConf().getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME); // in bytes + stage.getContext().getConf().getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME); // in bytes if (pageSize >= splitVolume) { throw new RuntimeException("tajo.dist-query.table-partition.task-volume-mb should be great than " + "tajo.shuffle.hash.appender.page.volumn-mb"); @@ -1033,11 +1033,11 @@ public class Repartitioner { fetchesArray[i] = new HashMap<String, List<FetchImpl>>(); fetchesArray[i].put(tableName, entry); - SubQuery.scheduleFetches(subQuery, fetchesArray[i]); + Stage.scheduleFetches(stage, fetchesArray[i]); i++; } - LOG.info(subQuery.getId() + LOG.info(stage.getId() + ", ShuffleType:" + SCATTERED_HASH_SHUFFLE.name() + ", Intermediate Size: " + totalIntermediateSize + ", splitSize: " + splitVolume @@ -1207,16 +1207,16 @@ public class Repartitioner { return hashed; } - public static SubQuery setShuffleOutputNumForTwoPhase(SubQuery subQuery, final int desiredNum, DataChannel channel) { - ExecutionBlock execBlock = subQuery.getBlock(); + public static Stage setShuffleOutputNumForTwoPhase(Stage stage, final int desiredNum, DataChannel channel) { + ExecutionBlock execBlock = stage.getBlock(); Column[] keys; // if the next query is join, // set the partition number for the current logicalUnit // TODO: the union handling is required when a join has unions as its child - MasterPlan masterPlan = subQuery.getMasterPlan(); + MasterPlan masterPlan = stage.getMasterPlan(); keys = channel.getShuffleKeys(); - if (!masterPlan.isRoot(subQuery.getBlock()) ) { - ExecutionBlock parentBlock = masterPlan.getParent(subQuery.getBlock()); + if (!masterPlan.isRoot(stage.getBlock()) ) { + ExecutionBlock parentBlock = masterPlan.getParent(stage.getBlock()); if (parentBlock.getPlan().getType() == NodeType.JOIN) { channel.setShuffleOutputNum(desiredNum); } @@ -1246,6 +1246,6 @@ public class Repartitioner { channel.setShuffleOutputNum(desiredNum); } } - return subQuery; + return stage; } }
http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java new file mode 100644 index 0000000..e421417 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java @@ -0,0 +1,1342 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.querymaster; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.state.*; +import org.apache.hadoop.yarn.util.Records; +import org.apache.tajo.*; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.catalog.statistics.ColumnStats; +import org.apache.tajo.catalog.statistics.StatisticsUtil; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.json.CoreGsonHelper; +import org.apache.tajo.engine.planner.PhysicalPlannerImpl; +import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.engine.planner.global.DataChannel; +import org.apache.tajo.engine.planner.global.ExecutionBlock; +import org.apache.tajo.engine.planner.global.MasterPlan; +import org.apache.tajo.ipc.TajoMasterProtocol; +import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage; +import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty; +import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto; +import org.apache.tajo.master.*; +import org.apache.tajo.master.TaskRunnerGroupEvent.EventType; +import org.apache.tajo.master.event.*; +import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext; +import org.apache.tajo.master.querymaster.Task.IntermediateEntry; +import org.apache.tajo.master.container.TajoContainer; +import org.apache.tajo.master.container.TajoContainerId; +import org.apache.tajo.storage.FileStorageManager; +import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.plan.logical.*; +import org.apache.tajo.storage.StorageManager; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.history.TaskHistory; +import org.apache.tajo.util.history.StageHistory; +import org.apache.tajo.worker.FetchImpl; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.tajo.conf.TajoConf.ConfVars; +import static org.apache.tajo.plan.serder.PlanProto.ShuffleType; + + +/** + * Stage plays a role in controlling an ExecutionBlock and is a finite state machine. + */ +public class Stage implements EventHandler<StageEvent> { + + private static final Log LOG = LogFactory.getLog(Stage.class); + + private MasterPlan masterPlan; + private ExecutionBlock block; + private int priority; + private Schema schema; + private TableMeta meta; + private TableStats resultStatistics; + private TableStats inputStatistics; + private EventHandler<Event> eventHandler; + private AbstractTaskScheduler taskScheduler; + private QueryMasterTask.QueryMasterTaskContext context; + private final List<String> diagnostics = new ArrayList<String>(); + private StageState stageState; + + private long startTime; + private long finishTime; + + volatile Map<TaskId, Task> tasks = new ConcurrentHashMap<TaskId, Task>(); + volatile Map<TajoContainerId, TajoContainer> containers = new ConcurrentHashMap<TajoContainerId, + TajoContainer>(); + + private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition(); + private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition(); + private static final ContainerLaunchTransition CONTAINER_LAUNCH_TRANSITION = new ContainerLaunchTransition(); + private static final TaskCompletedTransition TASK_COMPLETED_TRANSITION = new TaskCompletedTransition(); + private static final AllocatedContainersCancelTransition CONTAINERS_CANCEL_TRANSITION = + new AllocatedContainersCancelTransition(); + private static final StageCompleteTransition STAGE_COMPLETED_TRANSITION = new StageCompleteTransition(); + private StateMachine<StageState, StageEventType, StageEvent> stateMachine; + + protected static final StateMachineFactory<Stage, StageState, + StageEventType, StageEvent> stateMachineFactory = + new StateMachineFactory <Stage, StageState, + StageEventType, StageEvent> (StageState.NEW) + + // Transitions from NEW state + .addTransition(StageState.NEW, + EnumSet.of(StageState.INITED, StageState.ERROR, StageState.SUCCEEDED), + StageEventType.SQ_INIT, + new InitAndRequestContainer()) + .addTransition(StageState.NEW, StageState.NEW, + StageEventType.SQ_DIAGNOSTIC_UPDATE, + DIAGNOSTIC_UPDATE_TRANSITION) + .addTransition(StageState.NEW, StageState.KILLED, + StageEventType.SQ_KILL) + .addTransition(StageState.NEW, StageState.ERROR, + StageEventType.SQ_INTERNAL_ERROR, + INTERNAL_ERROR_TRANSITION) + + // Transitions from INITED state + .addTransition(StageState.INITED, StageState.RUNNING, + StageEventType.SQ_CONTAINER_ALLOCATED, + CONTAINER_LAUNCH_TRANSITION) + .addTransition(StageState.INITED, StageState.INITED, + StageEventType.SQ_DIAGNOSTIC_UPDATE, + DIAGNOSTIC_UPDATE_TRANSITION) + .addTransition(StageState.INITED, StageState.KILL_WAIT, + StageEventType.SQ_KILL, new KillTasksTransition()) + .addTransition(StageState.INITED, StageState.ERROR, + StageEventType.SQ_INTERNAL_ERROR, + INTERNAL_ERROR_TRANSITION) + + // Transitions from RUNNING state + .addTransition(StageState.RUNNING, StageState.RUNNING, + StageEventType.SQ_CONTAINER_ALLOCATED, + CONTAINER_LAUNCH_TRANSITION) + .addTransition(StageState.RUNNING, StageState.RUNNING, + StageEventType.SQ_TASK_COMPLETED, + TASK_COMPLETED_TRANSITION) + .addTransition(StageState.RUNNING, + EnumSet.of(StageState.SUCCEEDED, StageState.FAILED), + StageEventType.SQ_STAGE_COMPLETED, + STAGE_COMPLETED_TRANSITION) + .addTransition(StageState.RUNNING, StageState.RUNNING, + StageEventType.SQ_FAILED, + TASK_COMPLETED_TRANSITION) + .addTransition(StageState.RUNNING, StageState.RUNNING, + StageEventType.SQ_DIAGNOSTIC_UPDATE, + DIAGNOSTIC_UPDATE_TRANSITION) + .addTransition(StageState.RUNNING, StageState.KILL_WAIT, + StageEventType.SQ_KILL, + new KillTasksTransition()) + .addTransition(StageState.RUNNING, StageState.ERROR, + StageEventType.SQ_INTERNAL_ERROR, + INTERNAL_ERROR_TRANSITION) + // Ignore-able Transition + .addTransition(StageState.RUNNING, StageState.RUNNING, + StageEventType.SQ_START) + + // Transitions from KILL_WAIT state + .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT, + StageEventType.SQ_CONTAINER_ALLOCATED, + CONTAINERS_CANCEL_TRANSITION) + .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT, + EnumSet.of(StageEventType.SQ_KILL), new KillTasksTransition()) + .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT, + StageEventType.SQ_TASK_COMPLETED, + TASK_COMPLETED_TRANSITION) + .addTransition(StageState.KILL_WAIT, + EnumSet.of(StageState.SUCCEEDED, StageState.FAILED, StageState.KILLED), + StageEventType.SQ_STAGE_COMPLETED, + STAGE_COMPLETED_TRANSITION) + .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT, + StageEventType.SQ_DIAGNOSTIC_UPDATE, + DIAGNOSTIC_UPDATE_TRANSITION) + .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT, + StageEventType.SQ_FAILED, + TASK_COMPLETED_TRANSITION) + .addTransition(StageState.KILL_WAIT, StageState.ERROR, + StageEventType.SQ_INTERNAL_ERROR, + INTERNAL_ERROR_TRANSITION) + + // Transitions from SUCCEEDED state + .addTransition(StageState.SUCCEEDED, StageState.SUCCEEDED, + StageEventType.SQ_CONTAINER_ALLOCATED, + CONTAINERS_CANCEL_TRANSITION) + .addTransition(StageState.SUCCEEDED, StageState.SUCCEEDED, + StageEventType.SQ_DIAGNOSTIC_UPDATE, + DIAGNOSTIC_UPDATE_TRANSITION) + .addTransition(StageState.SUCCEEDED, StageState.ERROR, + StageEventType.SQ_INTERNAL_ERROR, + INTERNAL_ERROR_TRANSITION) + // Ignore-able events + .addTransition(StageState.SUCCEEDED, StageState.SUCCEEDED, + EnumSet.of( + StageEventType.SQ_START, + StageEventType.SQ_KILL, + StageEventType.SQ_CONTAINER_ALLOCATED)) + + // Transitions from KILLED state + .addTransition(StageState.KILLED, StageState.KILLED, + StageEventType.SQ_CONTAINER_ALLOCATED, + CONTAINERS_CANCEL_TRANSITION) + .addTransition(StageState.KILLED, StageState.KILLED, + StageEventType.SQ_DIAGNOSTIC_UPDATE, + DIAGNOSTIC_UPDATE_TRANSITION) + .addTransition(StageState.KILLED, StageState.ERROR, + StageEventType.SQ_INTERNAL_ERROR, + INTERNAL_ERROR_TRANSITION) + // Ignore-able transitions + .addTransition(StageState.KILLED, StageState.KILLED, + EnumSet.of( + StageEventType.SQ_START, + StageEventType.SQ_KILL, + StageEventType.SQ_CONTAINER_ALLOCATED, + StageEventType.SQ_FAILED)) + + // Transitions from FAILED state + .addTransition(StageState.FAILED, StageState.FAILED, + StageEventType.SQ_CONTAINER_ALLOCATED, + CONTAINERS_CANCEL_TRANSITION) + .addTransition(StageState.FAILED, StageState.FAILED, + StageEventType.SQ_DIAGNOSTIC_UPDATE, + DIAGNOSTIC_UPDATE_TRANSITION) + .addTransition(StageState.FAILED, StageState.ERROR, + StageEventType.SQ_INTERNAL_ERROR, + INTERNAL_ERROR_TRANSITION) + // Ignore-able transitions + .addTransition(StageState.FAILED, StageState.FAILED, + EnumSet.of( + StageEventType.SQ_START, + StageEventType.SQ_KILL, + StageEventType.SQ_CONTAINER_ALLOCATED, + StageEventType.SQ_FAILED)) + + // Transitions from ERROR state + .addTransition(StageState.ERROR, StageState.ERROR, + StageEventType.SQ_CONTAINER_ALLOCATED, + CONTAINERS_CANCEL_TRANSITION) + .addTransition(StageState.ERROR, StageState.ERROR, + StageEventType.SQ_DIAGNOSTIC_UPDATE, + DIAGNOSTIC_UPDATE_TRANSITION) + // Ignore-able transitions + .addTransition(StageState.ERROR, StageState.ERROR, + EnumSet.of( + StageEventType.SQ_START, + StageEventType.SQ_KILL, + StageEventType.SQ_FAILED, + StageEventType.SQ_INTERNAL_ERROR, + StageEventType.SQ_STAGE_COMPLETED)) + + .installTopology(); + + private final Lock readLock; + private final Lock writeLock; + + private int totalScheduledObjectsCount; + private int succeededObjectCount = 0; + private int completedTaskCount = 0; + private int succeededTaskCount = 0; + private int killedObjectCount = 0; + private int failedObjectCount = 0; + private TaskSchedulerContext schedulerContext; + private List<IntermediateEntry> hashShuffleIntermediateEntries = new ArrayList<IntermediateEntry>(); + private AtomicInteger completeReportReceived = new AtomicInteger(0); + private StageHistory finalStageHistory; + + public Stage(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan, ExecutionBlock block) { + this.context = context; + this.masterPlan = masterPlan; + this.block = block; + this.eventHandler = context.getEventHandler(); + + ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + this.readLock = readWriteLock.readLock(); + this.writeLock = readWriteLock.writeLock(); + stateMachine = stateMachineFactory.make(this); + stageState = stateMachine.getCurrentState(); + } + + public static boolean isRunningState(StageState state) { + return state == StageState.INITED || state == StageState.NEW || state == StageState.RUNNING; + } + + public QueryMasterTask.QueryMasterTaskContext getContext() { + return context; + } + + public MasterPlan getMasterPlan() { + return masterPlan; + } + + public DataChannel getDataChannel() { + return masterPlan.getOutgoingChannels(getId()).iterator().next(); + } + + public EventHandler<Event> getEventHandler() { + return eventHandler; + } + + public AbstractTaskScheduler getTaskScheduler() { + return taskScheduler; + } + + public void setStartTime() { + startTime = context.getClock().getTime(); + } + + @SuppressWarnings("UnusedDeclaration") + public long getStartTime() { + return this.startTime; + } + + public void setFinishTime() { + finishTime = context.getClock().getTime(); + } + + @SuppressWarnings("UnusedDeclaration") + public long getFinishTime() { + return this.finishTime; + } + + public float getTaskProgress() { + readLock.lock(); + try { + if (getState() == StageState.NEW) { + return 0; + } else { + return (float)(succeededObjectCount) / (float)totalScheduledObjectsCount; + } + } finally { + readLock.unlock(); + } + } + + public float getProgress() { + List<Task> tempTasks = null; + readLock.lock(); + try { + if (getState() == StageState.NEW) { + return 0.0f; + } else { + tempTasks = new ArrayList<Task>(tasks.values()); + } + } finally { + readLock.unlock(); + } + + float totalProgress = 0.0f; + for (Task eachTask : tempTasks) { + if (eachTask.getLastAttempt() != null) { + totalProgress += eachTask.getLastAttempt().getProgress(); + } + } + + if (totalProgress > 0.0f) { + return (float) Math.floor((totalProgress / (float) Math.max(tempTasks.size(), 1)) * 1000.0f) / 1000.0f; + } else { + return 0.0f; + } + } + + public int getSucceededObjectCount() { + return succeededObjectCount; + } + + public int getTotalScheduledObjectsCount() { + return totalScheduledObjectsCount; + } + + public ExecutionBlock getBlock() { + return block; + } + + public void addTask(Task task) { + tasks.put(task.getId(), task); + } + + public StageHistory getStageHistory() { + if (finalStageHistory != null) { + if (finalStageHistory.getFinishTime() == 0) { + finalStageHistory = makeStageHistory(); + finalStageHistory.setTasks(makeTaskHistories()); + } + return finalStageHistory; + } else { + return makeStageHistory(); + } + } + + private List<TaskHistory> makeTaskHistories() { + List<TaskHistory> taskHistories = new ArrayList<TaskHistory>(); + + for(Task eachTask : getTasks()) { + taskHistories.add(eachTask.getTaskHistory()); + } + + return taskHistories; + } + + private StageHistory makeStageHistory() { + StageHistory stageHistory = new StageHistory(); + + stageHistory.setExecutionBlockId(getId().toString()); + stageHistory.setPlan(PlannerUtil.buildExplainString(block.getPlan())); + stageHistory.setState(getState().toString()); + stageHistory.setStartTime(startTime); + stageHistory.setFinishTime(finishTime); + stageHistory.setSucceededObjectCount(succeededObjectCount); + stageHistory.setKilledObjectCount(killedObjectCount); + stageHistory.setFailedObjectCount(failedObjectCount); + stageHistory.setTotalScheduledObjectsCount(totalScheduledObjectsCount); + stageHistory.setHostLocalAssigned(getTaskScheduler().getHostLocalAssigned()); + stageHistory.setRackLocalAssigned(getTaskScheduler().getRackLocalAssigned()); + + long totalInputBytes = 0; + long totalReadBytes = 0; + long totalReadRows = 0; + long totalWriteBytes = 0; + long totalWriteRows = 0; + int numShuffles = 0; + for(Task eachTask : getTasks()) { + numShuffles = eachTask.getShuffleOutpuNum(); + if (eachTask.getLastAttempt() != null) { + TableStats inputStats = eachTask.getLastAttempt().getInputStats(); + if (inputStats != null) { + totalInputBytes += inputStats.getNumBytes(); + totalReadBytes += inputStats.getReadBytes(); + totalReadRows += inputStats.getNumRows(); + } + TableStats outputStats = eachTask.getLastAttempt().getResultStats(); + if (outputStats != null) { + totalWriteBytes += outputStats.getNumBytes(); + totalWriteRows += outputStats.getNumRows(); + } + } + } + + stageHistory.setTotalInputBytes(totalInputBytes); + stageHistory.setTotalReadBytes(totalReadBytes); + stageHistory.setTotalReadRows(totalReadRows); + stageHistory.setTotalWriteBytes(totalWriteBytes); + stageHistory.setTotalWriteRows(totalWriteRows); + stageHistory.setNumShuffles(numShuffles); + stageHistory.setProgress(getProgress()); + return stageHistory; + } + + /** + * It finalizes this stage. It is only invoked when the stage is succeeded. + */ + public void complete() { + cleanup(); + finalizeStats(); + setFinishTime(); + eventHandler.handle(new StageCompletedEvent(getId(), StageState.SUCCEEDED)); + } + + /** + * It finalizes this stage. Unlike {@link Stage#complete()}, + * it is invoked when a stage is abnormally finished. + * + * @param finalState The final stage state + */ + public void abort(StageState finalState) { + // TODO - + // - committer.abortStage(...) + // - record Stage Finish Time + // - CleanUp Tasks + // - Record History + cleanup(); + setFinishTime(); + eventHandler.handle(new StageCompletedEvent(getId(), finalState)); + } + + public StateMachine<StageState, StageEventType, StageEvent> getStateMachine() { + return this.stateMachine; + } + + public void setPriority(int priority) { + this.priority = priority; + } + + + public int getPriority() { + return this.priority; + } + + public ExecutionBlockId getId() { + return block.getId(); + } + + public Task[] getTasks() { + return tasks.values().toArray(new Task[tasks.size()]); + } + + public Task getTask(TaskId qid) { + return tasks.get(qid); + } + + public Schema getSchema() { + return schema; + } + + public TableMeta getTableMeta() { + return meta; + } + + public TableStats getResultStats() { + return resultStatistics; + } + + public TableStats getInputStats() { + return inputStatistics; + } + + public List<String> getDiagnostics() { + readLock.lock(); + try { + return diagnostics; + } finally { + readLock.unlock(); + } + } + + protected void addDiagnostic(String diag) { + diagnostics.add(diag); + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(this.getId()); + return sb.toString(); + } + + @Override + public boolean equals(Object o) { + if (o instanceof Stage) { + Stage other = (Stage)o; + return getId().equals(other.getId()); + } + return false; + } + + @Override + public int hashCode() { + return getId().hashCode(); + } + + public int compareTo(Stage other) { + return getId().compareTo(other.getId()); + } + + public StageState getSynchronizedState() { + readLock.lock(); + try { + return stateMachine.getCurrentState(); + } finally { + readLock.unlock(); + } + } + + /* non-blocking call for client API */ + public StageState getState() { + return stageState; + } + + public static TableStats[] computeStatFromUnionBlock(Stage stage) { + TableStats[] stat = new TableStats[]{new TableStats(), new TableStats()}; + long[] avgRows = new long[]{0, 0}; + long[] numBytes = new long[]{0, 0}; + long[] readBytes = new long[]{0, 0}; + long[] numRows = new long[]{0, 0}; + int[] numBlocks = new int[]{0, 0}; + int[] numOutputs = new int[]{0, 0}; + + List<ColumnStats> columnStatses = Lists.newArrayList(); + + MasterPlan masterPlan = stage.getMasterPlan(); + Iterator<ExecutionBlock> it = masterPlan.getChilds(stage.getBlock()).iterator(); + while (it.hasNext()) { + ExecutionBlock block = it.next(); + Stage childStage = stage.context.getStage(block.getId()); + TableStats[] childStatArray = new TableStats[]{ + childStage.getInputStats(), childStage.getResultStats() + }; + for (int i = 0; i < 2; i++) { + if (childStatArray[i] == null) { + continue; + } + avgRows[i] += childStatArray[i].getAvgRows(); + numBlocks[i] += childStatArray[i].getNumBlocks(); + numBytes[i] += childStatArray[i].getNumBytes(); + readBytes[i] += childStatArray[i].getReadBytes(); + numOutputs[i] += childStatArray[i].getNumShuffleOutputs(); + numRows[i] += childStatArray[i].getNumRows(); + } + columnStatses.addAll(childStatArray[1].getColumnStats()); + } + + for (int i = 0; i < 2; i++) { + stat[i].setNumBlocks(numBlocks[i]); + stat[i].setNumBytes(numBytes[i]); + stat[i].setReadBytes(readBytes[i]); + stat[i].setNumShuffleOutputs(numOutputs[i]); + stat[i].setNumRows(numRows[i]); + stat[i].setAvgRows(avgRows[i]); + } + stat[1].setColumnStats(columnStatses); + + return stat; + } + + private TableStats[] computeStatFromTasks() { + List<TableStats> inputStatsList = Lists.newArrayList(); + List<TableStats> resultStatsList = Lists.newArrayList(); + for (Task unit : getTasks()) { + resultStatsList.add(unit.getStats()); + if (unit.getLastAttempt().getInputStats() != null) { + inputStatsList.add(unit.getLastAttempt().getInputStats()); + } + } + TableStats inputStats = StatisticsUtil.aggregateTableStat(inputStatsList); + TableStats resultStats = StatisticsUtil.aggregateTableStat(resultStatsList); + return new TableStats[]{inputStats, resultStats}; + } + + private void stopScheduler() { + // If there are launched TaskRunners, send the 'shouldDie' message to all r + // via received task requests. + if (taskScheduler != null) { + taskScheduler.stop(); + } + } + + private void releaseContainers() { + // If there are still live TaskRunners, try to kill the containers. + eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, getId(), containers.values())); + } + + /** + * It computes all stats and sets the intermediate result. + */ + private void finalizeStats() { + TableStats[] statsArray; + if (block.hasUnion()) { + statsArray = computeStatFromUnionBlock(this); + } else { + statsArray = computeStatFromTasks(); + } + + DataChannel channel = masterPlan.getOutgoingChannels(getId()).get(0); + + // if store plan (i.e., CREATE or INSERT OVERWRITE) + StoreType storeType = PlannerUtil.getStoreType(masterPlan.getLogicalPlan()); + if (storeType == null) { + // get default or store type + storeType = StoreType.CSV; + } + + schema = channel.getSchema(); + meta = CatalogUtil.newTableMeta(storeType, new KeyValueSet()); + inputStatistics = statsArray[0]; + resultStatistics = statsArray[1]; + } + + @Override + public void handle(StageEvent event) { + if (LOG.isDebugEnabled()) { + LOG.debug("Processing " + event.getStageId() + " of type " + event.getType() + ", preState=" + + getSynchronizedState()); + } + + try { + writeLock.lock(); + StageState oldState = getSynchronizedState(); + try { + getStateMachine().doTransition(event.getType(), event); + stageState = getSynchronizedState(); + } catch (InvalidStateTransitonException e) { + LOG.error("Can't handle this event at current state" + + ", eventType:" + event.getType().name() + + ", oldState:" + oldState.name() + + ", nextState:" + getSynchronizedState().name() + , e); + eventHandler.handle(new StageEvent(getId(), + StageEventType.SQ_INTERNAL_ERROR)); + } + + // notify the eventhandler of state change + if (LOG.isDebugEnabled()) { + if (oldState != getSynchronizedState()) { + LOG.debug(getId() + " Stage Transitioned from " + oldState + " to " + + getSynchronizedState()); + } + } + } finally { + writeLock.unlock(); + } + } + + public void handleTaskRequestEvent(TaskRequestEvent event) { + taskScheduler.handleTaskRequestEvent(event); + } + + private static class InitAndRequestContainer implements MultipleArcTransition<Stage, + StageEvent, StageState> { + + @Override + public StageState transition(final Stage stage, StageEvent stageEvent) { + stage.setStartTime(); + ExecutionBlock execBlock = stage.getBlock(); + StageState state; + + try { + // Union operator does not require actual query processing. It is performed logically. + if (execBlock.hasUnion()) { + stage.finalizeStats(); + state = StageState.SUCCEEDED; + } else { + // execute pre-processing asyncronously + stage.getContext().getQueryMasterContext().getEventExecutor() + .submit(new Runnable() { + @Override + public void run() { + try { + ExecutionBlock parent = stage.getMasterPlan().getParent(stage.getBlock()); + DataChannel channel = stage.getMasterPlan().getChannel(stage.getId(), parent.getId()); + setShuffleIfNecessary(stage, channel); + initTaskScheduler(stage); + schedule(stage); + stage.totalScheduledObjectsCount = stage.getTaskScheduler().remainingScheduledObjectNum(); + LOG.info(stage.totalScheduledObjectsCount + " objects are scheduled"); + + if (stage.getTaskScheduler().remainingScheduledObjectNum() == 0) { // if there is no tasks + stage.complete(); + } else { + if(stage.getSynchronizedState() == StageState.INITED) { + stage.taskScheduler.start(); + allocateContainers(stage); + } else { + stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL)); + } + } + } catch (Throwable e) { + LOG.error("Stage (" + stage.getId() + ") ERROR: ", e); + stage.setFinishTime(); + stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), e.getMessage())); + stage.eventHandler.handle(new StageCompletedEvent(stage.getId(), StageState.ERROR)); + } + } + } + ); + state = StageState.INITED; + } + } catch (Throwable e) { + LOG.error("Stage (" + stage.getId() + ") ERROR: ", e); + stage.setFinishTime(); + stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), e.getMessage())); + stage.eventHandler.handle(new StageCompletedEvent(stage.getId(), StageState.ERROR)); + return StageState.ERROR; + } + + return state; + } + + private void initTaskScheduler(Stage stage) throws IOException { + TajoConf conf = stage.context.getConf(); + stage.schedulerContext = new TaskSchedulerContext(stage.context, + stage.getMasterPlan().isLeaf(stage.getId()), stage.getId()); + stage.taskScheduler = TaskSchedulerFactory.get(conf, stage.schedulerContext, stage); + stage.taskScheduler.init(conf); + LOG.info(stage.taskScheduler.getName() + " is chosen for the task scheduling for " + stage.getId()); + } + + /** + * If a parent block requires a repartition operation, the method sets proper repartition + * methods and the number of partitions to a given Stage. + */ + private static void setShuffleIfNecessary(Stage stage, DataChannel channel) { + if (channel.getShuffleType() != ShuffleType.NONE_SHUFFLE) { + int numTasks = calculateShuffleOutputNum(stage, channel); + Repartitioner.setShuffleOutputNumForTwoPhase(stage, numTasks, channel); + } + } + + /** + * Getting the total memory of cluster + * + * @param stage + * @return mega bytes + */ + private static int getClusterTotalMemory(Stage stage) { + List<TajoMasterProtocol.WorkerResourceProto> workers = + stage.context.getQueryMasterContext().getQueryMaster().getAllWorker(); + + int totalMem = 0; + for (TajoMasterProtocol.WorkerResourceProto worker : workers) { + totalMem += worker.getMemoryMB(); + } + return totalMem; + } + /** + * Getting the desire number of partitions according to the volume of input data. + * This method is only used to determine the partition key number of hash join or aggregation. + * + * @param stage + * @return + */ + public static int calculateShuffleOutputNum(Stage stage, DataChannel channel) { + TajoConf conf = stage.context.getConf(); + MasterPlan masterPlan = stage.getMasterPlan(); + ExecutionBlock parent = masterPlan.getParent(stage.getBlock()); + + LogicalNode grpNode = null; + if (parent != null) { + grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.GROUP_BY); + if (grpNode == null) { + grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.DISTINCT_GROUP_BY); + } + } + + // We assume this execution block the first stage of join if two or more tables are included in this block, + if (parent != null && parent.getScanNodes().length >= 2) { + List<ExecutionBlock> childs = masterPlan.getChilds(parent); + + // for outer + ExecutionBlock outer = childs.get(0); + long outerVolume = getInputVolume(stage.masterPlan, stage.context, outer); + + // for inner + ExecutionBlock inner = childs.get(1); + long innerVolume = getInputVolume(stage.masterPlan, stage.context, inner); + LOG.info(stage.getId() + ", Outer volume: " + Math.ceil((double) outerVolume / 1048576) + "MB, " + + "Inner volume: " + Math.ceil((double) innerVolume / 1048576) + "MB"); + + long bigger = Math.max(outerVolume, innerVolume); + + int mb = (int) Math.ceil((double) bigger / 1048576); + LOG.info(stage.getId() + ", Bigger Table's volume is approximately " + mb + " MB"); + + int taskNum = (int) Math.ceil((double) mb / masterPlan.getContext().getInt(SessionVars.JOIN_PER_SHUFFLE_SIZE)); + + if (masterPlan.getContext().containsKey(SessionVars.TEST_MIN_TASK_NUM)) { + taskNum = masterPlan.getContext().getInt(SessionVars.TEST_MIN_TASK_NUM); + LOG.warn("!!!!! TESTCASE MODE !!!!!"); + } + + // The shuffle output numbers of join may be inconsistent by execution block order. + // Thus, we need to compare the number with DataChannel output numbers. + // If the number is right, the number and DataChannel output numbers will be consistent. + int outerShuffleOutputNum = 0, innerShuffleOutputNum = 0; + for (DataChannel eachChannel : masterPlan.getOutgoingChannels(outer.getId())) { + outerShuffleOutputNum = Math.max(outerShuffleOutputNum, eachChannel.getShuffleOutputNum()); + } + for (DataChannel eachChannel : masterPlan.getOutgoingChannels(inner.getId())) { + innerShuffleOutputNum = Math.max(innerShuffleOutputNum, eachChannel.getShuffleOutputNum()); + } + if (outerShuffleOutputNum != innerShuffleOutputNum + && taskNum != outerShuffleOutputNum + && taskNum != innerShuffleOutputNum) { + LOG.info(stage.getId() + ", Change determined number of join partitions cause difference of outputNum" + + ", originTaskNum=" + taskNum + ", changedTaskNum=" + Math.max(outerShuffleOutputNum, innerShuffleOutputNum) + + ", outerShuffleOutptNum=" + outerShuffleOutputNum + + ", innerShuffleOutputNum=" + innerShuffleOutputNum); + taskNum = Math.max(outerShuffleOutputNum, innerShuffleOutputNum); + } + + LOG.info(stage.getId() + ", The determined number of join partitions is " + taskNum); + + return taskNum; + // Is this stage the first step of group-by? + } else if (grpNode != null) { + boolean hasGroupColumns = true; + if (grpNode.getType() == NodeType.GROUP_BY) { + hasGroupColumns = ((GroupbyNode)grpNode).getGroupingColumns().length > 0; + } else if (grpNode.getType() == NodeType.DISTINCT_GROUP_BY) { + // Find current distinct stage node. + DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(stage.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY); + if (distinctNode == null) { + LOG.warn(stage.getId() + ", Can't find current DistinctGroupbyNode"); + distinctNode = (DistinctGroupbyNode)grpNode; + } + hasGroupColumns = distinctNode.getGroupingColumns().length > 0; + + Enforcer enforcer = stage.getBlock().getEnforcer(); + if (enforcer == null) { + LOG.warn(stage.getId() + ", DistinctGroupbyNode's enforcer is null."); + } + EnforceProperty property = PhysicalPlannerImpl.getAlgorithmEnforceProperty(enforcer, distinctNode); + if (property != null) { + if (property.getDistinct().getIsMultipleAggregation()) { + MultipleAggregationStage multiAggStage = property.getDistinct().getMultipleAggregationStage(); + if (multiAggStage != MultipleAggregationStage.THRID_STAGE) { + hasGroupColumns = true; + } + } + } + } + if (!hasGroupColumns) { + LOG.info(stage.getId() + ", No Grouping Column - determinedTaskNum is set to 1"); + return 1; + } else { + long volume = getInputVolume(stage.masterPlan, stage.context, stage.block); + + int volumeByMB = (int) Math.ceil((double) volume / StorageUnit.MB); + LOG.info(stage.getId() + ", Table's volume is approximately " + volumeByMB + " MB"); + // determine the number of task + int taskNum = (int) Math.ceil((double) volumeByMB / + masterPlan.getContext().getInt(SessionVars.GROUPBY_PER_SHUFFLE_SIZE)); + LOG.info(stage.getId() + ", The determined number of aggregation partitions is " + taskNum); + return taskNum; + } + } else { + LOG.info("============>>>>> Unexpected Case! <<<<<================"); + long volume = getInputVolume(stage.masterPlan, stage.context, stage.block); + + int mb = (int) Math.ceil((double)volume / 1048576); + LOG.info(stage.getId() + ", Table's volume is approximately " + mb + " MB"); + // determine the number of task per 128MB + int taskNum = (int) Math.ceil((double)mb / 128); + LOG.info(stage.getId() + ", The determined number of partitions is " + taskNum); + return taskNum; + } + } + + private static void schedule(Stage stage) throws IOException { + MasterPlan masterPlan = stage.getMasterPlan(); + ExecutionBlock execBlock = stage.getBlock(); + if (stage.getMasterPlan().isLeaf(execBlock.getId()) && execBlock.getScanNodes().length == 1) { // Case 1: Just Scan + scheduleFragmentsForLeafQuery(stage); + } else if (execBlock.getScanNodes().length > 1) { // Case 2: Join + Repartitioner.scheduleFragmentsForJoinQuery(stage.schedulerContext, stage); + } else { // Case 3: Others (Sort or Aggregation) + int numTasks = getNonLeafTaskNum(stage); + Repartitioner.scheduleFragmentsForNonLeafTasks(stage.schedulerContext, masterPlan, stage, numTasks); + } + } + + /** + * Getting the desire number of tasks according to the volume of input data + * + * @param stage + * @return + */ + public static int getNonLeafTaskNum(Stage stage) { + // Getting intermediate data size + long volume = getInputVolume(stage.getMasterPlan(), stage.context, stage.getBlock()); + + int mb = (int) Math.ceil((double)volume / 1048576); + LOG.info(stage.getId() + ", Table's volume is approximately " + mb + " MB"); + // determine the number of task per 64MB + int maxTaskNum = Math.max(1, (int) Math.ceil((double)mb / 64)); + LOG.info(stage.getId() + ", The determined number of non-leaf tasks is " + maxTaskNum); + return maxTaskNum; + } + + public static long getInputVolume(MasterPlan masterPlan, QueryMasterTask.QueryMasterTaskContext context, + ExecutionBlock execBlock) { + Map<String, TableDesc> tableMap = context.getTableDescMap(); + if (masterPlan.isLeaf(execBlock)) { + ScanNode[] outerScans = execBlock.getScanNodes(); + long maxVolume = 0; + for (ScanNode eachScanNode: outerScans) { + TableStats stat = tableMap.get(eachScanNode.getCanonicalName()).getStats(); + if (stat.getNumBytes() > maxVolume) { + maxVolume = stat.getNumBytes(); + } + } + return maxVolume; + } else { + long aggregatedVolume = 0; + for (ExecutionBlock childBlock : masterPlan.getChilds(execBlock)) { + Stage stage = context.getStage(childBlock.getId()); + if (stage == null || stage.getSynchronizedState() != StageState.SUCCEEDED) { + aggregatedVolume += getInputVolume(masterPlan, context, childBlock); + } else { + aggregatedVolume += stage.getResultStats().getNumBytes(); + } + } + + return aggregatedVolume; + } + } + + public static void allocateContainers(Stage stage) { + ExecutionBlock execBlock = stage.getBlock(); + + //TODO consider disk slot + int requiredMemoryMBPerTask = 512; + + int numRequest = stage.getContext().getResourceAllocator().calculateNumRequestContainers( + stage.getContext().getQueryMasterContext().getWorkerContext(), + stage.schedulerContext.getEstimatedTaskNum(), + requiredMemoryMBPerTask + ); + + final Resource resource = Records.newRecord(Resource.class); + + resource.setMemory(requiredMemoryMBPerTask); + + LOG.info("Request Container for " + stage.getId() + " containers=" + numRequest); + + Priority priority = Records.newRecord(Priority.class); + priority.setPriority(stage.getPriority()); + ContainerAllocationEvent event = + new ContainerAllocationEvent(ContainerAllocatorEventType.CONTAINER_REQ, + stage.getId(), priority, resource, numRequest, + stage.masterPlan.isLeaf(execBlock), 0.0f); + stage.eventHandler.handle(event); + } + + private static void scheduleFragmentsForLeafQuery(Stage stage) throws IOException { + ExecutionBlock execBlock = stage.getBlock(); + ScanNode[] scans = execBlock.getScanNodes(); + Preconditions.checkArgument(scans.length == 1, "Must be Scan Query"); + ScanNode scan = scans[0]; + TableDesc table = stage.context.getTableDescMap().get(scan.getCanonicalName()); + + Collection<Fragment> fragments; + TableMeta meta = table.getMeta(); + + // Depending on scanner node's type, it creates fragments. If scan is for + // a partitioned table, It will creates lots fragments for all partitions. + // Otherwise, it creates at least one fragments for a table, which may + // span a number of blocks or possibly consists of a number of files. + if (scan.getType() == NodeType.PARTITIONS_SCAN) { + // After calling this method, partition paths are removed from the physical plan. + FileStorageManager storageManager = + (FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf()); + fragments = Repartitioner.getFragmentsFromPartitionedTable(storageManager, scan, table); + } else { + StorageManager storageManager = + StorageManager.getStorageManager(stage.getContext().getConf(), meta.getStoreType()); + fragments = storageManager.getSplits(scan.getCanonicalName(), table, scan); + } + + Stage.scheduleFragments(stage, fragments); + if (stage.getTaskScheduler() instanceof DefaultTaskScheduler) { + //Leaf task of DefaultTaskScheduler should be fragment size + // EstimatedTaskNum determined number of initial container + stage.schedulerContext.setEstimatedTaskNum(fragments.size()); + } else { + TajoConf conf = stage.context.getConf(); + stage.schedulerContext.setTaskSize(conf.getIntVar(ConfVars.TASK_DEFAULT_SIZE) * 1024 * 1024); + int estimatedTaskNum = (int) Math.ceil((double) table.getStats().getNumBytes() / + (double) stage.schedulerContext.getTaskSize()); + stage.schedulerContext.setEstimatedTaskNum(estimatedTaskNum); + } + } + } + + public static void scheduleFragment(Stage stage, Fragment fragment) { + stage.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE, + stage.getId(), fragment)); + } + + + public static void scheduleFragments(Stage stage, Collection<Fragment> fragments) { + for (Fragment eachFragment : fragments) { + scheduleFragment(stage, eachFragment); + } + } + + public static void scheduleFragments(Stage stage, Collection<Fragment> leftFragments, + Collection<Fragment> broadcastFragments) { + for (Fragment eachLeafFragment : leftFragments) { + scheduleFragment(stage, eachLeafFragment, broadcastFragments); + } + } + + public static void scheduleFragment(Stage stage, + Fragment leftFragment, Collection<Fragment> rightFragments) { + stage.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE, + stage.getId(), leftFragment, rightFragments)); + } + + public static void scheduleFetches(Stage stage, Map<String, List<FetchImpl>> fetches) { + stage.taskScheduler.handle(new FetchScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE, + stage.getId(), fetches)); + } + + public static Task newEmptyTask(TaskSchedulerContext schedulerContext, + TaskAttemptScheduleContext taskContext, + Stage stage, int taskId) { + ExecutionBlock execBlock = stage.getBlock(); + Task unit = new Task(schedulerContext.getMasterContext().getConf(), + taskContext, + QueryIdFactory.newTaskId(schedulerContext.getBlockId(), taskId), + schedulerContext.isLeafQuery(), stage.eventHandler); + unit.setLogicalPlan(execBlock.getPlan()); + stage.addTask(unit); + return unit; + } + + private static class ContainerLaunchTransition + implements SingleArcTransition<Stage, StageEvent> { + + @Override + public void transition(Stage stage, StageEvent event) { + try { + StageContainerAllocationEvent allocationEvent = + (StageContainerAllocationEvent) event; + for (TajoContainer container : allocationEvent.getAllocatedContainer()) { + TajoContainerId cId = container.getId(); + if (stage.containers.containsKey(cId)) { + stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), + "Duplicated containers are allocated: " + cId.toString())); + stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_INTERNAL_ERROR)); + } + stage.containers.put(cId, container); + } + LOG.info("Stage (" + stage.getId() + ") has " + stage.containers.size() + " containers!"); + stage.eventHandler.handle( + new LaunchTaskRunnersEvent(stage.getId(), allocationEvent.getAllocatedContainer(), + stage.getContext().getQueryContext(), + CoreGsonHelper.toJson(stage.getBlock().getPlan(), LogicalNode.class)) + ); + + stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_START)); + } catch (Throwable t) { + stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), + ExceptionUtils.getStackTrace(t))); + stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_INTERNAL_ERROR)); + } + } + } + + /** + * It is used in KILL_WAIT state against Contained Allocated event. + * It just returns allocated containers to resource manager. + */ + private static class AllocatedContainersCancelTransition implements SingleArcTransition<Stage, StageEvent> { + @Override + public void transition(Stage stage, StageEvent event) { + try { + StageContainerAllocationEvent allocationEvent = + (StageContainerAllocationEvent) event; + stage.eventHandler.handle( + new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, + stage.getId(), allocationEvent.getAllocatedContainer())); + LOG.info(String.format("[%s] %d allocated containers are canceled", + stage.getId().toString(), + allocationEvent.getAllocatedContainer().size())); + } catch (Throwable t) { + stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), + ExceptionUtils.getStackTrace(t))); + stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_INTERNAL_ERROR)); + } + } + } + + private static class TaskCompletedTransition implements SingleArcTransition<Stage, StageEvent> { + + @Override + public void transition(Stage stage, + StageEvent event) { + StageTaskEvent taskEvent = (StageTaskEvent) event; + Task task = stage.getTask(taskEvent.getTaskId()); + + if (task == null) { // task failed + LOG.error(String.format("Task %s is absent", taskEvent.getTaskId())); + stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED)); + } else { + stage.completedTaskCount++; + + if (taskEvent.getState() == TaskState.SUCCEEDED) { + stage.succeededObjectCount++; + } else if (task.getState() == TaskState.KILLED) { + stage.killedObjectCount++; + } else if (task.getState() == TaskState.FAILED) { + stage.failedObjectCount++; + // if at least one task is failed, try to kill all tasks. + stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL)); + } + + LOG.info(String.format("[%s] Task Completion Event (Total: %d, Success: %d, Killed: %d, Failed: %d)", + stage.getId(), + stage.getTotalScheduledObjectsCount(), + stage.succeededObjectCount, + stage.killedObjectCount, + stage.failedObjectCount)); + + if (stage.totalScheduledObjectsCount == + stage.succeededObjectCount + stage.killedObjectCount + stage.failedObjectCount) { + stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED)); + } + } + } + } + + private static class KillTasksTransition implements SingleArcTransition<Stage, StageEvent> { + + @Override + public void transition(Stage stage, StageEvent stageEvent) { + if(stage.getTaskScheduler() != null){ + stage.getTaskScheduler().stop(); + } + + for (Task task : stage.getTasks()) { + stage.eventHandler.handle(new TaskEvent(task.getId(), TaskEventType.T_KILL)); + } + } + } + + private void cleanup() { + stopScheduler(); + releaseContainers(); + + if (!getContext().getQueryContext().getBool(SessionVars.DEBUG_ENABLED)) { + List<ExecutionBlock> childs = getMasterPlan().getChilds(getId()); + List<TajoIdProtos.ExecutionBlockIdProto> ebIds = Lists.newArrayList(); + + for (ExecutionBlock executionBlock : childs) { + ebIds.add(executionBlock.getId().getProto()); + } + + getContext().getQueryMasterContext().getQueryMaster().cleanupExecutionBlock(ebIds); + } + + this.finalStageHistory = makeStageHistory(); + this.finalStageHistory.setTasks(makeTaskHistories()); + } + + public List<IntermediateEntry> getHashShuffleIntermediateEntries() { + return hashShuffleIntermediateEntries; + } + + protected void waitingIntermediateReport() { + LOG.info(getId() + ", waiting IntermediateReport: expectedTaskNum=" + completeReportReceived.get()); + synchronized(completeReportReceived) { + long startTime = System.currentTimeMillis(); + while (true) { + if (completeReportReceived.get() >= tasks.size()) { + LOG.info(getId() + ", completed waiting IntermediateReport"); + return; + } else { + try { + completeReportReceived.wait(10 * 1000); + } catch (InterruptedException e) { + } + long elapsedTime = System.currentTimeMillis() - startTime; + if (elapsedTime >= 120 * 1000) { + LOG.error(getId() + ", Timeout while receiving intermediate reports: " + elapsedTime + " ms"); + abort(StageState.FAILED); + return; + } + } + } + } + } + + public void receiveExecutionBlockReport(TajoWorkerProtocol.ExecutionBlockReport report) { + LOG.info(getId() + ", receiveExecutionBlockReport:" + report.getSucceededTasks()); + if (!report.getReportSuccess()) { + LOG.error(getId() + ", ExecutionBlock final report fail cause:" + report.getReportErrorMessage()); + abort(StageState.FAILED); + return; + } + if (report.getIntermediateEntriesCount() > 0) { + synchronized (hashShuffleIntermediateEntries) { + for (IntermediateEntryProto eachInterm: report.getIntermediateEntriesList()) { + hashShuffleIntermediateEntries.add(new IntermediateEntry(eachInterm)); + } + } + } + synchronized(completeReportReceived) { + completeReportReceived.addAndGet(report.getSucceededTasks()); + completeReportReceived.notifyAll(); + } + } + + private static class StageCompleteTransition implements MultipleArcTransition<Stage, StageEvent, StageState> { + + @Override + public StageState transition(Stage stage, StageEvent stageEvent) { + // TODO - Commit Stage + // TODO - records succeeded, failed, killed completed task + // TODO - records metrics + try { + LOG.info(String.format("Stage completed - %s (total=%d, success=%d, killed=%d)", + stage.getId().toString(), + stage.getTotalScheduledObjectsCount(), + stage.getSucceededObjectCount(), + stage.killedObjectCount)); + + if (stage.killedObjectCount > 0 || stage.failedObjectCount > 0) { + if (stage.failedObjectCount > 0) { + stage.abort(StageState.FAILED); + return StageState.FAILED; + } else if (stage.killedObjectCount > 0) { + stage.abort(StageState.KILLED); + return StageState.KILLED; + } else { + LOG.error("Invalid State " + stage.getSynchronizedState() + " State"); + stage.abort(StageState.ERROR); + return StageState.ERROR; + } + } else { + stage.complete(); + return StageState.SUCCEEDED; + } + } catch (Throwable t) { + LOG.error(t.getMessage(), t); + stage.abort(StageState.ERROR); + return StageState.ERROR; + } + } + } + + private static class DiagnosticsUpdateTransition implements SingleArcTransition<Stage, StageEvent> { + @Override + public void transition(Stage stage, StageEvent event) { + stage.addDiagnostic(((StageDiagnosticsUpdateEvent) event).getDiagnosticUpdate()); + } + } + + private static class InternalErrorTransition implements SingleArcTransition<Stage, StageEvent> { + @Override + public void transition(Stage stage, StageEvent stageEvent) { + stage.abort(StageState.ERROR); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/StageState.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/StageState.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/StageState.java new file mode 100644 index 0000000..82a06fe --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/StageState.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.querymaster; + +public enum StageState { + NEW, + INITED, + RUNNING, + SUCCEEDED, + FAILED, + KILL_WAIT, + KILLED, + ERROR +}
