[REEF-1251] IMRU Driver handlers for fault tolerant * IMRU Driver handler re-write to support fault tolerant * ServiceAndContextConfigurationProvider refactor and clean up for the updated IMRU driver * IMRUClient update to use the updated IMRUDriver * Allow client to set MaxRetryNumberInRecovery * Fixes for bugs found during testing * Add tests for evaluator and task failures at mapper and update evaluators
JIRA: [REEF-1251](https://issues.apache.org/jira/browse/REEF-1251) [REEF-1551](https://issues.apache.org/jira/browse/REEF-1551) [REEF-1552](https://issues.apache.org/jira/browse/REEF-1552) [REEF-1553](https://issues.apache.org/jira/browse/REEF-1553) This closes #1087 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/b14c8cd8 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/b14c8cd8 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/b14c8cd8 Branch: refs/heads/master Commit: b14c8cd8191b70249a939c1cca25a61e7231a9b0 Parents: d116d94 Author: Julia Wang <[email protected]> Authored: Mon Jun 6 20:42:29 2016 -0700 Committer: Julia Wang <[email protected]> Committed: Thu Sep 1 17:18:03 2016 -0700 ---------------------------------------------------------------------- .../Runtime/Evaluator/EvaluatorRuntime.cs | 11 +- .../Runtime/Evaluator/Task/TaskRuntime.cs | 3 + .../Org.Apache.REEF.Driver.csproj | 4 +- .../OnREEFIMRURunTimeConfiguration.cs | 19 +- .../Org.Apache.REEF.IMRU.Examples.csproj | 1 + .../FaultTolerantPipelinedBroadcastAndReduce.cs | 170 ++++ .../PipelinedBroadcastAndReduce.cs | 27 +- lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs | 41 +- .../TestActiveContextManager.cs | 15 +- .../TestEvaluatorManager.cs | 76 +- .../TestTaskManager.cs | 54 +- .../API/IMRUJobDefinition.cs | 12 + .../API/IMRUJobDefinitionBuilder.cs | 14 + .../OnREEF/Client/REEFIMRUClient.cs | 4 + .../OnREEF/Driver/ActiveContextManager.cs | 10 +- .../OnREEF/Driver/EvaluatorManager.cs | 20 +- .../OnREEF/Driver/IMRUDriver.cs | 821 +++++++++++++++---- .../ServiceAndContextConfigurationProvider.cs | 191 +---- .../OnREEF/Driver/TaskManager.cs | 144 +++- .../OnREEF/IMRUTasks/MapTaskHost.cs | 103 ++- .../OnREEF/IMRUTasks/TaskCloseCoordinator.cs | 4 +- .../OnREEF/IMRUTasks/UpdateTaskHost.cs | 88 +- .../AllowedFailedEvaluatorsFraction.cs | 2 +- .../Parameters/MaxRetryNumberInRecovery.cs | 29 + .../Org.Apache.REEF.IMRU.csproj | 1 + .../Group/Driver/IGroupCommDriver.cs | 2 +- .../Group/Driver/Impl/GroupCommDriver.cs | 2 +- .../Exceptions/IllegalStateException.cs | 7 + .../Exceptions/InjectionException.cs | 9 +- .../Functional/IMRU/IMRUBroadcastReduceTest.cs | 27 +- ...oadcastReduceWithFilePartitionDataSetTest.cs | 6 +- .../IMRU/IMRUBroadcastReduceWithLocalFile.cs | 1 + .../IMRU/IMRUBrodcastReduceTestBase.cs | 65 +- .../IMRUBrodcastReduceWithoutIMRUClientTest.cs | 8 +- .../Functional/IMRU/IMRUCloseTaskTest.cs | 26 +- .../Functional/IMRU/TestFailMapperEvaluators.cs | 173 ++++ .../IMRU/TestFailMapperEvaluatorsOnInit.cs | 86 ++ .../Functional/IMRU/TestFailMapperTasks.cs | 90 ++ .../Functional/IMRU/TestFailUpdateEvaluator.cs | 236 ++++++ .../Org.Apache.REEF.Tests.csproj | 4 + .../Remote/Impl/StreamingLink.cs | 6 +- .../Remote/Impl/StreamingTransportClient.cs | 33 +- .../Remote/Impl/StreamingTransportServer.cs | 41 +- .../Remote/Parameters/ConnectionRetryCount.cs | 2 +- 44 files changed, 2176 insertions(+), 512 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs index c1448d1..077ba31 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs @@ -91,11 +91,20 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator } } + private string MessageFieldAsText(object field) + { + return field == null ? "null" : "not null"; + } + public void Handle(EvaluatorControlProto message) { lock (_heartBeatManager) { - Logger.Log(Level.Info, "Handle Evaluator control message"); + var msg = " done_evaluator = " + MessageFieldAsText(message.done_evaluator) + + " kill_evaluator = " + MessageFieldAsText(message.kill_evaluator) + + " stop_evaluator = " + MessageFieldAsText(message.stop_evaluator) + + " context_control = " + MessageFieldAsText(message.context_control); + Logger.Log(Level.Info, "Handle Evaluator control message: " + msg); if (!message.identifier.Equals(_evaluatorId, StringComparison.OrdinalIgnoreCase)) { OnException(new InvalidOperationException( http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs index 330c7b4..053ef23 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs @@ -113,14 +113,17 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task } catch (TaskStartHandlerException e) { + Logger.Log(Level.Info, "TaskRuntime::TaskStartHandlerException"); _currentStatus.SetException(e.InnerException); } catch (TaskStopHandlerException e) { + Logger.Log(Level.Info, "TaskRuntime::TaskStopHandlerException"); _currentStatus.SetException(e.InnerException); } catch (Exception e) { + Logger.Log(Level.Info, "TaskRuntime::Exception {0}", e.GetType()); _currentStatus.SetException(e); } finally http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj index c7e55b2..9bebad6 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj +++ b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj @@ -30,7 +30,9 @@ under the License. </PropertyGroup> <Import Project="$(SolutionDir)\build.props" /> <ItemGroup> - <Reference Include="Microsoft.Hadoop.Avro, Version=1.5.6.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL" /> + <Reference Include="Microsoft.Hadoop.Avro"> + <HintPath>$(PackagesDir)\Microsoft.Hadoop.Avro.$(AvroVersion)\lib\net45\Microsoft.Hadoop.Avro.dll</HintPath> + </Reference> <Reference Include="protobuf-net"> <HintPath>$(PackagesDir)\protobuf-net.$(ProtobufVersion)\lib\net40\protobuf-net.dll</HintPath> </Reference> http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU.Examples/OnREEFIMRURunTimeConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/OnREEFIMRURunTimeConfiguration.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/OnREEFIMRURunTimeConfiguration.cs index f47b473..31585da 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Examples/OnREEFIMRURunTimeConfiguration.cs +++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/OnREEFIMRURunTimeConfiguration.cs @@ -19,7 +19,7 @@ using System.Globalization; using Org.Apache.REEF.Client.Local; using Org.Apache.REEF.Client.Yarn; using Org.Apache.REEF.IMRU.OnREEF.Client; -using Org.Apache.REEF.IO.FileSystem.Hadoop; +using Org.Apache.REEF.Network; using Org.Apache.REEF.Tang.Implementations.Configuration; using Org.Apache.REEF.Tang.Interface; @@ -59,7 +59,7 @@ namespace Org.Apache.REEF.IMRU.Examples .Build(); } - return Configurations.Merge(runtimeConfig, imruClientConfig); + return Configurations.Merge(runtimeConfig, imruClientConfig, GetTcpConfiguration()); } /// <summary> @@ -71,9 +71,18 @@ namespace Org.Apache.REEF.IMRU.Examples IConfiguration imruClientConfig = REEFIMRUClientConfiguration.ConfigurationModule.Build(); - IConfiguration runtimeConfig = - YARNClientConfiguration.ConfigurationModule.Build(); - return Configurations.Merge(runtimeConfig, imruClientConfig); + var runtimeConfig = YARNClientConfiguration.ConfigurationModule + .Build(); + + return Configurations.Merge(runtimeConfig, imruClientConfig, GetTcpConfiguration()); + } + + private static IConfiguration GetTcpConfiguration() + { + return TcpClientConfigurationModule.ConfigurationModule + .Set(TcpClientConfigurationModule.MaxConnectionRetry, "200") + .Set(TcpClientConfigurationModule.SleepTime, "1000") + .Build(); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj b/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj index 1a17903..3cb5fd3 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj +++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj @@ -46,6 +46,7 @@ under the License. <Compile Include="MapperCount\IdentityMapFunction.cs" /> <Compile Include="IntSumReduceFunction.cs" /> <Compile Include="MapperCount\MapperCount.cs" /> + <Compile Include="PipelinedBroadcastReduce\FaultTolerantPipelinedBroadcastAndReduce.cs" /> <Compile Include="SingleIterUpdateFunction.cs" /> <Compile Include="NaturalSum\NaturalSum.cs" /> <Compile Include="NaturalSum\NaturalSumMapFunction.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs new file mode 100644 index 0000000..672389c --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Collections.Generic; +using System.Linq; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.IMRU.API; +using Org.Apache.REEF.IMRU.OnREEF.Parameters; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Implementations.Configuration; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce +{ + /// <summary> + /// IMRU program that performs broadcast and reduce with fault tolerance. + /// </summary> + public class FaultTolerantPipelinedBroadcastAndReduce : PipelinedBroadcastAndReduce + { + private static readonly Logger Logger = Logger.GetLogger(typeof(FaultTolerantPipelinedBroadcastAndReduce)); + + [Inject] + protected FaultTolerantPipelinedBroadcastAndReduce(IIMRUClient imruClient) : base(imruClient) + { + } + + /// <summary> + /// Build a test mapper function configuration + /// </summary> + /// <param name="maxRetryInRecovery">Number of retries done if first run failed.</param> + /// <returns></returns> + protected override IConfiguration BuildMapperFunctionConfig(int maxRetryInRecovery) + { + var c1 = IMRUMapConfiguration<int[], int[]>.ConfigurationModule + .Set(IMRUMapConfiguration<int[], int[]>.MapFunction, + GenericType<TestSenderMapFunction>.Class) + .Build(); + + var c2 = TangFactory.GetTang().NewConfigurationBuilder() + .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-2-") + .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-") + .BindIntNamedParam<FailureType>(FailureType.EvaluatorFailureDuringTaskExecution.ToString()) + .BindNamedParameter(typeof(MaxRetryNumberInRecovery), maxRetryInRecovery.ToString()) + .Build(); + + return Configurations.Merge(c1, c2); + } + + [NamedParameter(Documentation = "Set of task ids which will produce task/evaluator failure")] + public class TaskIdsToFail : Name<ISet<string>> + { + } + + [NamedParameter(Documentation = "Type of failure to simulate")] + public class FailureType : Name<int> + { + internal static readonly int EvaluatorFailureDuringTaskExecution = 0; + internal static readonly int TaskFailureDuringTaskExecution = 1; + internal static readonly int EvaluatorFailureDuringTaskInitialization = 2; + internal static readonly int TaskFailureDuringTaskInitialization = 3; + + internal static bool IsEvaluatorFailure(int failureType) + { + return failureType == EvaluatorFailureDuringTaskExecution || + failureType == EvaluatorFailureDuringTaskInitialization; + } + } + + /// <summary> + /// The function is to simulate Evaluator/Task failure for mapper evaluator + /// </summary> + public sealed class TestSenderMapFunction : IMapFunction<int[], int[]> + { + private int _iterations; + private readonly string _taskId; + private readonly ISet<string> _taskIdsToFail; + private int _failureType; + private readonly int _maxRetryInRecovery; + + [Inject] + private TestSenderMapFunction( + [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId, + [Parameter(typeof(TaskIdsToFail))] ISet<string> taskIdsToFail, + [Parameter(typeof(FailureType))] int failureType, + [Parameter(typeof(MaxRetryNumberInRecovery))] int maxRetryNumberInRecovery) + { + _taskId = taskId; + _taskIdsToFail = taskIdsToFail; + _failureType = failureType; + _maxRetryInRecovery = maxRetryNumberInRecovery; + Logger.Log(Level.Info, "TestSenderMapFunction: TaskId: {0}, _maxRetryInRecovery {1}, Failure type: {2}.", _taskId, _maxRetryInRecovery, _failureType); + foreach (var n in _taskIdsToFail) + { + Logger.Log(Level.Info, "TestSenderMapFunction: taskIdsToFail: {0}", n); + } + + if (_failureType == FailureType.EvaluatorFailureDuringTaskInitialization || + _failureType == FailureType.TaskFailureDuringTaskInitialization) + { + SimulateFailure(0); + } + } + + /// <summary> + /// Map function + /// </summary> + /// <param name="mapInput">integer array</param> + /// <returns>The same integer array</returns> + int[] IMapFunction<int[], int[]>.Map(int[] mapInput) + { + _iterations++; + Logger.Log(Level.Info, "Received value {0} in iteration {1}.", mapInput[0], _iterations); + + if (_failureType == FailureType.EvaluatorFailureDuringTaskExecution || + _failureType == FailureType.TaskFailureDuringTaskExecution) + { + SimulateFailure(10); + } + + if (mapInput[0] != _iterations) + { + Exceptions.Throw(new Exception("Expected value in mappers (" + _iterations + ") different from actual value (" + mapInput[0] + ")"), Logger); + } + + return mapInput; + } + + private void SimulateFailure(int onIteration) + { + if (_iterations == onIteration && + _taskIdsToFail.FirstOrDefault(e => _taskId.StartsWith(e)) != null && + _taskIdsToFail.FirstOrDefault(e => _taskId.Equals(e + _maxRetryInRecovery)) == null) + { + Logger.Log(Level.Warning, "Simulating {0} failure for taskId {1}", + FailureType.IsEvaluatorFailure(_failureType) ? "evaluator" : "task", + _taskId); + if (FailureType.IsEvaluatorFailure(_failureType)) + { + // simulate evaluator failure + Environment.Exit(1); + } + else + { + // simulate task failure + throw new ArgumentNullException("Simulating task failure"); + } + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs index de1598c..01c6daa 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs +++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs @@ -15,12 +15,14 @@ // specific language governing permissions and limitations // under the License. +using System; using System.Globalization; using System.IO; using Org.Apache.REEF.IMRU.API; using Org.Apache.REEF.IO.PartitionedData.Random; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Tang.Util; using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs; @@ -29,12 +31,12 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce /// <summary> /// IMRU program that performs broadcast and reduce /// </summary> - public sealed class PipelinedBroadcastAndReduce + public class PipelinedBroadcastAndReduce { private readonly IIMRUClient _imruClient; [Inject] - private PipelinedBroadcastAndReduce(IIMRUClient imruClient) + protected PipelinedBroadcastAndReduce(IIMRUClient imruClient) { _imruClient = imruClient; } @@ -42,7 +44,7 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce /// <summary> /// Runs the actual broadcast and reduce job /// </summary> - public void Run(int numberofMappers, int chunkSize, int numIterations, int dim, int mapperMemory, int updateTaskMemory) + public void Run(int numberofMappers, int chunkSize, int numIterations, int dim, int mapperMemory, int updateTaskMemory, int maxRetryNumberInRecovery) { var updateFunctionConfig = TangFactory.GetTang().NewConfigurationBuilder(IMRUUpdateConfiguration<int[], int[], int[]>.ConfigurationModule @@ -76,10 +78,7 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce var results = _imruClient.Submit<int[], int[], int[], Stream>( new IMRUJobDefinitionBuilder() - .SetMapFunctionConfiguration(IMRUMapConfiguration<int[], int[]>.ConfigurationModule - .Set(IMRUMapConfiguration<int[], int[]>.MapFunction, - GenericType<BroadcastReceiverReduceSenderMapFunction>.Class) - .Build()) + .SetMapFunctionConfiguration(BuildMapperFunctionConfig(maxRetryNumberInRecovery)) .SetUpdateFunctionConfiguration(updateFunctionConfig) .SetMapInputCodecConfiguration(IMRUCodecConfiguration<int[]>.ConfigurationModule .Set(IMRUCodecConfiguration<int[]>.Codec, GenericType<IntArrayStreamingCodec>.Class) @@ -99,8 +98,22 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce .SetJobName("BroadcastReduce") .SetNumberOfMappers(numberofMappers) .SetMapperMemory(mapperMemory) + .SetMaxRetryNumberInRecovery(maxRetryNumberInRecovery) .SetUpdateTaskMemory(updateTaskMemory) .Build()); } + + protected virtual IConfiguration BuildMapperFunctionConfig(int maxRetryInRecovery) + { + return IMRUMapConfiguration<int[], int[]>.ConfigurationModule + .Set(IMRUMapConfiguration<int[], int[]>.MapFunction, + GenericType<BroadcastReceiverReduceSenderMapFunction>.Class) + .Build(); + } + + internal void Run(int v, int chunkSize, int iterations, int dims, int mapperMemory, int updateTaskMemory) + { + throw new NotImplementedException(); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs index 18876ab..d002f2d 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs +++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs @@ -19,7 +19,6 @@ using System; using System.Globalization; using System.Linq; using Org.Apache.REEF.Client.API; -using Org.Apache.REEF.Client.Yarn; using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce; using Org.Apache.REEF.IO.FileSystem.Hadoop; using Org.Apache.REEF.IO.FileSystem.Local; @@ -30,7 +29,7 @@ using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.IMRU.Examples { /// <summary> - /// Runs IMRU for mapper count either in localruntime or on cluster. + /// Runs IMRU for mapper count either in local runtime or on cluster. /// </summary> public class Run { @@ -59,13 +58,14 @@ namespace Org.Apache.REEF.IMRU.Examples mapperCountExample.Run(numNodes - 1, filename, fileSystemConfig); } - public static void RunBroadcastReduceTest(IConfiguration tcpPortConfig, bool runOnYarn, int numNodes, string[] args, params string[] runtimeDir) + public static void RunBroadcastReduceTest(IConfiguration tcpPortConfig, bool runOnYarn, int numNodes, bool faultTolerant, string[] args, params string[] runtimeDir) { int chunkSize = 2; int dims = 10; - int iterations = 10; + int iterations = 100; int mapperMemory = 512; int updateTaskMemory = 512; + int maxRetryNumberInRecovery = 2; if (args.Length > 0) { @@ -92,6 +92,11 @@ namespace Org.Apache.REEF.IMRU.Examples iterations = Convert.ToInt32(args[4]); } + if (args.Length > 5) + { + maxRetryNumberInRecovery = Convert.ToInt32(args[5]); + } + IInjector injector; if (!runOnYarn) @@ -105,10 +110,26 @@ namespace Org.Apache.REEF.IMRU.Examples injector = TangFactory.GetTang() .NewInjector(OnREEFIMRURunTimeConfiguration<int[], int[], int[]>.GetYarnIMRUConfiguration(), tcpPortConfig); } - var broadcastReduceExample = injector.GetInstance<PipelinedBroadcastAndReduce>(); - broadcastReduceExample.Run(numNodes - 1, chunkSize, iterations, dims, mapperMemory, updateTaskMemory); + + if (faultTolerant) + { + var broadcastReduceFtExample = injector.GetInstance<FaultTolerantPipelinedBroadcastAndReduce>(); + broadcastReduceFtExample.Run(numNodes - 1, chunkSize, iterations, dims, mapperMemory, updateTaskMemory, maxRetryNumberInRecovery); + } + else + { + var broadcastReduceExample = injector.GetInstance<PipelinedBroadcastAndReduce>(); + broadcastReduceExample.Run(numNodes - 1, chunkSize, iterations, dims, mapperMemory, updateTaskMemory, maxRetryNumberInRecovery); + } } + /// <summary> + /// Run IMRU examples from command line + /// </summary> + /// Sample command line: + /// .\Org.Apache.REEF.IMRU.Examples.exe true 500 8900 1000 broadcastandreduce 20000000 1000000 1024 1024 10 2 + /// .\Org.Apache.REEF.IMRU.Examples.exe true 500 8900 1000 broadcastandreduceft 20000000 1000000 1024 1024 100 2 + /// <param name="args"></param> private static void Main(string[] args) { Logger.Log(Level.Info, "start running client: " + DateTime.Now); @@ -168,10 +189,16 @@ namespace Org.Apache.REEF.IMRU.Examples case "broadcastandreduce": Logger.Log(Level.Info, "Running Broadcast and Reduce"); - RunBroadcastReduceTest(tcpPortConfig, runOnYarn, numNodes, args.Skip(5).ToArray()); + RunBroadcastReduceTest(tcpPortConfig, runOnYarn, numNodes, false, args.Skip(5).ToArray()); Logger.Log(Level.Info, "Done Running Broadcast and Reduce"); return; + case "broadcastandreduceft": + Logger.Log(Level.Info, "Running Broadcast and Reduce FT"); + RunBroadcastReduceTest(tcpPortConfig, runOnYarn, numNodes, true, args.Skip(5).ToArray()); + Logger.Log(Level.Info, "Done Running Broadcast and Reduce FT"); + return; + default: Logger.Log(Level.Info, "wrong test name"); return; http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU.Tests/TestActiveContextManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestActiveContextManager.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestActiveContextManager.cs index d3d09d6..f2a0dbf 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestActiveContextManager.cs +++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestActiveContextManager.cs @@ -17,6 +17,7 @@ using System; using System.Collections.Generic; +using System.Linq; using NSubstitute; using Org.Apache.REEF.Driver.Context; using Org.Apache.REEF.Driver.Evaluator; @@ -181,10 +182,10 @@ namespace Org.Apache.REEF.IMRU.Tests /// <summary> /// A Context Manager observer for test /// </summary> - private sealed class TestContextObserver : IObserver<IDictionary<string, IActiveContext>> + private sealed class TestContextObserver : IObserver<IEnumerable<IActiveContext>> { private readonly int _totalExpected; - private IDictionary<string, IActiveContext> _contexts = null; + private int _contextCount = 0; internal TestContextObserver(int totalExpected) { @@ -203,16 +204,12 @@ namespace Org.Apache.REEF.IMRU.Tests public int NumberOfActiveContextsReceived() { - if (_contexts != null) - { - return _contexts.Count; - } - return 0; + return _contextCount; } - public void OnNext(IDictionary<string, IActiveContext> value) + public void OnNext(IEnumerable<IActiveContext> value) { - _contexts = value; + _contextCount = value.Count(); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU.Tests/TestEvaluatorManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestEvaluatorManager.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestEvaluatorManager.cs index 489c725..a5c4caa 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestEvaluatorManager.cs +++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestEvaluatorManager.cs @@ -38,22 +38,22 @@ namespace Org.Apache.REEF.IMRU.Tests [Fact] public void TestValidAddRemoveAllocatedEvaluator() { - var evalutorManager = CreateTestEvaluators(3, 1); - Assert.Equal(3, evalutorManager.NumberOfAllocatedEvaluators); - Assert.True(evalutorManager.AreAllEvaluatorsAllocated()); - Assert.True(evalutorManager.IsMasterEvaluatorId(EvaluatorIdPrefix + 1)); - Assert.False(evalutorManager.IsMasterEvaluatorId(EvaluatorIdPrefix + 2)); - Assert.True(evalutorManager.IsAllocatedEvaluator(EvaluatorIdPrefix + 2)); - Assert.False(evalutorManager.IsMasterEvaluatorFailed()); + var evaluatorManager = CreateTestEvaluators(3, 1); + Assert.Equal(3, evaluatorManager.NumberOfAllocatedEvaluators); + Assert.True(evaluatorManager.AreAllEvaluatorsAllocated()); + Assert.True(evaluatorManager.IsMasterEvaluatorId(EvaluatorIdPrefix + 1)); + Assert.False(evaluatorManager.IsMasterEvaluatorId(EvaluatorIdPrefix + 2)); + Assert.True(evaluatorManager.IsAllocatedEvaluator(EvaluatorIdPrefix + 2)); + Assert.False(evaluatorManager.IsMasterEvaluatorFailed()); - evalutorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 1); - Assert.Equal(2, evalutorManager.NumberOfAllocatedEvaluators); - Assert.True(evalutorManager.IsMasterEvaluatorFailed()); - Assert.Equal(0, evalutorManager.NumberofFailedMappers()); + evaluatorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 1); + Assert.Equal(2, evaluatorManager.NumberOfAllocatedEvaluators); + Assert.True(evaluatorManager.IsMasterEvaluatorFailed()); + Assert.Equal(0, evaluatorManager.NumberofFailedMappers()); - evalutorManager.ResetFailedEvaluators(); - evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++)); - Assert.True(evalutorManager.AreAllEvaluatorsAllocated()); + evaluatorManager.ResetFailedEvaluators(); + evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++)); + Assert.True(evaluatorManager.AreAllEvaluatorsAllocated()); } /// <summary> @@ -62,10 +62,10 @@ namespace Org.Apache.REEF.IMRU.Tests [Fact] public void TestNoMasterEvaluator() { - var evalutorManager = CreateEvaluatorManager(3, 1); - evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++)); - evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++)); - Action add = () => evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++)); + var evaluatorManager = CreateEvaluatorManager(3, 1); + evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++)); + evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++)); + Action add = () => evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++)); Assert.Throws<IMRUSystemException>(add); } @@ -75,10 +75,10 @@ namespace Org.Apache.REEF.IMRU.Tests [Fact] public void TestTwoMasterEvaluator() { - var evalutorManager = CreateEvaluatorManager(3, 1); - evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++)); - evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++)); - Action add = () => evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3, EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++)); + var evaluatorManager = CreateEvaluatorManager(3, 1); + evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++)); + evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++)); + Action add = () => evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3, EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++)); Assert.Throws<IMRUSystemException>(add); } @@ -88,10 +88,10 @@ namespace Org.Apache.REEF.IMRU.Tests [Fact] public void TestTooManyEvaluators() { - var evalutorManager = CreateEvaluatorManager(2, 1); - evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++)); - evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++)); - Action add = () => evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++)); + var evaluatorManager = CreateEvaluatorManager(2, 1); + evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++)); + evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++)); + Action add = () => evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++)); Assert.Throws<IMRUSystemException>(add); } @@ -113,14 +113,14 @@ namespace Org.Apache.REEF.IMRU.Tests [Fact] public void TestResetFailedEvaluators() { - var evalutorManager = CreateTestEvaluators(3, 1); - evalutorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 1); - evalutorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 2); - Assert.Equal(2, evalutorManager.NumberOfMissingEvaluators()); - evalutorManager.ResetFailedEvaluators(); - Assert.Equal(0, evalutorManager.NumberofFailedMappers()); - Assert.False(evalutorManager.IsMasterEvaluatorId(EvaluatorIdPrefix + 1)); - Assert.False(evalutorManager.IsMasterEvaluatorFailed()); + var evaluatorManager = CreateTestEvaluators(3, 1); + evaluatorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 1); + evaluatorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 2); + Assert.Equal(2, evaluatorManager.NumberOfMissingEvaluators()); + evaluatorManager.ResetFailedEvaluators(); + Assert.Equal(0, evaluatorManager.NumberofFailedMappers()); + Assert.False(evaluatorManager.IsMasterEvaluatorId(EvaluatorIdPrefix + 1)); + Assert.False(evaluatorManager.IsMasterEvaluatorFailed()); } /// <summary> @@ -129,10 +129,10 @@ namespace Org.Apache.REEF.IMRU.Tests [Fact] public void TestReachedMaximumNumberOfEvaluatorFailures() { - var evalutorManager = CreateTestEvaluators(3, 2); - evalutorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 1); - evalutorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 2); - Assert.True(evalutorManager.ReachedMaximumNumberOfEvaluatorFailures()); + var evaluatorManager = CreateTestEvaluators(3, 1); + evaluatorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 1); + evaluatorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 2); + Assert.True(evaluatorManager.ExceededMaximumNumberOfEvaluatorFailures()); } /// <summary> http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs index bdcebc2..d35f7c8 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs +++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs @@ -205,7 +205,7 @@ namespace Org.Apache.REEF.IMRU.Tests taskManager.RecordFailedTaskDuringRunningOrSubmissionState(CreateMockFailedTask(MapperTaskIdPrefix + 1, TaskManager.TaskAppError)); taskManager.RecordFailedTaskDuringRunningOrSubmissionState(CreateMockFailedTask(MapperTaskIdPrefix + 2, TaskManager.TaskGroupCommunicationError)); taskManager.RecordFailedTaskDuringRunningOrSubmissionState(CreateMockFailedTask(MasterTaskId, TaskManager.TaskSystemError)); - Assert.True(taskManager.AllInFinalState()); + Assert.True(taskManager.AreAllTasksInFinalState()); } /// <summary> @@ -254,7 +254,7 @@ namespace Org.Apache.REEF.IMRU.Tests taskManager.RecordFailedTaskDuringSystemShuttingDownState(CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver)); Assert.Equal(TaskState.TaskClosedByDriver, taskManager.GetTaskState(MasterTaskId)); - Assert.True(taskManager.AllInFinalState()); + Assert.True(taskManager.AreAllTasksInFinalState()); } /// <summary> @@ -281,7 +281,7 @@ namespace Org.Apache.REEF.IMRU.Tests var masterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver); taskManager.RecordFailedTaskDuringSystemShuttingDownState(masterTask); - Assert.True(taskManager.AllInFinalState()); + Assert.True(taskManager.AreAllTasksInFinalState()); } /// <summary> @@ -312,7 +312,7 @@ namespace Org.Apache.REEF.IMRU.Tests taskManager.RecordFailedTaskDuringSystemShuttingDownState(masterTask); Assert.Equal(TaskState.TaskClosedByDriver, taskManager.GetTaskState(MasterTaskId)); - Assert.True(taskManager.AllInFinalState()); + Assert.True(taskManager.AreAllTasksInFinalState()); } /// <summary> @@ -341,7 +341,19 @@ namespace Org.Apache.REEF.IMRU.Tests taskManager.RecordFailedTaskDuringSystemShuttingDownState(masterTask); Assert.Equal(TaskState.TaskClosedByDriver, taskManager.GetTaskState(MasterTaskId)); - Assert.True(taskManager.AllInFinalState()); + Assert.True(taskManager.AreAllTasksInFinalState()); + } + + /// <summary> + /// Test the scenario where there is no task associated with the Failed Evaluator. + /// This can happen when submitting a task on a failed evaluator. + /// </summary> + [Fact] + public void TestFailedEvaluatorWithUnsuccessfullySubmittedTask() + { + var taskManager = TaskManagerWithTasksSubmitted(); + taskManager.RecordTaskFailWhenReceivingFailedEvaluator(CreateMockFailedEvaluatorWithoutTaskId(EvaluatorIdPrefix + ContextIdPrefix + 1)); + Assert.Equal(TaskState.TaskFailedByEvaluatorFailure, taskManager.GetTaskState(MapperTaskIdPrefix + 1)); } /// <summary> @@ -378,7 +390,7 @@ namespace Org.Apache.REEF.IMRU.Tests var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver); taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask); - Assert.True(taskManager.AllInFinalState()); + Assert.True(taskManager.AreAllTasksInFinalState()); } /// <summary> @@ -411,7 +423,7 @@ namespace Org.Apache.REEF.IMRU.Tests var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver); taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask); - Assert.True(taskManager.AllInFinalState()); + Assert.True(taskManager.AreAllTasksInFinalState()); } /// <summary> @@ -439,7 +451,7 @@ namespace Org.Apache.REEF.IMRU.Tests var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver); taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask); - Assert.True(taskManager.AllInFinalState()); + Assert.True(taskManager.AreAllTasksInFinalState()); } /// <summary> @@ -476,7 +488,7 @@ namespace Org.Apache.REEF.IMRU.Tests var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver); taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask); - Assert.True(taskManager.AllInFinalState()); + Assert.True(taskManager.AreAllTasksInFinalState()); } /// <summary> @@ -515,7 +527,7 @@ namespace Org.Apache.REEF.IMRU.Tests var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskKilledByDriver); taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask); - Assert.True(taskManager.AllInFinalState()); + Assert.True(taskManager.AreAllTasksInFinalState()); } /// <summary> @@ -548,7 +560,7 @@ namespace Org.Apache.REEF.IMRU.Tests var failedMasterTask = CreateMockFailedTask(MasterTaskId, TaskManager.TaskGroupCommunicationError); taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedMasterTask); - Assert.True(taskManager.AllInFinalState()); + Assert.True(taskManager.AreAllTasksInFinalState()); } /// <summary> @@ -633,15 +645,19 @@ namespace Org.Apache.REEF.IMRU.Tests case TaskManager.TaskGroupCommunicationError: taskException = new IMRUTaskGroupCommunicationException(errorMsg); break; - default: + case TaskManager.TaskSystemError: taskException = new IMRUTaskSystemException(errorMsg); break; + default: + taskException = new IMRUTaskAppException(errorMsg); + break; } IFailedTask failedtask = Substitute.For<IFailedTask>(); failedtask.Id.Returns(taskId); failedtask.Message.Returns(errorMsg); failedtask.AsError().Returns(taskException); + failedtask.GetActiveContext().Returns(Optional<IActiveContext>.Empty()); return failedtask; } @@ -684,6 +700,20 @@ namespace Org.Apache.REEF.IMRU.Tests } /// <summary> + /// Creates a mock IFailedEvaluator with no task id associated + /// This is to simulate the case where task is submitted on a failed evaluator. + /// </summary> + /// <param name="evaluatorId"></param> + /// <returns></returns> + private static IFailedEvaluator CreateMockFailedEvaluatorWithoutTaskId(string evaluatorId) + { + var failedEvalutor = Substitute.For<IFailedEvaluator>(); + failedEvalutor.Id.Returns(evaluatorId); + failedEvalutor.FailedTask.Returns(Optional<IFailedTask>.Empty()); + return failedEvalutor; + } + + /// <summary> /// Creates a mock IConfiguration /// </summary> /// <returns></returns> http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs index 5ea8d23..d42bf9b 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs @@ -41,6 +41,7 @@ namespace Org.Apache.REEF.IMRU.API private readonly int _updateTaskMemory; private readonly int _mapTaskCores; private readonly int _updateTaskCores; + private readonly int _maxRetryNumberInRecovery; private readonly ISet<IConfiguration> _perMapConfigGeneratorConfig; private readonly bool _invokeGC; @@ -66,6 +67,7 @@ namespace Org.Apache.REEF.IMRU.API /// <param name="updateTaskMemory">Update task memory</param> /// <param name="mapTaskCores">Number of map task cores</param> /// <param name="updateTaskCores">Number of update task cores</param> + /// <param name="maxRetryNumberInRecovery">Max number of retries done if first run of IMRU job failed</param> /// <param name="jobName">Job name</param> /// <param name="invokeGC">Whether to call garbage collector after each iteration</param> internal IMRUJobDefinition( @@ -84,6 +86,7 @@ namespace Org.Apache.REEF.IMRU.API int updateTaskMemory, int mapTaskCores, int updateTaskCores, + int maxRetryNumberInRecovery, string jobName, bool invokeGC) { @@ -101,6 +104,7 @@ namespace Org.Apache.REEF.IMRU.API _updateTaskMemory = updateTaskMemory; _mapTaskCores = mapTaskCores; _updateTaskCores = updateTaskCores; + _maxRetryNumberInRecovery = maxRetryNumberInRecovery; _perMapConfigGeneratorConfig = perMapConfigGeneratorConfig; _invokeGC = invokeGC; _resultHandlerConfiguration = resultHandlerConfiguration; @@ -223,6 +227,14 @@ namespace Org.Apache.REEF.IMRU.API } /// <summary> + /// Max number of retries done if first run of IMRU job failed. + /// </summary> + internal int MaxRetryNumberInRecovery + { + get { return _maxRetryNumberInRecovery; } + } + + /// <summary> /// Per mapper configuration /// </summary> internal ISet<IConfiguration> PerMapConfigGeneratorConfig http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs index f078c4a..5d56fde 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs @@ -38,6 +38,7 @@ namespace Org.Apache.REEF.IMRU.API private int _updateTaskMemory; private int _coresPerMapper; private int _updateTaskCores; + private int _maxRetryNumberInRecovery; private IConfiguration _mapFunctionConfiguration; private IConfiguration _mapInputCodecConfiguration; private IConfiguration _updateFunctionCodecsConfiguration; @@ -66,6 +67,7 @@ namespace Org.Apache.REEF.IMRU.API _updateTaskMemory = 512; _coresPerMapper = 1; _updateTaskCores = 1; + _maxRetryNumberInRecovery = 0; _invokeGC = true; _perMapConfigGeneratorConfig = new HashSet<IConfiguration>(); } @@ -233,6 +235,17 @@ namespace Org.Apache.REEF.IMRU.API } /// <summary> + /// Set max number of retries done if first run of IMRU job failed. + /// </summary> + /// <param name="maxRetryNumberInRecovery">Max number of retries</param> + /// <returns></returns> + public IMRUJobDefinitionBuilder SetMaxRetryNumberInRecovery(int maxRetryNumberInRecovery) + { + _maxRetryNumberInRecovery = maxRetryNumberInRecovery; + return this; + } + + /// <summary> /// Sets Per Map Configuration /// </summary> /// <param name="perMapperConfig">Mapper configs</param> @@ -320,6 +333,7 @@ namespace Org.Apache.REEF.IMRU.API _updateTaskMemory, _coresPerMapper, _updateTaskCores, + _maxRetryNumberInRecovery, _jobName, _invokeGC); } http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs index 969a874..9a256e0 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs @@ -104,6 +104,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Client GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) .Set(DriverConfiguration.OnTaskFailed, GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(DriverConfiguration.OnTaskRunning, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) .Set(DriverConfiguration.CustomTraceLevel, TraceLevel.Info.ToString()) .Build(), TangFactory.GetTang().NewConfigurationBuilder() @@ -144,6 +146,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Client jobDefinition.MapTaskCores.ToString(CultureInfo.InvariantCulture)) .BindNamedParameter(typeof(CoresForUpdateTask), jobDefinition.UpdateTaskCores.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter(typeof(MaxRetryNumberInRecovery), + jobDefinition.MaxRetryNumberInRecovery.ToString(CultureInfo.InvariantCulture)) .BindNamedParameter(typeof(InvokeGC), jobDefinition.InvokeGarbageCollectorAfterIteration.ToString(CultureInfo.InvariantCulture)) .Build(); http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs index 219a9f6..437b76f 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs @@ -32,12 +32,12 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver /// Manages active contexts for the driver /// </summary> [NotThreadSafe] - internal sealed class ActiveContextManager : IDisposable + internal sealed class ActiveContextManager : IDisposable, IObservable<IEnumerable<IActiveContext>> { private static readonly Logger Logger = Logger.GetLogger(typeof(ActiveContextManager)); private readonly IDictionary<string, IActiveContext> _activeContexts = new Dictionary<string, IActiveContext>(); private readonly int _totalExpectedContexts; - private IObserver<IDictionary<string, IActiveContext>> _activeContextObserver; + private IObserver<IEnumerable<IActiveContext>> _activeContextObserver; /// <summary> /// Constructor of ActiveContextManager @@ -71,7 +71,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver /// </summary> /// <param name="activeContextObserver"></param> /// <returns></returns> - public IDisposable Subscribe(IObserver<IDictionary<string, IActiveContext>> activeContextObserver) + public IDisposable Subscribe(IObserver<IEnumerable<IActiveContext>> activeContextObserver) { if (_activeContextObserver != null) { @@ -84,7 +84,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver /// <summary> /// Checks if all the requested contexts are received. /// </summary> - private bool AreAllContextsReceived + internal bool AreAllContextsReceived { get { return _totalExpectedContexts == NumberOfActiveContexts; } } @@ -112,7 +112,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver if (AreAllContextsReceived && _activeContextObserver != null) { - _activeContextObserver.OnNext(_activeContexts); + _activeContextObserver.OnNext(_activeContexts.Values); } } http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorManager.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorManager.cs index 8fa9876..5f6856c 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorManager.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorManager.cs @@ -201,7 +201,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver /// <summary> /// Records failed Evaluator /// Removes it from allocated Evaluator and adds it to the failed Evaluators collection - /// If the evaluatorId is not in _failedEvaluators, throw IMRUSystemException + /// If the evaluatorId is already in _failedEvaluators, throw IMRUSystemException /// </summary> /// <param name="evaluatorId"></param> internal void RecordFailedEvaluator(string evaluatorId) @@ -217,11 +217,25 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver } /// <summary> + /// Remove failed evaluator from the colletion + /// </summary> + /// <param name="evaluatorId"></param> + internal void RemoveFailedEvaluator(string evaluatorId) + { + if (!_failedEvaluatorIds.Contains(evaluatorId)) + { + string msg = string.Format("The failed evaluator {0} is not recorded in list of failed evaluators.", evaluatorId); + Exceptions.Throw(new IMRUSystemException(msg), Logger); + } + _failedEvaluatorIds.Remove(evaluatorId); + } + + /// <summary> /// Checks if the number of failed Evaluators has reached allowed maximum number of evaluator failures /// </summary> - internal bool ReachedMaximumNumberOfEvaluatorFailures() + internal bool ExceededMaximumNumberOfEvaluatorFailures() { - return _failedEvaluatorIds.Count >= AllowedNumberOfEvaluatorFailures; + return _failedEvaluatorIds.Count > AllowedNumberOfEvaluatorFailures; } /// <summary>
