Repository: reef
Updated Branches:
  refs/heads/master 55cde7d62 -> 7adaa97d3


[REEF-1687] Throw exception if task dispose throws exception after Call()

Currently in TaskRuntime, when task Dispose in Close method throws exception, 
we throw exception.
But when task Dispose in main flow (after Call method) throws exception, we log 
it but don't re-throw.
This behavior is inconsistent. It also results in random failures of 
TestFailMapperTasksOnDispose,
because the test depends on which code path is quicker and where the task 
Dispose is called.

When task Dispose throws exception, that means some resources are not released 
properly.
For fault tolerance work, even if the task returns from the Call(), either 
having really completed
the task or having been closed by driver, if we want to reuse the evaluator to 
resubmit a task on it,
we can't afford that. So the correct way is to throw exception to make 
evaluator fail,
same as what we do in Close method.

This change:
 * updates TaskRuntime to throw exception in both code paths.
 * updates TestFailMapperTasksOnDispose to account for the time-sensitive 
nature of the test

JIRA:
  [REEF-1687](https://issues.apache.org/jira/browse/REEF-1687)

Pull request:
  This closes #1203


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/7adaa97d
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/7adaa97d
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/7adaa97d

Branch: refs/heads/master
Commit: 7adaa97d3ddf4e192ae42fea88f6911a7f747343
Parents: 55cde7d
Author: Julia Wang <[email protected]>
Authored: Tue Dec 13 12:05:02 2016 -0800
Committer: Mariia Mykhailova <[email protected]>
Committed: Tue Dec 13 14:39:09 2016 -0800

----------------------------------------------------------------------
 .../Runtime/Evaluator/Task/TaskRuntime.cs       | 13 ++++----
 .../IMRU/TestFailMapperTasksOnDispose.cs        | 31 ++++++++++++++++----
 2 files changed, 31 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/7adaa97d/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
index 1d83516..39c51f6 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
@@ -136,8 +136,9 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
                     }
                     catch (Exception e)
                     {
-                        Utilities.Diagnostics.Exceptions.Caught(
-                            e, Level.Error, "Exception in disposing Task but 
ignoring as Task has already completed.", Logger);
+                        var msg = "Exception during Task Dispose in task 
Call()";
+                        Logger.Log(Level.Error, msg);
+                        throw new InvalidOperationException(msg, e);
                     }
                 }
             });
@@ -201,11 +202,9 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
                 }
                 catch (Exception e)
                 {
-                    Utilities.Diagnostics.Exceptions.CaughtAndThrow(
-                        new InvalidOperationException("Cannot dispose task 
properly", e),
-                        Level.Error,
-                        "Exception during task dispose.",
-                        Logger);
+                    var msg = "Exception during Task Dispose in task Close()";
+                    Logger.Log(Level.Error, msg);
+                    throw new InvalidOperationException(msg, e);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/reef/blob/7adaa97d/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnDispose.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnDispose.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnDispose.cs
index d027b8f..b54b9e6 100644
--- 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnDispose.cs
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnDispose.cs
@@ -59,11 +59,31 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             var failedTaskCount = GetMessageCount(lines, FailedTaskMessage);
             var jobSuccess = GetMessageCount(lines, IMRUDriver<int[], int[], 
int[], int[]>.DoneActionPrefix);
 
-            // No failed evaluators or tasks.
-            Assert.Equal(0, failedEvaluatorCount);
-            Assert.Equal(0, failedTaskCount);
-            Assert.True(numTasks >= completedTaskCount);
-            Assert.True(completedTaskCount >= 1);
+            // In this test one of evaluators fails at task dispose stage. 
Depending on the timing of the failure,
+            // if it happens after all tasks completed, the job succeeds 
immediately,
+            // but if it happens before that, this counts as failure and job 
restarts.
+            // Number of tries done can be detected as number of recoveries 
done + 1
+            var triesDone = GetMessageCount(lines, "Start recovery") + 1;
+
+            // When Task.Dispose throws exception, it will result in evaluator 
failure. 
+            // But if master task has completed earlier and this evaluator is 
closed by the driver, 
+            // this evaluator may not be able to send IFailedEvaluator event 
back to driver
+            // "WARNING: Evaluator trying to schedule a heartbeat after a 
completed heartbeat has already been scheduled or sent."
+            Assert.True(triesDone >= failedEvaluatorCount);
+
+            // All the retries can only be triggered by failed evaluator
+            // But not all failed evaluators trigger retry (if tasks completed 
before failure, the job will succeed)
+            Assert.True(failedEvaluatorCount >= triesDone - 1);
+
+            // Scenario1: Driver receives FailedEvaluator caused by dispose of 
a completed task after all the tasks have been competed. 
+            //            FailedEvaluator event will be ignored.
+            // Scenario2: Driver receives FailedEvaluator caused by dispose of 
a completed task before all the tasks have been competed.
+            //            Driver will send close event to the rest of the 
running tasks and enter shutdown then recovery. 
+            //            During this process, some tasks can still complete 
and some may fail due to communication error
+            //            As evaluator failure happens in finally block, 
therefore either ICompletedTask or IFailedTask event should be sent before it.
+            //            Considering once maser is done, rest of the contexts 
will be disposed, we have 
+            //            completedTask# + FailedTask# <= numTasks
+            Assert.True(triesDone * numTasks >= completedTaskCount + 
failedTaskCount);
 
             // eventually job succeeds
             Assert.Equal(1, jobSuccess);
@@ -79,7 +99,6 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
 
             return TangFactory.GetTang().NewConfigurationBuilder(c)
                 
.BindSetEntry<PipelinedBroadcastAndReduceWithFaultTolerant.TaskIdsToFail, 
string>(GenericType<PipelinedBroadcastAndReduceWithFaultTolerant.TaskIdsToFail>.Class,
 "IMRUMap-RandomInputPartition-2-")
-                
.BindSetEntry<PipelinedBroadcastAndReduceWithFaultTolerant.TaskIdsToFail, 
string>(GenericType<PipelinedBroadcastAndReduceWithFaultTolerant.TaskIdsToFail>.Class,
 "IMRUMap-RandomInputPartition-3-")
                 
.BindIntNamedParam<PipelinedBroadcastAndReduceWithFaultTolerant.FailureType>(PipelinedBroadcastAndReduceWithFaultTolerant.FailureType.TaskFailureDuringTaskDispose.ToString())
                 .BindNamedParameter(typeof(MaxRetryNumberInRecovery), 
NumberOfRetry.ToString())
                 
.BindNamedParameter(typeof(PipelinedBroadcastAndReduceWithFaultTolerant.TotalNumberOfForcedFailures),
 NumberOfRetry.ToString())

Reply via email to