[ 
http://issues.apache.org/jira/browse/HADOOP-362?page=comments#action_12423020 ] 
            
Devaraj Das commented on HADOOP-362:
------------------------------------

Yes, that's true. That's why in my proposal (the second part) I allow for this 
also - the sender of the RPC explicitly flags an RPC message that it should be 
executed no matter when it is considered for execution. That is, even if a 
message, m1, is received later than other messages that it sends after sending 
m1, the server should honor that and execute the RPC.
So basically, the client decides whether the server should ignore all 
(not-yet-executed) RPC requests sent before the current RPC request that the 
server is executing  OR  the server should execute all RPC requests.
We need to serialize the execution of RPC requests based on client addresses 
(to avoid the problem of multiple requests from the same client getting 
executed in parallel by different handler threads). This will avoid the race 
condition for cases like job status updates.
Yes, I agree that this we can implement the above in the protocol itself. I 
mean each protocol could have a flag signifying either  "execute all RPCs" or 
"ignore RPC requests with timestamps later than the current RPC's timestamp". 
Since in the RPC implementation in Hadoop, we have a single client for each 
protocol, this policy will work I think.

> tasks can get lost when reporting task completion to the JobTracker has an 
> error
> --------------------------------------------------------------------------------
>
>                 Key: HADOOP-362
>                 URL: http://issues.apache.org/jira/browse/HADOOP-362
>             Project: Hadoop
>          Issue Type: Bug
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Owen O'Malley
>         Attachments: lost-status-updates.patch
>
>
> Basically, the JobTracker used to lose some updates about successful map 
> tasks and it would assume that the tasks are still running (the old progress 
> report is what it used to display in the web page). Now this would cause the 
> reduces to also wait for the map output and they would never receive the 
> output. This would cause the job to appear as if it was hung.
>  
> The following piece of code sends the status of tasks to the JobTracker:
>  
>             synchronized (this) {
>                 for (Iterator it = runningTasks.values().iterator();
>                      it.hasNext(); ) {
>                     TaskInProgress tip = (TaskInProgress) it.next();
>                     TaskStatus status = tip.createStatus();
>                     taskReports.add(status);
>                     if (status.getRunState() != TaskStatus.RUNNING) {
>                         if (tip.getTask().isMapTask()) {
>                             mapTotal--;
>                         } else {
>                             reduceTotal--;
>                         }
>                         it.remove();
>                     }
>                 }
>             }
>  
>             //
>             // Xmit the heartbeat
>             //
>            
>             TaskTrackerStatus status =
>               new TaskTrackerStatus(taskTrackerName, localHostname,
>                                     httpPort, taskReports,
>                                     failures);
>             int resultCode = jobClient.emitHeartbeat(status, justStarted);
>  
>  
> Notice that the completed TIPs are removed from runningTasks data structure. 
> Now, if the emitHeartBeat threw an exception (if it could not communicate 
> with the JobTracker till the IPC timeout expires) then this update is lost. 
> And the next time it sends the hearbeat this completed task's status is 
> missing and hence the JobTracker doesn't know about this completed task. So, 
> one solution to this is to remove the completed TIPs from runningTasks after 
> emitHeartbeat returns. Here is how the new code would look like:
>  
>  
>             synchronized (this) {
>                 for (Iterator it = runningTasks.values().iterator();
>                      it.hasNext(); ) {
>                     TaskInProgress tip = (TaskInProgress) it.next();
>                     TaskStatus status = tip.createStatus();
>                     taskReports.add(status);
>                 }
>             }
>  
>             //
>             // Xmit the heartbeat
>             //
>  
>             TaskTrackerStatus status =
>               new TaskTrackerStatus(taskTrackerName, localHostname,
>                                     httpPort, taskReports,
>                                     failures);
>             int resultCode = jobClient.emitHeartbeat(status, justStarted);
>             synchronized (this) {
>                 for (Iterator it = runningTasks.values().iterator();
>                      it.hasNext(); ) {
>                     TaskInProgress tip = (TaskInProgress) it.next();
>                     if (tip.runstate != TaskStatus.RUNNING) {
>                         if (tip.getTask().isMapTask()) {
>                             mapTotal--;
>                         } else {
>                             reduceTotal--;
>                         }
>                         it.remove();
>                     }
>                 }
>             }
>  

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: 
http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to