[ 
http://issues.apache.org/jira/browse/HADOOP-362?page=comments#action_12422822 ] 
            
Devaraj Das commented on HADOOP-362:
------------------------------------

Will timestamps help for the generic case? So the thread invoking the RPC 
timestamps the message and when the server handler pulls a job out of the queue 
for execution, it makes a note of the sender's timestamp. It then looks up a 
map from client address to client-timestamp and if the current call's timestamp 
happens to be older than the one found in the map, the server simply ignores 
that. This means that we don't guarantee  that all calls will be invoked at the 
server.
Additionally, we can have a flag that the client sets forcing the server to 
execute the call even if it violates the timeliness of the call. This may be 
required in the DFS operations where a client wants to, lets say, create a file 
(not absolutely sure whether this is a sensible use-case) but in general this 
may be helpful.
Both of the above can be implemented in the same lines as call-id is handled 
today (it is a part of each RPC call). In fact, the call-id itself can serve as 
the timestamp. Makes sense?

> tasks can get lost when reporting task completion to the JobTracker has an 
> error
> --------------------------------------------------------------------------------
>
>                 Key: HADOOP-362
>                 URL: http://issues.apache.org/jira/browse/HADOOP-362
>             Project: Hadoop
>          Issue Type: Bug
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Owen O'Malley
>         Attachments: lost-status-updates.patch
>
>
> Basically, the JobTracker used to lose some updates about successful map 
> tasks and it would assume that the tasks are still running (the old progress 
> report is what it used to display in the web page). Now this would cause the 
> reduces to also wait for the map output and they would never receive the 
> output. This would cause the job to appear as if it was hung.
>  
> The following piece of code sends the status of tasks to the JobTracker:
>  
>             synchronized (this) {
>                 for (Iterator it = runningTasks.values().iterator();
>                      it.hasNext(); ) {
>                     TaskInProgress tip = (TaskInProgress) it.next();
>                     TaskStatus status = tip.createStatus();
>                     taskReports.add(status);
>                     if (status.getRunState() != TaskStatus.RUNNING) {
>                         if (tip.getTask().isMapTask()) {
>                             mapTotal--;
>                         } else {
>                             reduceTotal--;
>                         }
>                         it.remove();
>                     }
>                 }
>             }
>  
>             //
>             // Xmit the heartbeat
>             //
>            
>             TaskTrackerStatus status =
>               new TaskTrackerStatus(taskTrackerName, localHostname,
>                                     httpPort, taskReports,
>                                     failures);
>             int resultCode = jobClient.emitHeartbeat(status, justStarted);
>  
>  
> Notice that the completed TIPs are removed from runningTasks data structure. 
> Now, if the emitHeartBeat threw an exception (if it could not communicate 
> with the JobTracker till the IPC timeout expires) then this update is lost. 
> And the next time it sends the hearbeat this completed task's status is 
> missing and hence the JobTracker doesn't know about this completed task. So, 
> one solution to this is to remove the completed TIPs from runningTasks after 
> emitHeartbeat returns. Here is how the new code would look like:
>  
>  
>             synchronized (this) {
>                 for (Iterator it = runningTasks.values().iterator();
>                      it.hasNext(); ) {
>                     TaskInProgress tip = (TaskInProgress) it.next();
>                     TaskStatus status = tip.createStatus();
>                     taskReports.add(status);
>                 }
>             }
>  
>             //
>             // Xmit the heartbeat
>             //
>  
>             TaskTrackerStatus status =
>               new TaskTrackerStatus(taskTrackerName, localHostname,
>                                     httpPort, taskReports,
>                                     failures);
>             int resultCode = jobClient.emitHeartbeat(status, justStarted);
>             synchronized (this) {
>                 for (Iterator it = runningTasks.values().iterator();
>                      it.hasNext(); ) {
>                     TaskInProgress tip = (TaskInProgress) it.next();
>                     if (tip.runstate != TaskStatus.RUNNING) {
>                         if (tip.getTask().isMapTask()) {
>                             mapTotal--;
>                         } else {
>                             reduceTotal--;
>                         }
>                         it.remove();
>                     }
>                 }
>             }
>  

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: 
http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to