http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git
a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
deleted file mode 100644
index 367897d..0000000
---
a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ /dev/null
@@ -1,1251 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.SessionVars;
-import org.apache.tajo.algebra.JoinType;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.statistics.StatisticsUtil;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
-import org.apache.tajo.engine.planner.RangePartitionAlgorithm;
-import org.apache.tajo.engine.planner.UniformRangePartition;
-import org.apache.tajo.engine.planner.enforce.Enforcer;
-import org.apache.tajo.engine.planner.global.DataChannel;
-import org.apache.tajo.engine.planner.global.ExecutionBlock;
-import org.apache.tajo.engine.planner.global.GlobalPlanner;
-import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.utils.TupleUtil;
-import org.apache.tajo.exception.InternalException;
-import
org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
-import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
-import org.apache.tajo.master.TaskSchedulerContext;
-import org.apache.tajo.master.querymaster.Task.IntermediateEntry;
-import org.apache.tajo.plan.logical.SortNode.SortPurpose;
-import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.plan.PlanningException;
-import org.apache.tajo.plan.logical.*;
-import org.apache.tajo.storage.FileStorageManager;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.RowStoreUtil;
-import org.apache.tajo.storage.TupleRange;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.Pair;
-import org.apache.tajo.unit.StorageUnit;
-import org.apache.tajo.util.TUtil;
-import org.apache.tajo.util.TajoIdUtils;
-import org.apache.tajo.worker.FetchImpl;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.math.BigInteger;
-import java.net.URI;
-import java.util.*;
-import java.util.Map.Entry;
-
-import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
-import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.*;
-
-/**
- * Repartitioner creates non-leaf tasks and shuffles intermediate data.
- * It supports two repartition methods, such as hash and range repartition.
- */
-public class Repartitioner {
- private static final Log LOG = LogFactory.getLog(Repartitioner.class);
-
- private final static int HTTP_REQUEST_MAXIMUM_LENGTH = 1900;
- private final static String UNKNOWN_HOST = "unknown";
-
- public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext
schedulerContext, Stage stage)
- throws IOException {
- MasterPlan masterPlan = stage.getMasterPlan();
- ExecutionBlock execBlock = stage.getBlock();
- QueryMasterTask.QueryMasterTaskContext masterContext = stage.getContext();
-
- ScanNode[] scans = execBlock.getScanNodes();
-
- Path tablePath;
- Fragment[] fragments = new Fragment[scans.length];
- long[] stats = new long[scans.length];
-
- // initialize variables from the child operators
- for (int i = 0; i < scans.length; i++) {
- TableDesc tableDesc =
masterContext.getTableDescMap().get(scans[i].getCanonicalName());
- if (tableDesc == null) { // if it is a real table stored on storage
- FileStorageManager storageManager =
-
(FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf());
-
- tablePath = storageManager.getTablePath(scans[i].getTableName());
- if (execBlock.getUnionScanMap() != null &&
!execBlock.getUnionScanMap().isEmpty()) {
- for (Map.Entry<ExecutionBlockId, ExecutionBlockId> unionScanEntry:
execBlock.getUnionScanMap().entrySet()) {
- ExecutionBlockId originScanEbId = unionScanEntry.getKey();
- stats[i] +=
masterContext.getStage(originScanEbId).getResultStats().getNumBytes();
- }
- } else {
- ExecutionBlockId scanEBId =
TajoIdUtils.createExecutionBlockId(scans[i].getTableName());
- stats[i] =
masterContext.getStage(scanEBId).getResultStats().getNumBytes();
- }
- fragments[i] = new FileFragment(scans[i].getCanonicalName(),
tablePath, 0, 0, new String[]{UNKNOWN_HOST});
- } else {
- try {
- stats[i] = GlobalPlanner.computeDescendentVolume(scans[i]);
- } catch (PlanningException e) {
- throw new IOException(e);
- }
-
- StorageManager storageManager =
- StorageManager.getStorageManager(stage.getContext().getConf(),
tableDesc.getMeta().getStoreType());
-
- // if table has no data, storageManager will return empty FileFragment.
- // So, we need to handle FileFragment by its size.
- // If we don't check its size, it can cause IndexOutOfBoundsException.
- List<Fragment> fileFragments =
storageManager.getSplits(scans[i].getCanonicalName(), tableDesc);
- if (fileFragments.size() > 0) {
- fragments[i] = fileFragments.get(0);
- } else {
- fragments[i] = new FileFragment(scans[i].getCanonicalName(), new
Path(tableDesc.getPath()), 0, 0, new String[]{UNKNOWN_HOST});
- }
- }
- }
-
- // If one of inner join tables has no input data, it means that this
execution block has no result row.
- JoinNode joinNode = PlannerUtil.findMostBottomNode(execBlock.getPlan(),
NodeType.JOIN);
- if (joinNode != null) {
- if ( (joinNode.getJoinType() == JoinType.INNER)) {
- LogicalNode leftNode = joinNode.getLeftChild();
- LogicalNode rightNode = joinNode.getRightChild();
- for (int i = 0; i < stats.length; i++) {
- if (scans[i].getPID() == leftNode.getPID() || scans[i].getPID() ==
rightNode.getPID()) {
- if (stats[i] == 0) {
- LOG.info(scans[i] + " 's input data is zero. Inner join's result
is empty.");
- return;
- }
- }
- }
- }
- }
-
- // If node is outer join and a preserved relation is empty, it should
return zero rows.
- joinNode = PlannerUtil.findTopNode(execBlock.getPlan(), NodeType.JOIN);
- if (joinNode != null) {
- // If all stats are zero, return
- boolean isEmptyAllJoinTables = true;
- for (int i = 0; i < stats.length; i++) {
- if (stats[i] > 0) {
- isEmptyAllJoinTables = false;
- break;
- }
- }
- if (isEmptyAllJoinTables) {
- LOG.info("All input join tables are empty.");
- return;
- }
-
- // find left top scan node
- ScanNode leftScanNode = PlannerUtil.findTopNode(joinNode.getLeftChild(),
NodeType.SCAN);
- ScanNode rightScanNode =
PlannerUtil.findTopNode(joinNode.getRightChild(), NodeType.SCAN);
-
- long leftStats = -1;
- long rightStats = -1;
- if (stats.length == 2) {
- for (int i = 0; i < stats.length; i++) {
- if (scans[i].equals(leftScanNode)) {
- leftStats = stats[i];
- } else if (scans[i].equals(rightScanNode)) {
- rightStats = stats[i];
- }
- }
- if (joinNode.getJoinType() == JoinType.LEFT_OUTER) {
- if (leftStats == 0) {
- return;
- }
- }
- if (joinNode.getJoinType() == JoinType.RIGHT_OUTER) {
- if (rightStats == 0) {
- return;
- }
- }
- }
- }
-
- // Assigning either fragments or fetch urls to query units
- boolean isAllBroadcastTable = true;
- for (int i = 0; i < scans.length; i++) {
- if (!execBlock.isBroadcastTable(scans[i].getCanonicalName())) {
- isAllBroadcastTable = false;
- break;
- }
- }
-
-
- if (isAllBroadcastTable) { // if all relations of this EB are broadcasted
- // set largest table to normal mode
- long maxStats = Long.MIN_VALUE;
- int maxStatsScanIdx = -1;
- for (int i = 0; i < scans.length; i++) {
- // finding largest table.
- // If stats == 0, can't be base table.
- if (stats[i] > 0 && stats[i] > maxStats) {
- maxStats = stats[i];
- maxStatsScanIdx = i;
- }
- }
- if (maxStatsScanIdx == -1) {
- maxStatsScanIdx = 0;
- }
- int baseScanIdx = maxStatsScanIdx;
- scans[baseScanIdx].setBroadcastTable(false);
- execBlock.removeBroadcastTable(scans[baseScanIdx].getCanonicalName());
- LOG.info(String.format("[Distributed Join Strategy] : Broadcast Join
with all tables, base_table=%s, base_volume=%d",
- scans[baseScanIdx].getCanonicalName(), stats[baseScanIdx]));
- scheduleLeafTasksWithBroadcastTable(schedulerContext, stage,
baseScanIdx, fragments);
- } else if (!execBlock.getBroadcastTables().isEmpty()) { // If some
relations of this EB are broadcasted
- boolean hasNonLeafNode = false;
- List<Integer> largeScanIndexList = new ArrayList<Integer>();
- List<Integer> broadcastIndexList = new ArrayList<Integer>();
- String nonLeafScanNames = "";
- String namePrefix = "";
- long maxStats = Long.MIN_VALUE;
- int maxStatsScanIdx = -1;
- for (int i = 0; i < scans.length; i++) {
- if (scans[i].getTableDesc().getMeta().getStoreType() == StoreType.RAW)
{
- // Intermediate data scan
- hasNonLeafNode = true;
- largeScanIndexList.add(i);
- nonLeafScanNames += namePrefix + scans[i].getCanonicalName();
- namePrefix = ",";
- }
- if (execBlock.isBroadcastTable(scans[i].getCanonicalName())) {
- broadcastIndexList.add(i);
- } else {
- // finding largest table.
- if (stats[i] > 0 && stats[i] > maxStats) {
- maxStats = stats[i];
- maxStatsScanIdx = i;
- }
- }
- }
- if (maxStatsScanIdx == -1) {
- maxStatsScanIdx = 0;
- }
-
- if (!hasNonLeafNode) {
- if (largeScanIndexList.size() > 1) {
- String largeTableNames = "";
- for (Integer eachId : largeScanIndexList) {
- largeTableNames += scans[eachId].getTableName() + ",";
- }
- throw new IOException("Broadcast join with leaf node should have
only one large table, " +
- "but " + largeScanIndexList.size() + ", tables=" +
largeTableNames);
- }
- int baseScanIdx = largeScanIndexList.isEmpty() ? maxStatsScanIdx :
largeScanIndexList.get(0);
- LOG.info(String.format("[Distributed Join Strategy] : Broadcast Join,
base_table=%s, base_volume=%d",
- scans[baseScanIdx].getCanonicalName(), stats[baseScanIdx]));
- scheduleLeafTasksWithBroadcastTable(schedulerContext, stage,
baseScanIdx, fragments);
- } else {
- if (largeScanIndexList.size() > 2) {
- throw new IOException("Symmetric Repartition Join should have two
scan node, but " + nonLeafScanNames);
- }
-
- //select intermediate scan and stats
- ScanNode[] intermediateScans = new ScanNode[largeScanIndexList.size()];
- long[] intermediateScanStats = new long[largeScanIndexList.size()];
- Fragment[] intermediateFragments = new
Fragment[largeScanIndexList.size()];
- int index = 0;
- for (Integer eachIdx : largeScanIndexList) {
- intermediateScans[index] = scans[eachIdx];
- intermediateScanStats[index] = stats[eachIdx];
- intermediateFragments[index++] = fragments[eachIdx];
- }
- Fragment[] broadcastFragments = new
Fragment[broadcastIndexList.size()];
- ScanNode[] broadcastScans = new ScanNode[broadcastIndexList.size()];
- index = 0;
- for (Integer eachIdx : broadcastIndexList) {
- scans[eachIdx].setBroadcastTable(true);
- broadcastScans[index] = scans[eachIdx];
- broadcastFragments[index] = fragments[eachIdx];
- index++;
- }
- LOG.info(String.format("[Distributed Join Strategy] : Broadcast Join,
join_node=%s", nonLeafScanNames));
- scheduleSymmetricRepartitionJoin(masterContext, schedulerContext,
stage,
- intermediateScans, intermediateScanStats, intermediateFragments,
broadcastScans, broadcastFragments);
- }
- } else {
- LOG.info("[Distributed Join Strategy] : Symmetric Repartition Join");
- scheduleSymmetricRepartitionJoin(masterContext, schedulerContext, stage,
scans, stats, fragments, null, null);
- }
- }
-
- /**
- * Scheduling in tech case of Symmetric Repartition Join
- * @param masterContext
- * @param schedulerContext
- * @param stage
- * @param scans
- * @param stats
- * @param fragments
- * @throws IOException
- */
- private static void
scheduleSymmetricRepartitionJoin(QueryMasterTask.QueryMasterTaskContext
masterContext,
- TaskSchedulerContext
schedulerContext,
- Stage stage,
- ScanNode[] scans,
- long[] stats,
- Fragment[] fragments,
- ScanNode[]
broadcastScans,
- Fragment[]
broadcastFragments) throws IOException {
- MasterPlan masterPlan = stage.getMasterPlan();
- ExecutionBlock execBlock = stage.getBlock();
- // The hash map is modeling as follows:
- // <Part Id, <EbId, List<Intermediate Data>>>
- Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> hashEntries =
- new HashMap<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>>();
-
- // Grouping IntermediateData by a partition key and a table name
- List<ExecutionBlock> childBlocks = masterPlan.getChilds(stage.getId());
-
- // In the case of join with union, there is one ScanNode for union.
- Map<ExecutionBlockId, ExecutionBlockId> unionScanMap =
execBlock.getUnionScanMap();
- for (ExecutionBlock childBlock : childBlocks) {
- ExecutionBlockId scanEbId = unionScanMap.get(childBlock.getId());
- if (scanEbId == null) {
- scanEbId = childBlock.getId();
- }
- Stage childExecSM = stage.getContext().getStage(childBlock.getId());
-
- if (childExecSM.getHashShuffleIntermediateEntries() != null &&
- !childExecSM.getHashShuffleIntermediateEntries().isEmpty()) {
- for (IntermediateEntry intermediateEntry:
childExecSM.getHashShuffleIntermediateEntries()) {
- intermediateEntry.setEbId(childBlock.getId());
- if (hashEntries.containsKey(intermediateEntry.getPartId())) {
- Map<ExecutionBlockId, List<IntermediateEntry>> tbNameToInterm =
- hashEntries.get(intermediateEntry.getPartId());
-
- if (tbNameToInterm.containsKey(scanEbId)) {
- tbNameToInterm.get(scanEbId).add(intermediateEntry);
- } else {
- tbNameToInterm.put(scanEbId, TUtil.newList(intermediateEntry));
- }
- } else {
- Map<ExecutionBlockId, List<IntermediateEntry>> tbNameToInterm =
- new HashMap<ExecutionBlockId, List<IntermediateEntry>>();
- tbNameToInterm.put(scanEbId, TUtil.newList(intermediateEntry));
- hashEntries.put(intermediateEntry.getPartId(), tbNameToInterm);
- }
- }
- } else {
- //if no intermidatedata(empty table), make empty entry
- int emptyPartitionId = 0;
- if (hashEntries.containsKey(emptyPartitionId)) {
- Map<ExecutionBlockId, List<IntermediateEntry>> tbNameToInterm =
hashEntries.get(emptyPartitionId);
- if (tbNameToInterm.containsKey(scanEbId))
- tbNameToInterm.get(scanEbId).addAll(new
ArrayList<IntermediateEntry>());
- else
- tbNameToInterm.put(scanEbId, new ArrayList<IntermediateEntry>());
- } else {
- Map<ExecutionBlockId, List<IntermediateEntry>> tbNameToInterm =
- new HashMap<ExecutionBlockId, List<IntermediateEntry>>();
- tbNameToInterm.put(scanEbId, new ArrayList<IntermediateEntry>());
- hashEntries.put(emptyPartitionId, tbNameToInterm);
- }
- }
- }
-
- // hashEntries can be zero if there are no input data.
- // In the case, it will cause the zero divided exception.
- // it avoids this problem.
- int[] avgSize = new int[2];
- avgSize[0] = hashEntries.size() == 0 ? 0 : (int) (stats[0] /
hashEntries.size());
- avgSize[1] = hashEntries.size() == 0 ? 0 : (int) (stats[1] /
hashEntries.size());
- int bothFetchSize = avgSize[0] + avgSize[1];
-
- // Getting the desire number of join tasks according to the volumn
- // of a larger table
- int largerIdx = stats[0] >= stats[1] ? 0 : 1;
- int desireJoinTaskVolumn =
stage.getMasterPlan().getContext().getInt(SessionVars.JOIN_TASK_INPUT_SIZE);
-
- // calculate the number of tasks according to the data size
- int mb = (int) Math.ceil((double) stats[largerIdx] / 1048576);
- LOG.info("Larger intermediate data is approximately " + mb + " MB");
- // determine the number of task per 64MB
- int maxTaskNum = (int) Math.ceil((double) mb / desireJoinTaskVolumn);
- LOG.info("The calculated number of tasks is " + maxTaskNum);
- LOG.info("The number of total shuffle keys is " + hashEntries.size());
- // the number of join tasks cannot be larger than the number of
- // distinct partition ids.
- int joinTaskNum = Math.min(maxTaskNum, hashEntries.size());
- LOG.info("The determined number of join tasks is " + joinTaskNum);
-
- List<Fragment> rightFragments = new ArrayList<Fragment>();
- rightFragments.add(fragments[1]);
-
- if (broadcastFragments != null) {
- //In this phase a ScanNode has a single fragment.
- //If there are more than one data files, that files should be added to
fragments or partition path
- for (ScanNode eachScan: broadcastScans) {
- Path[] partitionScanPaths = null;
- TableDesc tableDesc =
masterContext.getTableDescMap().get(eachScan.getCanonicalName());
- if (eachScan.getType() == NodeType.PARTITIONS_SCAN) {
- FileStorageManager storageManager =
-
(FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf());
-
- PartitionedTableScanNode partitionScan =
(PartitionedTableScanNode)eachScan;
- partitionScanPaths = partitionScan.getInputPaths();
- // set null to inputPaths in getFragmentsFromPartitionedTable()
- getFragmentsFromPartitionedTable(storageManager, eachScan,
tableDesc);
- partitionScan.setInputPaths(partitionScanPaths);
- } else {
- StorageManager storageManager =
StorageManager.getStorageManager(stage.getContext().getConf(),
- tableDesc.getMeta().getStoreType());
- Collection<Fragment> scanFragments =
storageManager.getSplits(eachScan.getCanonicalName(),
- tableDesc, eachScan);
- if (scanFragments != null) {
- rightFragments.addAll(scanFragments);
- }
- }
- }
- }
- Stage.scheduleFragment(stage, fragments[0], rightFragments);
-
- // Assign partitions to tasks in a round robin manner.
- for (Entry<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> entry
- : hashEntries.entrySet()) {
- addJoinShuffle(stage, entry.getKey(), entry.getValue());
- }
-
- schedulerContext.setTaskSize((int) Math.ceil((double) bothFetchSize /
joinTaskNum));
- schedulerContext.setEstimatedTaskNum(joinTaskNum);
- }
-
- /**
- * merge intermediate entry by ebid, pullhost
- * @param hashEntries
- * @return
- */
- public static Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>>
mergeIntermediateByPullHost(
- Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>>
hashEntries) {
- Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>>
mergedHashEntries =
- new HashMap<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>>();
-
- for(Entry<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> entry:
hashEntries.entrySet()) {
- Integer partId = entry.getKey();
- for (Entry<ExecutionBlockId, List<IntermediateEntry>> partEntry:
entry.getValue().entrySet()) {
- List<IntermediateEntry> intermediateList = partEntry.getValue();
- if (intermediateList == null || intermediateList.isEmpty()) {
- continue;
- }
- ExecutionBlockId ebId = partEntry.getKey();
- // EBID + PullHost -> IntermediateEntry
- // In the case of union partEntry.getKey() return's delegated EBID.
- // Intermediate entries are merged by real EBID.
- Map<String, IntermediateEntry> ebMerged = new HashMap<String,
IntermediateEntry>();
-
- for (IntermediateEntry eachIntermediate: intermediateList) {
- String ebMergedKey = eachIntermediate.getEbId().toString() +
eachIntermediate.getPullHost().getPullAddress();
- IntermediateEntry intermediateEntryPerPullHost =
ebMerged.get(ebMergedKey);
- if (intermediateEntryPerPullHost == null) {
- intermediateEntryPerPullHost = new IntermediateEntry(-1, -1,
partId, eachIntermediate.getPullHost());
- intermediateEntryPerPullHost.setEbId(eachIntermediate.getEbId());
- ebMerged.put(ebMergedKey, intermediateEntryPerPullHost);
- }
-
intermediateEntryPerPullHost.setVolume(intermediateEntryPerPullHost.getVolume()
+ eachIntermediate.getVolume());
- }
-
- List<IntermediateEntry> ebIntermediateEntries = new
ArrayList<IntermediateEntry>(ebMerged.values());
-
- Map<ExecutionBlockId, List<IntermediateEntry>> mergedPartEntries =
mergedHashEntries.get(partId);
- if (mergedPartEntries == null) {
- mergedPartEntries = new HashMap<ExecutionBlockId,
List<IntermediateEntry>>();
- mergedHashEntries.put(partId, mergedPartEntries);
- }
- mergedPartEntries.put(ebId, ebIntermediateEntries);
- }
- }
- return mergedHashEntries;
- }
-
- /**
- * It creates a number of fragments for all partitions.
- */
- public static List<Fragment>
getFragmentsFromPartitionedTable(FileStorageManager sm,
-
ScanNode scan,
-
TableDesc table) throws IOException {
- List<Fragment> fragments = Lists.newArrayList();
- PartitionedTableScanNode partitionsScan = (PartitionedTableScanNode) scan;
- fragments.addAll(sm.getSplits(
- scan.getCanonicalName(), table.getMeta(), table.getSchema(),
partitionsScan.getInputPaths()));
- partitionsScan.setInputPaths(null);
- return fragments;
- }
-
- private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext
schedulerContext, Stage stage,
- int baseScanId,
Fragment[] fragments) throws IOException {
- ExecutionBlock execBlock = stage.getBlock();
- ScanNode[] scans = execBlock.getScanNodes();
-
- for (int i = 0; i < scans.length; i++) {
- if (i != baseScanId) {
- scans[i].setBroadcastTable(true);
- }
- }
-
- // Large table(baseScan)
- // -> add all fragment to baseFragments
- // -> each fragment is assigned to a Task by DefaultTaskScheduler.handle()
- // Broadcast table
- // all fragments or paths assigned every Large table's scan task.
- // -> PARTITIONS_SCAN
- // . add all partition paths to node's inputPaths variable
- // -> SCAN
- // . add all fragments to broadcastFragments
- Collection<Fragment> baseFragments = null;
- List<Fragment> broadcastFragments = new ArrayList<Fragment>();
- for (int i = 0; i < scans.length; i++) {
- ScanNode scan = scans[i];
- TableDesc desc =
stage.getContext().getTableDescMap().get(scan.getCanonicalName());
- TableMeta meta = desc.getMeta();
-
- Collection<Fragment> scanFragments;
- Path[] partitionScanPaths = null;
- if (scan.getType() == NodeType.PARTITIONS_SCAN) {
- PartitionedTableScanNode partitionScan =
(PartitionedTableScanNode)scan;
- partitionScanPaths = partitionScan.getInputPaths();
- // set null to inputPaths in getFragmentsFromPartitionedTable()
- FileStorageManager storageManager =
-
(FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf());
- scanFragments = getFragmentsFromPartitionedTable(storageManager, scan,
desc);
- } else {
- StorageManager storageManager =
- StorageManager.getStorageManager(stage.getContext().getConf(),
desc.getMeta().getStoreType());
-
- scanFragments = storageManager.getSplits(scan.getCanonicalName(),
desc, scan);
- }
-
- if (scanFragments != null) {
- if (i == baseScanId) {
- baseFragments = scanFragments;
- } else {
- if (scan.getType() == NodeType.PARTITIONS_SCAN) {
- PartitionedTableScanNode partitionScan =
(PartitionedTableScanNode)scan;
- // PhisicalPlanner make PartitionMergeScanExec when table is
boradcast table and inputpaths is not empty
- partitionScan.setInputPaths(partitionScanPaths);
- } else {
- broadcastFragments.addAll(scanFragments);
- }
- }
- }
- }
-
- if (baseFragments == null) {
- throw new IOException("No fragments for " +
scans[baseScanId].getTableName());
- }
-
- Stage.scheduleFragments(stage, baseFragments, broadcastFragments);
- schedulerContext.setEstimatedTaskNum(baseFragments.size());
- }
-
- private static void addJoinShuffle(Stage stage, int partitionId,
- Map<ExecutionBlockId,
List<IntermediateEntry>> grouppedPartitions) {
- Map<String, List<FetchImpl>> fetches = new HashMap<String,
List<FetchImpl>>();
- for (ExecutionBlock execBlock :
stage.getMasterPlan().getChilds(stage.getId())) {
- if (grouppedPartitions.containsKey(execBlock.getId())) {
- Collection<FetchImpl> requests = mergeShuffleRequest(partitionId,
HASH_SHUFFLE,
- grouppedPartitions.get(execBlock.getId()));
- fetches.put(execBlock.getId().toString(),
Lists.newArrayList(requests));
- }
- }
-
- if (fetches.isEmpty()) {
- LOG.info(stage.getId() + "'s " + partitionId + " partition has empty
result.");
- return;
- }
- Stage.scheduleFetches(stage, fetches);
- }
-
- /**
- * This method merges the partition request associated with the pullserver's
address.
- * It reduces the number of TCP connections.
- *
- * @return key: pullserver's address, value: a list of requests
- */
- private static Collection<FetchImpl> mergeShuffleRequest(int partitionId,
- ShuffleType type,
-
List<IntermediateEntry> partitions) {
- // ebId + pullhost -> FetchImmpl
- Map<String, FetchImpl> mergedPartitions = new HashMap<String, FetchImpl>();
-
- for (IntermediateEntry partition : partitions) {
- String mergedKey = partition.getEbId().toString() + "," +
partition.getPullHost();
-
- if (mergedPartitions.containsKey(mergedKey)) {
- FetchImpl fetch = mergedPartitions.get(mergedKey);
- fetch.addPart(partition.getTaskId(), partition.getAttemptId());
- } else {
- // In some cases like union each IntermediateEntry has different EBID.
- FetchImpl fetch = new FetchImpl(partition.getPullHost(), type,
partition.getEbId(), partitionId);
- fetch.addPart(partition.getTaskId(), partition.getAttemptId());
- mergedPartitions.put(mergedKey, fetch);
- }
- }
- return mergedPartitions.values();
- }
-
- public static void scheduleFragmentsForNonLeafTasks(TaskSchedulerContext
schedulerContext,
- MasterPlan masterPlan,
Stage stage, int maxNum)
- throws IOException {
- DataChannel channel =
masterPlan.getIncomingChannels(stage.getBlock().getId()).get(0);
- if (channel.getShuffleType() == HASH_SHUFFLE
- || channel.getShuffleType() == SCATTERED_HASH_SHUFFLE) {
- scheduleHashShuffledFetches(schedulerContext, masterPlan, stage,
channel, maxNum);
- } else if (channel.getShuffleType() == RANGE_SHUFFLE) {
- scheduleRangeShuffledFetches(schedulerContext, masterPlan, stage,
channel, maxNum);
- } else {
- throw new InternalException("Cannot support partition type");
- }
- }
-
- private static TableStats
computeChildBlocksStats(QueryMasterTask.QueryMasterTaskContext context,
MasterPlan masterPlan,
- ExecutionBlockId
parentBlockId) {
- List<TableStats> tableStatses = new ArrayList<TableStats>();
- List<ExecutionBlock> childBlocks = masterPlan.getChilds(parentBlockId);
- for (ExecutionBlock childBlock : childBlocks) {
- Stage childStage = context.getStage(childBlock.getId());
- tableStatses.add(childStage.getResultStats());
- }
- return StatisticsUtil.aggregateTableStat(tableStatses);
- }
-
- public static void scheduleRangeShuffledFetches(TaskSchedulerContext
schedulerContext, MasterPlan masterPlan,
- Stage stage, DataChannel
channel, int maxNum)
- throws IOException {
- ExecutionBlock execBlock = stage.getBlock();
- ScanNode scan = execBlock.getScanNodes()[0];
- Path tablePath;
- tablePath =
((FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf()))
- .getTablePath(scan.getTableName());
-
- ExecutionBlock sampleChildBlock = masterPlan.getChild(stage.getId(), 0);
- SortNode sortNode = PlannerUtil.findTopNode(sampleChildBlock.getPlan(),
NodeType.SORT);
- SortSpec [] sortSpecs = sortNode.getSortKeys();
- Schema sortSchema = new Schema(channel.getShuffleKeys());
-
- TupleRange[] ranges;
- int determinedTaskNum;
-
- // calculate the number of maximum query ranges
- TableStats totalStat = computeChildBlocksStats(stage.getContext(),
masterPlan, stage.getId());
-
- // If there is an empty table in inner join, it should return zero rows.
- if (totalStat.getNumBytes() == 0 && totalStat.getColumnStats().size() ==
0) {
- return;
- }
- TupleRange mergedRange = TupleUtil.columnStatToRange(sortSpecs,
sortSchema, totalStat.getColumnStats(), false);
-
- if (sortNode.getSortPurpose() == SortPurpose.STORAGE_SPECIFIED) {
- StoreType storeType =
PlannerUtil.getStoreType(masterPlan.getLogicalPlan());
- CatalogService catalog =
stage.getContext().getQueryMasterContext().getWorkerContext().getCatalog();
- LogicalRootNode rootNode =
masterPlan.getLogicalPlan().getRootBlock().getRoot();
- TableDesc tableDesc = PlannerUtil.getTableDesc(catalog,
rootNode.getChild());
- if (tableDesc == null) {
- throw new IOException("Can't get table meta data from catalog: " +
- PlannerUtil.getStoreTableName(masterPlan.getLogicalPlan()));
- }
- ranges = StorageManager.getStorageManager(stage.getContext().getConf(),
storeType)
- .getInsertSortRanges(stage.getContext().getQueryContext(), tableDesc,
- sortNode.getInSchema(), sortSpecs,
- mergedRange);
- determinedTaskNum = ranges.length;
- } else {
- RangePartitionAlgorithm partitioner = new
UniformRangePartition(mergedRange, sortSpecs);
- BigInteger card = partitioner.getTotalCardinality();
-
- // if the number of the range cardinality is less than the desired
number of tasks,
- // we set the the number of tasks to the number of range cardinality.
- if (card.compareTo(BigInteger.valueOf(maxNum)) < 0) {
- LOG.info(stage.getId() + ", The range cardinality (" + card
- + ") is less then the desired number of tasks (" + maxNum + ")");
- determinedTaskNum = card.intValue();
- } else {
- determinedTaskNum = maxNum;
- }
-
- LOG.info(stage.getId() + ", Try to divide " + mergedRange + " into " +
determinedTaskNum +
- " sub ranges (total units: " + determinedTaskNum + ")");
- ranges = partitioner.partition(determinedTaskNum);
- if (ranges == null || ranges.length == 0) {
- LOG.warn(stage.getId() + " no range infos.");
- }
- TupleUtil.setMaxRangeIfNull(sortSpecs, sortSchema,
totalStat.getColumnStats(), ranges);
- if (LOG.isDebugEnabled()) {
- if (ranges != null) {
- for (TupleRange eachRange : ranges) {
- LOG.debug(stage.getId() + " range: " + eachRange.getStart() + " ~
" + eachRange.getEnd());
- }
- }
- }
- }
-
- FileFragment dummyFragment = new FileFragment(scan.getTableName(),
tablePath, 0, 0, new String[]{UNKNOWN_HOST});
- Stage.scheduleFragment(stage, dummyFragment);
-
- List<FetchImpl> fetches = new ArrayList<FetchImpl>();
- List<ExecutionBlock> childBlocks = masterPlan.getChilds(stage.getId());
- for (ExecutionBlock childBlock : childBlocks) {
- Stage childExecSM = stage.getContext().getStage(childBlock.getId());
- for (Task qu : childExecSM.getTasks()) {
- for (IntermediateEntry p : qu.getIntermediateData()) {
- FetchImpl fetch = new FetchImpl(p.getPullHost(), RANGE_SHUFFLE,
childBlock.getId(), 0);
- fetch.addPart(p.getTaskId(), p.getAttemptId());
- fetches.add(fetch);
- }
- }
- }
-
- boolean ascendingFirstKey = sortSpecs[0].isAscending();
- SortedMap<TupleRange, Collection<FetchImpl>> map;
- if (ascendingFirstKey) {
- map = new TreeMap<TupleRange, Collection<FetchImpl>>();
- } else {
- map = new TreeMap<TupleRange, Collection<FetchImpl>>(new
TupleRange.DescendingTupleRangeComparator());
- }
-
- Set<FetchImpl> fetchSet;
- try {
- RowStoreUtil.RowStoreEncoder encoder =
RowStoreUtil.createEncoder(sortSchema);
- for (int i = 0; i < ranges.length; i++) {
- fetchSet = new HashSet<FetchImpl>();
- for (FetchImpl fetch: fetches) {
- String rangeParam =
- TupleUtil.rangeToQuery(ranges[i], ascendingFirstKey ? i ==
(ranges.length - 1) : i == 0, encoder);
- FetchImpl copy = null;
- try {
- copy = fetch.clone();
- } catch (CloneNotSupportedException e) {
- throw new RuntimeException(e);
- }
- copy.setRangeParams(rangeParam);
- fetchSet.add(copy);
- }
- map.put(ranges[i], fetchSet);
- }
-
- } catch (UnsupportedEncodingException e) {
- LOG.error(e);
- }
-
- scheduleFetchesByRoundRobin(stage, map, scan.getTableName(),
determinedTaskNum);
-
- schedulerContext.setEstimatedTaskNum(determinedTaskNum);
- }
-
- public static void scheduleFetchesByRoundRobin(Stage stage, Map<?,
Collection<FetchImpl>> partitions,
- String tableName, int num) {
- int i;
- Map<String, List<FetchImpl>>[] fetchesArray = new Map[num];
- for (i = 0; i < num; i++) {
- fetchesArray[i] = new HashMap<String, List<FetchImpl>>();
- }
- i = 0;
- for (Entry<?, Collection<FetchImpl>> entry : partitions.entrySet()) {
- Collection<FetchImpl> value = entry.getValue();
- TUtil.putCollectionToNestedList(fetchesArray[i++], tableName, value);
- if (i == num) i = 0;
- }
- for (Map<String, List<FetchImpl>> eachFetches : fetchesArray) {
- Stage.scheduleFetches(stage, eachFetches);
- }
- }
-
- @VisibleForTesting
- public static class FetchGroupMeta {
- long totalVolume;
- List<FetchImpl> fetchUrls;
-
- public FetchGroupMeta(long volume, FetchImpl fetchUrls) {
- this.totalVolume = volume;
- this.fetchUrls = Lists.newArrayList(fetchUrls);
- }
-
- public FetchGroupMeta addFetche(FetchImpl fetches) {
- this.fetchUrls.add(fetches);
- return this;
- }
-
- public void increaseVolume(long volume) {
- this.totalVolume += volume;
- }
-
- public long getVolume() {
- return totalVolume;
- }
-
- }
-
- public static void scheduleHashShuffledFetches(TaskSchedulerContext
schedulerContext, MasterPlan masterPlan,
- Stage stage, DataChannel
channel,
- int maxNum) throws
IOException {
- ExecutionBlock execBlock = stage.getBlock();
- ScanNode scan = execBlock.getScanNodes()[0];
- Path tablePath;
- tablePath =
((FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf()))
- .getTablePath(scan.getTableName());
-
- Fragment frag = new FileFragment(scan.getCanonicalName(), tablePath, 0, 0,
new String[]{UNKNOWN_HOST});
- List<Fragment> fragments = new ArrayList<Fragment>();
- fragments.add(frag);
- Stage.scheduleFragments(stage, fragments);
-
- Map<Integer, FetchGroupMeta> finalFetches = new HashMap<Integer,
FetchGroupMeta>();
- Map<ExecutionBlockId, List<IntermediateEntry>> intermediates = new
HashMap<ExecutionBlockId,
- List<IntermediateEntry>>();
-
- for (ExecutionBlock block : masterPlan.getChilds(execBlock)) {
- List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
-
partitions.addAll(stage.getContext().getStage(block.getId()).getHashShuffleIntermediateEntries());
-
- // In scattered hash shuffle, Collecting each IntermediateEntry
- if (channel.getShuffleType() == SCATTERED_HASH_SHUFFLE) {
- if (intermediates.containsKey(block.getId())) {
- intermediates.get(block.getId()).addAll(partitions);
- } else {
- intermediates.put(block.getId(), partitions);
- }
- }
-
- // make FetchImpl per PullServer, PartId
- Map<Integer, List<IntermediateEntry>> hashed = hashByKey(partitions);
- for (Entry<Integer, List<IntermediateEntry>> interm : hashed.entrySet())
{
- Map<Task.PullHost, List<IntermediateEntry>> hashedByHost =
hashByHost(interm.getValue());
- for (Entry<Task.PullHost, List<IntermediateEntry>> e :
hashedByHost.entrySet()) {
-
- FetchImpl fetch = new FetchImpl(e.getKey(), channel.getShuffleType(),
- block.getId(), interm.getKey(), e.getValue());
-
- long volumeSum = 0;
- for (IntermediateEntry ie : e.getValue()) {
- volumeSum += ie.getVolume();
- }
-
- if (finalFetches.containsKey(interm.getKey())) {
-
finalFetches.get(interm.getKey()).addFetche(fetch).increaseVolume(volumeSum);
- } else {
- finalFetches.put(interm.getKey(), new FetchGroupMeta(volumeSum,
fetch));
- }
- }
- }
- }
-
- int groupingColumns = 0;
- LogicalNode[] groupbyNodes =
PlannerUtil.findAllNodes(stage.getBlock().getPlan(),
- new NodeType[]{NodeType.GROUP_BY, NodeType.DISTINCT_GROUP_BY});
- if (groupbyNodes != null && groupbyNodes.length > 0) {
- LogicalNode bottomNode = groupbyNodes[0];
- if (bottomNode.getType() == NodeType.GROUP_BY) {
- groupingColumns =
((GroupbyNode)bottomNode).getGroupingColumns().length;
- } else if (bottomNode.getType() == NodeType.DISTINCT_GROUP_BY) {
- DistinctGroupbyNode distinctNode =
PlannerUtil.findMostBottomNode(stage.getBlock().getPlan(),
NodeType.DISTINCT_GROUP_BY);
- if (distinctNode == null) {
- LOG.warn(stage.getId() + ", Can't find current DistinctGroupbyNode");
- distinctNode = (DistinctGroupbyNode)bottomNode;
- }
- groupingColumns = distinctNode.getGroupingColumns().length;
-
- Enforcer enforcer = execBlock.getEnforcer();
- EnforceProperty property =
PhysicalPlannerImpl.getAlgorithmEnforceProperty(enforcer, distinctNode);
- if (property != null) {
- if (property.getDistinct().getIsMultipleAggregation()) {
- MultipleAggregationStage mulAggStage =
property.getDistinct().getMultipleAggregationStage();
- if (mulAggStage != MultipleAggregationStage.THRID_STAGE) {
- groupingColumns = distinctNode.getOutSchema().size();
- }
- }
- }
- }
- }
- // get a proper number of tasks
- int determinedTaskNum = Math.min(maxNum, finalFetches.size());
- LOG.info(stage.getId() + ", ScheduleHashShuffledFetches - Max num=" +
maxNum + ", finalFetchURI=" + finalFetches.size());
-
- if (groupingColumns == 0) {
- determinedTaskNum = 1;
- LOG.info(stage.getId() + ", No Grouping Column - determinedTaskNum is
set to 1");
- } else {
- TableStats totalStat = computeChildBlocksStats(stage.getContext(),
masterPlan, stage.getId());
- if (totalStat.getNumRows() == 0) {
- determinedTaskNum = 1;
- }
- }
-
- // set the proper number of tasks to the estimated task num
- if (channel.getShuffleType() == SCATTERED_HASH_SHUFFLE) {
- scheduleScatteredHashShuffleFetches(schedulerContext, stage,
intermediates,
- scan.getTableName());
- } else {
- schedulerContext.setEstimatedTaskNum(determinedTaskNum);
- // divide fetch uris into the the proper number of tasks according to
volumes
- scheduleFetchesByEvenDistributedVolumes(stage, finalFetches,
scan.getTableName(), determinedTaskNum);
- LOG.info(stage.getId() + ", DeterminedTaskNum : " + determinedTaskNum);
- }
- }
-
- public static Pair<Long [], Map<String, List<FetchImpl>>[]>
makeEvenDistributedFetchImpl(
- Map<Integer, FetchGroupMeta> partitions, String tableName, int num) {
-
- // Sort fetchGroupMeta in a descending order of data volumes.
- List<FetchGroupMeta> fetchGroupMetaList =
Lists.newArrayList(partitions.values());
- Collections.sort(fetchGroupMetaList, new Comparator<FetchGroupMeta>() {
- @Override
- public int compare(FetchGroupMeta o1, FetchGroupMeta o2) {
- return o1.getVolume() < o2.getVolume() ? 1 : (o1.getVolume() >
o2.getVolume() ? -1 : 0);
- }
- });
-
- // Initialize containers
- Map<String, List<FetchImpl>>[] fetchesArray = new Map[num];
- Long [] assignedVolumes = new Long[num];
- // initialization
- for (int i = 0; i < num; i++) {
- fetchesArray[i] = new HashMap<String, List<FetchImpl>>();
- assignedVolumes[i] = 0l;
- }
-
- // This algorithm assignes bigger first manner by using a sorted iterator.
It is a kind of greedy manner.
- // Its complexity is O(n). Since FetchGroup can be more than tens of
thousands, we should consider its complexity.
- // In terms of this point, it will show reasonable performance and
results. even though it is not an optimal
- // algorithm.
- Iterator<FetchGroupMeta> iterator = fetchGroupMetaList.iterator();
-
- int p = 0;
- while(iterator.hasNext()) {
- while (p < num && iterator.hasNext()) {
- FetchGroupMeta fetchGroupMeta = iterator.next();
- assignedVolumes[p] += fetchGroupMeta.getVolume();
-
- TUtil.putCollectionToNestedList(fetchesArray[p], tableName,
fetchGroupMeta.fetchUrls);
- p++;
- }
-
- p = num - 1;
- while (p > 0 && iterator.hasNext()) {
- FetchGroupMeta fetchGroupMeta = iterator.next();
- assignedVolumes[p] += fetchGroupMeta.getVolume();
- TUtil.putCollectionToNestedList(fetchesArray[p], tableName,
fetchGroupMeta.fetchUrls);
-
- // While the current one is smaller than next one, it adds additional
fetches to current one.
- while(iterator.hasNext() && assignedVolumes[p - 1] >
assignedVolumes[p]) {
- FetchGroupMeta additionalFetchGroup = iterator.next();
- assignedVolumes[p] += additionalFetchGroup.getVolume();
- TUtil.putCollectionToNestedList(fetchesArray[p], tableName,
additionalFetchGroup.fetchUrls);
- }
-
- p--;
- }
- }
-
- return new Pair<Long[], Map<String, List<FetchImpl>>[]>(assignedVolumes,
fetchesArray);
- }
-
- public static void scheduleFetchesByEvenDistributedVolumes(Stage stage,
Map<Integer, FetchGroupMeta> partitions,
- String tableName,
int num) {
- Map<String, List<FetchImpl>>[] fetchsArray =
makeEvenDistributedFetchImpl(partitions, tableName, num).getSecond();
- // Schedule FetchImpls
- for (Map<String, List<FetchImpl>> eachFetches : fetchsArray) {
- Stage.scheduleFetches(stage, eachFetches);
- }
- }
-
- // Scattered hash shuffle hashes the key columns and groups the hash keys
associated with
- // the same hash key. Then, if the volume of a group is larger
- // than $DIST_QUERY_TABLE_PARTITION_VOLUME, it divides the group into more
than two sub groups
- // according to $DIST_QUERY_TABLE_PARTITION_VOLUME (default size = 256MB).
- // As a result, each group size always becomes the less than or equal
- // to $DIST_QUERY_TABLE_PARTITION_VOLUME. Finally, each subgroup is assigned
to a query unit.
- // It is usually used for writing partitioned tables.
- public static void scheduleScatteredHashShuffleFetches(TaskSchedulerContext
schedulerContext,
- Stage stage, Map<ExecutionBlockId, List<IntermediateEntry>>
intermediates,
- String tableName) {
- long splitVolume = StorageUnit.MB *
-
stage.getMasterPlan().getContext().getLong(SessionVars.TABLE_PARTITION_PER_SHUFFLE_SIZE);
- long pageSize = StorageUnit.MB *
-
stage.getContext().getConf().getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME);
// in bytes
- if (pageSize >= splitVolume) {
- throw new
RuntimeException("tajo.dist-query.table-partition.task-volume-mb should be
great than " +
- "tajo.shuffle.hash.appender.page.volumn-mb");
- }
- List<List<FetchImpl>> fetches = new ArrayList<List<FetchImpl>>();
-
- long totalIntermediateSize = 0L;
- for (Entry<ExecutionBlockId, List<IntermediateEntry>> listEntry :
intermediates.entrySet()) {
- // merge by PartitionId
- Map<Integer, List<IntermediateEntry>> partitionIntermMap = new
HashMap<Integer, List<IntermediateEntry>>();
- for (IntermediateEntry eachInterm: listEntry.getValue()) {
- totalIntermediateSize += eachInterm.getVolume();
- int partId = eachInterm.getPartId();
- List<IntermediateEntry> partitionInterms =
partitionIntermMap.get(partId);
- if (partitionInterms == null) {
- partitionInterms = TUtil.newList(eachInterm);
- partitionIntermMap.put(partId, partitionInterms);
- } else {
- partitionInterms.add(eachInterm);
- }
- }
-
- // Grouping or splitting to fit $DIST_QUERY_TABLE_PARTITION_VOLUME size
- for (List<IntermediateEntry> partitionEntries :
partitionIntermMap.values()) {
- List<List<FetchImpl>> eachFetches =
splitOrMergeIntermediates(listEntry.getKey(), partitionEntries,
- splitVolume, pageSize);
- if (eachFetches != null && !eachFetches.isEmpty()) {
- fetches.addAll(eachFetches);
- }
- }
- }
-
- schedulerContext.setEstimatedTaskNum(fetches.size());
-
- int i = 0;
- Map<String, List<FetchImpl>>[] fetchesArray = new Map[fetches.size()];
- for(List<FetchImpl> entry : fetches) {
- fetchesArray[i] = new HashMap<String, List<FetchImpl>>();
- fetchesArray[i].put(tableName, entry);
-
- Stage.scheduleFetches(stage, fetchesArray[i]);
- i++;
- }
-
- LOG.info(stage.getId()
- + ", ShuffleType:" + SCATTERED_HASH_SHUFFLE.name()
- + ", Intermediate Size: " + totalIntermediateSize
- + ", splitSize: " + splitVolume
- + ", DeterminedTaskNum: " + fetches.size());
- }
-
- /**
- * If a IntermediateEntry is large than splitVolume, List<FetchImpl> has
single element.
- * @param ebId
- * @param entries
- * @param splitVolume
- * @return
- */
- public static List<List<FetchImpl>> splitOrMergeIntermediates(
- ExecutionBlockId ebId, List<IntermediateEntry> entries, long
splitVolume, long pageSize) {
- // Each List<FetchImpl> has splitVolume size.
- List<List<FetchImpl>> fetches = new ArrayList<List<FetchImpl>>();
-
- Iterator<IntermediateEntry> iter = entries.iterator();
- if (!iter.hasNext()) {
- return null;
- }
- List<FetchImpl> fetchListForSingleTask = new ArrayList<FetchImpl>();
- long fetchListVolume = 0;
-
- while (iter.hasNext()) {
- IntermediateEntry currentInterm = iter.next();
-
- long firstSplitVolume = splitVolume - fetchListVolume;
- if (firstSplitVolume < pageSize) {
- firstSplitVolume = splitVolume;
- }
-
- //Each Pair object in the splits variable is assigned to the next
ExectionBlock's task.
- //The first long value is a offset of the intermediate file and the
second long value is length.
- List<Pair<Long, Long>> splits = currentInterm.split(firstSplitVolume,
splitVolume);
- if (splits == null || splits.isEmpty()) {
- break;
- }
-
- for (Pair<Long, Long> eachSplit: splits) {
- if (fetchListVolume > 0 && fetchListVolume + eachSplit.getSecond() >=
splitVolume) {
- if (!fetchListForSingleTask.isEmpty()) {
- fetches.add(fetchListForSingleTask);
- }
- fetchListForSingleTask = new ArrayList<FetchImpl>();
- fetchListVolume = 0;
- }
- FetchImpl fetch = new FetchImpl(currentInterm.getPullHost(),
SCATTERED_HASH_SHUFFLE,
- ebId, currentInterm.getPartId(), TUtil.newList(currentInterm));
- fetch.setOffset(eachSplit.getFirst());
- fetch.setLength(eachSplit.getSecond());
- fetchListForSingleTask.add(fetch);
- fetchListVolume += eachSplit.getSecond();
- }
- }
- if (!fetchListForSingleTask.isEmpty()) {
- fetches.add(fetchListForSingleTask);
- }
- return fetches;
- }
-
- public static List<URI> createFetchURL(FetchImpl fetch, boolean
includeParts) {
- String scheme = "http://";
-
- StringBuilder urlPrefix = new StringBuilder(scheme);
-
urlPrefix.append(fetch.getPullHost().getHost()).append(":").append(fetch.getPullHost().getPort()).append("/?")
-
.append("qid=").append(fetch.getExecutionBlockId().getQueryId().toString())
- .append("&sid=").append(fetch.getExecutionBlockId().getId())
- .append("&p=").append(fetch.getPartitionId())
- .append("&type=");
- if (fetch.getType() == HASH_SHUFFLE) {
- urlPrefix.append("h");
- } else if (fetch.getType() == RANGE_SHUFFLE) {
- urlPrefix.append("r").append("&").append(fetch.getRangeParams());
- } else if (fetch.getType() == SCATTERED_HASH_SHUFFLE) {
- urlPrefix.append("s");
- }
-
- if (fetch.getLength() >= 0) {
-
urlPrefix.append("&offset=").append(fetch.getOffset()).append("&length=").append(fetch.getLength());
- }
-
- List<URI> fetchURLs = new ArrayList<URI>();
- if(includeParts) {
- if (fetch.getType() == HASH_SHUFFLE || fetch.getType() ==
SCATTERED_HASH_SHUFFLE) {
- fetchURLs.add(URI.create(urlPrefix.toString()));
- } else {
- // If the get request is longer than 2000 characters,
- // the long request uri may cause HTTP Status Code - 414 Request-URI
Too Long.
- // Refer to
http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.4.15
- // The below code transforms a long request to multiple requests.
- List<String> taskIdsParams = new ArrayList<String>();
- StringBuilder taskIdListBuilder = new StringBuilder();
- List<Integer> taskIds = fetch.getTaskIds();
- List<Integer> attemptIds = fetch.getAttemptIds();
- boolean first = true;
-
- for (int i = 0; i < taskIds.size(); i++) {
- StringBuilder taskAttemptId = new StringBuilder();
-
- if (!first) { // when comma is added?
- taskAttemptId.append(",");
- } else {
- first = false;
- }
-
- int taskId = taskIds.get(i);
- if (taskId < 0) {
- // In the case of hash shuffle each partition has single shuffle
file per worker.
- // TODO If file is large, consider multiple fetching(shuffle file
can be split)
- continue;
- }
- int attemptId = attemptIds.get(i);
- taskAttemptId.append(taskId).append("_").append(attemptId);
-
- if (taskIdListBuilder.length() + taskAttemptId.length()
- > HTTP_REQUEST_MAXIMUM_LENGTH) {
- taskIdsParams.add(taskIdListBuilder.toString());
- taskIdListBuilder = new StringBuilder(taskId + "_" + attemptId);
- } else {
- taskIdListBuilder.append(taskAttemptId);
- }
- }
- // if the url params remain
- if (taskIdListBuilder.length() > 0) {
- taskIdsParams.add(taskIdListBuilder.toString());
- }
- urlPrefix.append("&ta=");
- for (String param : taskIdsParams) {
- fetchURLs.add(URI.create(urlPrefix + param));
- }
- }
- } else {
- fetchURLs.add(URI.create(urlPrefix.toString()));
- }
-
- return fetchURLs;
- }
-
- public static Map<Integer, List<IntermediateEntry>>
hashByKey(List<IntermediateEntry> entries) {
- Map<Integer, List<IntermediateEntry>> hashed = new HashMap<Integer,
List<IntermediateEntry>>();
- for (IntermediateEntry entry : entries) {
- if (hashed.containsKey(entry.getPartId())) {
- hashed.get(entry.getPartId()).add(entry);
- } else {
- hashed.put(entry.getPartId(), TUtil.newList(entry));
- }
- }
-
- return hashed;
- }
-
- public static Map<Task.PullHost, List<IntermediateEntry>>
hashByHost(List<IntermediateEntry> entries) {
- Map<Task.PullHost, List<IntermediateEntry>> hashed = new
HashMap<Task.PullHost, List<IntermediateEntry>>();
-
- Task.PullHost host;
- for (IntermediateEntry entry : entries) {
- host = entry.getPullHost();
- if (hashed.containsKey(host)) {
- hashed.get(host).add(entry);
- } else {
- hashed.put(host, TUtil.newList(entry));
- }
- }
-
- return hashed;
- }
-
- public static Stage setShuffleOutputNumForTwoPhase(Stage stage, final int
desiredNum, DataChannel channel) {
- ExecutionBlock execBlock = stage.getBlock();
- Column[] keys;
- // if the next query is join,
- // set the partition number for the current logicalUnit
- // TODO: the union handling is required when a join has unions as its child
- MasterPlan masterPlan = stage.getMasterPlan();
- keys = channel.getShuffleKeys();
- if (!masterPlan.isRoot(stage.getBlock()) ) {
- ExecutionBlock parentBlock = masterPlan.getParent(stage.getBlock());
- if (parentBlock.getPlan().getType() == NodeType.JOIN) {
- channel.setShuffleOutputNum(desiredNum);
- }
- }
-
- // set the partition number for group by and sort
- if (channel.getShuffleType() == HASH_SHUFFLE) {
- if (execBlock.getPlan().getType() == NodeType.GROUP_BY ||
- execBlock.getPlan().getType() == NodeType.DISTINCT_GROUP_BY) {
- keys = channel.getShuffleKeys();
- }
- } else if (channel.getShuffleType() == RANGE_SHUFFLE) {
- if (execBlock.getPlan().getType() == NodeType.SORT) {
- SortNode sort = (SortNode) execBlock.getPlan();
- keys = new Column[sort.getSortKeys().length];
- for (int i = 0; i < keys.length; i++) {
- keys[i] = sort.getSortKeys()[i].getSortKey();
- }
- }
- }
- if (keys != null) {
- if (keys.length == 0) {
- channel.setShuffleKeys(new Column[]{});
- channel.setShuffleOutputNum(1);
- } else {
- channel.setShuffleKeys(keys);
- channel.setShuffleOutputNum(desiredNum);
- }
- }
- return stage;
- }
-}