Repository: reef Updated Branches: refs/heads/master 3cc39310b -> 9b034dd3c
[REEF-1556] Add number of forced failures for IMRU fault tolerant testing * Refactor PipelinedBroadcastAndReduce in IMRU example so that it can be reused by fault tolerant. * Update FaultTolerantPipelinedBroadcastAndReduce to allow configuring the number of forced failures done before successful run. * Update test cases. JIRA: [REEF-1556](https://issues.apache.org/jira/browse/REEF-1556) Pull request: This closes #1114 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/9b034dd3 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/9b034dd3 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/9b034dd3 Branch: refs/heads/master Commit: 9b034dd3c11b3a9854ad7967ad676e6853a01e1d Parents: 3cc3931 Author: Julia Wang <[email protected]> Authored: Wed Sep 7 15:35:05 2016 -0700 Committer: Mariia Mykhailova <[email protected]> Committed: Fri Sep 9 12:53:59 2016 -0700 ---------------------------------------------------------------------- .../FaultTolerantPipelinedBroadcastAndReduce.cs | 70 +++++++-- .../PipelinedBroadcastAndReduce.cs | 147 +++++++++++++------ lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs | 16 +- .../Functional/IMRU/IMRUBroadcastReduceTest.cs | 2 +- .../Functional/IMRU/TestFailMapperEvaluators.cs | 1 + .../IMRU/TestFailMapperEvaluatorsOnInit.cs | 2 + .../Functional/IMRU/TestFailMapperTasks.cs | 4 +- 7 files changed, 172 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/9b034dd3/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs index 672389c..f0e563d 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs +++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs @@ -7,7 +7,7 @@ // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -17,6 +17,7 @@ using System; using System.Collections.Generic; +using System.IO; using System.Linq; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.IMRU.API; @@ -34,21 +35,33 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce /// <summary> /// IMRU program that performs broadcast and reduce with fault tolerance. /// </summary> - public class FaultTolerantPipelinedBroadcastAndReduce : PipelinedBroadcastAndReduce + public sealed class FaultTolerantPipelinedBroadcastAndReduce : PipelinedBroadcastAndReduce { private static readonly Logger Logger = Logger.GetLogger(typeof(FaultTolerantPipelinedBroadcastAndReduce)); [Inject] - protected FaultTolerantPipelinedBroadcastAndReduce(IIMRUClient imruClient) : base(imruClient) + private FaultTolerantPipelinedBroadcastAndReduce(IIMRUClient imruClient) : base(imruClient) + { + } + + /// <summary> + /// Runs the actual broadcast and reduce job with fault tolerance + /// </summary> + internal void Run(int numberofMappers, int chunkSize, int numIterations, int dim, int mapperMemory, int updateTaskMemory, int maxRetryNumberInRecovery, int totalNumberOfForcedFailures) { + var results = _imruClient.Submit<int[], int[], int[], Stream>( + CreateJobDefinitionBuilder(numberofMappers, chunkSize, numIterations, dim, mapperMemory, updateTaskMemory) + .SetMapFunctionConfiguration(BuildMapperFunctionConfig(maxRetryNumberInRecovery, totalNumberOfForcedFailures)) + .SetMaxRetryNumberInRecovery(maxRetryNumberInRecovery) + .Build()); } - + /// <summary> /// Build a test mapper function configuration /// </summary> /// <param name="maxRetryInRecovery">Number of retries done if first run failed.</param> - /// <returns></returns> - protected override IConfiguration BuildMapperFunctionConfig(int maxRetryInRecovery) + /// <param name="totalNumberOfForcedFailures">Number of forced failure times in recovery.</param> + private static IConfiguration BuildMapperFunctionConfig(int maxRetryInRecovery, int totalNumberOfForcedFailures) { var c1 = IMRUMapConfiguration<int[], int[]>.ConfigurationModule .Set(IMRUMapConfiguration<int[], int[]>.MapFunction, @@ -60,18 +73,19 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-") .BindIntNamedParam<FailureType>(FailureType.EvaluatorFailureDuringTaskExecution.ToString()) .BindNamedParameter(typeof(MaxRetryNumberInRecovery), maxRetryInRecovery.ToString()) + .BindNamedParameter(typeof(TotalNumberOfForcedFailures), totalNumberOfForcedFailures.ToString()) .Build(); return Configurations.Merge(c1, c2); } [NamedParameter(Documentation = "Set of task ids which will produce task/evaluator failure")] - public class TaskIdsToFail : Name<ISet<string>> + internal class TaskIdsToFail : Name<ISet<string>> { } [NamedParameter(Documentation = "Type of failure to simulate")] - public class FailureType : Name<int> + internal class FailureType : Name<int> { internal static readonly int EvaluatorFailureDuringTaskExecution = 0; internal static readonly int TaskFailureDuringTaskExecution = 1; @@ -85,35 +99,54 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce } } + [NamedParameter(Documentation = "Total number of failures in recovery.", DefaultValue = "2")] + internal class TotalNumberOfForcedFailures : Name<int> + { + } + /// <summary> /// The function is to simulate Evaluator/Task failure for mapper evaluator /// </summary> - public sealed class TestSenderMapFunction : IMapFunction<int[], int[]> + internal sealed class TestSenderMapFunction : IMapFunction<int[], int[]> { private int _iterations; private readonly string _taskId; private readonly ISet<string> _taskIdsToFail; - private int _failureType; + private readonly int _failureType; private readonly int _maxRetryInRecovery; + private readonly int _totalNumberOfForcedFailures; + private readonly int _retryIndex; [Inject] private TestSenderMapFunction( [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId, [Parameter(typeof(TaskIdsToFail))] ISet<string> taskIdsToFail, [Parameter(typeof(FailureType))] int failureType, - [Parameter(typeof(MaxRetryNumberInRecovery))] int maxRetryNumberInRecovery) + [Parameter(typeof(MaxRetryNumberInRecovery))] int maxRetryNumberInRecovery, + [Parameter(typeof(TotalNumberOfForcedFailures))] int totalNumberOfForcedFailures) { _taskId = taskId; _taskIdsToFail = taskIdsToFail; _failureType = failureType; _maxRetryInRecovery = maxRetryNumberInRecovery; - Logger.Log(Level.Info, "TestSenderMapFunction: TaskId: {0}, _maxRetryInRecovery {1}, Failure type: {2}.", _taskId, _maxRetryInRecovery, _failureType); + _totalNumberOfForcedFailures = totalNumberOfForcedFailures; + + var taskIdSplit = taskId.Split('-'); + _retryIndex = int.Parse(taskIdSplit[taskIdSplit.Length - 1]); + + Logger.Log(Level.Info, + "TestSenderMapFunction: TaskId: {0}, _maxRetryInRecovery {1}, totalNumberOfForcedFailures: {2}, RetryNumber: {3}, Failure type: {4}.", + _taskId, + _maxRetryInRecovery, + _totalNumberOfForcedFailures, + _retryIndex, + _failureType); foreach (var n in _taskIdsToFail) { Logger.Log(Level.Info, "TestSenderMapFunction: taskIdsToFail: {0}", n); } - if (_failureType == FailureType.EvaluatorFailureDuringTaskInitialization || + if (_failureType == FailureType.EvaluatorFailureDuringTaskInitialization || _failureType == FailureType.TaskFailureDuringTaskInitialization) { SimulateFailure(0); @@ -138,7 +171,10 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce if (mapInput[0] != _iterations) { - Exceptions.Throw(new Exception("Expected value in mappers (" + _iterations + ") different from actual value (" + mapInput[0] + ")"), Logger); + Exceptions.Throw( + new Exception("Expected value in mappers (" + _iterations + ") different from actual value (" + + mapInput[0] + ")"), + Logger); } return mapInput; @@ -148,9 +184,11 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce { if (_iterations == onIteration && _taskIdsToFail.FirstOrDefault(e => _taskId.StartsWith(e)) != null && - _taskIdsToFail.FirstOrDefault(e => _taskId.Equals(e + _maxRetryInRecovery)) == null) + _taskIdsToFail.FirstOrDefault(e => _taskId.Equals(e + _maxRetryInRecovery)) == null && + _retryIndex < _totalNumberOfForcedFailures) { - Logger.Log(Level.Warning, "Simulating {0} failure for taskId {1}", + Logger.Log(Level.Warning, + "Simulating {0} failure for taskId {1}", FailureType.IsEvaluatorFailure(_failureType) ? "evaluator" : "task", _taskId); if (FailureType.IsEvaluatorFailure(_failureType)) http://git-wip-us.apache.org/repos/asf/reef/blob/9b034dd3/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs index 01c6daa..f061a44 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs +++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -using System; using System.Globalization; using System.IO; using Org.Apache.REEF.IMRU.API; @@ -33,7 +32,7 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce /// </summary> public class PipelinedBroadcastAndReduce { - private readonly IIMRUClient _imruClient; + protected readonly IIMRUClient _imruClient; [Inject] protected PipelinedBroadcastAndReduce(IIMRUClient imruClient) @@ -44,21 +43,76 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce /// <summary> /// Runs the actual broadcast and reduce job /// </summary> - public void Run(int numberofMappers, int chunkSize, int numIterations, int dim, int mapperMemory, int updateTaskMemory, int maxRetryNumberInRecovery) + internal void Run(int numberofMappers, int chunkSize, int numIterations, int dim, int mapperMemory, int updateTaskMemory) { - var updateFunctionConfig = - TangFactory.GetTang().NewConfigurationBuilder(IMRUUpdateConfiguration<int[], int[], int[]>.ConfigurationModule - .Set(IMRUUpdateConfiguration<int[], int[], int[]>.UpdateFunction, - GenericType<BroadcastSenderReduceReceiverUpdateFunction>.Class).Build()) - .BindNamedParameter(typeof(BroadcastReduceConfiguration.NumberOfIterations), - numIterations.ToString(CultureInfo.InvariantCulture)) - .BindNamedParameter(typeof(BroadcastReduceConfiguration.Dimensions), - dim.ToString(CultureInfo.InvariantCulture)) - .BindNamedParameter(typeof(BroadcastReduceConfiguration.NumWorkers), - numberofMappers.ToString(CultureInfo.InvariantCulture)) - .Build(); + var results = _imruClient.Submit<int[], int[], int[], Stream>( + CreateJobDefinitionBuilder(numberofMappers, chunkSize, numIterations, dim, mapperMemory, updateTaskMemory) + .SetMapFunctionConfiguration(BuildMapperFunctionConfig()) + .Build()); + } - var dataConverterConfig1 = + protected IMRUJobDefinitionBuilder CreateJobDefinitionBuilder(int numberofMappers, int chunkSize, int numIterations, int dim, int mapperMemory, int updateTaskMemory) + { + return new IMRUJobDefinitionBuilder() + .SetUpdateFunctionConfiguration(UpdateFunctionConfig(numberofMappers, numIterations, dim)) + .SetMapInputCodecConfiguration(MapInputCodecConfiguration()) + .SetUpdateFunctionCodecsConfiguration(UpdateFunctionCodecsConfiguration()) + .SetReduceFunctionConfiguration(ReduceFunctionConfiguration()) + .SetMapInputPipelineDataConverterConfiguration(MapInputDataConverterConfig(chunkSize)) + .SetMapOutputPipelineDataConverterConfiguration(MapOutputDataConverterConfig(chunkSize)) + .SetPartitionedDatasetConfiguration(PartitionedDatasetConfiguration(numberofMappers)) + .SetJobName("BroadcastReduce") + .SetNumberOfMappers(numberofMappers) + .SetMapperMemory(mapperMemory) + .SetUpdateTaskMemory(updateTaskMemory); + } + + /// <summary> + /// Configuration for Partitioned Dataset + /// </summary> + protected static IConfiguration PartitionedDatasetConfiguration(int numberofMappers) + { + return RandomInputDataConfiguration.ConfigurationModule.Set(RandomInputDataConfiguration.NumberOfPartitions, + numberofMappers.ToString()).Build(); + } + + /// <summary> + /// Configuration for Reduce Function + /// </summary> + protected static IConfiguration ReduceFunctionConfiguration() + { + return IMRUReduceFunctionConfiguration<int[]>.ConfigurationModule + .Set(IMRUReduceFunctionConfiguration<int[]>.ReduceFunction, + GenericType<IntArraySumReduceFunction>.Class) + .Build(); + } + + /// <summary> + /// Configuration for Update Function + /// </summary> + protected static IConfiguration UpdateFunctionCodecsConfiguration() + { + return IMRUCodecConfiguration<int[]>.ConfigurationModule + .Set(IMRUCodecConfiguration<int[]>.Codec, GenericType<IntArrayStreamingCodec>.Class) + .Build(); + } + + /// <summary> + /// Configuration for Map Input Codec + /// </summary> + protected static IConfiguration MapInputCodecConfiguration() + { + return IMRUCodecConfiguration<int[]>.ConfigurationModule + .Set(IMRUCodecConfiguration<int[]>.Codec, GenericType<IntArrayStreamingCodec>.Class) + .Build(); + } + + /// <summary> + /// Configuration for Map Output Data Converter + /// </summary> + protected static IConfiguration MapOutputDataConverterConfig(int chunkSize) + { + var dataConverterConfig2 = TangFactory.GetTang() .NewConfigurationBuilder(IMRUPipelineDataConverterConfiguration<int[]>.ConfigurationModule .Set(IMRUPipelineDataConverterConfiguration<int[]>.MapInputPiplelineDataConverter, @@ -66,8 +120,15 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce .BindNamedParameter(typeof(BroadcastReduceConfiguration.ChunkSize), chunkSize.ToString(CultureInfo.InvariantCulture)) .Build(); + return dataConverterConfig2; + } - var dataConverterConfig2 = + /// <summary> + /// Configuration for Map Input Data Converter + /// </summary> + protected static IConfiguration MapInputDataConverterConfig(int chunkSize) + { + var dataConverterConfig1 = TangFactory.GetTang() .NewConfigurationBuilder(IMRUPipelineDataConverterConfiguration<int[]>.ConfigurationModule .Set(IMRUPipelineDataConverterConfiguration<int[]>.MapInputPiplelineDataConverter, @@ -75,45 +136,37 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce .BindNamedParameter(typeof(BroadcastReduceConfiguration.ChunkSize), chunkSize.ToString(CultureInfo.InvariantCulture)) .Build(); + return dataConverterConfig1; + } - var results = _imruClient.Submit<int[], int[], int[], Stream>( - new IMRUJobDefinitionBuilder() - .SetMapFunctionConfiguration(BuildMapperFunctionConfig(maxRetryNumberInRecovery)) - .SetUpdateFunctionConfiguration(updateFunctionConfig) - .SetMapInputCodecConfiguration(IMRUCodecConfiguration<int[]>.ConfigurationModule - .Set(IMRUCodecConfiguration<int[]>.Codec, GenericType<IntArrayStreamingCodec>.Class) - .Build()) - .SetUpdateFunctionCodecsConfiguration(IMRUCodecConfiguration<int[]>.ConfigurationModule - .Set(IMRUCodecConfiguration<int[]>.Codec, GenericType<IntArrayStreamingCodec>.Class) - .Build()) - .SetReduceFunctionConfiguration(IMRUReduceFunctionConfiguration<int[]>.ConfigurationModule - .Set(IMRUReduceFunctionConfiguration<int[]>.ReduceFunction, - GenericType<IntArraySumReduceFunction>.Class) - .Build()) - .SetMapInputPipelineDataConverterConfiguration(dataConverterConfig1) - .SetMapOutputPipelineDataConverterConfiguration(dataConverterConfig2) - .SetPartitionedDatasetConfiguration( - RandomInputDataConfiguration.ConfigurationModule.Set(RandomInputDataConfiguration.NumberOfPartitions, - numberofMappers.ToString()).Build()) - .SetJobName("BroadcastReduce") - .SetNumberOfMappers(numberofMappers) - .SetMapperMemory(mapperMemory) - .SetMaxRetryNumberInRecovery(maxRetryNumberInRecovery) - .SetUpdateTaskMemory(updateTaskMemory) - .Build()); + /// <summary> + /// Configuration for Update Function + /// </summary> + protected static IConfiguration UpdateFunctionConfig(int numberofMappers, int numIterations, int dim) + { + var updateFunctionConfig = + TangFactory.GetTang().NewConfigurationBuilder(IMRUUpdateConfiguration<int[], int[], int[]>.ConfigurationModule + .Set(IMRUUpdateConfiguration<int[], int[], int[]>.UpdateFunction, + GenericType<BroadcastSenderReduceReceiverUpdateFunction>.Class).Build()) + .BindNamedParameter(typeof(BroadcastReduceConfiguration.NumberOfIterations), + numIterations.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter(typeof(BroadcastReduceConfiguration.Dimensions), + dim.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter(typeof(BroadcastReduceConfiguration.NumWorkers), + numberofMappers.ToString(CultureInfo.InvariantCulture)) + .Build(); + return updateFunctionConfig; } - protected virtual IConfiguration BuildMapperFunctionConfig(int maxRetryInRecovery) + /// <summary> + /// Configuration for Mapper function + /// </summary> + private static IConfiguration BuildMapperFunctionConfig() { return IMRUMapConfiguration<int[], int[]>.ConfigurationModule .Set(IMRUMapConfiguration<int[], int[]>.MapFunction, GenericType<BroadcastReceiverReduceSenderMapFunction>.Class) .Build(); } - - internal void Run(int v, int chunkSize, int iterations, int dims, int mapperMemory, int updateTaskMemory) - { - throw new NotImplementedException(); - } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/9b034dd3/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs index d002f2d..d8b7bb8 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs +++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs @@ -65,7 +65,8 @@ namespace Org.Apache.REEF.IMRU.Examples int iterations = 100; int mapperMemory = 512; int updateTaskMemory = 512; - int maxRetryNumberInRecovery = 2; + int maxRetryNumberInRecovery = 2; + int totalNumberOfForcedFailures = 2; if (args.Length > 0) { @@ -97,6 +98,11 @@ namespace Org.Apache.REEF.IMRU.Examples maxRetryNumberInRecovery = Convert.ToInt32(args[5]); } + if (args.Length > 6) + { + totalNumberOfForcedFailures = Convert.ToInt32(args[6]); + } + IInjector injector; if (!runOnYarn) @@ -114,12 +120,12 @@ namespace Org.Apache.REEF.IMRU.Examples if (faultTolerant) { var broadcastReduceFtExample = injector.GetInstance<FaultTolerantPipelinedBroadcastAndReduce>(); - broadcastReduceFtExample.Run(numNodes - 1, chunkSize, iterations, dims, mapperMemory, updateTaskMemory, maxRetryNumberInRecovery); + broadcastReduceFtExample.Run(numNodes - 1, chunkSize, iterations, dims, mapperMemory, updateTaskMemory, maxRetryNumberInRecovery, totalNumberOfForcedFailures); } else { var broadcastReduceExample = injector.GetInstance<PipelinedBroadcastAndReduce>(); - broadcastReduceExample.Run(numNodes - 1, chunkSize, iterations, dims, mapperMemory, updateTaskMemory, maxRetryNumberInRecovery); + broadcastReduceExample.Run(numNodes - 1, chunkSize, iterations, dims, mapperMemory, updateTaskMemory); } } @@ -127,8 +133,8 @@ namespace Org.Apache.REEF.IMRU.Examples /// Run IMRU examples from command line /// </summary> /// Sample command line: - /// .\Org.Apache.REEF.IMRU.Examples.exe true 500 8900 1000 broadcastandreduce 20000000 1000000 1024 1024 10 2 - /// .\Org.Apache.REEF.IMRU.Examples.exe true 500 8900 1000 broadcastandreduceft 20000000 1000000 1024 1024 100 2 + /// .\Org.Apache.REEF.IMRU.Examples.exe true 500 8900 1000 broadcastandreduce 20000000 1000000 1024 1024 10 + /// .\Org.Apache.REEF.IMRU.Examples.exe true 500 8900 1000 broadcastandreduceft 20000000 1000000 1024 1024 100 5 2 /// <param name="args"></param> private static void Main(string[] args) { http://git-wip-us.apache.org/repos/asf/reef/blob/9b034dd3/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs index 63126e8..1b44663 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs @@ -70,7 +70,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU .Set(TcpPortConfigurationModule.PortRangeCount, "1000") .Build(); - string[] args = { "10", "2", "512", "512", "100", NumOfRetry.ToString() }; + string[] args = { "10", "2", "512", "512", "100", NumOfRetry.ToString(), NumOfRetry.ToString() }; Run.RunBroadcastReduceTest(tcpPortConfig, runOnYarn, NumNodes, faultTolerant, args, testFolder); } } http://git-wip-us.apache.org/repos/asf/reef/blob/9b034dd3/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs index af02405..a172908 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs @@ -139,6 +139,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-") .BindIntNamedParam<FailureType>(FailureType.EvaluatorFailureDuringTaskExecution.ToString()) .BindNamedParameter(typeof(MaxRetryNumberInRecovery), NumberOfRetry.ToString()) + .BindNamedParameter(typeof(FaultTolerantPipelinedBroadcastAndReduce.TotalNumberOfForcedFailures), NumberOfRetry.ToString()) .Build(); return Configurations.Merge(c1, c2, GetTcpConfiguration()); http://git-wip-us.apache.org/repos/asf/reef/blob/9b034dd3/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs index 6fecb2c..5de2a6f 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs @@ -16,6 +16,7 @@ // under the License. using Org.Apache.REEF.IMRU.API; +using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce; using Org.Apache.REEF.IMRU.OnREEF.Parameters; using TaskIdsToFail = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail; using FailureType = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.FailureType; @@ -80,6 +81,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-") .BindIntNamedParam<FailureType>(FailureType.EvaluatorFailureDuringTaskInitialization.ToString()) .BindNamedParameter(typeof(MaxRetryNumberInRecovery), NumberOfRetry.ToString()) + .BindNamedParameter(typeof(FaultTolerantPipelinedBroadcastAndReduce.TotalNumberOfForcedFailures), NumberOfRetry.ToString()) .Build(); } } http://git-wip-us.apache.org/repos/asf/reef/blob/9b034dd3/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs index dc998fc..5e38968 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs @@ -16,6 +16,7 @@ // under the License. using Org.Apache.REEF.IMRU.API; +using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce; using TaskIdsToFail = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail; using FailureType = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.FailureType; using TestSenderMapFunction = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TestSenderMapFunction; @@ -60,7 +61,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU var failedTaskCount = GetMessageCount(lines, FailedTaskMessage); // on each try each task should fail or complete - // there shoould be no failed evaluators + // there should be no failed evaluators // and on each try all tasks should start successfully Assert.Equal((NumberOfRetry + 1) * numTasks, completedTaskCount + failedTaskCount); Assert.Equal(0, failedEvaluatorCount); @@ -84,6 +85,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-") .BindIntNamedParam<FailureType>(FailureType.TaskFailureDuringTaskExecution.ToString()) .BindNamedParameter(typeof(MaxRetryNumberInRecovery), NumberOfRetry.ToString()) + .BindNamedParameter(typeof(FaultTolerantPipelinedBroadcastAndReduce.TotalNumberOfForcedFailures), NumberOfRetry.ToString()) .Build(); } }
