Hi Min,
Here are my comments
If you have any questions, please let me know
Thanks.
> Hi Jihoon,
>
> Thank you for you answer. However, seem you didn't answer that how tajo use
> disk information to balance the io overhead.
>
> And still can't understand the details, quite complex to me, especially
> the class TaskBlockLocation
>
>
> public static class TaskBlockLocation {
> // This is a mapping from diskId to a list of pending task, right?
> private HashMap<Integer, LinkedList<QueryUnitAttemptId>>
> unAssignedTaskMap =
Yes, this is a list of pending task of a host. (initial time)
hostA(TaskBlockLocation) ------- Disk1 --------Task1(HDFS replecation-1), 2
|
Disk2 --------Task3,4
hostB -------------------------- Disk1 --------Task1(HDFS replecation-2), 3
|
Disk2 --------Task2,4
> new HashMap<Integer, LinkedList<QueryUnitAttemptId>>();
> // How can I return a Task to the container according to the diskId?
> private HashMap<ContainerId, Integer> assignedContainerMap = new
> HashMap<ContainerId, Integer>();
When available container(unique per host) are request a task, a container
assign lowest concurrency disk.(runtime)
> private TreeMap<Integer, Integer> volumeUsageMap = new TreeMap<Integer,
> Integer>();
> private String host;
>
> public TaskBlockLocation(String host){
> this.host = host;
> }
>
> public void addQueryUnitAttemptId(Integer volumeId, QueryUnitAttemptId
> attemptId){
> LinkedList<QueryUnitAttemptId> list = unAssignedTaskMap.get(volumeId);
> if (list == null) {
> list = new LinkedList<QueryUnitAttemptId>();
> unAssignedTaskMap.put(volumeId, list);
> }
> list.add(attemptId);
>
> if(!volumeUsageMap.containsKey(volumeId))
> volumeUsageMap.put(volumeId, 0);
> }
>
> public LinkedList<QueryUnitAttemptId>
> getQueryUnitAttemptIdList(ContainerId containerId){
> Integer volumeId;
>
> if (!assignedContainerMap.containsKey(containerId)) {
> // assign a new container to a volume with the lowest concurrency,
> right?
> volumeId = assignVolumeId();
Yes, it is
> assignedContainerMap.put(containerId, volumeId);
> } else {
> volumeId = assignedContainerMap.get(containerId);
> }
>
> LinkedList<QueryUnitAttemptId> list = null;
> if (unAssignedTaskMap.size() > 0) {
> int retry = unAssignedTaskMap.size();
> do {
> list = unAssignedTaskMap.get(volumeId);
> if (list == null || list.size() == 0) {
> //clean and reassign remaining volume
> unAssignedTaskMap.remove(volumeId);
> volumeUsageMap.remove(volumeId);
> if (volumeId < 0) break; // processed all block on disk
>
> // WHY THIS LINE ASSIGN A VOLUMEID AGAIN?
> volumeId = assignVolumeId();
> // WHY THIS LINE PUT AGAIN?
> // if the container is a new container, does it put twice??
> assignedContainerMap.put(containerId, volumeId);
Case 1 :
disks greater than containers.
ex) host ------- container1 ---------- disk1 ---- tasks
|
disk2 ----
tasks
Case 2 :
This is unknown disk(-1) for the remote task. because all local block
is processed on disk
if container is not assigned a remote task, end of all tasks will be
deadlock
ex)
hostA ------- container1 ---------- disk1( new assign -1) ----
remote pending tasks(zero local pending task)
|
container2 ---------- disk2 ---- tasks
hostB ------- container1 ---------- disk1 ---- tasks (will
decreased pending task)
--Jinho
2014. 2. 13., 오후 5:29, Min Zhou 작성:
> Hi Jihoon,
>
> Thank you for you answer. However, seem you didn't answer that how tajo use
> disk information to balance the io overhead.
>
> And still can't understand the details, quite complex to me, especially
> the class TaskBlockLocation
>
>
> public static class TaskBlockLocation {
> // This is a mapping from diskId to a list of pending task, right?
> private HashMap<Integer, LinkedList<QueryUnitAttemptId>>
> unAssignedTaskMap =
> new HashMap<Integer, LinkedList<QueryUnitAttemptId>>();
> // How can I return a Task to the container according to the diskId?
> private HashMap<ContainerId, Integer> assignedContainerMap = new
> HashMap<ContainerId, Integer>();
> private TreeMap<Integer, Integer> volumeUsageMap = new TreeMap<Integer,
> Integer>();
> private String host;
>
> public TaskBlockLocation(String host){
> this.host = host;
> }
>
> public void addQueryUnitAttemptId(Integer volumeId, QueryUnitAttemptId
> attemptId){
> LinkedList<QueryUnitAttemptId> list = unAssignedTaskMap.get(volumeId);
> if (list == null) {
> list = new LinkedList<QueryUnitAttemptId>();
> unAssignedTaskMap.put(volumeId, list);
> }
> list.add(attemptId);
>
> if(!volumeUsageMap.containsKey(volumeId))
> volumeUsageMap.put(volumeId, 0);
> }
>
> public LinkedList<QueryUnitAttemptId>
> getQueryUnitAttemptIdList(ContainerId containerId){
> Integer volumeId;
>
> if (!assignedContainerMap.containsKey(containerId)) {
> // assign a new container to a volume with the lowest concurrency,
> right?
> volumeId = assignVolumeId();
> assignedContainerMap.put(containerId, volumeId);
> } else {
> volumeId = assignedContainerMap.get(containerId);
> }
>
> LinkedList<QueryUnitAttemptId> list = null;
> if (unAssignedTaskMap.size() > 0) {
> int retry = unAssignedTaskMap.size();
> do {
> list = unAssignedTaskMap.get(volumeId);
> if (list == null || list.size() == 0) {
> //clean and reassign remaining volume
> unAssignedTaskMap.remove(volumeId);
> volumeUsageMap.remove(volumeId);
> if (volumeId < 0) break; // processed all block on disk
>
> // WHY THIS LINE ASSIGN A VOLUMEID AGAIN?
> volumeId = assignVolumeId();
> // WHY THIS LINE PUT AGAIN?
> // if the container is a new container, does it put twice??
> assignedContainerMap.put(containerId, volumeId);
> retry--;
> } else {
> break;
> }
> } while (retry > 0);
> }
> return list;
> }
>
> public Integer assignVolumeId(){
> Map.Entry<Integer, Integer> volumeEntry = null;
>
> // choose a volume with the lowest concurrency, right?
> for (Map.Entry<Integer, Integer> entry : volumeUsageMap.entrySet()) {
> if(volumeEntry == null) volumeEntry = entry;
>
> if (volumeEntry.getValue() >= entry.getValue()) {
> volumeEntry = entry;
> }
> }
>
> if(volumeEntry != null){
> volumeUsageMap.put(volumeEntry.getKey(), volumeEntry.getValue() +
> 1);
> LOG.info("Assigned host : " + host + " Volume : " +
> volumeEntry.getKey() + ", Concurrency : "
> + volumeUsageMap.get(volumeEntry.getKey()));
> return volumeEntry.getKey();
> } else {
> return -1; // processed all block on disk
> }
> }
>
> public String getHost() {
> return host;
> }
> }
>
> This class maintains a mapping (assignedContainerMap) from containerId to
> the assigned diskId, How can I retrieve a task based on the diskId to the
> container?
>
>
> Thanks,
> Min
>
>
> On Wed, Feb 12, 2014 at 10:17 PM, Jihoon Son <[email protected]> wrote:
>
>> Hi, Min.
>>
>> In DefaultTaskScheduler, each container is mapped to each disk of all nodes
>> in a cluster. When a container requests a task, DefaultTaskScheduler
>> selects a closest task and assigns it to the container. This process works
>> for only the local reads. The disk volume information is not considered for
>> remote reads.
>>
>> In my opinion, this is enough for us because there are few remote tasks in
>> each sub query. From a test on an in-house cluster composed of 32 nodes,
>> the ratio of remote tasks to whole tasks was only about 0.17% (The query
>> was 'select l_orderkey from lineitem', and the volume of the lineitem table
>> was about 1TB.). Since the number of tasks was very small, there were small
>> disk contentions.
>>
>> Hope that answers your questions.
>> Thanks,
>> Jihoon
>>
>> 2014-02-13 11:00 GMT+09:00 Min Zhou <[email protected]>:
>>
>>> Hi all,
>>>
>>> Tajo leverages the feature supported by HDFS-3672, which exposes the disk
>>> volume id of each hdfs data block. I already found the related code in
>>> DefaultTaskScheduler.assignToLeafTasks, can anyone explain the logic for
>>> me? What the scheduler do when the hdfs read is a remote read on the
>>> other
>>> machine's disk?
>>>
>>>
>>> Thanks,
>>> Min
>>> --
>>> My research interests are distributed systems, parallel computing and
>>> bytecode based virtual machine.
>>>
>>> My profile:
>>> http://www.linkedin.com/in/coderplay
>>> My blog:
>>> http://coderplay.javaeye.com
>>>
>>
>
>
>
> --
> My research interests are distributed systems, parallel computing and
> bytecode based virtual machine.
>
> My profile:
> http://www.linkedin.com/in/coderplay
> My blog:
> http://coderplay.javaeye.com