jeongyooneo commented on a change in pull request #21: [NEMO-46] Make the 
operations on ExecutorRegistry atomic
URL: https://github.com/apache/incubator-nemo/pull/21#discussion_r189775205
 
 

 ##########
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
 ##########
 @@ -144,53 +140,61 @@ public void onTaskStateChanged(final String executorId, 
final String taskId,
                                  final TaskState.State newState, final int 
attemptIdx,
                                  @Nullable final String taskPutOnHold,
                                  final TaskState.RecoverableFailureCause 
failureCause) {
-    switch (newState) {
-      case COMPLETE:
-        jobStateManager.onTaskStateChanged(taskId, newState);
-        onTaskExecutionComplete(executorId, taskId);
-        break;
-      case FAILED_RECOVERABLE:
-        onTaskExecutionFailedRecoverable(executorId, taskId, attemptIdx, 
newState, failureCause);
-        break;
-      case ON_HOLD:
-        jobStateManager.onTaskStateChanged(taskId, newState);
-        onTaskExecutionOnHold(executorId, taskId, taskPutOnHold);
-        break;
-      case FAILED_UNRECOVERABLE:
-        throw new UnrecoverableFailureException(new Exception(new 
StringBuffer().append("The job failed on Task #")
-            .append(taskId).append(" in Executor 
").append(executorId).toString()));
-      case READY:
-      case EXECUTING:
-        throw new IllegalStateTransitionException(
-            new Exception("The states READY/EXECUTING cannot occur at this 
point"));
-      default:
-        throw new UnknownExecutionStateException(new Exception("This TaskState 
is unknown: " + newState));
+    final int currentTaskAttemptIndex = 
jobStateManager.getCurrentAttemptIndexForTask(taskId);
+    if (attemptIdx == currentTaskAttemptIndex) {
+      switch (newState) {
+        case COMPLETE:
+          jobStateManager.onTaskStateChanged(taskId, newState);
+          onTaskExecutionComplete(executorId, taskId);
+          break;
+        case FAILED_RECOVERABLE:
+          onTaskExecutionFailedRecoverable(executorId, taskId, newState, 
failureCause);
+          break;
+        case ON_HOLD:
+          jobStateManager.onTaskStateChanged(taskId, newState);
+          onTaskExecutionOnHold(executorId, taskId, taskPutOnHold);
+          break;
+        case FAILED_UNRECOVERABLE:
+          throw new UnrecoverableFailureException(new Exception(new 
StringBuffer().append("The job failed on Task #")
+              .append(taskId).append(" in Executor 
").append(executorId).toString()));
+        case READY:
+        case EXECUTING:
+          throw new IllegalStateTransitionException(
+              new Exception("The states READY/EXECUTING cannot occur at this 
point"));
+        default:
+          throw new UnknownExecutionStateException(new Exception("This 
TaskState is unknown: " + newState));
+      }
+    } else if (attemptIdx < currentTaskAttemptIndex) {
 
 Review comment:
   Could you elaborate the usage of `attemptIdx` and `currentTaskAttemptIndex` 
on the `onTaskStateChanged` function comment?
   
   Here, we only change `Task` state when `attemptIdx == 
currentTaskAttemptIndex` and if `attemptIdx < currentTaskAttemptIndex`, it 
means that this Task state change message is 'late arriving'. This is a little 
bit hard to grasp as the relationship of those two variables aren't explained 
explicitly.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to