Repository: reef Updated Branches: refs/heads/master 865cdb516 -> 326eae21a
[REEF-1321] Adding TaskManager for IMRU fault tolerant This change: * adds TaskManager class * adds test cases for it JIRA: [REEF-1321](https://issues.apache.org/jira/browse/REEF-1321) Pull request: This closes #1002 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/326eae21 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/326eae21 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/326eae21 Branch: refs/heads/master Commit: 326eae21a217073cdd6eedba6086f1e7ed93659d Parents: 865cdb5 Author: Julia Wang <[email protected]> Authored: Thu May 12 14:32:40 2016 -0700 Committer: Mariia Mykhailova <[email protected]> Committed: Wed May 18 14:50:07 2016 -0700 ---------------------------------------------------------------------- .../Org.Apache.REEF.IMRU.Tests.csproj | 1 + .../TestTaskManager.cs | 679 +++++++++++++++++++ .../OnREEF/Driver/TaskInfo.cs | 58 ++ .../OnREEF/Driver/TaskManager.cs | 466 +++++++++++++ .../Org.Apache.REEF.IMRU.csproj | 2 + 5 files changed, 1206 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/326eae21/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj b/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj index 415a6db..43e8dfe 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj @@ -54,6 +54,7 @@ under the License. <Compile Include="TestActiveContextManager.cs" /> <Compile Include="TestEvaluatorManager.cs" /> <Compile Include="TestSystemStates.cs" /> + <Compile Include="TestTaskManager.cs" /> <Compile Include="TestTaskStates.cs" /> </ItemGroup> <ItemGroup> http://git-wip-us.apache.org/repos/asf/reef/blob/326eae21/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs new file mode 100644 index 0000000..319f541 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs @@ -0,0 +1,679 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on anAssert.Equal +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using NSubstitute; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.IMRU.OnREEF.Driver; +using Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Utilities; +using Xunit; + +namespace Org.Apache.REEF.IMRU.Tests +{ + /// <summary> + /// Test cases for TaskManager + /// </summary> + public sealed class TestTaskManager + { + private const string MapperTaskIdPrefix = "MapperTaskIdPrefix"; + private const string MasterTaskId = "MasterTaskId"; + private const string EvaluatorIdPrefix = "EvaluatorId"; + private const string ContextIdPrefix = "ContextId"; + private const int TotalNumberOfTasks = 3; + + /// <summary> + /// Tests valid Add task cases + /// </summary> + [Fact] + public void TestValidAddAndReset() + { + var taskManager = TaskManagerWithTasksAdded(); + Assert.True(taskManager.AreAllTasksInState(TaskState.TaskNew)); + Assert.Equal(TotalNumberOfTasks, taskManager.NumberOfTasks); + taskManager.Reset(); + Assert.Equal(0, taskManager.NumberOfTasks); + Assert.Equal(0, taskManager.NumberOfAppErrors()); + } + + /// <summary> + /// Tests SubmitTasks after adding all the tasks to the TaskManager + /// </summary> + [Fact] + public void TestSubmitTasks() + { + var taskManager = TaskManagerWithTasksSubmitted(); + Assert.True(taskManager.AreAllTasksInState(TaskState.TaskSubmitted)); + } + + /// <summary> + /// Tests SubmitTask with a missing mapper task + /// </summary> + [Fact] + public void TestMissingMapperTasksSubmit() + { + var taskManager = CreateTaskManager(); + taskManager.AddTask(MasterTaskId, MockConfig(), CreateMockActiveContext(0)); + taskManager.AddTask(MapperTaskIdPrefix + 1, MockConfig(), CreateMockActiveContext(1)); + + Action submit = () => taskManager.SubmitTasks(); + Assert.Throws<IMRUSystemException>(submit); + } + + /// <summary> + /// Tests SubmitTask with missing master task + /// </summary> + [Fact] + public void TestMissingMasterTaskSubmit() + { + var taskManager = CreateTaskManager(); + taskManager.AddTask(MapperTaskIdPrefix + 1, MockConfig(), CreateMockActiveContext(1)); + taskManager.AddTask(MapperTaskIdPrefix + 2, MockConfig(), CreateMockActiveContext(2)); + + Action submit = () => taskManager.SubmitTasks(); + Assert.Throws<IMRUSystemException>(submit); + } + + /// <summary> + /// Tests adding all mapper tasks without master task + /// </summary> + [Fact] + public void NoMasterTask() + { + var taskManager = CreateTaskManager(); + taskManager.AddTask(MapperTaskIdPrefix + 1, MockConfig(), CreateMockActiveContext(1)); + taskManager.AddTask(MapperTaskIdPrefix + 2, MockConfig(), CreateMockActiveContext(2)); + Action add = () => taskManager.AddTask(MapperTaskIdPrefix + 3, MockConfig(), CreateMockActiveContext(3)); + Assert.Throws<IMRUSystemException>(add); + } + + /// <summary> + /// Tests adding more than expected tasks + /// </summary> + [Fact] + public void ExceededTotalNumber() + { + var taskManager = TaskManagerWithTasksAdded(); + Action add = () => taskManager.AddTask(MapperTaskIdPrefix + 4, MockConfig(), CreateMockActiveContext(4)); + Assert.Throws<IMRUSystemException>(add); + } + + /// <summary> + /// Tests adding a task with duplicated task id and duplicated master id + /// </summary> + [Fact] + public void DuplicatedTaskIdInAdd() + { + var taskManager = CreateTaskManager(); + taskManager.AddTask(MasterTaskId, MockConfig(), CreateMockActiveContext(0)); + taskManager.AddTask(MapperTaskIdPrefix + 1, MockConfig(), CreateMockActiveContext(1)); + Action add = () => taskManager.AddTask(MapperTaskIdPrefix + 1, MockConfig(), CreateMockActiveContext(1)); + Assert.Throws<IMRUSystemException>(add); + add = () => taskManager.AddTask(MasterTaskId, MockConfig(), CreateMockActiveContext(1)); + Assert.Throws<IMRUSystemException>(add); + } + + /// <summary> + /// Tests invalid arguments when adding tasks + /// </summary> + [Fact] + public void NullArguments() + { + var taskManager = CreateTaskManager(); + taskManager.AddTask(MasterTaskId, MockConfig(), CreateMockActiveContext(0)); + + Action add = () => taskManager.AddTask(MapperTaskIdPrefix + 1, null, CreateMockActiveContext(1)); + Assert.Throws<IMRUSystemException>(add); + + add = () => taskManager.AddTask(MapperTaskIdPrefix + 2, MockConfig(), null); + Assert.Throws<IMRUSystemException>(add); + } + + /// <summary> + /// Tests passing invalid arguments in creating TaskManager + /// </summary> + [Fact] + public void InvalidArgumentsInCreatingTaskManger() + { + Action taskManager = () => CreateTaskManager(0, MasterTaskId); + Assert.Throws<IMRUSystemException>(taskManager); + + taskManager = () => CreateTaskManager(1, null); + Assert.Throws<IMRUSystemException>(taskManager); + } + + /// <summary> + /// Tests whether all tasks rightly reach Running and Completed states + /// </summary> + [Fact] + public void TestCompletingTasks() + { + var taskManager = TaskManagerWithTasksRunning(); + Assert.True(taskManager.AreAllTasksInState(TaskState.TaskRunning)); + + taskManager.RecordCompletedTask(CreateMockCompletedTask(MapperTaskIdPrefix + 1)); + taskManager.RecordCompletedTask(CreateMockCompletedTask(MapperTaskIdPrefix + 2)); + taskManager.RecordCompletedTask(CreateMockCompletedTask(MasterTaskId)); + Assert.True(taskManager.AreAllTasksInState(TaskState.TaskCompleted)); + } + + /// <summary> + /// Tests closing running tasks + /// </summary> + [Fact] + public void TestClosingRunningTasks() + { + var taskManager = TaskManagerWithTasksSubmitted(); + + taskManager.RecordRunningTask(CreateMockRunningTask(MasterTaskId)); + taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1)); + + var runningTask2 = CreateMockRunningTask(MapperTaskIdPrefix + 2); + taskManager.RecordRunningTaskDuringSystemFailure(runningTask2, TaskManager.CloseTaskByDriver); + + taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver); + Assert.True(taskManager.AreAllTasksInState(TaskState.TaskWaitingForClose)); + } + + /// <summary> + /// Tests record failed tasks after all the tasks are running + /// </summary> + [Fact] + public void TestFailedRunningTasks() + { + var taskManager = TaskManagerWithTasksRunning(); + + taskManager.RecordFailedTaskDuringRunningOrSubmissionState(CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskAppError)); + taskManager.RecordFailedTaskDuringRunningOrSubmissionState(CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskGroupCommunicationError)); + taskManager.RecordFailedTaskDuringRunningOrSubmissionState(CreateMockFailedTask(MasterTaskId, TaskManager.TaskSystemError)); + Assert.True(taskManager.AllInFinalState()); + } + + /// <summary> + /// Tests number of application errors + /// </summary> + [Fact] + public void TestAppError() + { + var taskManager = TaskManagerWithTasksSubmitted(); + + taskManager.RecordFailedTaskDuringRunningOrSubmissionState(CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskAppError)); + taskManager.RecordFailedTaskDuringRunningOrSubmissionState(CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskSystemError)); + Assert.Equal(1, taskManager.NumberOfAppErrors()); + } + + /// <summary> + /// Tests failed tasks in various event sequences + /// </summary> + [Fact] + public void TestFailedTasks() + { + var taskManager = TaskManagerWithTasksSubmitted(); + + taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1)); + taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 2)); + + // This task failed by evaluator then failed by itself + var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError); + taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluator("eId1", failedTask1)); + Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 1)); + + // no state change should happen in this case + taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask1); + Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 1)); + + // This task failed by itself first, then failed by Evaluator failure + var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskGroupCommunicationError); + taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask2); + Assert.Equal(TaskState.TaskFailedByGroupCommunication, taskManager.GetTaskState(MapperTaskIdPrefix + 2)); + taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluator("eId2", failedTask2)); + Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 2)); + + // close the running task during shutting down + var masterRuningTask = CreateMockRunningTask(MasterTaskId); + taskManager.RecordRunningTaskDuringSystemFailure(masterRuningTask, TaskManager.CloseTaskByDriver); + taskManager.RecordFailedTaskDuringSystemShuttingDownState(CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver)); + Assert.Equal(TaskState.TaskClosedByDriver, taskManager.GetTaskState(MasterTaskId)); + + Assert.True(taskManager.AllInFinalState()); + } + + /// <summary> + /// Tests after all the tasks are running, a task fails first, then close all running tasks + /// </summary> + [Fact] + public void TestFailedTasksAfterAllTasksAreRunnigScenario() + { + var taskManager = TaskManagerWithTasksRunning(); + + // A task fail first + var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError); + taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask1); + Assert.Equal(TaskState.TaskFailedBySystemError, taskManager.GetTaskState(MapperTaskIdPrefix + 1)); + + // system is in shutting down, close all other tasks + taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver); + + // task 2 is killed by driver + var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskKilledByDriver); + taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask2); + + // master task is killed by driver + var masterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver); + taskManager.RecordFailedTaskDuringSystemShuttingDownState(masterTask); + + Assert.True(taskManager.AllInFinalState()); + } + + /// <summary> + /// Tests after all the tasks are running, an evaluator fails first, then a task fails with communication error + /// </summary> + [Fact] + public void TestFailedEvaluatorThenFailedTaskAfterTasksAreRunningScenario() + { + var taskManager = TaskManagerWithTasksRunning(); + + // Evaluator error + var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError); + taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluator("eId1", failedTask1)); + Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 1)); + + // system is in shutting down, close all other tasks + taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver); + Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MapperTaskIdPrefix + 2)); + Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MasterTaskId)); + + // Another task may get failed by communication during the shutting down + var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskGroupCommunicationError); + taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask2); + Assert.Equal(TaskState.TaskClosedByDriver, taskManager.GetTaskState(MapperTaskIdPrefix + 2)); + + // The task that receives the close from driver now send failed event back to driver + var masterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver); + taskManager.RecordFailedTaskDuringSystemShuttingDownState(masterTask); + Assert.Equal(TaskState.TaskClosedByDriver, taskManager.GetTaskState(MasterTaskId)); + + Assert.True(taskManager.AllInFinalState()); + } + + /// <summary> + /// Tests after all the tasks are running, a task fails first, then an evaluator fails + /// </summary> + [Fact] + public void TestFailedTasksThenFailedEvaluatorAfterAllTasksAreRunningScenario() + { + var taskManager = TaskManagerWithTasksRunning(); + + // A task fails first + var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError); + taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask1); + Assert.Equal(TaskState.TaskFailedBySystemError, taskManager.GetTaskState(MapperTaskIdPrefix + 1)); + + // system is in shutting down, close all other tasks + taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver); + + // An Evaluator fails during shut down, as the task is already in waiting for close state, its state will be changed to TaskClosedByDriver + var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskSystemError); + taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluator("eId2", failedTask2)); + Assert.Equal(TaskState.TaskClosedByDriver, taskManager.GetTaskState(MapperTaskIdPrefix + 2)); + + // master task gets communication error before it receives close event, as the task is already in waiting for close state, its state will be changed to TaskClosedByDriver + var masterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskGroupCommunicationError); + taskManager.RecordFailedTaskDuringSystemShuttingDownState(masterTask); + Assert.Equal(TaskState.TaskClosedByDriver, taskManager.GetTaskState(MasterTaskId)); + + Assert.True(taskManager.AllInFinalState()); + } + + /// <summary> + /// Test evaluator fails before any task is running after all the tasks are submitted + /// </summary> + [Fact] + public void TestFailedEvaluatorBeforeAnyTaskIsRunningScenario() + { + var taskManager = TaskManagerWithTasksSubmitted(); + + // Evaluator error + var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError); + taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluator("eId1", failedTask1)); + Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 1)); + + // there is no any running task yet + taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver); + + // task2 is running , close it + var runingTask2 = CreateMockRunningTask(MapperTaskIdPrefix + 2); + taskManager.RecordRunningTaskDuringSystemFailure(runingTask2, TaskManager.CloseTaskByDriver); + Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MapperTaskIdPrefix + 2)); + + // master task is running, close it + var masterTask = CreateMockRunningTask(MasterTaskId); + taskManager.RecordRunningTaskDuringSystemFailure(masterTask, TaskManager.CloseTaskByDriver); + Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MasterTaskId)); + + // received task failure because of the closing + var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskKilledByDriver); + taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask2); + + // received task failure because of the closing + var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver); + taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask); + + Assert.True(taskManager.AllInFinalState()); + } + + /// <summary> + /// Tests an evaluator fails for a running task before all the tasks are running + /// </summary> + [Fact] + public void TestFailedEvaluatorOnRunningTaskBeforeAllTasksAreRunningScenario() + { + var taskManager = TaskManagerWithTasksSubmitted(); + + taskManager.RecordRunningTask(CreateMockRunningTask(MasterTaskId)); + taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1)); + + // Evaluator error + var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError); + taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluator("eId1", failedTask1)); + Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 1)); + + // the master task should be closed + taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver); + + // task 2 is now running, close it + var runingTask2 = CreateMockRunningTask(MapperTaskIdPrefix + 2); + taskManager.RecordRunningTaskDuringSystemFailure(runingTask2, TaskManager.CloseTaskByDriver); + Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MapperTaskIdPrefix + 2)); + + var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskKilledByDriver); + taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask2); + + var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver); + taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask); + + Assert.True(taskManager.AllInFinalState()); + } + + /// <summary> + /// Tests an evaluator fails for a non running task before all the tasks are running + /// </summary> + [Fact] + public void TestFailedEvaluatorOnNoRunningTaskBeforeAllTasksAreRunningScenario() + { + var taskManager = TaskManagerWithTasksSubmitted(); + + taskManager.RecordRunningTask(CreateMockRunningTask(MasterTaskId)); + taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1)); + + // Evaluator error + var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskSystemError); + taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluator("eId2", failedTask2)); + Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 2)); + + // Send event to close master task and task1 + taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver); + + var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskKilledByDriver); + taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask1); + + var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver); + taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask); + + Assert.True(taskManager.AllInFinalState()); + } + + /// <summary> + /// Tests a task fails before any task is running after all the tasks are submitted. + /// </summary> + [Fact] + public void TestFailedTaskBeforeAnyTaskIsRunningScenario() + { + var taskManager = TaskManagerWithTasksSubmitted(); + + // Evaluator error + var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError); + taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask1); + Assert.Equal(TaskState.TaskFailedBySystemError, taskManager.GetTaskState(MapperTaskIdPrefix + 1)); + + // there is no any running task yet + taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver); + + // task 2 is running, now close it + var runingTask2 = CreateMockRunningTask(MapperTaskIdPrefix + 2); + taskManager.RecordRunningTaskDuringSystemFailure(runingTask2, TaskManager.CloseTaskByDriver); + Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MapperTaskIdPrefix + 2)); + + // master task is running, close it + var masterTask = CreateMockRunningTask(MasterTaskId); + taskManager.RecordRunningTaskDuringSystemFailure(masterTask, TaskManager.CloseTaskByDriver); + Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MasterTaskId)); + + // The task 2 could be failed by communication before receiving close event + var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskGroupCommunicationError); + taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask2); + + // master task failed because receiving close event + var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver); + taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask); + + Assert.True(taskManager.AllInFinalState()); + } + + /// <summary> + /// Tests a running task fails before all the tasks are running + /// </summary> + [Fact] + public void TestFailedRunningTaskBeforeAllTasksAreRunningScenario() + { + var taskManager = TaskManagerWithTasksSubmitted(); + + taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1)); + + // Evaluator error + var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskSystemError); + taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask1); + Assert.Equal(TaskState.TaskFailedBySystemError, taskManager.GetTaskState(MapperTaskIdPrefix + 1)); + + // there is no any running task yet + taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver); + + // task 2 is running, now close it + var runingTask2 = CreateMockRunningTask(MapperTaskIdPrefix + 2); + taskManager.RecordRunningTaskDuringSystemFailure(runingTask2, TaskManager.CloseTaskByDriver); + Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MapperTaskIdPrefix + 2)); + + // master task is running, close it + var masterTask = CreateMockRunningTask(MasterTaskId); + taskManager.RecordRunningTaskDuringSystemFailure(masterTask, TaskManager.CloseTaskByDriver); + Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MasterTaskId)); + + // The task 2 could be failed by communication before receiving close event + var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskGroupCommunicationError); + taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask2); + + // master task failed because receiving close event + var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver); + taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask); + + Assert.True(taskManager.AllInFinalState()); + } + + /// <summary> + /// Tests a non running task fails before all the tasks are running + /// </summary> + [Fact] + public void TestFailedNoRunningTaskBeforeAllTasksAreRunningScenario() + { + var taskManager = TaskManagerWithTasksSubmitted(); + taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1)); + + // Evaluator error + var failedTask2 = CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskSystemError); + taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask2); + Assert.Equal(TaskState.TaskFailedBySystemError, taskManager.GetTaskState(MapperTaskIdPrefix + 2)); + + // there is no any running task yet + taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver); + + // master task is running, close it + var masterTask = CreateMockRunningTask(MasterTaskId); + taskManager.RecordRunningTaskDuringSystemFailure(masterTask, TaskManager.CloseTaskByDriver); + Assert.Equal(TaskState.TaskWaitingForClose, taskManager.GetTaskState(MasterTaskId)); + + // The task 1 could be failed by communication before receiving close event + var failedTask1 = CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskGroupCommunicationError); + taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask1); + + // master task failed could be failed by communication error as well + var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskGroupCommunicationError); + taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask); + + Assert.True(taskManager.AllInFinalState()); + } + + /// <summary> + /// Creates a TaskManager with specified numTasks, masterTaskId and IGroupCommDriver + /// </summary> + /// <param name="numTasks"></param> + /// <param name="masterTaskId"></param> + /// <returns></returns> + private static TaskManager CreateTaskManager(int numTasks = TotalNumberOfTasks, string masterTaskId = MasterTaskId) + { + var taskManager = new TaskManager(numTasks, masterTaskId); + return taskManager; + } + + /// <summary> + /// Creates a TaskManager and add one master task and two mapping tasks + /// </summary> + /// <returns></returns> + private static TaskManager TaskManagerWithTasksAdded() + { + var taskManager = CreateTaskManager(); + taskManager.AddTask(MasterTaskId, MockConfig(), CreateMockActiveContext(0)); + taskManager.AddTask(MapperTaskIdPrefix + 1, MockConfig(), CreateMockActiveContext(1)); + taskManager.AddTask(MapperTaskIdPrefix + 2, MockConfig(), CreateMockActiveContext(2)); + + return taskManager; + } + + /// <summary> + /// Create a TaskManager with all the tasks submitted + /// </summary> + /// <returns></returns> + private static TaskManager TaskManagerWithTasksSubmitted() + { + var taskManager = TaskManagerWithTasksAdded(); + taskManager.SubmitTasks(); + + return taskManager; + } + + /// <summary> + /// Create a TaskManager with all the tasks running + /// </summary> + /// <returns></returns> + private static TaskManager TaskManagerWithTasksRunning() + { + var taskManager = TaskManagerWithTasksSubmitted(); + taskManager.RecordRunningTask(CreateMockRunningTask(MasterTaskId)); + taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1)); + taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 2)); + + return taskManager; + } + + /// <summary> + /// Creates a mock IActiveContext + /// </summary> + /// <param name="id"></param> + /// <returns></returns> + private static IActiveContext CreateMockActiveContext(int id) + { + var mockActiveContext = Substitute.For<IActiveContext>(); + mockActiveContext.Id.Returns(ContextIdPrefix + id); + mockActiveContext.EvaluatorId.Returns(EvaluatorIdPrefix + ContextIdPrefix + id); + return mockActiveContext; + } + + /// <summary> + /// Creates a mock FailedTask with specified taskId and error message + /// </summary> + /// <param name="taskId"></param> + /// <param name="errorMsg"></param> + /// <returns></returns> + private static IFailedTask CreateMockFailedTask(string taskId, string errorMsg) + { + IFailedTask failedtask = Substitute.For<IFailedTask>(); + failedtask.Id.Returns(taskId); + failedtask.Message.Returns(errorMsg); + return failedtask; + } + + /// <summary> + /// Creates a mock running task with the taskId specified + /// </summary> + /// <param name="taskId"></param> + /// <returns></returns> + private static IRunningTask CreateMockRunningTask(string taskId) + { + var runningTask = Substitute.For<IRunningTask>(); + runningTask.Id.Returns(taskId); + return runningTask; + } + + /// <summary> + /// Creates a mock running task with the taskId specified + /// </summary> + /// <param name="taskId"></param> + /// <returns></returns> + private static ICompletedTask CreateMockCompletedTask(string taskId) + { + var completedTask = Substitute.For<ICompletedTask>(); + completedTask.Id.Returns(taskId); + return completedTask; + } + + /// <summary> + /// Creates a mock IFailedEvaluator with the specified IFailedTask associated + /// </summary> + /// <param name="evaluatorId"></param> + /// <param name="failedTask"></param> + /// <returns></returns> + private static IFailedEvaluator CreateMockFailedEvaluator(string evaluatorId, IFailedTask failedTask) + { + var failedEvalutor = Substitute.For<IFailedEvaluator>(); + failedEvalutor.Id.Returns(evaluatorId); + failedEvalutor.FailedTask.Returns(Optional<IFailedTask>.Of(failedTask)); + return failedEvalutor; + } + + /// <summary> + /// Creates a mock IConfiguration + /// </summary> + /// <returns></returns> + private static IConfiguration MockConfig() + { + return TangFactory.GetTang().NewConfigurationBuilder().Build(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/326eae21/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskInfo.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskInfo.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskInfo.cs new file mode 100644 index 0000000..6ae992d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskInfo.cs @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine; +using Org.Apache.REEF.Tang.Interface; + +namespace Org.Apache.REEF.IMRU.OnREEF.Driver +{ + internal sealed class TaskInfo + { + private readonly TaskStateMachine _taskState; + private readonly IConfiguration _taskConfiguration; + private readonly IActiveContext _activeContext; + + /// <summary> + /// Construct a TaskInfo that wraps task state, task configuration, and active context for submitting the task + /// </summary> + /// <param name="taskState"></param> + /// <param name="config"></param> + /// <param name="context"></param> + internal TaskInfo(TaskStateMachine taskState, IConfiguration config, IActiveContext context) + { + _taskState = taskState; + _taskConfiguration = config; + _activeContext = context; + } + + internal TaskStateMachine TaskState + { + get { return _taskState; } + } + + internal IConfiguration TaskConfiguration + { + get { return _taskConfiguration; } + } + + internal IActiveContext ActiveContext + { + get { return _activeContext; } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/326eae21/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs new file mode 100644 index 0000000..78af207 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs @@ -0,0 +1,466 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Collections.Generic; +using System.Globalization; +using System.Linq; +using System.Text; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Utilities.Attributes; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.IMRU.OnREEF.Driver +{ + /// <summary> + /// Manages Tasks, maintains task states and responsible for task submission + /// </summary> + [NotThreadSafe] + internal sealed class TaskManager + { + private static readonly Logger Logger = Logger.GetLogger(typeof(TaskManager)); + + /// <summary> + /// Error messages thrown in IMRU tasks when an exception happens + /// </summary> + internal const string TaskAppError = "TaskAppError"; + internal const string TaskSystemError = "TaskSystemError"; + internal const string TaskGroupCommunicationError = "TaskGroupCommunicationError"; + internal const string TaskEvaluatorError = "TaskEvaluatorError"; + + /// <summary> + /// Message sending from driver to evaluator to close a running task + /// </summary> + internal const string CloseTaskByDriver = "CloseTaskByDriver"; + + /// <summary> + /// Error message in Task exception to show the task received close event + /// </summary> + internal const string TaskKilledByDriver = "TaskKilledByDriver"; + + /// <summary> + /// This Dictionary contains task information. The key is the Id of the Task, the value is TaskInfo which contains + /// task state, task configuration, and active context that the task is running on. + /// </summary> + private readonly IDictionary<string, TaskInfo> _tasks = new Dictionary<string, TaskInfo>(); + + /// <summary> + /// This Dictionary keeps all the running tasks. The key is the Task Id and the value is IRunningTask. + /// After a task is running, it will be added to this collection. After the task is requested to close, + /// or fails, completed, it will be removed from this collection. + /// </summary> + private readonly IDictionary<string, IRunningTask> _runningTasks = new Dictionary<string, IRunningTask>(); + + /// <summary> + /// Total expected tasks + /// </summary> + private readonly int _totalExpectedTasks; + + /// <summary> + /// Master tasks Id is set in the IGroupCommDriver. It must be the same Id used in the TaskManager. + /// </summary> + private readonly string _masterTaskId; + + /// <summary> + /// Total number of Application error received from tasks + /// </summary> + private int _numberOfAppErrors = 0; + + /// <summary> + /// Creates a TaskManager with specified total number of tasks and master task id. + /// Throws IMRUSystemException if numTasks is smaller than or equals to 0 or masterTaskId is null. + /// </summary> + /// <param name="numTasks"></param> + /// <param name="masterTaskId"></param> + internal TaskManager(int numTasks, string masterTaskId) + { + if (numTasks <= 0) + { + Exceptions.Throw(new IMRUSystemException("Number of expected tasks must be positive"), Logger); + } + + if (string.IsNullOrWhiteSpace(masterTaskId)) + { + Exceptions.Throw(new IMRUSystemException("masterTaskId cannot be null"), Logger); + } + + _totalExpectedTasks = numTasks; + _masterTaskId = masterTaskId; + } + + /// <summary> + /// Adds a Task to the task collection + /// Throws IMRUSystemException in the following cases: + /// taskId is already added + /// taskConfiguration is null + /// activeContext is null + /// trying to add extra tasks + /// No Master Task is added in the collection + /// </summary> + /// <param name="taskId"></param> + /// <param name="taskConfiguration"></param> + /// <param name="activeContext"></param> + internal void AddTask(string taskId, IConfiguration taskConfiguration, IActiveContext activeContext) + { + if (taskId == null) + { + Exceptions.Throw(new IMRUSystemException("The taskId is null."), Logger); + } + + if (_tasks.ContainsKey(taskId)) + { + var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] already exists.", taskId); + Exceptions.Throw(new IMRUSystemException(msg), Logger); + } + + if (taskConfiguration == null) + { + Exceptions.Throw(new IMRUSystemException("The task configuration is null."), Logger); + } + + if (activeContext == null) + { + Exceptions.Throw(new IMRUSystemException("The context is null."), Logger); + } + + if (NumberOfTasks >= _totalExpectedTasks) + { + string msg = string.Format("Trying to add an additional Task {0}, but the total expected Task number {1} has been reached.", taskId, _totalExpectedTasks); + Exceptions.Throw(new IMRUSystemException(msg), Logger); + } + + _tasks.Add(taskId, new TaskInfo(new TaskStateMachine(), taskConfiguration, activeContext)); + + if (NumberOfTasks == _totalExpectedTasks && !MasterTaskExists()) + { + Exceptions.Throw(new IMRUSystemException("There is no master task added."), Logger); + } + } + + /// <summary> + /// Returns the number of tasks in the task collection + /// </summary> + internal int NumberOfTasks + { + get { return _tasks.Count; } + } + + /// <summary> + /// This method is called when receiving IRunningTask event during task submitting. + /// Adds the IRunningTask to the running tasks collection and update the task state to TaskRunning. + /// Throws IMRUSystemException if running tasks already contains this task or tasks collection doesn't contain this task. + /// </summary> + /// <param name="runningTask"></param> + internal void RecordRunningTask(IRunningTask runningTask) + { + if (_runningTasks.ContainsKey(runningTask.Id)) + { + var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] already running.", runningTask.Id); + Exceptions.Throw(new IMRUSystemException(msg), Logger); + } + + if (!_tasks.ContainsKey(runningTask.Id)) + { + var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] doesn't exist.", runningTask.Id); + Exceptions.Throw(new IMRUSystemException(msg), Logger); + } + + _runningTasks.Add(runningTask.Id, runningTask); + UpdateState(runningTask.Id, TaskStateEvent.RunningTask); + } + + /// <summary> + /// This method is called at the beginning of the recovery. + /// Clears the task collection, running task collection and resets the number of application error. + /// </summary> + internal void Reset() + { + _tasks.Clear(); + _runningTasks.Clear(); + _numberOfAppErrors = 0; + } + + /// <summary> + /// This method is called when receiving ICompletedTask event during task running or system shutting down. + /// Removes the task from running tasks + /// Changes the task state from RunningTask to CompletedTask + /// </summary> + /// <param name="completedTask"></param> + internal void RecordCompletedTask(ICompletedTask completedTask) + { + RemoveRunningTask(completedTask.Id); + UpdateState(completedTask.Id, TaskStateEvent.CompletedTask); + } + + /// <summary> + /// This method is called when receiving IFailedTask event during task submitting or running + /// Removes the task from running tasks if the task was running + /// Updates the task state to fail based on the error message in the failed task + /// </summary> + /// <param name="failedTask"></param> + internal void RecordFailedTaskDuringRunningOrSubmissionState(IFailedTask failedTask) + { + //// Remove the task from running tasks if it exists there + _runningTasks.Remove(failedTask.Id); + UpdateState(failedTask.Id, GetTaskErrorEvent(failedTask)); + } + + /// <summary> + /// This method is called when receiving IFailedTask event during system shutting down. + /// If the task failed because it receives the close command from driver, update the task state to TaskClosedByDriver. + /// Task could fail by communication error or any other application or system error during this time, as long as it is not + /// TaskFailedByEvaluatorFailure, update the task state based on the error received. + /// </summary> + /// <param name="failedTask"></param> + internal void RecordFailedTaskDuringSystemShuttingDownState(IFailedTask failedTask) + { + var taskState = GetTaskState(failedTask.Id); + if (taskState == StateMachine.TaskState.TaskWaitingForClose) + { + UpdateState(failedTask.Id, TaskStateEvent.ClosedTask); + } + else if (taskState != StateMachine.TaskState.TaskFailedByEvaluatorFailure) + { + UpdateState(failedTask.Id, GetTaskErrorEvent(failedTask)); + } + } + + /// <summary> + /// This method is called when receiving an IFailedEvaluator event during TaskSubmitted, TaskRunning or system shutting down. + /// Removes the task from RunningTasks if the task associated with the FailedEvaluator is present and running. + /// Sets the task state to TaskFailedByEvaluatorFailure + /// </summary> + /// <param name="failedEvaluator"></param> + internal void RecordTaskFailWhenReceivingFailedEvaluator(IFailedEvaluator failedEvaluator) + { + if (failedEvaluator.FailedTask.IsPresent()) + { + var taskId = failedEvaluator.FailedTask.Value.Id; + var taskState = GetTaskState(taskId); + if (taskState == StateMachine.TaskState.TaskRunning) + { + RemoveRunningTask(taskId); + } + + UpdateState(taskId, TaskStateEvent.FailedTaskEvaluatorError); + } + } + + /// <summary> + /// Removes a task from running tasks if it exists in the running tasks collection + /// </summary> + /// <param name="taskId"></param> + private void RemoveRunningTask(string taskId) + { + if (!_runningTasks.ContainsKey(taskId)) + { + var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] doesn't exist in Running Tasks.", taskId); + Exceptions.Throw(new IMRUSystemException(msg), Logger); + } + _runningTasks.Remove(taskId); + } + + /// <summary> + /// Updates task state for a given taskId based on the task event + /// </summary> + /// <param name="taskId"></param> + /// <param name="taskEvent"></param> + private void UpdateState(string taskId, TaskStateEvent taskEvent) + { + GetTaskInfo(taskId).TaskState.MoveNext(taskEvent); + } + + /// <summary> + /// Checks if all the tasks are running. + /// </summary> + /// <returns></returns> + internal bool AreAllTasksRunning() + { + return AreAllTasksInState(StateMachine.TaskState.TaskRunning) && + _runningTasks.Count == _totalExpectedTasks; + } + + /// <summary> + /// Checks if all the tasks are completed. + /// </summary> + /// <returns></returns> + internal bool AreAllTasksCompleted() + { + return AreAllTasksInState(StateMachine.TaskState.TaskCompleted) && _tasks.Count == _totalExpectedTasks && _runningTasks.Count == 0; + } + + /// <summary> + /// This method is called when receiving either IFailedEvaluator or IFailedTask event + /// Driver tries to close all the running tasks and clean the running task collection in the end. + /// If all the tasks are running, the total number of running tasks should be _totalExpectedTasks -1 + /// If this happens before all the tasks are running, then the total number of running tasks should smaller than _totalExpectedTasks -1 + /// If this happens when no task is running, the total number of running tasks could be 0 + /// </summary> + internal void CloseAllRunningTasks(string closeMessage) + { + Logger.Log(Level.Verbose, "Closing [{0}] running tasks.", _runningTasks.Count); + foreach (var runningTask in _runningTasks.Values) + { + runningTask.Dispose(Encoding.UTF8.GetBytes(closeMessage)); + UpdateState(runningTask.Id, TaskStateEvent.WaitingTaskToClose); + } + _runningTasks.Clear(); + } + + /// <summary> + /// This method is called when receiving an IRunningTask event but system is either in shutting down or fail. + /// In this case, the task should not be added in Running Tasks yet. + /// Change the task state to TaskRunning if it is still in TaskSubmitted state + /// Closes the IRunningTask + /// Then move the task state to WaitingTaskToClose + /// Throw IMRUSystemException if runningTask is null or the running task is already added in the running task collection + /// </summary> + /// <param name="runningTask"></param> + /// <param name="closeMessage"></param> + internal void RecordRunningTaskDuringSystemFailure(IRunningTask runningTask, string closeMessage) + { + if (runningTask == null) + { + Exceptions.Throw(new IMRUSystemException("RunningTask is null."), Logger); + } + + if (_runningTasks.ContainsKey(runningTask.Id)) + { + var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] is already in running tasks.", runningTask.Id); + Exceptions.Throw(new IMRUSystemException(msg), Logger); + } + + UpdateState(runningTask.Id, TaskStateEvent.RunningTask); + runningTask.Dispose(Encoding.UTF8.GetBytes(closeMessage)); + UpdateState(runningTask.Id, TaskStateEvent.WaitingTaskToClose); + } + + /// <summary> + /// Gets error type based on the information in IFailedTask + /// Currently we use the Message in IFailedTask to distinguish different types of errors + /// </summary> + /// <param name="failedTask"></param> + /// <returns></returns> + private TaskStateEvent GetTaskErrorEvent(IFailedTask failedTask) + { + switch (failedTask.Message) + { + case TaskAppError: + _numberOfAppErrors++; + return TaskStateEvent.FailedTaskAppError; + case TaskSystemError: + return TaskStateEvent.FailedTaskSystemError; + case TaskGroupCommunicationError: + return TaskStateEvent.FailedTaskCommunicationError; + default: + return TaskStateEvent.FailedTaskSystemError; + } + } + + /// <summary> + /// Returns the number of application error caused by FailedTask + /// </summary> + /// <returns></returns> + internal int NumberOfAppErrors() + { + return _numberOfAppErrors; + } + + /// <summary> + /// Checks if all the tasks are in final states + /// </summary> + /// <returns></returns> + internal bool AllInFinalState() + { + return _tasks.All(t => t.Value.TaskState.IsFinalState()); + } + + /// <summary> + /// Gets current state of the task + /// </summary> + /// <param name="taskId"></param> + /// <returns></returns> + internal TaskState GetTaskState(string taskId) + { + var taskInfo = GetTaskInfo(taskId); + return taskInfo.TaskState.CurrentState; + } + + /// <summary> + /// Checks if all the tasks are in the state specified. + /// For example, passing TaskState.TaskRunning to check if all the tasks are in TaskRunning state + /// </summary> + /// <param name="taskState"></param> + /// <returns></returns> + internal bool AreAllTasksInState(TaskState taskState) + { + return _tasks.All(t => t.Value.TaskState.CurrentState == taskState); + } + + /// <summary> + /// Submit all the tasks + /// Tasks will be submitted after all the tasks are added in the collection and master task exists + /// IMRUSystemException will be thrown if not all the tasks are added or if there is no master task + /// </summary> + internal void SubmitTasks() + { + if (NumberOfTasks < _totalExpectedTasks || !MasterTaskExists()) + { + string msg = string.Format("Trying to submit tasks but either master task doesn't exist or number of tasks [{0}] is smaller than expected number of tasks [{1}].", NumberOfTasks, _totalExpectedTasks); + Exceptions.Throw(new IMRUSystemException(msg), Logger); + } + + foreach (var taskId in _tasks.Keys) + { + var taskInfo = GetTaskInfo(taskId); + taskInfo.ActiveContext.SubmitTask(taskInfo.TaskConfiguration); + UpdateState(taskId, TaskStateEvent.SubmittedTask); + } + } + + /// <summary> + /// Checks if master task has been added + /// </summary> + /// <returns></returns> + private bool MasterTaskExists() + { + return _tasks.ContainsKey(_masterTaskId); + } + + /// <summary> + /// Gets task Tuple based on the given taskId. + /// Throws IMRUSystemException if the task Tuple is not in the task collection. + /// </summary> + /// <param name="taskId"></param> + /// <returns></returns> + private TaskInfo GetTaskInfo(string taskId) + { + TaskInfo taskInfo; + _tasks.TryGetValue(taskId, out taskInfo); + if (taskInfo == null) + { + var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] does not exist in the task collection.", taskId); + Exceptions.Throw(new IMRUSystemException(msg), Logger); + } + return taskInfo; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/326eae21/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj index 2f5cf03..b51638a 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj +++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj @@ -88,6 +88,8 @@ under the License. <Compile Include="OnREEF\Driver\StateMachine\TaskStateEvent.cs" /> <Compile Include="OnREEF\Driver\StateMachine\TaskState.cs" /> <Compile Include="OnREEF\Driver\StateMachine\TaskStateTransitionException.cs" /> + <Compile Include="OnREEF\Driver\TaskInfo.cs" /> + <Compile Include="OnREEF\Driver\TaskManager.cs" /> <Compile Include="OnREEF\IMRUTasks\MapTaskHost.cs" /> <Compile Include="OnREEF\IMRUTasks\UpdateTaskHost.cs" /> <Compile Include="OnREEF\MapInputWithControlMessage\MapControlMessage.cs" />
