jeongyooneo commented on a change in pull request #21: [NEMO-46] Make the
operations on ExecutorRegistry atomic
URL: https://github.com/apache/incubator-nemo/pull/21#discussion_r189775205
##########
File path:
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
##########
@@ -144,53 +140,61 @@ public void onTaskStateChanged(final String executorId,
final String taskId,
final TaskState.State newState, final int
attemptIdx,
@Nullable final String taskPutOnHold,
final TaskState.RecoverableFailureCause
failureCause) {
- switch (newState) {
- case COMPLETE:
- jobStateManager.onTaskStateChanged(taskId, newState);
- onTaskExecutionComplete(executorId, taskId);
- break;
- case FAILED_RECOVERABLE:
- onTaskExecutionFailedRecoverable(executorId, taskId, attemptIdx,
newState, failureCause);
- break;
- case ON_HOLD:
- jobStateManager.onTaskStateChanged(taskId, newState);
- onTaskExecutionOnHold(executorId, taskId, taskPutOnHold);
- break;
- case FAILED_UNRECOVERABLE:
- throw new UnrecoverableFailureException(new Exception(new
StringBuffer().append("The job failed on Task #")
- .append(taskId).append(" in Executor
").append(executorId).toString()));
- case READY:
- case EXECUTING:
- throw new IllegalStateTransitionException(
- new Exception("The states READY/EXECUTING cannot occur at this
point"));
- default:
- throw new UnknownExecutionStateException(new Exception("This TaskState
is unknown: " + newState));
+ final int currentTaskAttemptIndex =
jobStateManager.getCurrentAttemptIndexForTask(taskId);
+ if (attemptIdx == currentTaskAttemptIndex) {
+ switch (newState) {
+ case COMPLETE:
+ jobStateManager.onTaskStateChanged(taskId, newState);
+ onTaskExecutionComplete(executorId, taskId);
+ break;
+ case FAILED_RECOVERABLE:
+ onTaskExecutionFailedRecoverable(executorId, taskId, newState,
failureCause);
+ break;
+ case ON_HOLD:
+ jobStateManager.onTaskStateChanged(taskId, newState);
+ onTaskExecutionOnHold(executorId, taskId, taskPutOnHold);
+ break;
+ case FAILED_UNRECOVERABLE:
+ throw new UnrecoverableFailureException(new Exception(new
StringBuffer().append("The job failed on Task #")
+ .append(taskId).append(" in Executor
").append(executorId).toString()));
+ case READY:
+ case EXECUTING:
+ throw new IllegalStateTransitionException(
+ new Exception("The states READY/EXECUTING cannot occur at this
point"));
+ default:
+ throw new UnknownExecutionStateException(new Exception("This
TaskState is unknown: " + newState));
+ }
+ } else if (attemptIdx < currentTaskAttemptIndex) {
Review comment:
Could you elaborate the usage of `attemptIdx` and `currentTaskAttemptIndex`
on the `onTaskStateChanged` function comment?
Here, we only change `Task` state when `attemptIdx ==
currentTaskAttemptIndex` and if `attemptIdx < currentTaskAttemptIndex`, it
means that this Task state change message is 'late arriving'. This is a little
bit hard to grasp as the relationship of those two variables aren't explained
explicitly.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services