[REEF-1404] Move IMRU Task state to Context
* Define ITaskState interface in IMRU and default implementation
* Define MapTaskState and UpdateTaskState in IMRU example
* Modify IMRU JobDefinition and IMRUJobDefinitionBuilder to all the task state
to be configured
* Modify ConfigurationManager for managing updateTaskStateConfiguration and
mapTaskStateConfiguration
* Modify ServiceAndContextConfigurationProvider to set the service and state
configurations
* Add TaskStateService to bind it to context for holding ITaskState instance
* Change Update and map function in IMRU example and test to save and restore
the state
JIRA:
[REEF-1404](https://issues.apache.org/jira/browse/REEF-1404)
Pull Request:
This closes #1131
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/f5107f0e
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/f5107f0e
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/f5107f0e
Branch: refs/heads/master
Commit: f5107f0ebd7553d1f3f0995a9472389e437b69e5
Parents: 59b766e
Author: Julia Wang <[email protected]>
Authored: Thu Oct 6 17:07:46 2016 -0700
Committer: Markus Weimer <[email protected]>
Committed: Thu Oct 6 17:56:31 2016 -0700
----------------------------------------------------------------------
.../Org.Apache.REEF.IMRU.Examples.csproj | 4 +-
.../FaultTolerantPipelinedBroadcastAndReduce.cs | 220 ----------
.../PipelinedBroadcastReduce/MapTaskState.cs | 41 ++
.../PipelinedBroadcastAndReduce.cs | 30 +-
...elinedBroadcastAndReduceWithFaultTolerant.cs | 402 +++++++++++++++++++
.../PipelinedBroadcastReduce/UpdateTaskState.cs | 50 +++
lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs | 2 +-
.../API/IMRUJobDefinition.cs | 24 ++
.../API/IMRUJobDefinitionBuilder.cs | 28 ++
.../OnREEF/Client/REEFIMRUClient.cs | 5 +
.../OnREEF/Driver/ConfigurationManager.cs | 40 ++
.../OnREEF/Driver/IMRUDriver.cs | 4 +-
.../ServiceAndContextConfigurationProvider.cs | 25 +-
.../OnREEF/Driver/TaskStateService.cs | 44 ++
.../OnREEF/IMRUTasks/ITaskState.cs | 27 ++
.../SerializedMapTaskStateConfiguration.cs | 26 ++
.../SerializedUpdateTaskStateConfiguration.cs | 26 ++
.../Org.Apache.REEF.IMRU.csproj | 4 +
.../IMRU/IMRUBrodcastReduceTestBase.cs | 45 ++-
.../Functional/IMRU/IMRUCloseTaskTest.cs | 4 +-
.../Functional/IMRU/TestFailMapperEvaluators.cs | 82 +++-
.../IMRU/TestFailMapperEvaluatorsOnDispose.cs | 11 +-
.../IMRU/TestFailMapperEvaluatorsOnInit.cs | 11 +-
.../Functional/IMRU/TestFailMapperTasks.cs | 12 +-
.../IMRU/TestFailMapperTasksOnDispose.cs | 11 +-
.../IMRU/TestFailMapperTasksOnInit.cs | 11 +-
.../Functional/IMRU/TestFailUpdateEvaluator.cs | 12 +-
27 files changed, 900 insertions(+), 301 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
b/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
index 3cb5fd3..ae2cb15 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
@@ -46,7 +46,9 @@ under the License.
<Compile Include="MapperCount\IdentityMapFunction.cs" />
<Compile Include="IntSumReduceFunction.cs" />
<Compile Include="MapperCount\MapperCount.cs" />
- <Compile
Include="PipelinedBroadcastReduce\FaultTolerantPipelinedBroadcastAndReduce.cs"
/>
+ <Compile
Include="PipelinedBroadcastReduce\PipelinedBroadcastAndReduceWithFaultTolerant.cs"
/>
+ <Compile Include="PipelinedBroadcastReduce\MapTaskState.cs" />
+ <Compile Include="PipelinedBroadcastReduce\UpdateTaskState.cs" />
<Compile Include="SingleIterUpdateFunction.cs" />
<Compile Include="NaturalSum\NaturalSum.cs" />
<Compile Include="NaturalSum\NaturalSumMapFunction.cs" />
http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs
deleted file mode 100644
index 1b7b15f..0000000
---
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs
+++ /dev/null
@@ -1,220 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-using System;
-using System.Collections.Generic;
-using System.IO;
-using System.Linq;
-using Org.Apache.REEF.Common.Tasks;
-using Org.Apache.REEF.IMRU.API;
-using Org.Apache.REEF.IMRU.OnREEF.Parameters;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Implementations.Configuration;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Utilities.Diagnostics;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
-{
- /// <summary>
- /// IMRU program that performs broadcast and reduce with fault tolerance.
- /// </summary>
- public sealed class FaultTolerantPipelinedBroadcastAndReduce :
PipelinedBroadcastAndReduce
- {
- private static readonly Logger Logger =
Logger.GetLogger(typeof(FaultTolerantPipelinedBroadcastAndReduce));
-
- [Inject]
- private FaultTolerantPipelinedBroadcastAndReduce(IIMRUClient
imruClient) : base(imruClient)
- {
- }
-
- /// <summary>
- /// Runs the actual broadcast and reduce job with fault tolerance
- /// </summary>
- internal void Run(int numberofMappers, int chunkSize, int
numIterations, int dim, int mapperMemory, int updateTaskMemory, int
maxRetryNumberInRecovery, int totalNumberOfForcedFailures)
- {
- var results = _imruClient.Submit<int[], int[], int[], Stream>(
- CreateJobDefinitionBuilder(numberofMappers, chunkSize,
numIterations, dim, mapperMemory, updateTaskMemory)
-
.SetMapFunctionConfiguration(BuildMapperFunctionConfig(maxRetryNumberInRecovery,
totalNumberOfForcedFailures))
- .SetMaxRetryNumberInRecovery(maxRetryNumberInRecovery)
- .Build());
- }
-
- /// <summary>
- /// Build a test mapper function configuration
- /// </summary>
- /// <param name="maxRetryInRecovery">Number of retries done if first
run failed.</param>
- /// <param name="totalNumberOfForcedFailures">Number of forced failure
times in recovery.</param>
- private static IConfiguration BuildMapperFunctionConfig(int
maxRetryInRecovery, int totalNumberOfForcedFailures)
- {
- var c1 = IMRUMapConfiguration<int[], int[]>.ConfigurationModule
- .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
- GenericType<TestSenderMapFunction>.Class)
- .Build();
-
- var c2 = TangFactory.GetTang().NewConfigurationBuilder()
- .BindSetEntry<TaskIdsToFail,
string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-2-")
- .BindSetEntry<TaskIdsToFail,
string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-")
-
.BindIntNamedParam<FailureType>(FailureType.EvaluatorFailureDuringTaskExecution.ToString())
- .BindNamedParameter(typeof(MaxRetryNumberInRecovery),
maxRetryInRecovery.ToString())
- .BindNamedParameter(typeof(TotalNumberOfForcedFailures),
totalNumberOfForcedFailures.ToString())
- .Build();
-
- return Configurations.Merge(c1, c2);
- }
-
- [NamedParameter(Documentation = "Set of task ids which will produce
task/evaluator failure")]
- internal class TaskIdsToFail : Name<ISet<string>>
- {
- }
-
- [NamedParameter(Documentation = "Type of failure to simulate")]
- internal class FailureType : Name<int>
- {
- internal const int EvaluatorFailureDuringTaskExecution = 0;
- internal const int TaskFailureDuringTaskExecution = 1;
- internal const int EvaluatorFailureDuringTaskInitialization = 2;
- internal const int TaskFailureDuringTaskInitialization = 3;
- internal const int EvaluatorFailureDuringTaskDispose = 4;
- internal const int TaskFailureDuringTaskDispose = 5;
-
- internal static bool IsEvaluatorFailure(int failureType)
- {
- return failureType == EvaluatorFailureDuringTaskExecution ||
- failureType == EvaluatorFailureDuringTaskInitialization
||
- failureType == EvaluatorFailureDuringTaskDispose;
- }
- }
-
- [NamedParameter(Documentation = "Total number of failures in
recovery.", DefaultValue = "2")]
- internal class TotalNumberOfForcedFailures : Name<int>
- {
- }
-
- /// <summary>
- /// The function is to simulate Evaluator/Task failure for mapper
evaluator
- /// </summary>
- internal sealed class TestSenderMapFunction : IMapFunction<int[],
int[]>, IDisposable
- {
- private int _iterations;
- private readonly string _taskId;
- private readonly ISet<string> _taskIdsToFail;
- private readonly int _failureType;
- private readonly int _maxRetryInRecovery;
- private readonly int _totalNumberOfForcedFailures;
- private readonly int _retryIndex;
-
- [Inject]
- private TestSenderMapFunction(
- [Parameter(typeof(TaskConfigurationOptions.Identifier))]
string taskId,
- [Parameter(typeof(TaskIdsToFail))] ISet<string> taskIdsToFail,
- [Parameter(typeof(FailureType))] int failureType,
- [Parameter(typeof(MaxRetryNumberInRecovery))] int
maxRetryNumberInRecovery,
- [Parameter(typeof(TotalNumberOfForcedFailures))] int
totalNumberOfForcedFailures)
- {
- _taskId = taskId;
- _taskIdsToFail = taskIdsToFail;
- _failureType = failureType;
- _maxRetryInRecovery = maxRetryNumberInRecovery;
- _totalNumberOfForcedFailures = totalNumberOfForcedFailures;
-
- var taskIdSplit = taskId.Split('-');
- _retryIndex = int.Parse(taskIdSplit[taskIdSplit.Length - 1]);
-
- Logger.Log(Level.Info,
- "TestSenderMapFunction: TaskId: {0}, _maxRetryInRecovery
{1}, totalNumberOfForcedFailures: {2}, RetryNumber: {3}, Failure type: {4}.",
- _taskId,
- _maxRetryInRecovery,
- _totalNumberOfForcedFailures,
- _retryIndex,
- _failureType);
- foreach (var n in _taskIdsToFail)
- {
- Logger.Log(Level.Info, "TestSenderMapFunction:
taskIdsToFail: {0}", n);
- }
-
- if (_failureType ==
FailureType.EvaluatorFailureDuringTaskInitialization ||
- _failureType ==
FailureType.TaskFailureDuringTaskInitialization)
- {
- SimulateFailure(0);
- }
- }
-
- /// <summary>
- /// Map function
- /// </summary>
- /// <param name="mapInput">integer array</param>
- /// <returns>The same integer array</returns>
- int[] IMapFunction<int[], int[]>.Map(int[] mapInput)
- {
- _iterations++;
- Logger.Log(Level.Info, "Received value {0} in iteration {1}.",
mapInput[0], _iterations);
-
- if (_failureType ==
FailureType.EvaluatorFailureDuringTaskExecution ||
- _failureType == FailureType.TaskFailureDuringTaskExecution)
- {
- SimulateFailure(10);
- }
-
- if (mapInput[0] != _iterations)
- {
- Exceptions.Throw(
- new Exception("Expected value in mappers (" +
_iterations + ") different from actual value (" +
- mapInput[0] + ")"),
- Logger);
- }
-
- return mapInput;
- }
-
- public void Dispose()
- {
- if (_failureType ==
FailureType.EvaluatorFailureDuringTaskDispose ||
- _failureType == FailureType.TaskFailureDuringTaskDispose)
- {
- SimulateFailure(_iterations);
- }
- }
-
- private void SimulateFailure(int onIteration)
- {
- if (_iterations == onIteration &&
- _taskIdsToFail.FirstOrDefault(e => _taskId.StartsWith(e))
!= null &&
- _taskIdsToFail.FirstOrDefault(e => _taskId.Equals(e +
_maxRetryInRecovery)) == null &&
- _retryIndex < _totalNumberOfForcedFailures)
- {
- Logger.Log(Level.Warning,
- "Simulating {0} failure for taskId {1}",
- FailureType.IsEvaluatorFailure(_failureType) ?
"evaluator" : "task",
- _taskId);
- if (FailureType.IsEvaluatorFailure(_failureType))
- {
- // simulate evaluator failure
- Environment.Exit(1);
- }
- else
- {
- // simulate task failure
- throw new ArgumentNullException("Simulating task
failure");
- }
- }
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/MapTaskState.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/MapTaskState.cs
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/MapTaskState.cs
new file mode 100644
index 0000000..18d6ed0
--- /dev/null
+++
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/MapTaskState.cs
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
+{
+ internal sealed class MapTaskState<TMapInput> : ITaskState
+ {
+ private static readonly Logger Logger =
Logger.GetLogger(typeof(MapTaskState<TMapInput>));
+
+ /// <summary>
+ /// Current value of the map function
+ /// </summary>
+ internal TMapInput CurrentValue { get; set; }
+
+ /// <summary>
+ /// Simple implementation of the Map task state
+ /// </summary>
+ [Inject]
+ private MapTaskState()
+ {
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
index f061a44..47c01c3 100644
---
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
+++
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
@@ -18,6 +18,7 @@
using System.Globalization;
using System.IO;
using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
using Org.Apache.REEF.IO.PartitionedData.Random;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Implementations.Tang;
@@ -51,7 +52,7 @@ namespace
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
.Build());
}
- protected IMRUJobDefinitionBuilder CreateJobDefinitionBuilder(int
numberofMappers, int chunkSize, int numIterations, int dim, int mapperMemory,
int updateTaskMemory)
+ protected virtual IMRUJobDefinitionBuilder
CreateJobDefinitionBuilder(int numberofMappers, int chunkSize, int
numIterations, int dim, int mapperMemory, int updateTaskMemory)
{
return new IMRUJobDefinitionBuilder()
.SetUpdateFunctionConfiguration(UpdateFunctionConfig(numberofMappers,
numIterations, dim))
@@ -70,7 +71,7 @@ namespace
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
/// <summary>
/// Configuration for Partitioned Dataset
/// </summary>
- protected static IConfiguration PartitionedDatasetConfiguration(int
numberofMappers)
+ protected virtual IConfiguration PartitionedDatasetConfiguration(int
numberofMappers)
{
return
RandomInputDataConfiguration.ConfigurationModule.Set(RandomInputDataConfiguration.NumberOfPartitions,
numberofMappers.ToString()).Build();
@@ -79,7 +80,7 @@ namespace
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
/// <summary>
/// Configuration for Reduce Function
/// </summary>
- protected static IConfiguration ReduceFunctionConfiguration()
+ protected virtual IConfiguration ReduceFunctionConfiguration()
{
return IMRUReduceFunctionConfiguration<int[]>.ConfigurationModule
.Set(IMRUReduceFunctionConfiguration<int[]>.ReduceFunction,
@@ -90,7 +91,7 @@ namespace
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
/// <summary>
/// Configuration for Update Function
/// </summary>
- protected static IConfiguration UpdateFunctionCodecsConfiguration()
+ protected virtual IConfiguration UpdateFunctionCodecsConfiguration()
{
return IMRUCodecConfiguration<int[]>.ConfigurationModule
.Set(IMRUCodecConfiguration<int[]>.Codec,
GenericType<IntArrayStreamingCodec>.Class)
@@ -100,7 +101,7 @@ namespace
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
/// <summary>
/// Configuration for Map Input Codec
/// </summary>
- protected static IConfiguration MapInputCodecConfiguration()
+ protected virtual IConfiguration MapInputCodecConfiguration()
{
return IMRUCodecConfiguration<int[]>.ConfigurationModule
.Set(IMRUCodecConfiguration<int[]>.Codec,
GenericType<IntArrayStreamingCodec>.Class)
@@ -110,7 +111,7 @@ namespace
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
/// <summary>
/// Configuration for Map Output Data Converter
/// </summary>
- protected static IConfiguration MapOutputDataConverterConfig(int
chunkSize)
+ protected virtual IConfiguration MapOutputDataConverterConfig(int
chunkSize)
{
var dataConverterConfig2 =
TangFactory.GetTang()
@@ -126,7 +127,7 @@ namespace
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
/// <summary>
/// Configuration for Map Input Data Converter
/// </summary>
- protected static IConfiguration MapInputDataConverterConfig(int
chunkSize)
+ protected virtual IConfiguration MapInputDataConverterConfig(int
chunkSize)
{
var dataConverterConfig1 =
TangFactory.GetTang()
@@ -142,12 +143,10 @@ namespace
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
/// <summary>
/// Configuration for Update Function
/// </summary>
- protected static IConfiguration UpdateFunctionConfig(int
numberofMappers, int numIterations, int dim)
+ protected IConfiguration UpdateFunctionConfig(int numberofMappers, int
numIterations, int dim)
{
var updateFunctionConfig =
-
TangFactory.GetTang().NewConfigurationBuilder(IMRUUpdateConfiguration<int[],
int[], int[]>.ConfigurationModule
- .Set(IMRUUpdateConfiguration<int[], int[],
int[]>.UpdateFunction,
-
GenericType<BroadcastSenderReduceReceiverUpdateFunction>.Class).Build())
+
TangFactory.GetTang().NewConfigurationBuilder(BuildUpdateFunctionConfig())
.BindNamedParameter(typeof(BroadcastReduceConfiguration.NumberOfIterations),
numIterations.ToString(CultureInfo.InvariantCulture))
.BindNamedParameter(typeof(BroadcastReduceConfiguration.Dimensions),
@@ -158,10 +157,17 @@ namespace
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
return updateFunctionConfig;
}
+ protected virtual IConfiguration BuildUpdateFunctionConfig()
+ {
+ return IMRUUpdateConfiguration<int[], int[],
int[]>.ConfigurationModule
+ .Set(IMRUUpdateConfiguration<int[], int[],
int[]>.UpdateFunction,
+
GenericType<BroadcastSenderReduceReceiverUpdateFunction>.Class).Build();
+ }
+
/// <summary>
/// Configuration for Mapper function
/// </summary>
- private static IConfiguration BuildMapperFunctionConfig()
+ protected virtual IConfiguration BuildMapperFunctionConfig()
{
return IMRUMapConfiguration<int[], int[]>.ConfigurationModule
.Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduceWithFaultTolerant.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduceWithFaultTolerant.cs
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduceWithFaultTolerant.cs
new file mode 100644
index 0000000..9ec3a69
--- /dev/null
+++
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduceWithFaultTolerant.cs
@@ -0,0 +1,402 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.IO;
+using System.Linq;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
+{
+ /// <summary>
+ /// IMRU program that performs broadcast and reduce with fault tolerance.
+ /// </summary>
+ public sealed class PipelinedBroadcastAndReduceWithFaultTolerant :
PipelinedBroadcastAndReduce
+ {
+ private static readonly Logger Logger =
Logger.GetLogger(typeof(PipelinedBroadcastAndReduceWithFaultTolerant));
+
+ [Inject]
+ private PipelinedBroadcastAndReduceWithFaultTolerant(IIMRUClient
imruClient) : base(imruClient)
+ {
+ }
+
+ /// <summary>
+ /// Runs the actual broadcast and reduce job with fault tolerance
+ /// </summary>
+ internal void Run(int numberofMappers, int chunkSize, int
numIterations, int dim, int mapperMemory, int updateTaskMemory, int
maxRetryNumberInRecovery, int totalNumberOfForcedFailures)
+ {
+ var results = _imruClient.Submit<int[], int[], int[], Stream>(
+ CreateJobDefinitionBuilder(numberofMappers, chunkSize,
numIterations, dim, mapperMemory, updateTaskMemory)
+
.SetMapFunctionConfiguration(BuildMapperFunctionConfig(maxRetryNumberInRecovery,
totalNumberOfForcedFailures))
+ .SetMaxRetryNumberInRecovery(maxRetryNumberInRecovery)
+ .Build());
+ }
+
+ protected override IMRUJobDefinitionBuilder
CreateJobDefinitionBuilder(int numberofMappers, int chunkSize, int
numIterations, int dim, int mapperMemory, int updateTaskMemory)
+ {
+ return new IMRUJobDefinitionBuilder()
+
.SetUpdateTaskStateConfiguration(UpdateTaskStateConfiguration())
+ .SetMapTaskStateConfiguration(MapTaskStateConfiguration())
+
.SetUpdateFunctionConfiguration(UpdateFunctionConfig(numberofMappers,
numIterations, dim))
+
.SetMapInputCodecConfiguration(MapInputCodecConfiguration())
+
.SetUpdateFunctionCodecsConfiguration(UpdateFunctionCodecsConfiguration())
+
.SetReduceFunctionConfiguration(ReduceFunctionConfiguration())
+
.SetMapInputPipelineDataConverterConfiguration(MapInputDataConverterConfig(chunkSize))
+
.SetMapOutputPipelineDataConverterConfiguration(MapOutputDataConverterConfig(chunkSize))
+
.SetPartitionedDatasetConfiguration(PartitionedDatasetConfiguration(numberofMappers))
+ .SetJobName("BroadcastReduce")
+ .SetNumberOfMappers(numberofMappers)
+ .SetMapperMemory(mapperMemory)
+ .SetUpdateTaskMemory(updateTaskMemory);
+ }
+
+ /// <summary>
+ /// Build a test mapper function configuration
+ /// </summary>
+ /// <param name="maxRetryInRecovery">Number of retries done if first
run failed.</param>
+ /// <param name="totalNumberOfForcedFailures">Number of forced failure
times in recovery.</param>
+ private IConfiguration BuildMapperFunctionConfig(int
maxRetryInRecovery, int totalNumberOfForcedFailures)
+ {
+ var c1 = IMRUMapConfiguration<int[], int[]>.ConfigurationModule
+ .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
+ GenericType<SenderMapFunctionFT>.Class)
+ .Build();
+
+ var c2 = TangFactory.GetTang().NewConfigurationBuilder()
+ .BindSetEntry<TaskIdsToFail,
string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-2-")
+ .BindSetEntry<TaskIdsToFail,
string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-")
+
.BindIntNamedParam<FailureType>(FailureType.EvaluatorFailureDuringTaskExecution.ToString())
+ .BindNamedParameter(typeof(MaxRetryNumberInRecovery),
maxRetryInRecovery.ToString())
+ .BindNamedParameter(typeof(TotalNumberOfForcedFailures),
totalNumberOfForcedFailures.ToString())
+ .Build();
+
+ return Configurations.Merge(c1, c2);
+ }
+
+ /// <summary>
+ /// Configuration for Update Function
+ /// </summary>
+ protected override IConfiguration BuildUpdateFunctionConfig()
+ {
+ return IMRUUpdateConfiguration<int[], int[],
int[]>.ConfigurationModule
+ .Set(IMRUUpdateConfiguration<int[], int[],
int[]>.UpdateFunction,
+
GenericType<BroadcastSenderReduceReceiverUpdateFunctionFT>.Class).Build();
+ }
+
+ /// <summary>
+ /// Configuration for Update task state
+ /// </summary>
+ /// <returns></returns>
+ private IConfiguration UpdateTaskStateConfiguration()
+ {
+ return TangFactory.GetTang()
+ .NewConfigurationBuilder()
+ .BindImplementation(GenericType<ITaskState>.Class,
GenericType<UpdateTaskState<int[], int[]>>.Class)
+ .Build();
+ }
+
+ /// <summary>
+ /// Configuration for map task state
+ /// </summary>
+ /// <returns></returns>
+ private IConfiguration MapTaskStateConfiguration()
+ {
+ return TangFactory.GetTang()
+ .NewConfigurationBuilder()
+ .BindImplementation(GenericType<ITaskState>.Class,
GenericType<MapTaskState<int[]>>.Class)
+ .Build();
+ }
+
+ [NamedParameter(Documentation = "Set of task ids which will produce
task/evaluator failure")]
+ internal class TaskIdsToFail : Name<ISet<string>>
+ {
+ }
+
+ [NamedParameter(Documentation = "Type of failure to simulate")]
+ internal class FailureType : Name<int>
+ {
+ internal const int EvaluatorFailureDuringTaskExecution = 0;
+ internal const int TaskFailureDuringTaskExecution = 1;
+ internal const int EvaluatorFailureDuringTaskInitialization = 2;
+ internal const int TaskFailureDuringTaskInitialization = 3;
+ internal const int EvaluatorFailureDuringTaskDispose = 4;
+ internal const int TaskFailureDuringTaskDispose = 5;
+
+ internal static bool IsEvaluatorFailure(int failureType)
+ {
+ return failureType == EvaluatorFailureDuringTaskExecution ||
+ failureType == EvaluatorFailureDuringTaskInitialization
||
+ failureType == EvaluatorFailureDuringTaskDispose;
+ }
+ }
+
+ [NamedParameter(Documentation = "Total number of failures in
recovery.", DefaultValue = "2")]
+ internal class TotalNumberOfForcedFailures : Name<int>
+ {
+ }
+
+ /// <summary>
+ /// The function is to simulate Evaluator/Task failure for mapper
evaluator
+ /// </summary>
+ internal sealed class SenderMapFunctionFT : IMapFunction<int[],
int[]>, IDisposable
+ {
+ private int _iterations;
+ private readonly string _taskId;
+ private readonly MapTaskState<int[]> _taskState;
+ private readonly ISet<string> _taskIdsToFail;
+ private readonly int _failureType;
+ private readonly int _maxRetryInRecovery;
+ private readonly int _totalNumberOfForcedFailures;
+ private readonly int _retryIndex;
+
+ [Inject]
+ private SenderMapFunctionFT(
+ [Parameter(typeof(TaskConfigurationOptions.Identifier))]
string taskId,
+ [Parameter(typeof(TaskIdsToFail))] ISet<string> taskIdsToFail,
+ [Parameter(typeof(FailureType))] int failureType,
+ [Parameter(typeof(MaxRetryNumberInRecovery))] int
maxRetryNumberInRecovery,
+ [Parameter(typeof(TotalNumberOfForcedFailures))] int
totalNumberOfForcedFailures,
+ ITaskState taskState)
+ {
+ _taskId = taskId;
+ _taskState = (MapTaskState<int[]>)taskState;
+ _taskIdsToFail = taskIdsToFail;
+ _failureType = failureType;
+ _maxRetryInRecovery = maxRetryNumberInRecovery;
+ _totalNumberOfForcedFailures = totalNumberOfForcedFailures;
+
+ var taskIdSplit = taskId.Split('-');
+ _retryIndex = int.Parse(taskIdSplit[taskIdSplit.Length - 1]);
+
+ Logger.Log(Level.Info,
+ "TestSenderMapFunction: TaskId: {0}, _maxRetryInRecovery
{1}, totalNumberOfForcedFailures: {2}, RetryNumber: {3}, Failure type: {4}.",
+ _taskId,
+ _maxRetryInRecovery,
+ _totalNumberOfForcedFailures,
+ _retryIndex,
+ _failureType);
+ foreach (var n in _taskIdsToFail)
+ {
+ Logger.Log(Level.Info, "TestSenderMapFunction:
taskIdsToFail: {0}", n);
+ }
+
+ if (_failureType ==
FailureType.EvaluatorFailureDuringTaskInitialization ||
+ _failureType ==
FailureType.TaskFailureDuringTaskInitialization)
+ {
+ SimulateFailure(0);
+ }
+ }
+
+ /// <summary>
+ /// Map function
+ /// It simply takes the input, does a verification for the task
state and returns the same input value.
+ /// </summary>
+ /// <param name="mapInput">integer array</param>
+ /// <returns>The same integer array</returns>
+ int[] IMapFunction<int[], int[]>.Map(int[] mapInput)
+ {
+ int[] previousValue = _taskState.CurrentValue;
+
+ // In this example, when task is re-started, with the task
state management, it should continue from the previous number
+ if (previousValue != null && previousValue[0] > mapInput[0])
+ {
+ throw new
Exception(string.Format(CultureInfo.CurrentCulture,
+ "The previous value received was {0} but the value
received is {1} which is smaller than the previous value.",
+ previousValue[0],
+ mapInput[0]));
+ }
+
+ _taskState.CurrentValue = mapInput;
+
+ _iterations++;
+ Logger.Log(Level.Info, "Received value {0} in iteration {1}.",
mapInput[0], _iterations);
+
+ if (_failureType ==
FailureType.EvaluatorFailureDuringTaskExecution ||
+ _failureType == FailureType.TaskFailureDuringTaskExecution)
+ {
+ SimulateFailure(10);
+ }
+
+ return mapInput;
+ }
+
+ public void Dispose()
+ {
+ if (_failureType ==
FailureType.EvaluatorFailureDuringTaskDispose ||
+ _failureType == FailureType.TaskFailureDuringTaskDispose)
+ {
+ SimulateFailure(_iterations);
+ }
+ }
+
+ private void SimulateFailure(int onIteration)
+ {
+ if (_iterations == onIteration &&
+ _taskIdsToFail.FirstOrDefault(e => _taskId.StartsWith(e))
!= null &&
+ _taskIdsToFail.FirstOrDefault(e => _taskId.Equals(e +
_maxRetryInRecovery)) == null &&
+ _retryIndex < _totalNumberOfForcedFailures)
+ {
+ Logger.Log(Level.Warning,
+ "Simulating {0} failure for taskId {1}",
+ FailureType.IsEvaluatorFailure(_failureType) ?
"evaluator" : "task",
+ _taskId);
+ if (FailureType.IsEvaluatorFailure(_failureType))
+ {
+ // simulate evaluator failure
+ Environment.Exit(1);
+ }
+ else
+ {
+ // simulate task failure
+ throw new ArgumentNullException("Simulating task
failure");
+ }
+ }
+ }
+ }
+
+ /// <summary>
+ /// The Update function for integer array broadcast and reduce
+ /// </summary>
+ internal sealed class BroadcastSenderReduceReceiverUpdateFunctionFT :
IUpdateFunction<int[], int[], int[]>
+ {
+ private int _iterations;
+ private readonly int _maxIters;
+ private readonly int _dim;
+ private readonly int[] _intArr;
+ private readonly int _workers;
+ private readonly UpdateTaskState<int[], int[]> _taskState;
+
+ [Inject]
+ private BroadcastSenderReduceReceiverUpdateFunctionFT(
+
[Parameter(typeof(BroadcastReduceConfiguration.NumberOfIterations))] int
maxIters,
+ [Parameter(typeof(BroadcastReduceConfiguration.Dimensions))]
int dim,
+ [Parameter(typeof(BroadcastReduceConfiguration.NumWorkers))]
int numWorkers,
+ ITaskState taskState)
+ {
+ _maxIters = maxIters;
+ _iterations = 0;
+ _dim = dim;
+ _intArr = new int[_dim];
+ _workers = numWorkers;
+ _taskState = (UpdateTaskState<int[], int[]>)taskState;
+ }
+
+ /// <summary>
+ /// Update function
+ /// </summary>
+ /// <param name="input">Input containing sum of all mappers
arrays</param>
+ /// <returns>The Update Result</returns>
+ UpdateResult<int[], int[]> IUpdateFunction<int[], int[],
int[]>.Update(int[] input)
+ {
+ Logger.Log(Level.Info, "Received value {0} in iterations
{1}.", input[0], _iterations + 1);
+
+ if (input[0] != (_iterations + 1) * _workers)
+ {
+ throw new Exception("Expected input to update function not
same as actual input");
+ }
+
+ _iterations++;
+
+ if (_iterations < _maxIters)
+ {
+ for (int i = 0; i < _dim; i++)
+ {
+ _intArr[i] = _iterations + 1;
+ }
+
+ SaveState(_intArr);
+ return UpdateResult<int[], int[]>.AnotherRound(_intArr);
+ }
+ SaveResult(input);
+ return UpdateResult<int[], int[]>.Done(input);
+ }
+
+ /// <summary>
+ /// Initialize function. Sends integer array with value 1 to all
mappers first time.
+ /// In recovery case, restore the state and iterations from task
state.
+ /// </summary>
+ /// <returns>Map input</returns>
+ UpdateResult<int[], int[]> IUpdateFunction<int[], int[],
int[]>.Initialize()
+ {
+ if (_taskState.Result != null)
+ {
+ Restore(_taskState.Result);
+ _iterations = _taskState.Iterations;
+ return UpdateResult<int[], int[]>.Done(_intArr);
+ }
+
+ if (_taskState.Input != null)
+ {
+ Restore(_taskState.Input);
+ _iterations = _taskState.Iterations;
+ return UpdateResult<int[], int[]>.AnotherRound(_intArr);
+ }
+
+ for (int i = 0; i < _dim; i++)
+ {
+ _intArr[i] = 1;
+ }
+ return UpdateResult<int[], int[]>.AnotherRound(_intArr);
+ }
+
+ /// <summary>
+ /// Save the current value to Task State
+ /// </summary>
+ /// <param name="value"></param>
+ private void SaveState(int[] value)
+ {
+ _taskState.Iterations = _iterations;
+ _taskState.Input = value;
+ Logger.Log(Level.Info, "State saved: {0}",
_taskState.Input[0]);
+ }
+
+ /// <summary>
+ /// Save the result to Task State
+ /// </summary>
+ /// <param name="value"></param>
+ private void SaveResult(int[] value)
+ {
+ _taskState.Iterations = _iterations;
+ _taskState.Result = value;
+ Logger.Log(Level.Info, "Result saved: {0}",
_taskState.Result[0]);
+ }
+
+ /// <summary>
+ /// Restore the data back to _intArr
+ /// </summary>
+ /// <param name="d"></param>
+ private void Restore(int[] d)
+ {
+ for (int i = 0; i < _dim; i++)
+ {
+ _intArr[i] = d[i];
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/UpdateTaskState.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/UpdateTaskState.cs
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/UpdateTaskState.cs
new file mode 100644
index 0000000..ed0c3ba
--- /dev/null
+++
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/UpdateTaskState.cs
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
+{
+ /// <summary>
+ /// Sample implementation of ITaskState that holds update task state
+ /// </summary>
+ /// <typeparam name="TMapInput"></typeparam>
+ /// <typeparam name="TResult"></typeparam>
+ internal sealed class UpdateTaskState<TMapInput, TResult> : ITaskState
+ {
+ private static readonly Logger Logger =
Logger.GetLogger(typeof(UpdateTaskState<TMapInput, TResult>));
+
+ internal TMapInput Input { get; set; }
+
+ internal TResult Result { get; set; }
+
+ /// <summary>
+ /// Keep the current iteration number
+ /// </summary>
+ internal int Iterations { get; set; }
+
+ /// <summary>
+ /// Simple constructor for UpdateTaskState
+ /// </summary>
+ [Inject]
+ private UpdateTaskState()
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
b/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
index d8b7bb8..746e5e2 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
@@ -119,7 +119,7 @@ namespace Org.Apache.REEF.IMRU.Examples
if (faultTolerant)
{
- var broadcastReduceFtExample =
injector.GetInstance<FaultTolerantPipelinedBroadcastAndReduce>();
+ var broadcastReduceFtExample =
injector.GetInstance<PipelinedBroadcastAndReduceWithFaultTolerant>();
broadcastReduceFtExample.Run(numNodes - 1, chunkSize,
iterations, dims, mapperMemory, updateTaskMemory, maxRetryNumberInRecovery,
totalNumberOfForcedFailures);
}
else
http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
index d42bf9b..ffec4a6 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
@@ -27,6 +27,8 @@ namespace Org.Apache.REEF.IMRU.API
public sealed class IMRUJobDefinition
{
private readonly string _jobName;
+ private readonly IConfiguration _updateTaskStateConfiguration;
+ private readonly IConfiguration _mapTaskStateConfiguration;
private readonly IConfiguration _mapFunctionConfiguration;
private readonly IConfiguration _mapInputCodecConfiguration;
private readonly IConfiguration _updateFunctionCodecsConfiguration;
@@ -48,6 +50,8 @@ namespace Org.Apache.REEF.IMRU.API
/// <summary>
/// Constructor
/// </summary>
+ /// <param name="updateTaskStateConfiguration">Update task state
configuration</param>
+ /// <param name="mapTaskStateConfiguration">Map task state
configuration</param>
/// <param name="mapFunctionConfiguration">Map function
configuration</param>
/// <param name="mapInputCodecConfiguration">Map input codec
configuration</param>
/// <param name="updateFunctionCodecsConfiguration">codec
configuration for update
@@ -71,6 +75,8 @@ namespace Org.Apache.REEF.IMRU.API
/// <param name="jobName">Job name</param>
/// <param name="invokeGC">Whether to call garbage collector after
each iteration</param>
internal IMRUJobDefinition(
+ IConfiguration updateTaskStateConfiguration,
+ IConfiguration mapTaskStateConfiguration,
IConfiguration mapFunctionConfiguration,
IConfiguration mapInputCodecConfiguration,
IConfiguration updateFunctionCodecsConfiguration,
@@ -90,6 +96,8 @@ namespace Org.Apache.REEF.IMRU.API
string jobName,
bool invokeGC)
{
+ _updateTaskStateConfiguration = updateTaskStateConfiguration;
+ _mapTaskStateConfiguration = mapTaskStateConfiguration;
_mapFunctionConfiguration = mapFunctionConfiguration;
_mapInputCodecConfiguration = mapInputCodecConfiguration;
_updateFunctionCodecsConfiguration =
updateFunctionCodecsConfiguration;
@@ -119,6 +127,22 @@ namespace Org.Apache.REEF.IMRU.API
}
/// <summary>
+ /// Configuration of update task state
+ /// </summary>
+ internal IConfiguration UpdateTaskStateConfiguration
+ {
+ get { return _updateTaskStateConfiguration; }
+ }
+
+ /// <summary>
+ /// Configuration of map task state
+ /// </summary>
+ internal IConfiguration MapTaskStateConfiguration
+ {
+ get { return _mapTaskStateConfiguration; }
+ }
+
+ /// <summary>
/// Configuration of map function
/// </summary>
internal IConfiguration MapFunctionConfiguration
http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
index 5d56fde..e1af1a3 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
@@ -39,6 +39,8 @@ namespace Org.Apache.REEF.IMRU.API
private int _coresPerMapper;
private int _updateTaskCores;
private int _maxRetryNumberInRecovery;
+ private IConfiguration _updateTaskStateConfiguration;
+ private IConfiguration _mapTaskStateConfiguration;
private IConfiguration _mapFunctionConfiguration;
private IConfiguration _mapInputCodecConfiguration;
private IConfiguration _updateFunctionCodecsConfiguration;
@@ -59,6 +61,8 @@ namespace Org.Apache.REEF.IMRU.API
/// </summary>
public IMRUJobDefinitionBuilder()
{
+ _updateTaskStateConfiguration = EmptyConfiguration;
+ _mapTaskStateConfiguration = EmptyConfiguration;
_mapInputPipelineDataConverterConfiguration = EmptyConfiguration;
_mapOutputPipelineDataConverterConfiguration = EmptyConfiguration;
_partitionedDatasetConfiguration = EmptyConfiguration;
@@ -84,6 +88,28 @@ namespace Org.Apache.REEF.IMRU.API
}
/// <summary>
+ /// Sets configuration of map task state
+ /// </summary>
+ /// <param name="mapTaskStateConfiguration">Configuration for map task
state</param>
+ /// <returns>this</returns>
+ public IMRUJobDefinitionBuilder
SetMapTaskStateConfiguration(IConfiguration mapTaskStateConfiguration)
+ {
+ _mapTaskStateConfiguration = mapTaskStateConfiguration;
+ return this;
+ }
+
+ /// <summary>
+ /// Sets configuration of update task state
+ /// </summary>
+ /// <param name="updateTaskStateConfiguration">Configuration for
update task state</param>
+ /// <returns>this</returns>
+ public IMRUJobDefinitionBuilder
SetUpdateTaskStateConfiguration(IConfiguration updateTaskStateConfiguration)
+ {
+ _updateTaskStateConfiguration = updateTaskStateConfiguration;
+ return this;
+ }
+
+ /// <summary>
/// Sets configuration of map function
/// </summary>
/// <param name="mapFunctionConfiguration">Configuration</param>
@@ -318,6 +344,8 @@ namespace Org.Apache.REEF.IMRU.API
}
return new IMRUJobDefinition(
+ _updateTaskStateConfiguration,
+ _mapTaskStateConfiguration,
_mapFunctionConfiguration,
_mapInputCodecConfiguration,
_updateFunctionCodecsConfiguration,
http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
index 223e033..8ba132e 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
@@ -37,6 +37,7 @@ using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Utilities.Diagnostics;
using Org.Apache.REEF.Utilities.Logging;
+using TraceLevel = System.Diagnostics.TraceLevel;
namespace Org.Apache.REEF.IMRU.OnREEF.Client
{
@@ -122,6 +123,10 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Client
jobDefinition.PartitionedDatasetConfiguration,
overallPerMapConfig
})
+
.BindNamedParameter(typeof(SerializedUpdateTaskStateConfiguration),
+
_configurationSerializer.ToString(jobDefinition.UpdateTaskStateConfiguration))
+
.BindNamedParameter(typeof(SerializedMapTaskStateConfiguration),
+
_configurationSerializer.ToString(jobDefinition.MapTaskStateConfiguration))
.BindNamedParameter(typeof(SerializedMapConfiguration),
_configurationSerializer.ToString(jobDefinition.MapFunctionConfiguration))
.BindNamedParameter(typeof(SerializedUpdateConfiguration),
http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ConfigurationManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ConfigurationManager.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ConfigurationManager.cs
index b9d0542..192cdc4 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ConfigurationManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ConfigurationManager.cs
@@ -32,6 +32,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
{
private static readonly Logger Logger =
Logger.GetLogger(typeof(ConfigurationManager));
+ private readonly IConfiguration _updateTaskStateConfiguration;
+ private readonly IConfiguration _mapTaskStateConfiguration;
private readonly IConfiguration _mapFunctionConfiguration;
private readonly IConfiguration _mapInputCodecConfiguration;
private readonly IConfiguration _updateFunctionCodecsConfiguration;
@@ -44,6 +46,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
[Inject]
private ConfigurationManager(
AvroConfigurationSerializer configurationSerializer,
+ [Parameter(typeof(SerializedUpdateTaskStateConfiguration))] string
updateTaskStateConfig,
+ [Parameter(typeof(SerializedMapTaskStateConfiguration))] string
mapTaskStateConfig,
[Parameter(typeof(SerializedMapConfiguration))] string mapConfig,
[Parameter(typeof(SerializedReduceConfiguration))] string
reduceConfig,
[Parameter(typeof(SerializedUpdateConfiguration))] string
updateConfig,
@@ -55,6 +59,26 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
{
try
{
+ _updateTaskStateConfiguration =
configurationSerializer.FromString(updateTaskStateConfig);
+ }
+ catch (Exception)
+ {
+ Logger.Log(Level.Error, "Unable to deserialize update task
state configuration");
+ throw;
+ }
+
+ try
+ {
+ _mapTaskStateConfiguration =
configurationSerializer.FromString(mapTaskStateConfig);
+ }
+ catch (Exception)
+ {
+ Logger.Log(Level.Error, "Unable to deserialize map task state
configuration");
+ throw;
+ }
+
+ try
+ {
_mapFunctionConfiguration =
configurationSerializer.FromString(mapConfig);
}
catch (Exception e)
@@ -131,6 +155,22 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
}
/// <summary>
+ /// Configuration of update task state
+ /// </summary>
+ internal IConfiguration UpdateTaskStateConfiguration
+ {
+ get { return _updateTaskStateConfiguration; }
+ }
+
+ /// <summary>
+ /// Configuration of map task state
+ /// </summary>
+ internal IConfiguration MapTaskStateConfiguration
+ {
+ get { return _mapTaskStateConfiguration; }
+ }
+
+ /// <summary>
/// Configuration of map function
/// </summary>
internal IConfiguration MapFunctionConfiguration
http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
index 50424c7..a468e5e 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
@@ -159,7 +159,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
_systemState = new SystemStateMachine();
_serviceAndContextConfigurationProvider =
- new ServiceAndContextConfigurationProvider<TMapInput,
TMapOutput, TPartitionType>(dataSet);
+ new ServiceAndContextConfigurationProvider<TMapInput,
TMapOutput, TPartitionType>(dataSet, configurationManager);
var msg =
string.Format(CultureInfo.InvariantCulture, "map task
memory:{0}, update task memory:{1}, map task cores:{2}, update task cores:{3},
maxRetry {4}, allowedFailedEvaluators {5}.",
@@ -793,7 +793,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
}
/// <summary>
- /// Generates map task configuration given the active context. S
+ /// Generates map task configuration given the active context.
/// Merge configurations of all the inputs to the MapTaskHost.
/// </summary>
/// <param name="activeContext">Active context to which task needs to
be submitted</param>
http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
index 24a2b9b..0d4e27b 100644
---
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
+++
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
@@ -22,6 +22,7 @@ using Org.Apache.REEF.Common.Context;
using Org.Apache.REEF.Common.Events;
using Org.Apache.REEF.Common.Services;
using Org.Apache.REEF.IO.PartitionedData;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Utilities.Attributes;
@@ -44,18 +45,21 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
private readonly Dictionary<string, string> _partitionIdProvider = new
Dictionary<string, string>();
private readonly Stack<string> _partitionDescriptorIds = new
Stack<string>();
private readonly IPartitionedInputDataSet _dataset;
+ private readonly ConfigurationManager _configurationManager;
/// <summary>
/// Constructs the object witch maintains partitionDescriptor Ids so
that to provide proper data load configuration
/// </summary>
- /// <param name="dataset"></param>
- internal
ServiceAndContextConfigurationProvider(IPartitionedInputDataSet dataset)
+ /// <param name="dataset">partition input dataset</param>
+ /// <param name="configurationManager">Configuration manager that
holds configurations for context and tasks</param>
+ internal
ServiceAndContextConfigurationProvider(IPartitionedInputDataSet dataset,
ConfigurationManager configurationManager)
{
_dataset = dataset;
foreach (var descriptor in _dataset)
{
_partitionDescriptorIds.Push(descriptor.Id);
}
+ _configurationManager = configurationManager;
}
/// <summary>
@@ -83,10 +87,15 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
internal ContextAndServiceConfiguration
GetContextConfigurationForMasterEvaluatorById(string evaluatorId)
{
Logger.Log(Level.Info, "Getting root context and service
configuration for master");
+
+ var serviceConf = ServiceConfiguration.ConfigurationModule
+ .Set(ServiceConfiguration.Services,
GenericType<TaskStateService>.Class)
+ .Build();
+
return new ContextAndServiceConfiguration(
ContextConfiguration.ConfigurationModule.Set(ContextConfiguration.Identifier,
IMRUConstants.MasterContextId).Build(),
- TangFactory.GetTang().NewConfigurationBuilder().Build());
+ Configurations.Merge(serviceConf,
_configurationManager.UpdateTaskStateConfiguration));
}
/// <summary>
@@ -152,7 +161,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
/// <param name="partitionDescriptor"></param>
/// <param name="evaluatorId"></param>
/// <returns></returns>
- private static ContextAndServiceConfiguration
GetDataLoadingContextAndServiceConfiguration(
+ private ContextAndServiceConfiguration
GetDataLoadingContextAndServiceConfiguration(
IPartitionDescriptor partitionDescriptor,
string evaluatorId)
{
@@ -166,9 +175,13 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
var serviceConf =
TangFactory.GetTang()
-
.NewConfigurationBuilder(ServiceConfiguration.ConfigurationModule.Build(),
+ .NewConfigurationBuilder(
+ ServiceConfiguration.ConfigurationModule
+ .Set(ServiceConfiguration.Services,
GenericType<TaskStateService>.Class)
+ .Build(),
dataLoadingContextConf,
- partitionDescriptor.GetPartitionConfiguration())
+ partitionDescriptor.GetPartitionConfiguration(),
+ _configurationManager.MapTaskStateConfiguration)
.Build();
var contextConf = ContextConfiguration.ConfigurationModule
http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskStateService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskStateService.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskStateService.cs
new file mode 100644
index 0000000..fcbf846
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskStateService.cs
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Driver
+{
+ internal sealed class TaskStateService
+ {
+ private static readonly Logger Logger =
Logger.GetLogger(typeof(TaskStateService));
+
+ /// <summary>
+ /// The reference of ITaskState to hold the task sate
+ /// </summary>
+ private readonly ITaskState _state;
+
+ [Inject]
+ private TaskStateService(ITaskState state)
+ {
+ _state = state;
+ }
+
+ [Inject]
+ private TaskStateService()
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/ITaskState.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/ITaskState.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/ITaskState.cs
new file mode 100644
index 0000000..92eba07
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/ITaskState.cs
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
+{
+ /// <summary>
+ /// Interface for task state
+ /// Client can implement this interface and inject it into context service
and task function to save the current task sate
+ /// </summary>
+ public interface ITaskState
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedMapTaskStateConfiguration.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedMapTaskStateConfiguration.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedMapTaskStateConfiguration.cs
new file mode 100644
index 0000000..0954ccb
--- /dev/null
+++
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedMapTaskStateConfiguration.cs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
+{
+ [NamedParameter("The serialized configuration for map task state.")]
+ internal sealed class SerializedMapTaskStateConfiguration : Name<string>
+ {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedUpdateTaskStateConfiguration.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedUpdateTaskStateConfiguration.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedUpdateTaskStateConfiguration.cs
new file mode 100644
index 0000000..159b244
--- /dev/null
+++
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedUpdateTaskStateConfiguration.cs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation(ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
+{
+ [NamedParameter("The serialized configuration for map task state.")]
+ internal sealed class SerializedUpdateTaskStateConfiguration : Name<string>
+ {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index d89d5a9..f4da45f 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -90,9 +90,11 @@ under the License.
<Compile
Include="OnREEF\Driver\StateMachine\TaskStateTransitionException.cs" />
<Compile Include="OnREEF\Driver\TaskInfo.cs" />
<Compile Include="OnREEF\Driver\TaskManager.cs" />
+ <Compile Include="OnREEF\Driver\TaskStateService.cs" />
<Compile Include="OnREEF\IMRUTasks\IMRUTaskAppException.cs" />
<Compile Include="OnREEF\IMRUTasks\IMRUTaskGroupCommunicationException.cs"
/>
<Compile Include="OnREEF\IMRUTasks\IMRUTaskSystemException.cs" />
+ <Compile Include="OnREEF\IMRUTasks\ITaskState.cs" />
<Compile Include="OnREEF\IMRUTasks\MapTaskHost.cs" />
<Compile Include="OnREEF\IMRUTasks\TaskCloseCoordinator.cs" />
<Compile Include="OnREEF\IMRUTasks\TaskHostBase.cs" />
@@ -108,6 +110,7 @@ under the License.
<Compile Include="OnREEF\Parameters\MaxRetryNumberInRecovery.cs" />
<Compile Include="OnREEF\Parameters\MemoryForUpdateTask.cs" />
<Compile Include="OnREEF\Parameters\MemoryPerMapper.cs" />
+ <Compile
Include="OnREEF\Parameters\SerializedMapTaskStateConfiguration.cs" />
<Compile
Include="OnREEF\Parameters\SerializedResultHandlerConfiguration.cs" />
<Compile Include="OnREEF\Parameters\SerializedMapConfiguration.cs" />
<Compile
Include="OnREEF\Parameters\SerializedMapInputCodecConfiguration.cs" />
@@ -116,6 +119,7 @@ under the License.
<Compile Include="OnREEF\Parameters\SerializedReduceConfiguration.cs" />
<Compile Include="OnREEF\Parameters\SerializedUpdateConfiguration.cs" />
<Compile
Include="OnREEF\Parameters\SerializedUpdateFunctionCodecsConfiguration.cs" />
+ <Compile
Include="OnREEF\Parameters\SerializedUpdateTaskStateConfiguration.cs" />
<Compile Include="OnREEF\ResultHandler\DefaultResultHandler.cs" />
<Compile Include="OnREEF\ResultHandler\ResultOutputLocation.cs" />
<Compile Include="OnREEF\ResultHandler\WriteResultHandler.cs" />
http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
index f20ec31..8964836 100644
---
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
+++
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
@@ -48,7 +48,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
public abstract class IMRUBrodcastReduceTestBase : ReefFunctionalTest
{
protected static readonly Logger Logger =
Logger.GetLogger(typeof(IMRUBrodcastReduceTestBase));
- private const string IMRUJobName = "IMRUBroadcastReduce";
+ protected const string IMRUJobName = "IMRUBroadcastReduce";
protected const string CompletedTaskMessage = "CompletedTaskMessage";
protected const string RunningTaskMessage = "RunningTaskMessage";
@@ -136,6 +136,10 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
jobDefinition.PartitionedDatasetConfiguration,
overallPerMapConfig
})
+
.BindNamedParameter(typeof(SerializedUpdateTaskStateConfiguration),
+
configurationSerializer.ToString(jobDefinition.UpdateTaskStateConfiguration))
+
.BindNamedParameter(typeof(SerializedMapTaskStateConfiguration),
+
configurationSerializer.ToString(jobDefinition.MapTaskStateConfiguration))
.BindNamedParameter(typeof(SerializedMapConfiguration),
configurationSerializer.ToString(jobDefinition.MapFunctionConfiguration))
.BindNamedParameter(typeof(SerializedUpdateConfiguration),
@@ -203,7 +207,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
/// <param name="updateTaskMemory"></param>
/// <param name="numberOfRetryInRecovery"></param>
/// <returns></returns>
- protected IMRUJobDefinition CreateIMRUJobDefinitionBuilder(int
numberofMappers,
+ protected virtual IMRUJobDefinition CreateIMRUJobDefinitionBuilder(int
numberofMappers,
int chunkSize,
int numIterations,
int dim,
@@ -211,19 +215,9 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
int updateTaskMemory,
int numberOfRetryInRecovery)
{
- var updateFunctionConfig =
-
TangFactory.GetTang().NewConfigurationBuilder(BuildUpdateFunctionConfig())
-
.BindNamedParameter(typeof(BroadcastReduceConfiguration.NumberOfIterations),
- numIterations.ToString(CultureInfo.InvariantCulture))
-
.BindNamedParameter(typeof(BroadcastReduceConfiguration.Dimensions),
- dim.ToString(CultureInfo.InvariantCulture))
-
.BindNamedParameter(typeof(BroadcastReduceConfiguration.NumWorkers),
- numberofMappers.ToString(CultureInfo.InvariantCulture))
- .Build();
-
return new IMRUJobDefinitionBuilder()
.SetMapFunctionConfiguration(BuildMapperFunctionConfig())
- .SetUpdateFunctionConfiguration(updateFunctionConfig)
+
.SetUpdateFunctionConfiguration(BuildUpdateFunctionConfiguration(numberofMappers,
numIterations, dim))
.SetMapInputCodecConfiguration(BuildMapInputCodecConfig())
.SetUpdateFunctionCodecsConfiguration(BuildUpdateFunctionCodecsConfig())
.SetReduceFunctionConfiguration(BuildReduceFunctionConfig())
@@ -239,6 +233,27 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
}
/// <summary>
+ /// Build update function configuration. Subclass can override it.
+ /// </summary>
+ /// <param name="numberofMappers"></param>
+ /// <param name="numIterations"></param>
+ /// <param name="dim"></param>
+ /// <returns></returns>
+ protected virtual IConfiguration BuildUpdateFunctionConfiguration(int
numberofMappers, int numIterations, int dim)
+ {
+ var updateFunctionConfig =
+
TangFactory.GetTang().NewConfigurationBuilder(BuildUpdateFunctionConfigModule())
+
.BindNamedParameter(typeof(BroadcastReduceConfiguration.NumberOfIterations),
+ numIterations.ToString(CultureInfo.InvariantCulture))
+
.BindNamedParameter(typeof(BroadcastReduceConfiguration.Dimensions),
+ dim.ToString(CultureInfo.InvariantCulture))
+
.BindNamedParameter(typeof(BroadcastReduceConfiguration.NumWorkers),
+ numberofMappers.ToString(CultureInfo.InvariantCulture))
+ .Build();
+ return updateFunctionConfig;
+ }
+
+ /// <summary>
/// Data Converter Configuration. Subclass can override it to have
its own test Data Converter.
/// </summary>
/// <param name="chunkSize"></param>
@@ -267,10 +282,10 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
}
/// <summary>
- /// Update function configuration. Subclass can override it to have
its own test function.
+ /// Set update function to IMRUUpdateConfiguration configuration
module. Sub class can override it to set different function.
/// </summary>
/// <returns></returns>
- protected virtual IConfiguration BuildUpdateFunctionConfig()
+ protected virtual IConfiguration BuildUpdateFunctionConfigModule()
{
return IMRUUpdateConfiguration<int[], int[],
int[]>.ConfigurationModule
.Set(IMRUUpdateConfiguration<int[], int[],
int[]>.UpdateFunction,
http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
index 433fce1..225ae42 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
@@ -131,9 +131,9 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
/// Update function configuration. Add TcpConfiguration to the base
configuration.
/// </summary>
/// <returns></returns>
- protected override IConfiguration BuildUpdateFunctionConfig()
+ protected override IConfiguration BuildUpdateFunctionConfigModule()
{
- return Configurations.Merge(GetTcpConfiguration(),
base.BuildUpdateFunctionConfig());
+ return Configurations.Merge(GetTcpConfiguration(),
base.BuildUpdateFunctionConfigModule());
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
index 29c3e25..1fb08ad 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
@@ -19,6 +19,7 @@ using System.Diagnostics;
using Org.Apache.REEF.IMRU.API;
using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce;
using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
using Org.Apache.REEF.IMRU.OnREEF.Parameters;
using Org.Apache.REEF.Network;
using Org.Apache.REEF.Tang.Implementations.Configuration;
@@ -35,8 +36,9 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
protected const int NumberOfRetry = 3;
/// <summary>
- /// This test fails two evaluators during task execution stage on each
retry except last.
+ /// This test fails two evaluators during task execution stage on each
retry except last.
/// Job is retried until success.
+ /// In each retry, when the task is restarted on an existing
evaluator, the iteration will continue from the previous task state.
/// </summary>
[Fact]
public virtual void TestFailedMapperOnLocalRuntime()
@@ -124,6 +126,44 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
}
/// <summary>
+ /// Create IMRU Job Definition with IMRU required configurations
+ /// </summary>
+ /// <param name="numberofMappers"></param>
+ /// <param name="chunkSize"></param>
+ /// <param name="numIterations"></param>
+ /// <param name="dim"></param>
+ /// <param name="mapperMemory"></param>
+ /// <param name="updateTaskMemory"></param>
+ /// <param name="numberOfRetryInRecovery"></param>
+ /// <returns></returns>
+ protected override IMRUJobDefinition
CreateIMRUJobDefinitionBuilder(int numberofMappers,
+ int chunkSize,
+ int numIterations,
+ int dim,
+ int mapperMemory,
+ int updateTaskMemory,
+ int numberOfRetryInRecovery)
+ {
+ return new IMRUJobDefinitionBuilder()
+
.SetUpdateTaskStateConfiguration(UpdateTaskStateConfiguration())
+ .SetMapTaskStateConfiguration(MapTaskStateConfiguration())
+ .SetMapFunctionConfiguration(BuildMapperFunctionConfig())
+
.SetUpdateFunctionConfiguration(BuildUpdateFunctionConfiguration(numberofMappers,
numIterations, dim))
+ .SetMapInputCodecConfiguration(BuildMapInputCodecConfig())
+
.SetUpdateFunctionCodecsConfiguration(BuildUpdateFunctionCodecsConfig())
+ .SetReduceFunctionConfiguration(BuildReduceFunctionConfig())
+
.SetMapInputPipelineDataConverterConfiguration(BuildDataConverterConfig(chunkSize))
+
.SetMapOutputPipelineDataConverterConfiguration(BuildDataConverterConfig(chunkSize))
+
.SetPartitionedDatasetConfiguration(BuildPartitionedDatasetConfiguration(numberofMappers))
+ .SetJobName(IMRUJobName)
+ .SetNumberOfMappers(numberofMappers)
+ .SetMapperMemory(mapperMemory)
+ .SetUpdateTaskMemory(updateTaskMemory)
+ .SetMaxRetryNumberInRecovery(numberOfRetryInRecovery)
+ .Build();
+ }
+
+ /// <summary>
/// Mapper function configuration. Subclass can override it to have
its own test function.
/// </summary>
/// <returns></returns>
@@ -131,29 +171,29 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
{
var c1 = IMRUMapConfiguration<int[], int[]>.ConfigurationModule
.Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
-
GenericType<FaultTolerantPipelinedBroadcastAndReduce.TestSenderMapFunction>.Class)
+
GenericType<PipelinedBroadcastAndReduceWithFaultTolerant.SenderMapFunctionFT>.Class)
.Build();
var c2 = TangFactory.GetTang().NewConfigurationBuilder()
-
.BindSetEntry<FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail,
string>(GenericType<FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail>.Class,
"IMRUMap-RandomInputPartition-2-")
-
.BindSetEntry<FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail,
string>(GenericType<FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail>.Class,
"IMRUMap-RandomInputPartition-3-")
-
.BindIntNamedParam<FaultTolerantPipelinedBroadcastAndReduce.FailureType>(FaultTolerantPipelinedBroadcastAndReduce.FailureType.EvaluatorFailureDuringTaskExecution.ToString())
+
.BindSetEntry<PipelinedBroadcastAndReduceWithFaultTolerant.TaskIdsToFail,
string>(GenericType<PipelinedBroadcastAndReduceWithFaultTolerant.TaskIdsToFail>.Class,
"IMRUMap-RandomInputPartition-2-")
+
.BindSetEntry<PipelinedBroadcastAndReduceWithFaultTolerant.TaskIdsToFail,
string>(GenericType<PipelinedBroadcastAndReduceWithFaultTolerant.TaskIdsToFail>.Class,
"IMRUMap-RandomInputPartition-3-")
+
.BindIntNamedParam<PipelinedBroadcastAndReduceWithFaultTolerant.FailureType>(PipelinedBroadcastAndReduceWithFaultTolerant.FailureType.EvaluatorFailureDuringTaskExecution.ToString())
.BindNamedParameter(typeof(MaxRetryNumberInRecovery),
NumberOfRetry.ToString())
-
.BindNamedParameter(typeof(FaultTolerantPipelinedBroadcastAndReduce.TotalNumberOfForcedFailures),
NumberOfRetry.ToString())
+
.BindNamedParameter(typeof(PipelinedBroadcastAndReduceWithFaultTolerant.TotalNumberOfForcedFailures),
NumberOfRetry.ToString())
.Build();
return Configurations.Merge(c1, c2, GetTcpConfiguration());
}
/// <summary>
- /// Update function configuration. Subclass can override it to have
its own test function.
+ /// Set update function to IMRUUpdateConfiguration configuration
module. Return it with TCP configuration.
/// </summary>
/// <returns></returns>
- protected override IConfiguration BuildUpdateFunctionConfig()
+ protected override IConfiguration BuildUpdateFunctionConfigModule()
{
var c = IMRUUpdateConfiguration<int[], int[],
int[]>.ConfigurationModule
.Set(IMRUUpdateConfiguration<int[], int[],
int[]>.UpdateFunction,
-
GenericType<BroadcastSenderReduceReceiverUpdateFunction>.Class)
+
GenericType<PipelinedBroadcastAndReduceWithFaultTolerant.BroadcastSenderReduceReceiverUpdateFunctionFT>.Class)
.Build();
return Configurations.Merge(c, GetTcpConfiguration());
@@ -170,5 +210,29 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
.Set(TcpClientConfigurationModule.SleepTime, "1000")
.Build();
}
+
+ /// <summary>
+ /// Configuration for Map task state
+ /// </summary>
+ /// <returns></returns>
+ private IConfiguration MapTaskStateConfiguration()
+ {
+ return TangFactory.GetTang()
+ .NewConfigurationBuilder()
+ .BindImplementation(GenericType<ITaskState>.Class,
GenericType<MapTaskState<int[]>>.Class)
+ .Build();
+ }
+
+ /// <summary>
+ /// Configuration for Update task state
+ /// </summary>
+ /// <returns></returns>
+ private IConfiguration UpdateTaskStateConfiguration()
+ {
+ return TangFactory.GetTang()
+ .NewConfigurationBuilder()
+ .BindImplementation(GenericType<ITaskState>.Class,
GenericType<UpdateTaskState<int[], int[]>>.Class)
+ .Build();
+ }
}
}
\ No newline at end of file