Repository: reef Updated Branches: refs/heads/master 55cde7d62 -> 7adaa97d3
[REEF-1687] Throw exception if task dispose throws exception after Call() Currently in TaskRuntime, when task Dispose in Close method throws exception, we throw exception. But when task Dispose in main flow (after Call method) throws exception, we log it but don't re-throw. This behavior is inconsistent. It also results in random failures of TestFailMapperTasksOnDispose, because the test depends on which code path is quicker and where the task Dispose is called. When task Dispose throws exception, that means some resources are not released properly. For fault tolerance work, even if the task returns from the Call(), either having really completed the task or having been closed by driver, if we want to reuse the evaluator to resubmit a task on it, we can't afford that. So the correct way is to throw exception to make evaluator fail, same as what we do in Close method. This change: * updates TaskRuntime to throw exception in both code paths. * updates TestFailMapperTasksOnDispose to account for the time-sensitive nature of the test JIRA: [REEF-1687](https://issues.apache.org/jira/browse/REEF-1687) Pull request: This closes #1203 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/7adaa97d Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/7adaa97d Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/7adaa97d Branch: refs/heads/master Commit: 7adaa97d3ddf4e192ae42fea88f6911a7f747343 Parents: 55cde7d Author: Julia Wang <[email protected]> Authored: Tue Dec 13 12:05:02 2016 -0800 Committer: Mariia Mykhailova <[email protected]> Committed: Tue Dec 13 14:39:09 2016 -0800 ---------------------------------------------------------------------- .../Runtime/Evaluator/Task/TaskRuntime.cs | 13 ++++---- .../IMRU/TestFailMapperTasksOnDispose.cs | 31 ++++++++++++++++---- 2 files changed, 31 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/7adaa97d/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs index 1d83516..39c51f6 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs @@ -136,8 +136,9 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task } catch (Exception e) { - Utilities.Diagnostics.Exceptions.Caught( - e, Level.Error, "Exception in disposing Task but ignoring as Task has already completed.", Logger); + var msg = "Exception during Task Dispose in task Call()"; + Logger.Log(Level.Error, msg); + throw new InvalidOperationException(msg, e); } } }); @@ -201,11 +202,9 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task } catch (Exception e) { - Utilities.Diagnostics.Exceptions.CaughtAndThrow( - new InvalidOperationException("Cannot dispose task properly", e), - Level.Error, - "Exception during task dispose.", - Logger); + var msg = "Exception during Task Dispose in task Close()"; + Logger.Log(Level.Error, msg); + throw new InvalidOperationException(msg, e); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/7adaa97d/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnDispose.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnDispose.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnDispose.cs index d027b8f..b54b9e6 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnDispose.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnDispose.cs @@ -59,11 +59,31 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU var failedTaskCount = GetMessageCount(lines, FailedTaskMessage); var jobSuccess = GetMessageCount(lines, IMRUDriver<int[], int[], int[], int[]>.DoneActionPrefix); - // No failed evaluators or tasks. - Assert.Equal(0, failedEvaluatorCount); - Assert.Equal(0, failedTaskCount); - Assert.True(numTasks >= completedTaskCount); - Assert.True(completedTaskCount >= 1); + // In this test one of evaluators fails at task dispose stage. Depending on the timing of the failure, + // if it happens after all tasks completed, the job succeeds immediately, + // but if it happens before that, this counts as failure and job restarts. + // Number of tries done can be detected as number of recoveries done + 1 + var triesDone = GetMessageCount(lines, "Start recovery") + 1; + + // When Task.Dispose throws exception, it will result in evaluator failure. + // But if master task has completed earlier and this evaluator is closed by the driver, + // this evaluator may not be able to send IFailedEvaluator event back to driver + // "WARNING: Evaluator trying to schedule a heartbeat after a completed heartbeat has already been scheduled or sent." + Assert.True(triesDone >= failedEvaluatorCount); + + // All the retries can only be triggered by failed evaluator + // But not all failed evaluators trigger retry (if tasks completed before failure, the job will succeed) + Assert.True(failedEvaluatorCount >= triesDone - 1); + + // Scenario1: Driver receives FailedEvaluator caused by dispose of a completed task after all the tasks have been competed. + // FailedEvaluator event will be ignored. + // Scenario2: Driver receives FailedEvaluator caused by dispose of a completed task before all the tasks have been competed. + // Driver will send close event to the rest of the running tasks and enter shutdown then recovery. + // During this process, some tasks can still complete and some may fail due to communication error + // As evaluator failure happens in finally block, therefore either ICompletedTask or IFailedTask event should be sent before it. + // Considering once maser is done, rest of the contexts will be disposed, we have + // completedTask# + FailedTask# <= numTasks + Assert.True(triesDone * numTasks >= completedTaskCount + failedTaskCount); // eventually job succeeds Assert.Equal(1, jobSuccess); @@ -79,7 +99,6 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU return TangFactory.GetTang().NewConfigurationBuilder(c) .BindSetEntry<PipelinedBroadcastAndReduceWithFaultTolerant.TaskIdsToFail, string>(GenericType<PipelinedBroadcastAndReduceWithFaultTolerant.TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-2-") - .BindSetEntry<PipelinedBroadcastAndReduceWithFaultTolerant.TaskIdsToFail, string>(GenericType<PipelinedBroadcastAndReduceWithFaultTolerant.TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-") .BindIntNamedParam<PipelinedBroadcastAndReduceWithFaultTolerant.FailureType>(PipelinedBroadcastAndReduceWithFaultTolerant.FailureType.TaskFailureDuringTaskDispose.ToString()) .BindNamedParameter(typeof(MaxRetryNumberInRecovery), NumberOfRetry.ToString()) .BindNamedParameter(typeof(PipelinedBroadcastAndReduceWithFaultTolerant.TotalNumberOfForcedFailures), NumberOfRetry.ToString())
