Hi all,
I spent more than half a day try to understand the logic
of Repartitioner.scheduleFragmentsForJoinQuery(), still be confused.
Can anyone help me ?
public static void scheduleFragmentsForJoinQuery(SubQuery subQuery)
throws IOException {
MasterPlan masterPlan = subQuery.getMasterPlan();
ExecutionBlock execBlock = subQuery.getBlock();
QueryMasterTask.QueryMasterTaskContext masterContext =
subQuery.getContext();
AbstractStorageManager storageManager = subQuery.getStorageManager();
ScanNode[] scans = execBlock.getScanNodes();
Path tablePath;
// DOES TAJO ONLY SUPPORT 2 WAY JOINS?
// WHY THIS ARRAY SIZE IS ONLY 2?
FileFragment[] fragments = new FileFragment[2];
long[] stats = new long[2];
// initialize variables from the child operators
for (int i = 0; i < 2; i++) {
TableDesc tableDesc =
masterContext.getTableDescMap().get(scans[i].getCanonicalName());
if (tableDesc == null) { // if it is a real table stored on storage
// WHAT'S THE COMMENT MEAN?
// WHICH KIND OF SQL CAN RUN INTO HERE?
// TODO - to be fixed (wrong directory)
ExecutionBlock [] childBlocks = new ExecutionBlock[2];
childBlocks[0] = masterPlan.getChild(execBlock.getId(), 0);
childBlocks[1] = masterPlan.getChild(execBlock.getId(), 1);
tablePath = storageManager.getTablePath(scans[i].getTableName());
stats[i] =
masterContext.getSubQuery(childBlocks[i].getId()).getTableStat().getNumBytes();
fragments[i] = new FileFragment(scans[i].getCanonicalName(),
tablePath, 0, 0, new String[]{UNKNOWN_HOST});
} else {
tablePath = tableDesc.getPath();
try {
stats[i] = GlobalPlanner.computeDescendentVolume(scans[i]);
} catch (PlanningException e) {
throw new IOException(e);
}
fragments[i] =
storageManager.getSplits(scans[i].getCanonicalName(), tableDesc.getMeta(),
tableDesc.getSchema(),
tablePath).get(0); // WHY JUST RETURN THE FIRST MEMBER OF THE
SPLITS ARRAY?
}
}
LOG.info(String.format("Left Volume: %d, Right Volume: %d", stats[0],
stats[1]));
// Assigning either fragments or fetch urls to query units
boolean leftSmall =
execBlock.isBroadcastTable(scans[0].getCanonicalName());
boolean rightSmall =
execBlock.isBroadcastTable(scans[1].getCanonicalName());
if (leftSmall && rightSmall) {
LOG.info("[Distributed Join Strategy] : Immediate Two Way Join on
Single Machine");
SubQuery.scheduleFragment(subQuery, fragments[0], fragments[1]);
} else if (leftSmall ^ rightSmall) {
int broadcastIdx = leftSmall ? 0 : 1;
int baseScanIdx = leftSmall ? 1 : 0;
LOG.info(String.format("[BRDCAST JOIN] base_table=%s, base_volume=%d",
scans[baseScanIdx].getCanonicalName(), stats[baseScanIdx]));
scheduleLeafTasksWithBroadcastTable(subQuery, baseScanIdx,
fragments[broadcastIdx]);
} else {
LOG.info("[Distributed Join Strategy] : Symmetric Repartition Join");
// The hash map is modeling as follows:
// <Part Id, <Table Name, Intermediate Data>>
Map<Integer, Map<String, List<IntermediateEntry>>> hashEntries = new
HashMap<Integer, Map<String, List<IntermediateEntry>>>();
// Grouping IntermediateData by a partition key and a table name
for (ScanNode scan : scans) {
SubQuery childSubQuery =
masterContext.getSubQuery(TajoIdUtils.createExecutionBlockId(scan.getCanonicalName()));
for (QueryUnit task : childSubQuery.getQueryUnits()) {
if (task.getIntermediateData() != null) {
for (IntermediateEntry intermEntry :
task.getIntermediateData()) {
if (hashEntries.containsKey(intermEntry.getPartId())) {
Map<String, List<IntermediateEntry>> tbNameToInterm =
hashEntries.get(intermEntry.getPartId());
if (tbNameToInterm.containsKey(scan.getCanonicalName())) {
tbNameToInterm.get(scan.getCanonicalName()).add(intermEntry);
} else {
tbNameToInterm.put(scan.getCanonicalName(),
TUtil.newList(intermEntry));
}
} else {
Map<String, List<IntermediateEntry>> tbNameToInterm =
new HashMap<String, List<IntermediateEntry>>();
tbNameToInterm.put(scan.getCanonicalName(),
TUtil.newList(intermEntry));
hashEntries.put(intermEntry.getPartId(), tbNameToInterm);
}
}
}
}
}
// hashEntries can be zero if there are no input data.
// In the case, it will cause the zero divided exception.
// it avoids this problem.
int [] avgSize = new int[2];
avgSize[0] = hashEntries.size() == 0 ? 0 : (int) (stats[0] /
hashEntries.size());
avgSize[1] = hashEntries.size() == 0 ? 0 : (int) (stats[1] /
hashEntries.size());
int bothFetchSize = avgSize[0] + avgSize[1];
// Getting the desire number of join tasks according to the volumn
// of a larger table
int largerIdx = stats[0] >= stats[1] ? 0 : 1;
int desireJoinTaskVolumn = subQuery.getContext().getConf().
getIntVar(ConfVars.DIST_QUERY_JOIN_TASK_VOLUME);
// calculate the number of tasks according to the data size
int mb = (int) Math.ceil((double)stats[largerIdx] / 1048576);
LOG.info("Larger intermediate data is approximately " + mb + " MB");
// determine the number of task per 64MB
int maxTaskNum = (int) Math.ceil((double)mb / desireJoinTaskVolumn);
LOG.info("The calculated number of tasks is " + maxTaskNum);
LOG.info("The number of total shuffle keys is " + hashEntries.size());
// the number of join tasks cannot be larger than the number of
// distinct partition ids.
int joinTaskNum = Math.min(maxTaskNum, hashEntries.size());
LOG.info("The determined number of join tasks is " + joinTaskNum);
SubQuery.scheduleFragment(subQuery, fragments[0], fragments[1]); //
WHY JUST ONLY 2 FRAGMENT FOR THIS JOIN? JUST 2 HDFS BLOCKS?
// Assign partitions to tasks in a round robin manner.
for (Entry<Integer, Map<String, List<IntermediateEntry>>> entry
: hashEntries.entrySet()) {
addJoinShuffle(subQuery, entry.getKey(), entry.getValue());
}
}
}
Thanks,
Min
--
My research interests are distributed systems, parallel computing and
bytecode based virtual machine.
My profile:
http://www.linkedin.com/in/coderplay
My blog:
http://coderplay.javaeye.com