Repository: reef
Updated Branches:
  refs/heads/master 3cc39310b -> 9b034dd3c


[REEF-1556] Add number of forced failures for IMRU fault tolerant testing

* Refactor PipelinedBroadcastAndReduce in IMRU example
  so that it can be reused by fault tolerant.
* Update FaultTolerantPipelinedBroadcastAndReduce to allow configuring
  the number of forced failures done before successful run.
* Update test cases.

JIRA:
  [REEF-1556](https://issues.apache.org/jira/browse/REEF-1556)

Pull request:
  This closes #1114


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/9b034dd3
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/9b034dd3
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/9b034dd3

Branch: refs/heads/master
Commit: 9b034dd3c11b3a9854ad7967ad676e6853a01e1d
Parents: 3cc3931
Author: Julia Wang <[email protected]>
Authored: Wed Sep 7 15:35:05 2016 -0700
Committer: Mariia Mykhailova <[email protected]>
Committed: Fri Sep 9 12:53:59 2016 -0700

----------------------------------------------------------------------
 .../FaultTolerantPipelinedBroadcastAndReduce.cs |  70 +++++++--
 .../PipelinedBroadcastAndReduce.cs              | 147 +++++++++++++------
 lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs    |  16 +-
 .../Functional/IMRU/IMRUBroadcastReduceTest.cs  |   2 +-
 .../Functional/IMRU/TestFailMapperEvaluators.cs |   1 +
 .../IMRU/TestFailMapperEvaluatorsOnInit.cs      |   2 +
 .../Functional/IMRU/TestFailMapperTasks.cs      |   4 +-
 7 files changed, 172 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/9b034dd3/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs
 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs
index 672389c..f0e563d 100644
--- 
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs
+++ 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs
@@ -7,7 +7,7 @@
 // with the License.  You may obtain a copy of the License at
 //
 //   http://www.apache.org/licenses/LICENSE-2.0
-//
+// 
 // Unless required by applicable law or agreed to in writing,
 // software distributed under the License is distributed on an
 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -17,6 +17,7 @@
 
 using System;
 using System.Collections.Generic;
+using System.IO;
 using System.Linq;
 using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.IMRU.API;
@@ -34,21 +35,33 @@ namespace 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
     /// <summary>
     /// IMRU program that performs broadcast and reduce with fault tolerance.
     /// </summary>
-    public class FaultTolerantPipelinedBroadcastAndReduce : 
PipelinedBroadcastAndReduce
+    public sealed class FaultTolerantPipelinedBroadcastAndReduce : 
PipelinedBroadcastAndReduce
     {
         private static readonly Logger Logger = 
Logger.GetLogger(typeof(FaultTolerantPipelinedBroadcastAndReduce));
 
         [Inject]
-        protected FaultTolerantPipelinedBroadcastAndReduce(IIMRUClient 
imruClient) : base(imruClient)
+        private FaultTolerantPipelinedBroadcastAndReduce(IIMRUClient 
imruClient) : base(imruClient)
+        {
+        }
+
+        /// <summary>
+        /// Runs the actual broadcast and reduce job with fault tolerance
+        /// </summary>
+        internal void Run(int numberofMappers, int chunkSize, int 
numIterations, int dim, int mapperMemory, int updateTaskMemory, int 
maxRetryNumberInRecovery, int totalNumberOfForcedFailures)
         {
+            var results = _imruClient.Submit<int[], int[], int[], Stream>(
+                CreateJobDefinitionBuilder(numberofMappers, chunkSize, 
numIterations, dim, mapperMemory, updateTaskMemory)
+                    
.SetMapFunctionConfiguration(BuildMapperFunctionConfig(maxRetryNumberInRecovery,
 totalNumberOfForcedFailures))
+                    .SetMaxRetryNumberInRecovery(maxRetryNumberInRecovery)
+                    .Build());
         }
-        
+
         /// <summary>
         /// Build a test mapper function configuration
         /// </summary>
         /// <param name="maxRetryInRecovery">Number of retries done if first 
run failed.</param>
-        /// <returns></returns>
-        protected override IConfiguration BuildMapperFunctionConfig(int 
maxRetryInRecovery)
+        /// <param name="totalNumberOfForcedFailures">Number of forced failure 
times in recovery.</param>
+        private static IConfiguration BuildMapperFunctionConfig(int 
maxRetryInRecovery, int totalNumberOfForcedFailures)
         {
             var c1 = IMRUMapConfiguration<int[], int[]>.ConfigurationModule
                 .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
@@ -60,18 +73,19 @@ namespace 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
                 .BindSetEntry<TaskIdsToFail, 
string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-")
                 
.BindIntNamedParam<FailureType>(FailureType.EvaluatorFailureDuringTaskExecution.ToString())
                 .BindNamedParameter(typeof(MaxRetryNumberInRecovery), 
maxRetryInRecovery.ToString())
+                .BindNamedParameter(typeof(TotalNumberOfForcedFailures), 
totalNumberOfForcedFailures.ToString())
                 .Build();
 
             return Configurations.Merge(c1, c2);
         }
 
         [NamedParameter(Documentation = "Set of task ids which will produce 
task/evaluator failure")]
-        public class TaskIdsToFail : Name<ISet<string>>
+        internal class TaskIdsToFail : Name<ISet<string>>
         {
         }
 
         [NamedParameter(Documentation = "Type of failure to simulate")]
-        public class FailureType : Name<int>
+        internal class FailureType : Name<int>
         {
             internal static readonly int EvaluatorFailureDuringTaskExecution = 
0;
             internal static readonly int TaskFailureDuringTaskExecution = 1;
@@ -85,35 +99,54 @@ namespace 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
             }
         }
 
+        [NamedParameter(Documentation = "Total number of failures in 
recovery.", DefaultValue = "2")]
+        internal class TotalNumberOfForcedFailures : Name<int>
+        {
+        }
+
         /// <summary>
         /// The function is to simulate Evaluator/Task failure for mapper 
evaluator
         /// </summary>
-        public sealed class TestSenderMapFunction : IMapFunction<int[], int[]>
+        internal sealed class TestSenderMapFunction : IMapFunction<int[], 
int[]>
         {
             private int _iterations;
             private readonly string _taskId;
             private readonly ISet<string> _taskIdsToFail;
-            private int _failureType;
+            private readonly int _failureType;
             private readonly int _maxRetryInRecovery;
+            private readonly int _totalNumberOfForcedFailures;
+            private readonly int _retryIndex;
 
             [Inject]
             private TestSenderMapFunction(
                 [Parameter(typeof(TaskConfigurationOptions.Identifier))] 
string taskId,
                 [Parameter(typeof(TaskIdsToFail))] ISet<string> taskIdsToFail,
                 [Parameter(typeof(FailureType))] int failureType,
-                [Parameter(typeof(MaxRetryNumberInRecovery))] int 
maxRetryNumberInRecovery)
+                [Parameter(typeof(MaxRetryNumberInRecovery))] int 
maxRetryNumberInRecovery,
+                [Parameter(typeof(TotalNumberOfForcedFailures))] int 
totalNumberOfForcedFailures)
             {
                 _taskId = taskId;
                 _taskIdsToFail = taskIdsToFail;
                 _failureType = failureType;
                 _maxRetryInRecovery = maxRetryNumberInRecovery;
-                Logger.Log(Level.Info, "TestSenderMapFunction: TaskId: {0}, 
_maxRetryInRecovery {1},  Failure type: {2}.", _taskId, _maxRetryInRecovery, 
_failureType);
+                _totalNumberOfForcedFailures = totalNumberOfForcedFailures;
+
+                var taskIdSplit = taskId.Split('-');
+                _retryIndex = int.Parse(taskIdSplit[taskIdSplit.Length - 1]);
+
+                Logger.Log(Level.Info,
+                    "TestSenderMapFunction: TaskId: {0}, _maxRetryInRecovery 
{1}, totalNumberOfForcedFailures: {2}, RetryNumber: {3}, Failure type: {4}.",
+                    _taskId,
+                    _maxRetryInRecovery,
+                    _totalNumberOfForcedFailures,
+                    _retryIndex,
+                    _failureType);
                 foreach (var n in _taskIdsToFail)
                 {
                     Logger.Log(Level.Info, "TestSenderMapFunction: 
taskIdsToFail: {0}", n);
                 }
 
-                if (_failureType == 
FailureType.EvaluatorFailureDuringTaskInitialization || 
+                if (_failureType == 
FailureType.EvaluatorFailureDuringTaskInitialization ||
                     _failureType == 
FailureType.TaskFailureDuringTaskInitialization)
                 {
                     SimulateFailure(0);
@@ -138,7 +171,10 @@ namespace 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
 
                 if (mapInput[0] != _iterations)
                 {
-                    Exceptions.Throw(new Exception("Expected value in mappers 
(" + _iterations + ") different from actual value (" + mapInput[0] + ")"), 
Logger);
+                    Exceptions.Throw(
+                        new Exception("Expected value in mappers (" + 
_iterations + ") different from actual value (" +
+                                      mapInput[0] + ")"),
+                        Logger);
                 }
 
                 return mapInput;
@@ -148,9 +184,11 @@ namespace 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
             {
                 if (_iterations == onIteration &&
                     _taskIdsToFail.FirstOrDefault(e => _taskId.StartsWith(e)) 
!= null &&
-                    _taskIdsToFail.FirstOrDefault(e => _taskId.Equals(e + 
_maxRetryInRecovery)) == null)
+                    _taskIdsToFail.FirstOrDefault(e => _taskId.Equals(e + 
_maxRetryInRecovery)) == null &&
+                    _retryIndex < _totalNumberOfForcedFailures)
                 {
-                    Logger.Log(Level.Warning, "Simulating {0} failure for 
taskId {1}",
+                    Logger.Log(Level.Warning,
+                        "Simulating {0} failure for taskId {1}",
                         FailureType.IsEvaluatorFailure(_failureType) ? 
"evaluator" : "task",
                         _taskId);
                     if (FailureType.IsEvaluatorFailure(_failureType))

http://git-wip-us.apache.org/repos/asf/reef/blob/9b034dd3/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
index 01c6daa..f061a44 100644
--- 
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
+++ 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-using System;
 using System.Globalization;
 using System.IO;
 using Org.Apache.REEF.IMRU.API;
@@ -33,7 +32,7 @@ namespace 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
     /// </summary>
     public class PipelinedBroadcastAndReduce
     {
-        private readonly IIMRUClient _imruClient;
+        protected readonly IIMRUClient _imruClient;
 
         [Inject]
         protected PipelinedBroadcastAndReduce(IIMRUClient imruClient)
@@ -44,21 +43,76 @@ namespace 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
         /// <summary>
         /// Runs the actual broadcast and reduce job
         /// </summary>
-        public void Run(int numberofMappers, int chunkSize, int numIterations, 
int dim, int mapperMemory, int updateTaskMemory, int maxRetryNumberInRecovery)
+        internal void Run(int numberofMappers, int chunkSize, int 
numIterations, int dim, int mapperMemory, int updateTaskMemory)
         {
-            var updateFunctionConfig =
-                
TangFactory.GetTang().NewConfigurationBuilder(IMRUUpdateConfiguration<int[], 
int[], int[]>.ConfigurationModule
-                    .Set(IMRUUpdateConfiguration<int[], int[], 
int[]>.UpdateFunction,
-                        
GenericType<BroadcastSenderReduceReceiverUpdateFunction>.Class).Build())
-                    
.BindNamedParameter(typeof(BroadcastReduceConfiguration.NumberOfIterations),
-                        numIterations.ToString(CultureInfo.InvariantCulture))
-                    
.BindNamedParameter(typeof(BroadcastReduceConfiguration.Dimensions),
-                        dim.ToString(CultureInfo.InvariantCulture))
-                    
.BindNamedParameter(typeof(BroadcastReduceConfiguration.NumWorkers),
-                        numberofMappers.ToString(CultureInfo.InvariantCulture))
-                    .Build();
+            var results = _imruClient.Submit<int[], int[], int[], Stream>(
+                CreateJobDefinitionBuilder(numberofMappers, chunkSize, 
numIterations, dim, mapperMemory, updateTaskMemory)
+                    .SetMapFunctionConfiguration(BuildMapperFunctionConfig())
+                    .Build());
+        }
 
-            var dataConverterConfig1 =
+        protected IMRUJobDefinitionBuilder CreateJobDefinitionBuilder(int 
numberofMappers, int chunkSize, int numIterations, int dim, int mapperMemory, 
int updateTaskMemory)
+        {
+            return new IMRUJobDefinitionBuilder()
+                    
.SetUpdateFunctionConfiguration(UpdateFunctionConfig(numberofMappers, 
numIterations, dim))
+                    
.SetMapInputCodecConfiguration(MapInputCodecConfiguration())
+                    
.SetUpdateFunctionCodecsConfiguration(UpdateFunctionCodecsConfiguration())
+                    
.SetReduceFunctionConfiguration(ReduceFunctionConfiguration())
+                    
.SetMapInputPipelineDataConverterConfiguration(MapInputDataConverterConfig(chunkSize))
+                    
.SetMapOutputPipelineDataConverterConfiguration(MapOutputDataConverterConfig(chunkSize))
+                    
.SetPartitionedDatasetConfiguration(PartitionedDatasetConfiguration(numberofMappers))
+                    .SetJobName("BroadcastReduce")
+                    .SetNumberOfMappers(numberofMappers)
+                    .SetMapperMemory(mapperMemory)
+                    .SetUpdateTaskMemory(updateTaskMemory);
+        }
+
+        /// <summary>
+        /// Configuration for Partitioned Dataset
+        /// </summary>
+        protected static IConfiguration PartitionedDatasetConfiguration(int 
numberofMappers)
+        {
+            return 
RandomInputDataConfiguration.ConfigurationModule.Set(RandomInputDataConfiguration.NumberOfPartitions,
+                numberofMappers.ToString()).Build();
+        }
+
+        /// <summary>
+        /// Configuration for Reduce Function
+        /// </summary>
+        protected static IConfiguration ReduceFunctionConfiguration()
+        {
+            return IMRUReduceFunctionConfiguration<int[]>.ConfigurationModule
+                .Set(IMRUReduceFunctionConfiguration<int[]>.ReduceFunction,
+                    GenericType<IntArraySumReduceFunction>.Class)
+                .Build();
+        }
+
+        /// <summary>
+        /// Configuration for Update Function
+        /// </summary>
+        protected static IConfiguration UpdateFunctionCodecsConfiguration()
+        {
+            return IMRUCodecConfiguration<int[]>.ConfigurationModule
+                .Set(IMRUCodecConfiguration<int[]>.Codec, 
GenericType<IntArrayStreamingCodec>.Class)
+                .Build();
+        }
+
+        /// <summary>
+        /// Configuration for Map Input Codec
+        /// </summary>
+        protected static IConfiguration MapInputCodecConfiguration()
+        {
+            return IMRUCodecConfiguration<int[]>.ConfigurationModule
+                .Set(IMRUCodecConfiguration<int[]>.Codec, 
GenericType<IntArrayStreamingCodec>.Class)
+                .Build();
+        }
+
+        /// <summary>
+        /// Configuration for Map Output Data Converter
+        /// </summary>
+        protected static IConfiguration MapOutputDataConverterConfig(int 
chunkSize)
+        {
+            var dataConverterConfig2 =
                 TangFactory.GetTang()
                     
.NewConfigurationBuilder(IMRUPipelineDataConverterConfiguration<int[]>.ConfigurationModule
                         
.Set(IMRUPipelineDataConverterConfiguration<int[]>.MapInputPiplelineDataConverter,
@@ -66,8 +120,15 @@ namespace 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
                     
.BindNamedParameter(typeof(BroadcastReduceConfiguration.ChunkSize),
                         chunkSize.ToString(CultureInfo.InvariantCulture))
                     .Build();
+            return dataConverterConfig2;
+        }
 
-            var dataConverterConfig2 =
+        /// <summary>
+        /// Configuration for Map Input Data Converter
+        /// </summary>
+        protected static IConfiguration MapInputDataConverterConfig(int 
chunkSize)
+        {
+            var dataConverterConfig1 =
                 TangFactory.GetTang()
                     
.NewConfigurationBuilder(IMRUPipelineDataConverterConfiguration<int[]>.ConfigurationModule
                         
.Set(IMRUPipelineDataConverterConfiguration<int[]>.MapInputPiplelineDataConverter,
@@ -75,45 +136,37 @@ namespace 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
                     
.BindNamedParameter(typeof(BroadcastReduceConfiguration.ChunkSize),
                         chunkSize.ToString(CultureInfo.InvariantCulture))
                     .Build();
+            return dataConverterConfig1;
+        }
 
-            var results = _imruClient.Submit<int[], int[], int[], Stream>(
-                new IMRUJobDefinitionBuilder()
-                    
.SetMapFunctionConfiguration(BuildMapperFunctionConfig(maxRetryNumberInRecovery))
-                    .SetUpdateFunctionConfiguration(updateFunctionConfig)
-                    
.SetMapInputCodecConfiguration(IMRUCodecConfiguration<int[]>.ConfigurationModule
-                        .Set(IMRUCodecConfiguration<int[]>.Codec, 
GenericType<IntArrayStreamingCodec>.Class)
-                        .Build())
-                    
.SetUpdateFunctionCodecsConfiguration(IMRUCodecConfiguration<int[]>.ConfigurationModule
-                        .Set(IMRUCodecConfiguration<int[]>.Codec, 
GenericType<IntArrayStreamingCodec>.Class)
-                        .Build())
-                    
.SetReduceFunctionConfiguration(IMRUReduceFunctionConfiguration<int[]>.ConfigurationModule
-                        
.Set(IMRUReduceFunctionConfiguration<int[]>.ReduceFunction,
-                            GenericType<IntArraySumReduceFunction>.Class)
-                        .Build())
-                    
.SetMapInputPipelineDataConverterConfiguration(dataConverterConfig1)
-                    
.SetMapOutputPipelineDataConverterConfiguration(dataConverterConfig2)
-                    .SetPartitionedDatasetConfiguration(
-                        
RandomInputDataConfiguration.ConfigurationModule.Set(RandomInputDataConfiguration.NumberOfPartitions,
-                            numberofMappers.ToString()).Build())
-                    .SetJobName("BroadcastReduce")
-                    .SetNumberOfMappers(numberofMappers)
-                    .SetMapperMemory(mapperMemory)
-                    .SetMaxRetryNumberInRecovery(maxRetryNumberInRecovery)
-                    .SetUpdateTaskMemory(updateTaskMemory)
-                    .Build());
+        /// <summary>
+        /// Configuration for Update Function
+        /// </summary>
+        protected static IConfiguration UpdateFunctionConfig(int 
numberofMappers, int numIterations, int dim)
+        {
+            var updateFunctionConfig =
+                
TangFactory.GetTang().NewConfigurationBuilder(IMRUUpdateConfiguration<int[], 
int[], int[]>.ConfigurationModule
+                    .Set(IMRUUpdateConfiguration<int[], int[], 
int[]>.UpdateFunction,
+                        
GenericType<BroadcastSenderReduceReceiverUpdateFunction>.Class).Build())
+                    
.BindNamedParameter(typeof(BroadcastReduceConfiguration.NumberOfIterations),
+                        numIterations.ToString(CultureInfo.InvariantCulture))
+                    
.BindNamedParameter(typeof(BroadcastReduceConfiguration.Dimensions),
+                        dim.ToString(CultureInfo.InvariantCulture))
+                    
.BindNamedParameter(typeof(BroadcastReduceConfiguration.NumWorkers),
+                        numberofMappers.ToString(CultureInfo.InvariantCulture))
+                    .Build();
+            return updateFunctionConfig;
         }
 
-        protected virtual IConfiguration BuildMapperFunctionConfig(int 
maxRetryInRecovery)
+        /// <summary>
+        /// Configuration for Mapper function
+        /// </summary>
+        private static IConfiguration BuildMapperFunctionConfig()
         {
             return IMRUMapConfiguration<int[], int[]>.ConfigurationModule
                 .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
                     
GenericType<BroadcastReceiverReduceSenderMapFunction>.Class)
                 .Build();
         }
-
-        internal void Run(int v, int chunkSize, int iterations, int dims, int 
mapperMemory, int updateTaskMemory)
-        {
-            throw new NotImplementedException();
-        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/9b034dd3/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
index d002f2d..d8b7bb8 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
@@ -65,7 +65,8 @@ namespace Org.Apache.REEF.IMRU.Examples
             int iterations = 100;
             int mapperMemory = 512;
             int updateTaskMemory = 512;
-            int maxRetryNumberInRecovery = 2;         
+            int maxRetryNumberInRecovery = 2;
+            int totalNumberOfForcedFailures = 2;
 
             if (args.Length > 0)
             {
@@ -97,6 +98,11 @@ namespace Org.Apache.REEF.IMRU.Examples
                 maxRetryNumberInRecovery = Convert.ToInt32(args[5]);
             }
 
+            if (args.Length > 6)
+            {
+                totalNumberOfForcedFailures = Convert.ToInt32(args[6]);
+            }
+
             IInjector injector;
 
             if (!runOnYarn)
@@ -114,12 +120,12 @@ namespace Org.Apache.REEF.IMRU.Examples
             if (faultTolerant)
             {
                 var broadcastReduceFtExample = 
injector.GetInstance<FaultTolerantPipelinedBroadcastAndReduce>();
-                broadcastReduceFtExample.Run(numNodes - 1, chunkSize, 
iterations, dims, mapperMemory, updateTaskMemory, maxRetryNumberInRecovery);
+                broadcastReduceFtExample.Run(numNodes - 1, chunkSize, 
iterations, dims, mapperMemory, updateTaskMemory, maxRetryNumberInRecovery, 
totalNumberOfForcedFailures);
             }
             else
             {
                 var broadcastReduceExample = 
injector.GetInstance<PipelinedBroadcastAndReduce>();
-                broadcastReduceExample.Run(numNodes - 1, chunkSize, 
iterations, dims, mapperMemory, updateTaskMemory, maxRetryNumberInRecovery);
+                broadcastReduceExample.Run(numNodes - 1, chunkSize, 
iterations, dims, mapperMemory, updateTaskMemory);
             }
         }
 
@@ -127,8 +133,8 @@ namespace Org.Apache.REEF.IMRU.Examples
         /// Run IMRU examples from command line
         /// </summary>
         /// Sample command line:  
-        /// .\Org.Apache.REEF.IMRU.Examples.exe true 500 8900 1000 
broadcastandreduce 20000000 1000000 1024 1024 10 2
-        /// .\Org.Apache.REEF.IMRU.Examples.exe true 500 8900 1000 
broadcastandreduceft 20000000 1000000 1024 1024 100 2
+        /// .\Org.Apache.REEF.IMRU.Examples.exe true 500 8900 1000 
broadcastandreduce 20000000 1000000 1024 1024 10
+        /// .\Org.Apache.REEF.IMRU.Examples.exe true 500 8900 1000 
broadcastandreduceft 20000000 1000000 1024 1024 100 5 2
         /// <param name="args"></param>
         private static void Main(string[] args)
         {

http://git-wip-us.apache.org/repos/asf/reef/blob/9b034dd3/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs
index 63126e8..1b44663 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs
@@ -70,7 +70,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
                 .Set(TcpPortConfigurationModule.PortRangeCount, "1000")
                 .Build();
 
-            string[] args = { "10", "2", "512", "512", "100", 
NumOfRetry.ToString() };
+            string[] args = { "10", "2", "512", "512", "100", 
NumOfRetry.ToString(), NumOfRetry.ToString() };
             Run.RunBroadcastReduceTest(tcpPortConfig, runOnYarn, NumNodes, 
faultTolerant, args, testFolder);
         }
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/9b034dd3/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
index af02405..a172908 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
@@ -139,6 +139,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
                 .BindSetEntry<TaskIdsToFail, 
string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-")
                 
.BindIntNamedParam<FailureType>(FailureType.EvaluatorFailureDuringTaskExecution.ToString())
                 .BindNamedParameter(typeof(MaxRetryNumberInRecovery), 
NumberOfRetry.ToString())
+                
.BindNamedParameter(typeof(FaultTolerantPipelinedBroadcastAndReduce.TotalNumberOfForcedFailures),
 NumberOfRetry.ToString())
                 .Build();
 
             return Configurations.Merge(c1, c2, GetTcpConfiguration());

http://git-wip-us.apache.org/repos/asf/reef/blob/9b034dd3/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
index 6fecb2c..5de2a6f 100644
--- 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce;
 using Org.Apache.REEF.IMRU.OnREEF.Parameters;
 using TaskIdsToFail = 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail;
 using FailureType = 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.FailureType;
@@ -80,6 +81,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
                 .BindSetEntry<TaskIdsToFail, 
string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-")
                 
.BindIntNamedParam<FailureType>(FailureType.EvaluatorFailureDuringTaskInitialization.ToString())
                 .BindNamedParameter(typeof(MaxRetryNumberInRecovery), 
NumberOfRetry.ToString())
+                
.BindNamedParameter(typeof(FaultTolerantPipelinedBroadcastAndReduce.TotalNumberOfForcedFailures),
 NumberOfRetry.ToString())
                 .Build();
         }
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/9b034dd3/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs
index dc998fc..5e38968 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce;
 using TaskIdsToFail = 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail;
 using FailureType = 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.FailureType;
 using TestSenderMapFunction = 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TestSenderMapFunction;
@@ -60,7 +61,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             var failedTaskCount = GetMessageCount(lines, FailedTaskMessage);
 
             // on each try each task should fail or complete
-            // there shoould be no failed evaluators
+            // there should be no failed evaluators
             // and on each try all tasks should start successfully
             Assert.Equal((NumberOfRetry + 1) * numTasks, completedTaskCount + 
failedTaskCount);
             Assert.Equal(0, failedEvaluatorCount);
@@ -84,6 +85,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
                 .BindSetEntry<TaskIdsToFail, 
string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-")
                 
.BindIntNamedParam<FailureType>(FailureType.TaskFailureDuringTaskExecution.ToString())
                 .BindNamedParameter(typeof(MaxRetryNumberInRecovery), 
NumberOfRetry.ToString())
+                
.BindNamedParameter(typeof(FaultTolerantPipelinedBroadcastAndReduce.TotalNumberOfForcedFailures),
 NumberOfRetry.ToString())
                 .Build();
         }
     }

Reply via email to