Hyunsik, I am +1 this is worthy of a wiki page or separate page to explain the technical flow. Tracing the flow in the code is confusing.
Maybe similar to the doc like in YARN [1] page (the more details the better ^_^) - Henry [1] http://hadoop.apache.org/docs/current2/hadoop-yarn/hadoop-yarn-site/YARN.html On Thu, Feb 13, 2014 at 2:18 AM, Hyunsik Choi <[email protected]> wrote: > Hi Min, > > Above all, I'm very sorry for lack of documentations on the source code. So > far, we have developed Tajo with insufficient documentations by only > pursuiting a quick and dirty manner. We should fill more documentations on > the source code. > > I'm going to explain how Tajo uses disk volume locality. Before this > explanation, I would like to explain the node locality that you may already > know. Similar to MR, Tajo also uses three level locality for each task. For > each task, the scheduler finds local node, closest rack, and random node > sequentially. In Tajo, the scheduler additionally finds the local volume > prior to finding the local node. > > The important thing is that we don't need to aware of actual disk volume > IDs in each local node, and we just assigne disk volumes to TaskRunners in > a node in a round robin manner. It would be sufficient to improve the load > balancing by considering disk volume. > > Initially, TaskRunners are not mapped to disk volumes in each worker. The > mapping occurs dynamically in the scheduler. For example, there are 6 local > tasks for some node N1 which has 3 disk volumes (v1, v2, and v3). Also, > three TaskRunners (T1, T2, and T3) will be running on the node N1. > > When tasks are added to the scheduler, the scheduler gets the disk volume > id from each task. As you know, each volume is just an integer which is > just a logical identifier for just distinguishing different disk volumes. > Then, the scheduler builds a map between disk volume ids (obtained from > BlockStorageLocation) in each node and a list of tasks > (DefaultTaskScheduler::addQueryUnitAttemptId). In other words, each entry > in the map consists of one disk volume id and a list of tasks corresponding > to the disk volume. > > When the first task is requested from a TaskRunner T1 in node N1, the > scheduler just assignes the first disk volume v1 to T1, and then it > schedules one task which belongs to the disk volume v1. Later, a task is > requested from a different TaskRunner T2 from node N1, the schedules > assignes the second disk volume v2 to T2, and then it schedules a task > which belongs to the disk volume v2. Also, a task request is given from T1 > again, the scheduler schedules one task in the disk volume v1 to T1 because > T1 is already mapped to v1. > > Like MR, Tajo uses a dynamic scheduling, and it works very well in the > environments where each node has different performance disks. If you have > additional question, please feel free to ask. > > Also, I'll create a Jira issue to add this explain to DefaultTaskScheduler. > > - hyunsik > > > > On Thu, Feb 13, 2014 at 5:29 PM, Min Zhou <[email protected]> wrote: > >> Hi Jihoon, >> >> Thank you for you answer. However, seem you didn't answer that how tajo use >> disk information to balance the io overhead. >> >> And still can't understand the details, quite complex to me, especially >> the class TaskBlockLocation >> >> >> public static class TaskBlockLocation { >> // This is a mapping from diskId to a list of pending task, right? >> private HashMap<Integer, LinkedList<QueryUnitAttemptId>> >> unAssignedTaskMap = >> new HashMap<Integer, LinkedList<QueryUnitAttemptId>>(); >> // How can I return a Task to the container according to the diskId? >> private HashMap<ContainerId, Integer> assignedContainerMap = new >> HashMap<ContainerId, Integer>(); >> private TreeMap<Integer, Integer> volumeUsageMap = new TreeMap<Integer, >> Integer>(); >> private String host; >> >> public TaskBlockLocation(String host){ >> this.host = host; >> } >> >> public void addQueryUnitAttemptId(Integer volumeId, QueryUnitAttemptId >> attemptId){ >> LinkedList<QueryUnitAttemptId> list = >> unAssignedTaskMap.get(volumeId); >> if (list == null) { >> list = new LinkedList<QueryUnitAttemptId>(); >> unAssignedTaskMap.put(volumeId, list); >> } >> list.add(attemptId); >> >> if(!volumeUsageMap.containsKey(volumeId)) >> volumeUsageMap.put(volumeId, 0); >> } >> >> public LinkedList<QueryUnitAttemptId> >> getQueryUnitAttemptIdList(ContainerId containerId){ >> Integer volumeId; >> >> if (!assignedContainerMap.containsKey(containerId)) { >> // assign a new container to a volume with the lowest concurrency, >> right? >> volumeId = assignVolumeId(); >> assignedContainerMap.put(containerId, volumeId); >> } else { >> volumeId = assignedContainerMap.get(containerId); >> } >> >> LinkedList<QueryUnitAttemptId> list = null; >> if (unAssignedTaskMap.size() > 0) { >> int retry = unAssignedTaskMap.size(); >> do { >> list = unAssignedTaskMap.get(volumeId); >> if (list == null || list.size() == 0) { >> //clean and reassign remaining volume >> unAssignedTaskMap.remove(volumeId); >> volumeUsageMap.remove(volumeId); >> if (volumeId < 0) break; // processed all block on disk >> >> // WHY THIS LINE ASSIGN A VOLUMEID AGAIN? >> volumeId = assignVolumeId(); >> // WHY THIS LINE PUT AGAIN? >> // if the container is a new container, does it put twice?? >> assignedContainerMap.put(containerId, volumeId); >> retry--; >> } else { >> break; >> } >> } while (retry > 0); >> } >> return list; >> } >> >> public Integer assignVolumeId(){ >> Map.Entry<Integer, Integer> volumeEntry = null; >> >> // choose a volume with the lowest concurrency, right? >> for (Map.Entry<Integer, Integer> entry : volumeUsageMap.entrySet()) { >> if(volumeEntry == null) volumeEntry = entry; >> >> if (volumeEntry.getValue() >= entry.getValue()) { >> volumeEntry = entry; >> } >> } >> >> if(volumeEntry != null){ >> volumeUsageMap.put(volumeEntry.getKey(), volumeEntry.getValue() + >> 1); >> LOG.info("Assigned host : " + host + " Volume : " + >> volumeEntry.getKey() + ", Concurrency : " >> + volumeUsageMap.get(volumeEntry.getKey())); >> return volumeEntry.getKey(); >> } else { >> return -1; // processed all block on disk >> } >> } >> >> public String getHost() { >> return host; >> } >> } >> >> This class maintains a mapping (assignedContainerMap) from containerId to >> the assigned diskId, How can I retrieve a task based on the diskId to the >> container? >> >> >> Thanks, >> Min >> >> >> On Wed, Feb 12, 2014 at 10:17 PM, Jihoon Son <[email protected]> wrote: >> >> > Hi, Min. >> > >> > In DefaultTaskScheduler, each container is mapped to each disk of all >> nodes >> > in a cluster. When a container requests a task, DefaultTaskScheduler >> > selects a closest task and assigns it to the container. This process >> works >> > for only the local reads. The disk volume information is not considered >> for >> > remote reads. >> > >> > In my opinion, this is enough for us because there are few remote tasks >> in >> > each sub query. From a test on an in-house cluster composed of 32 nodes, >> > the ratio of remote tasks to whole tasks was only about 0.17% (The query >> > was 'select l_orderkey from lineitem', and the volume of the lineitem >> table >> > was about 1TB.). Since the number of tasks was very small, there were >> small >> > disk contentions. >> > >> > Hope that answers your questions. >> > Thanks, >> > Jihoon >> > >> > 2014-02-13 11:00 GMT+09:00 Min Zhou <[email protected]>: >> > >> > > Hi all, >> > > >> > > Tajo leverages the feature supported by HDFS-3672, which exposes the >> disk >> > > volume id of each hdfs data block. I already found the related code in >> > > DefaultTaskScheduler.assignToLeafTasks, can anyone explain the logic >> for >> > > me? What the scheduler do when the hdfs read is a remote read on the >> > > other >> > > machine's disk? >> > > >> > > >> > > Thanks, >> > > Min >> > > -- >> > > My research interests are distributed systems, parallel computing and >> > > bytecode based virtual machine. >> > > >> > > My profile: >> > > http://www.linkedin.com/in/coderplay >> > > My blog: >> > > http://coderplay.javaeye.com >> > > >> > >> >> >> >> -- >> My research interests are distributed systems, parallel computing and >> bytecode based virtual machine. >> >> My profile: >> http://www.linkedin.com/in/coderplay >> My blog: >> http://coderplay.javaeye.com >>
