Repository: reef Updated Branches: refs/heads/master d116d94e6 -> b14c8cd81
http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs index af4fbf1..01a2bdb 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs @@ -17,6 +17,8 @@ using System; using System.IO; +using System.Net.Sockets; +using System.Runtime.Remoting; using System.Threading; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Common.Tasks.Events; @@ -77,14 +79,17 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks /// <param name="resultHandler">Result handler</param> /// <param name="taskCloseCoordinator">Task close Coordinator</param> /// <param name="invokeGC">Whether to call Garbage Collector after each iteration or not</param> + /// <param name="taskId">task id</param> [Inject] private UpdateTaskHost( IUpdateFunction<TMapInput, TMapOutput, TResult> updateTask, IGroupCommClient groupCommunicationsClient, IIMRUResultHandler<TResult> resultHandler, TaskCloseCoordinator taskCloseCoordinator, - [Parameter(typeof(InvokeGC))] bool invokeGC) + [Parameter(typeof(InvokeGC))] bool invokeGC, + [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId) { + Logger.Log(Level.Info, "Entering constructor of UpdateTaskHost for task id {0}", taskId); _updateTask = updateTask; _groupCommunicationsClient = groupCommunicationsClient; var cg = groupCommunicationsClient.GetCommunicationGroup(IMRUConstants.CommunicationGroupName); @@ -95,6 +100,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks _resultHandler = resultHandler; _taskCloseCoordinator = taskCloseCoordinator; _cancellationSource = new CancellationTokenSource(); + Logger.Log(Level.Info, "UpdateTaskHost initialized."); } /// <summary> @@ -104,6 +110,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks /// <returns></returns> public byte[] Call(byte[] memento) { + Logger.Log(Level.Info, "Entering UpdateTaskHost Call()."); var updateResult = _updateTask.Initialize(); int iterNo = 0; try @@ -148,27 +155,80 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks "Received OperationCanceledException in UpdateTaskHost with message: {0}.", e.Message); } - catch (IOException e) + catch (Exception e) { - Logger.Log(Level.Error, "Received IOException in UpdateTaskHost with message: {0}.", e.Message); - if (!_cancellationSource.IsCancellationRequested) + if (e is IOException || e is TcpClientConnectionException || e is RemotingException || + e is SocketException) + { + Logger.Log(Level.Error, + "Received Exception {0} in UpdateTaskHost with message: {1}. The cancellation token is: {2}.", + e.GetType(), + e.Message, + _cancellationSource.IsCancellationRequested); + if (!_cancellationSource.IsCancellationRequested) + { + Logger.Log(Level.Error, + "UpdateTaskHost is throwing IMRUTaskGroupCommunicationException with cancellation token: {0}.", + _cancellationSource.IsCancellationRequested); + throw new IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError); + } + } + else if (e is AggregateException) + { + Logger.Log(Level.Error, + "Received AggregateException. The cancellation token is: {0}.", + _cancellationSource.IsCancellationRequested); + if (e.InnerException != null) + { + Logger.Log(Level.Error, + "InnerException {0}, with message {1}.", + e.InnerException.GetType(), + e.InnerException.Message); + } + if (!_cancellationSource.IsCancellationRequested) + { + if (e.InnerException != null && e.InnerException is IOException) + { + Logger.Log(Level.Error, + "UpdateTaskHost is throwing IMRUTaskGroupCommunicationException with cancellation token: {0}.", + _cancellationSource.IsCancellationRequested); + throw new IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError); + } + else + { + throw e; + } + } + } + else { - throw new IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError); + Logger.Log(Level.Error, + "UpdateTaskHost is throwing Excetion {0}, messge {1} with cancellation token: {2} and StackTrace {3}.", + e.GetType(), + e.Message, + _cancellationSource.IsCancellationRequested, + e.StackTrace); + if (!_cancellationSource.IsCancellationRequested) + { + throw e; + } } } - catch (TcpClientConnectionException e) + finally { - Logger.Log(Level.Error, - "Received TcpClientConnectionException in UpdateTaskHost with message: {0}.", - e.Message); - if (!_cancellationSource.IsCancellationRequested) + try + { + _resultHandler.Dispose(); + } + catch (Exception e) { - throw new IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError); + Logger.Log(Level.Error, "Exception in dispose result handler.", e); + //// TODO throw proper exceptions JIRA REEF-1492 } + _taskCloseCoordinator.SignalTaskStopped(); + Logger.Log(Level.Info, "UpdateTaskHost returned with cancellation token {0}.", _cancellationSource.IsCancellationRequested); } - _resultHandler.Dispose(); - _taskCloseCoordinator.SignalTaskStopped(); - Logger.Log(Level.Info, "UpdateTaskHost returned with cancellation token {0}.", _cancellationSource.IsCancellationRequested); + return null; } http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/AllowedFailedEvaluatorsFraction.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/AllowedFailedEvaluatorsFraction.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/AllowedFailedEvaluatorsFraction.cs index 5a5b9c3..40816c1 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/AllowedFailedEvaluatorsFraction.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/AllowedFailedEvaluatorsFraction.cs @@ -19,7 +19,7 @@ using Org.Apache.REEF.Tang.Annotations; namespace Org.Apache.REEF.IMRU.OnREEF.Parameters { - [NamedParameter("Determines number of failed evaluators (AllowedFailedEvaluators * Number of mappers) tolerated before throwing exception", "failedevaluators", "2.0")] + [NamedParameter("Determines number of failed evaluators (AllowedFailedEvaluators * Number of mappers) tolerated before throwing exception", "failedevaluators", "0.5")] internal sealed class AllowedFailedEvaluatorsFraction : Name<double> { } http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/MaxRetryNumberInRecovery.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/MaxRetryNumberInRecovery.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/MaxRetryNumberInRecovery.cs new file mode 100644 index 0000000..cc7a5b7 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/MaxRetryNumberInRecovery.cs @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.IMRU.OnREEF.Parameters +{ + /// <summary> + /// Max retry number for the system recovery + /// </summary> + [NamedParameter("Maximum retry number in fault tolerant recovery.", "maxRetryInRecovery", "3")] + public sealed class MaxRetryNumberInRecovery : Name<int> + { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj index 30d110a..cd3603a 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj +++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj @@ -104,6 +104,7 @@ under the License. <Compile Include="OnREEF\Parameters\AllowedFailedEvaluatorsFraction.cs" /> <Compile Include="OnREEF\Parameters\CoresForUpdateTask.cs" /> <Compile Include="OnREEF\Parameters\CoresPerMapper.cs" /> + <Compile Include="OnREEF\Parameters\MaxRetryNumberInRecovery.cs" /> <Compile Include="OnREEF\Parameters\MemoryForUpdateTask.cs" /> <Compile Include="OnREEF\Parameters\MemoryPerMapper.cs" /> <Compile Include="OnREEF\Parameters\SerializedResultHandlerConfiguration.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Network/Group/Driver/IGroupCommDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/IGroupCommDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/IGroupCommDriver.cs index acf640b..16dba10 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/IGroupCommDriver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/IGroupCommDriver.cs @@ -29,7 +29,7 @@ namespace Org.Apache.REEF.Network.Group.Driver /// <summary> /// Returns the identifier for the master task /// </summary> - string MasterTaskId { get; } + string MasterTaskId { get; set; } ICommunicationGroupDriver DefaultGroup { get; } http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs index e636b04..7fe797a 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs @@ -100,7 +100,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl /// <summary> /// Returns the identifier for the master task /// </summary> - public string MasterTaskId { get; private set; } + public string MasterTaskId { get; set; } public ICommunicationGroupDriver DefaultGroup { http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tang/Exceptions/IllegalStateException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tang/Exceptions/IllegalStateException.cs b/lang/cs/Org.Apache.REEF.Tang/Exceptions/IllegalStateException.cs index 6736e10..fc4a12b 100644 --- a/lang/cs/Org.Apache.REEF.Tang/Exceptions/IllegalStateException.cs +++ b/lang/cs/Org.Apache.REEF.Tang/Exceptions/IllegalStateException.cs @@ -16,9 +16,11 @@ // under the License. using System; +using System.Runtime.Serialization; namespace Org.Apache.REEF.Tang.Exceptions { + [Serializable] public sealed class IllegalStateException : Exception { public IllegalStateException() @@ -30,6 +32,11 @@ namespace Org.Apache.REEF.Tang.Exceptions { } + public IllegalStateException(SerializationInfo info, StreamingContext context) + : base(info, context) + { + } + public IllegalStateException(string message, Exception innerException) : base(message, innerException) { http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tang/Exceptions/InjectionException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tang/Exceptions/InjectionException.cs b/lang/cs/Org.Apache.REEF.Tang/Exceptions/InjectionException.cs index 4fc991c..b9a8189 100644 --- a/lang/cs/Org.Apache.REEF.Tang/Exceptions/InjectionException.cs +++ b/lang/cs/Org.Apache.REEF.Tang/Exceptions/InjectionException.cs @@ -16,19 +16,26 @@ // under the License. using System; +using System.Runtime.Serialization; namespace Org.Apache.REEF.Tang.Exceptions { + [Serializable] public sealed class InjectionException : Exception { internal InjectionException(string msg) : base(msg) - { + { } internal InjectionException(string message, Exception innerException) : base(message, innerException) { } + + public InjectionException(SerializationInfo info, StreamingContext context) + : base(info, context) + { + } } } http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs index 376ca7c..63126e8 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs @@ -28,31 +28,50 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU private static readonly Logger Logger = Logger.GetLogger(typeof(IMRUMapperCountTest)); private static readonly int NumNodes = 4; + private static readonly int NumOfRetry = 2; [Fact] [Trait("Description", "Run IMRU broadcast and reduce example as test.")] void TestIMRUBroadcastReduceOnLocalRuntime() { string testFolder = DefaultRuntimeFolder + TestId; - TestIMRUBroadcastReduce(false, testFolder); + TestIMRUBroadcastReduce(false, false, testFolder); ValidateSuccessForLocalRuntime(NumNodes, testFolder: testFolder); CleanUp(testFolder); } + [Fact] + [Trait("Description", "Run IMRU broadcast and reduce example as test.")] + void TestIMRUBroadcastReduceWithFTOnLocalRuntime() + { + string testFolder = DefaultRuntimeFolder + TestId; + TestIMRUBroadcastReduce(false, true, testFolder); + string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 240); + var completedTaskCount = GetMessageCount(lines, "Received ICompletedTask"); + var runningTaskCount = GetMessageCount(lines, "Received IRunningTask"); + var failedEvaluatorCount = GetMessageCount(lines, "Received IFailedEvaluator"); + var failedTaskCount = GetMessageCount(lines, "Received IFailedTask"); + Assert.Equal((NumOfRetry + 1) * NumNodes, completedTaskCount + failedEvaluatorCount + failedTaskCount); + Assert.Equal((NumOfRetry + 1) * NumNodes, runningTaskCount); + CleanUp(testFolder); + } + [Fact(Skip = "Requires Yarn")] [Trait("Description", "Run IMRU broadcast and reduce example as test on Yarn.")] void TestIMRUBroadcastReduceOnYarn() { - TestIMRUBroadcastReduce(true); + TestIMRUBroadcastReduce(true, false); } - private void TestIMRUBroadcastReduce(bool runOnYarn, params string[] testFolder) + private void TestIMRUBroadcastReduce(bool runOnYarn, bool faultTolerant, params string[] testFolder) { var tcpPortConfig = TcpPortConfigurationModule.ConfigurationModule .Set(TcpPortConfigurationModule.PortRangeStart, "8900") .Set(TcpPortConfigurationModule.PortRangeCount, "1000") .Build(); - Run.RunBroadcastReduceTest(tcpPortConfig, runOnYarn, NumNodes, new string[0], testFolder); + + string[] args = { "10", "2", "512", "512", "100", NumOfRetry.ToString() }; + Run.RunBroadcastReduceTest(tcpPortConfig, runOnYarn, NumNodes, faultTolerant, args, testFolder); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithFilePartitionDataSetTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithFilePartitionDataSetTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithFilePartitionDataSetTest.cs index e58e236..3bf712b 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithFilePartitionDataSetTest.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithFilePartitionDataSetTest.cs @@ -75,6 +75,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU iterations, mapperMemory, updateTaskMemory, + 0, testFolder); ValidateSuccessForLocalRuntime(numTasks, 0, 0, testFolder, 100); CleanUp(testFolder); @@ -90,11 +91,12 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU int iterations, int mapperMemory, int updateTaskMemory, + int numberOfRetryInRecovery = 0, string testFolder = DefaultRuntimeFolder) { string runPlatform = runOnYarn ? "yarn" : "local"; TestRun(DriverConfiguration<int[], int[], int[], IEnumerable<Row>>( - CreateIMRUJobDefinitionBuilder(numTasks - 1, chunkSize, iterations, dims, mapperMemory, updateTaskMemory), + CreateIMRUJobDefinitionBuilder(numTasks - 1, chunkSize, iterations, dims, mapperMemory, updateTaskMemory, numberOfRetryInRecovery), DriverEventHandlerConfigurations<int[], int[], int[], IEnumerable<Row>>()), typeof(BroadcastReduceDriver), numTasks, @@ -121,6 +123,8 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) .Set(REEF.Driver.DriverConfiguration.OnContextFailed, GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnTaskRunning, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) .Set(REEF.Driver.DriverConfiguration.OnTaskFailed, GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) .Set(REEF.Driver.DriverConfiguration.CustomTraceLevel, TraceLevel.Info.ToString()) http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithLocalFile.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithLocalFile.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithLocalFile.cs index aa287b1..2bacd5a 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithLocalFile.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithLocalFile.cs @@ -48,6 +48,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU iterations, mapperMemory, updateTaskMemory, + 0, testFolder); ValidateSuccessForLocalRuntime(numTasks, 0, 0, testFolder, 100); CleanUp(testFolder); http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs index 2f4b109..f20ec31 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs @@ -16,10 +16,11 @@ // under the License. using System; -using System.Collections.Generic; using System.Globalization; using System.IO; using System.Linq; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; using Org.Apache.REEF.IMRU.API; using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce; using Org.Apache.REEF.IMRU.OnREEF.Driver; @@ -29,6 +30,7 @@ using Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverA using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Driver; using Org.Apache.REEF.Network.Group.Driver.Impl; +using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Formats; using Org.Apache.REEF.Tang.Implementations.Configuration; using Org.Apache.REEF.Tang.Implementations.Tang; @@ -48,6 +50,11 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU protected static readonly Logger Logger = Logger.GetLogger(typeof(IMRUBrodcastReduceTestBase)); private const string IMRUJobName = "IMRUBroadcastReduce"; + protected const string CompletedTaskMessage = "CompletedTaskMessage"; + protected const string RunningTaskMessage = "RunningTaskMessage"; + protected const string FailedTaskMessage = "FailedTaskMessage"; + protected const string FailedEvaluatorMessage = "FailedEvaluatorMessage"; + /// <summary> /// Abstract method for subclass to override it to provide configurations for driver handlers /// </summary> @@ -70,6 +77,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU /// <param name="dims"></param> /// <param name="iterations"></param> /// <param name="mapperMemory"></param> + /// <param name="numberOfRetryInRecovery"></param> /// <param name="updateTaskMemory"></param> /// <param name="testFolder"></param> protected void TestBroadCastAndReduce(bool runOnYarn, @@ -79,11 +87,12 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU int iterations, int mapperMemory, int updateTaskMemory, + int numberOfRetryInRecovery = 0, string testFolder = DefaultRuntimeFolder) { string runPlatform = runOnYarn ? "yarn" : "local"; TestRun(DriverConfiguration<int[], int[], int[], Stream>( - CreateIMRUJobDefinitionBuilder(numTasks - 1, chunkSize, iterations, dims, mapperMemory, updateTaskMemory), + CreateIMRUJobDefinitionBuilder(numTasks - 1, chunkSize, iterations, dims, mapperMemory, updateTaskMemory, numberOfRetryInRecovery), DriverEventHandlerConfigurations<int[], int[], int[], Stream>()), typeof(BroadcastReduceDriver), numTasks, @@ -151,6 +160,8 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU jobDefinition.MapTaskCores.ToString(CultureInfo.InvariantCulture)) .BindNamedParameter(typeof(CoresForUpdateTask), jobDefinition.UpdateTaskCores.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter(typeof(MaxRetryNumberInRecovery), + jobDefinition.MaxRetryNumberInRecovery.ToString(CultureInfo.InvariantCulture)) .BindNamedParameter(typeof(InvokeGC), jobDefinition.InvokeGarbageCollectorAfterIteration.ToString(CultureInfo.InvariantCulture)) .Build(); @@ -190,13 +201,15 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU /// <param name="dim"></param> /// <param name="mapperMemory"></param> /// <param name="updateTaskMemory"></param> + /// <param name="numberOfRetryInRecovery"></param> /// <returns></returns> protected IMRUJobDefinition CreateIMRUJobDefinitionBuilder(int numberofMappers, int chunkSize, int numIterations, int dim, int mapperMemory, - int updateTaskMemory) + int updateTaskMemory, + int numberOfRetryInRecovery) { var updateFunctionConfig = TangFactory.GetTang().NewConfigurationBuilder(BuildUpdateFunctionConfig()) @@ -221,6 +234,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU .SetNumberOfMappers(numberofMappers) .SetMapperMemory(mapperMemory) .SetUpdateTaskMemory(updateTaskMemory) + .SetMaxRetryNumberInRecovery(numberOfRetryInRecovery) .Build(); } @@ -309,5 +323,50 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU GenericType<IntArraySumReduceFunction>.Class) .Build(); } + + /// <summary> + /// This class contains handlers for log purpose only + /// </summary> + protected sealed class MessageLogger : + IObserver<ICompletedTask>, + IObserver<IFailedEvaluator>, + IObserver<IFailedTask>, + IObserver<IRunningTask> + { + [Inject] + private MessageLogger() + { + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnNext(ICompletedTask value) + { + Logger.Log(Level.Info, CompletedTaskMessage + " " + value.Id + " " + value.ActiveContext.EvaluatorId); + } + + public void OnNext(IFailedTask value) + { + Logger.Log(Level.Info, FailedTaskMessage + " " + value.Id + " " + value.GetActiveContext().Value.EvaluatorId); + } + + public void OnNext(IFailedEvaluator value) + { + Logger.Log(Level.Info, FailedEvaluatorMessage + " " + value.Id + " " + (value.FailedTask.IsPresent() ? value.FailedTask.Value.Id : "no task")); + } + + public void OnNext(IRunningTask value) + { + Logger.Log(Level.Info, RunningTaskMessage + " " + value.Id + " " + value.ActiveContext.EvaluatorId); + } + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceWithoutIMRUClientTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceWithoutIMRUClientTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceWithoutIMRUClientTest.cs index c00f44d..eaa405f 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceWithoutIMRUClientTest.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceWithoutIMRUClientTest.cs @@ -38,8 +38,9 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU int mapperMemory = 5120; int updateTaskMemory = 5120; int numTasks = 4; + int numberOfRetryInRecovery = 4; string testFolder = DefaultRuntimeFolder + TestId; - TestBroadCastAndReduce(false, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory, testFolder); + TestBroadCastAndReduce(false, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory, numberOfRetryInRecovery, testFolder); ValidateSuccessForLocalRuntime(numTasks, 0, 0, testFolder); CleanUp(testFolder); } @@ -56,7 +57,8 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU int mapperMemory = 5120; int updateTaskMemory = 5120; int numTasks = 4; - TestBroadCastAndReduce(false, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory); + int numberOfRetryInRecovery = 4; + TestBroadCastAndReduce(false, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory, numberOfRetryInRecovery); } /// <summary> @@ -82,6 +84,8 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) .Set(REEF.Driver.DriverConfiguration.OnContextFailed, GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnTaskRunning, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) .Set(REEF.Driver.DriverConfiguration.OnTaskFailed, GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) .Set(REEF.Driver.DriverConfiguration.CustomTraceLevel, TraceLevel.Info.ToString()) http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs index b462438..068aa50 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs @@ -34,23 +34,19 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU { /// <summary> /// This is to test close event handler in IMRU tasks - /// The test provide IRunningTask, IFailedTask and ICompletedTask handlers so that to trigger close events and handle the + /// The test provide IRunningTask, IFailedTask and ICompletedTask handlers so that to trigger close events and handle the /// failed tasks and completed tasks /// </summary> [Collection("FunctionalTests")] public class IMRUCloseTaskTest : IMRUBrodcastReduceTestBase { - private const string CompletedTaskMessage = "CompletedTaskMessage"; - private const string FailEvaluatorMessage = "FailEvaluatorMessage"; - private const string FailTaskMessage = "FailTaskMessage"; - /// <summary> /// This test is for running in local runtime /// It sends close event for all the running tasks. - /// In the task close handler, the cancellation token will be set, and as a result tasks will return from the Call() + /// In the task close handler, the cancellation token will be set, and as a result tasks will return from the Call() /// method and driver will receive ICompletedTask. - /// In the exceptional case, task might throw exception from Call() method, as a result, driver will receive IFailedTask. - /// Expect number of CompletedTask and FailedTask equals to the total number of tasks. No failed Evaluator. + /// In the exceptional case, task might throw exception from Call() method, as a result, driver will receive IFailedTask. + /// Expect number of CompletedTask and FailedTask equals to the total number of tasks. No failed Evaluator. /// </summary> [Fact] public void TestTaskCloseOnLocalRuntime() @@ -61,11 +57,12 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU const int mapperMemory = 512; const int updateTaskMemory = 512; const int numTasks = 4; + const int numOfRetryInRecovery = 4; var testFolder = DefaultRuntimeFolder + TestId; - TestBroadCastAndReduce(false, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory, testFolder); + TestBroadCastAndReduce(false, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory, numOfRetryInRecovery, testFolder); string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 120); var completedCount = GetMessageCount(lines, CompletedTaskMessage); - var failedCount = GetMessageCount(lines, FailTaskMessage); + var failedCount = GetMessageCount(lines, FailedTaskMessage); Assert.Equal(numTasks, completedCount + failedCount); CleanUp(testFolder); } @@ -83,12 +80,13 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU const int mapperMemory = 512; const int updateTaskMemory = 512; const int numTasks = 4; - TestBroadCastAndReduce(true, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory); + const int numOfRetryInRecovery = 4; + TestBroadCastAndReduce(true, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory, numOfRetryInRecovery); } /// <summary> /// This method overrides base class method and defines its own event handlers for driver. - /// It uses its own RunningTaskHandler, FailedEvaluatorHandler and CompletedTaskHandler, FailedTaskHandler so that to simulate the test scenarios + /// It uses its own RunningTaskHandler, FailedEvaluatorHandler and CompletedTaskHandler, FailedTaskHandler so that to simulate the test scenarios /// and verify the test result. /// Rest of the event handlers use those from IMRUDriver. In IActiveContext handler in IMRUDriver, IMRU tasks are bound for the test. /// </summary> @@ -190,7 +188,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU { lock (_lock) { - Logger.Log(Level.Info, FailTaskMessage + value.Id); + Logger.Log(Level.Info, FailedTaskMessage + value.Id); CloseRunningTasks(); value.GetActiveContext().Value.Dispose(); } @@ -202,7 +200,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU /// <param name="value"></param> public void OnNext(IFailedEvaluator value) { - throw new Exception(FailEvaluatorMessage); + throw new Exception(FailedEvaluatorMessage); } /// <summary> http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs new file mode 100644 index 0000000..af02405 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs @@ -0,0 +1,173 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.IMRU.API; +using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce; +using TaskIdsToFail = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail; +using FailureType = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.FailureType; +using TestSenderMapFunction = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TestSenderMapFunction; +using Org.Apache.REEF.IMRU.OnREEF.Driver; +using Org.Apache.REEF.IMRU.OnREEF.Parameters; +using Org.Apache.REEF.Network; +using Org.Apache.REEF.Tang.Implementations.Configuration; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Xunit; +using TraceLevel = System.Diagnostics.TraceLevel; + +namespace Org.Apache.REEF.Tests.Functional.IMRU +{ + [Collection("FunctionalTests")] + public class TestFailMapperEvaluators : IMRUBrodcastReduceTestBase + { + protected const int NumberOfRetry = 3; + + /// <summary> + /// This test is to fail one evaluator and then try to resubmit. In the last retry, + /// there will be no failed evaluator and all tasks will be successfully completed. + /// </summary> + [Fact] + public virtual void TestFailedMapperOnLocalRuntime() + { + int chunkSize = 2; + int dims = 100; + int iterations = 200; + int mapperMemory = 5120; + int updateTaskMemory = 5120; + int numTasks = 9; + string testFolder = DefaultRuntimeFolder + TestId; + TestBroadCastAndReduce(false, + numTasks, + chunkSize, + dims, + iterations, + mapperMemory, + updateTaskMemory, + NumberOfRetry, + testFolder); + string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 240); + var completedTaskCount = GetMessageCount(lines, CompletedTaskMessage); + var runningTaskCount = GetMessageCount(lines, RunningTaskMessage); + var failedEvaluatorCount = GetMessageCount(lines, FailedEvaluatorMessage); + var failedTaskCount = GetMessageCount(lines, FailedTaskMessage); + + // on each try each task should fail or complete or disappear with failed evaluator + // and on each try all tasks should start successfully + Assert.Equal((NumberOfRetry + 1) * numTasks, completedTaskCount + failedEvaluatorCount + failedTaskCount); + Assert.Equal((NumberOfRetry + 1) * numTasks, runningTaskCount); + CleanUp(testFolder); + } + + /// <summary> + /// This test is for the normal scenarios of IMRUDriver and IMRUTasks on yarn + /// </summary> + [Fact(Skip = "Requires Yarn")] + public virtual void TestFailedMapperOnYarn() + { + int chunkSize = 2; + int dims = 100; + int iterations = 200; + int mapperMemory = 5120; + int updateTaskMemory = 5120; + int numTasks = 4; + TestBroadCastAndReduce(true, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory); + } + + /// <summary> + /// This method defines event handlers for driver. As default, it uses all the handlers defined in IMRUDriver. + /// </summary> + /// <typeparam name="TMapInput"></typeparam> + /// <typeparam name="TMapOutput"></typeparam> + /// <typeparam name="TResult"></typeparam> + /// <typeparam name="TPartitionType"></typeparam> + /// <returns></returns> + protected override IConfiguration DriverEventHandlerConfigurations<TMapInput, TMapOutput, TResult, TPartitionType>() + { + return REEF.Driver.DriverConfiguration.ConfigurationModule + .Set(REEF.Driver.DriverConfiguration.OnEvaluatorAllocated, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnDriverStarted, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnContextActive, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnTaskCompleted, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnEvaluatorFailed, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnContextFailed, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnTaskFailed, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnTaskRunning, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnTaskCompleted, GenericType<MessageLogger>.Class) + .Set(REEF.Driver.DriverConfiguration.OnTaskFailed, GenericType<MessageLogger>.Class) + .Set(REEF.Driver.DriverConfiguration.OnTaskRunning, GenericType<MessageLogger>.Class) + .Set(REEF.Driver.DriverConfiguration.OnEvaluatorFailed, GenericType<MessageLogger>.Class) + .Set(REEF.Driver.DriverConfiguration.CustomTraceLevel, TraceLevel.Verbose.ToString()) + .Build(); + } + + /// <summary> + /// Mapper function configuration. Subclass can override it to have its own test function. + /// </summary> + /// <returns></returns> + protected override IConfiguration BuildMapperFunctionConfig() + { + var c1 = IMRUMapConfiguration<int[], int[]>.ConfigurationModule + .Set(IMRUMapConfiguration<int[], int[]>.MapFunction, + GenericType<TestSenderMapFunction>.Class) + .Build(); + + var c2 = TangFactory.GetTang().NewConfigurationBuilder() + .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-2-") + .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-") + .BindIntNamedParam<FailureType>(FailureType.EvaluatorFailureDuringTaskExecution.ToString()) + .BindNamedParameter(typeof(MaxRetryNumberInRecovery), NumberOfRetry.ToString()) + .Build(); + + return Configurations.Merge(c1, c2, GetTcpConfiguration()); + } + + /// <summary> + /// Update function configuration. Subclass can override it to have its own test function. + /// </summary> + /// <returns></returns> + protected override IConfiguration BuildUpdateFunctionConfig() + { + var c = IMRUUpdateConfiguration<int[], int[], int[]>.ConfigurationModule + .Set(IMRUUpdateConfiguration<int[], int[], int[]>.UpdateFunction, + GenericType<BroadcastSenderReduceReceiverUpdateFunction>.Class) + .Build(); + + return Configurations.Merge(c, GetTcpConfiguration()); + } + + /// <summary> + /// Override default setting for retry policy + /// </summary> + /// <returns></returns> + private IConfiguration GetTcpConfiguration() + { + return TcpClientConfigurationModule.ConfigurationModule + .Set(TcpClientConfigurationModule.MaxConnectionRetry, "200") + .Set(TcpClientConfigurationModule.SleepTime, "1000") + .Build(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs new file mode 100644 index 0000000..6fecb2c --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.IMRU.API; +using Org.Apache.REEF.IMRU.OnREEF.Parameters; +using TaskIdsToFail = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail; +using FailureType = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.FailureType; +using TestSenderMapFunction = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TestSenderMapFunction; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Xunit; + +namespace Org.Apache.REEF.Tests.Functional.IMRU +{ + [Collection("FunctionalTests")] + public class TestFailMapperEvaluatorsOnInit : TestFailMapperEvaluators + { + /// <summary> + /// This test is to throw exceptions in two tasks. In the first try, there is task app failure, + /// and no retries will be done. + /// </summary> + [Fact(Skip = "Times out at high timeout for RetryCountWaitingForRegistration; disabling until this parameter is configurable in test.")] + public override void TestFailedMapperOnLocalRuntime() + { + int chunkSize = 2; + int dims = 100; + int iterations = 200; + int mapperMemory = 5120; + int updateTaskMemory = 5120; + int numTasks = 9; + string testFolder = DefaultRuntimeFolder + TestId; + TestBroadCastAndReduce(false, + numTasks, + chunkSize, + dims, + iterations, + mapperMemory, + updateTaskMemory, + NumberOfRetry, + testFolder); + string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 360); + var completedTaskCount = GetMessageCount(lines, "Received ICompletedTask"); + var failedEvaluatorCount = GetMessageCount(lines, FailedEvaluatorMessage); + var failedTaskCount = GetMessageCount(lines, FailedTaskMessage); + + // on each try each task should fail or complete or disappear with failed evaluator + // not all tasks will start successfully, so not checking this + Assert.Equal((NumberOfRetry + 1) * numTasks, completedTaskCount + failedEvaluatorCount + failedTaskCount); + CleanUp(testFolder); + } + + /// <summary> + /// Mapper function configuration. Subclass can override it to have its own test function. + /// </summary> + /// <returns></returns> + protected override IConfiguration BuildMapperFunctionConfig() + { + var c = IMRUMapConfiguration<int[], int[]>.ConfigurationModule + .Set(IMRUMapConfiguration<int[], int[]>.MapFunction, + GenericType<TestSenderMapFunction>.Class) + .Build(); + + return TangFactory.GetTang().NewConfigurationBuilder(c) + .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-2-") + .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-") + .BindIntNamedParam<FailureType>(FailureType.EvaluatorFailureDuringTaskInitialization.ToString()) + .BindNamedParameter(typeof(MaxRetryNumberInRecovery), NumberOfRetry.ToString()) + .Build(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs new file mode 100644 index 0000000..dc998fc --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.IMRU.API; +using TaskIdsToFail = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail; +using FailureType = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.FailureType; +using TestSenderMapFunction = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TestSenderMapFunction; +using Org.Apache.REEF.IMRU.OnREEF.Parameters; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Xunit; + +namespace Org.Apache.REEF.Tests.Functional.IMRU +{ + [Collection("FunctionalTests")] + public class TestFailMapperTasks : TestFailMapperEvaluators + { + /// <summary> + /// This test is to throw exceptions in two tasks. In the first try, there is task app failure, + /// and no retries will be done. + /// </summary> + [Fact] + public override void TestFailedMapperOnLocalRuntime() + { + int chunkSize = 2; + int dims = 100; + int iterations = 200; + int mapperMemory = 5120; + int updateTaskMemory = 5120; + int numTasks = 9; + string testFolder = DefaultRuntimeFolder + TestId; + TestBroadCastAndReduce(false, + numTasks, + chunkSize, + dims, + iterations, + mapperMemory, + updateTaskMemory, + NumberOfRetry, + testFolder); + string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 240); + var completedTaskCount = GetMessageCount(lines, "Received ICompletedTask"); + var runningTaskCount = GetMessageCount(lines, RunningTaskMessage); + var failedEvaluatorCount = GetMessageCount(lines, FailedEvaluatorMessage); + var failedTaskCount = GetMessageCount(lines, FailedTaskMessage); + + // on each try each task should fail or complete + // there shoould be no failed evaluators + // and on each try all tasks should start successfully + Assert.Equal((NumberOfRetry + 1) * numTasks, completedTaskCount + failedTaskCount); + Assert.Equal(0, failedEvaluatorCount); + Assert.Equal((NumberOfRetry + 1) * numTasks, runningTaskCount); + CleanUp(testFolder); + } + + /// <summary> + /// Mapper function configuration. Subclass can override it to have its own test function. + /// </summary> + /// <returns></returns> + protected override IConfiguration BuildMapperFunctionConfig() + { + var c = IMRUMapConfiguration<int[], int[]>.ConfigurationModule + .Set(IMRUMapConfiguration<int[], int[]>.MapFunction, + GenericType<TestSenderMapFunction>.Class) + .Build(); + + return TangFactory.GetTang().NewConfigurationBuilder(c) + .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-2-") + .BindSetEntry<TaskIdsToFail, string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-") + .BindIntNamedParam<FailureType>(FailureType.TaskFailureDuringTaskExecution.ToString()) + .BindNamedParameter(typeof(MaxRetryNumberInRecovery), NumberOfRetry.ToString()) + .Build(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluator.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluator.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluator.cs new file mode 100644 index 0000000..cf16e25 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluator.cs @@ -0,0 +1,236 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.IMRU.API; +using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce; +using FailureType = Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.FailureType; +using Org.Apache.REEF.IMRU.OnREEF.Driver; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; +using Xunit; +using TraceLevel = System.Diagnostics.TraceLevel; + +namespace Org.Apache.REEF.Tests.Functional.IMRU +{ + [Collection("FunctionalTests")] + public class TestFailUpdateEvaluator : IMRUBrodcastReduceTestBase + { + private const int NumberOfRetry = 3; + + /// <summary> + /// This test is to fail update evaluator and then try to resubmit. We don't recover from update evaluator failure. + /// </summary> + [Fact] + public virtual void TestFailedUpdateOnLocalRuntime() + { + int chunkSize = 2; + int dims = 100; + int iterations = 200; + int mapperMemory = 5120; + int updateTaskMemory = 5120; + int numTasks = 9; + string testFolder = DefaultRuntimeFolder + TestId; + TestBroadCastAndReduce(false, + numTasks, + chunkSize, + dims, + iterations, + mapperMemory, + updateTaskMemory, + NumberOfRetry, + testFolder); + string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 240); + var completedTaskCount = GetMessageCount(lines, "Received ICompletedTask"); + var runningTaskCount = GetMessageCount(lines, RunningTaskMessage); + var failedEvaluatorCount = GetMessageCount(lines, FailedEvaluatorMessage); + var failedTaskCount = GetMessageCount(lines, FailedTaskMessage); + + // there should be one try with each task either completing or disappearing with failed evaluator + // no task failures + // and on this try all tasks should start successfully + Assert.Equal(numTasks, completedTaskCount + failedEvaluatorCount); + Assert.Equal(0, failedTaskCount); + Assert.Equal(numTasks, runningTaskCount); + CleanUp(testFolder); + } + + /// <summary> + /// This test is for the normal scenarios of IMRUDriver and IMRUTasks on yarn + /// </summary> + [Fact(Skip = "Requires Yarn")] + public virtual void TestFailedUpdateOnYarn() + { + int chunkSize = 2; + int dims = 100; + int iterations = 200; + int mapperMemory = 5120; + int updateTaskMemory = 5120; + int numTasks = 4; + TestBroadCastAndReduce(true, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory); + } + + /// <summary> + /// This method defines event handlers for driver. As default, it uses all the handlers defined in IMRUDriver. + /// </summary> + /// <typeparam name="TMapInput"></typeparam> + /// <typeparam name="TMapOutput"></typeparam> + /// <typeparam name="TResult"></typeparam> + /// <typeparam name="TPartitionType"></typeparam> + /// <returns></returns> + protected override IConfiguration DriverEventHandlerConfigurations<TMapInput, TMapOutput, TResult, TPartitionType>() + { + return REEF.Driver.DriverConfiguration.ConfigurationModule + .Set(REEF.Driver.DriverConfiguration.OnEvaluatorAllocated, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnDriverStarted, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnContextActive, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnTaskCompleted, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnEvaluatorFailed, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnContextFailed, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnTaskFailed, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnTaskRunning, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnTaskCompleted, GenericType<MessageLogger>.Class) + .Set(REEF.Driver.DriverConfiguration.OnTaskFailed, GenericType<MessageLogger>.Class) + .Set(REEF.Driver.DriverConfiguration.OnTaskRunning, GenericType<MessageLogger>.Class) + .Set(REEF.Driver.DriverConfiguration.OnEvaluatorFailed, GenericType<MessageLogger>.Class) + .Set(REEF.Driver.DriverConfiguration.CustomTraceLevel, TraceLevel.Verbose.ToString()) + .Build(); + } + + /// <summary> + /// Mapper function configuration. Subclass can override it to have its own test function. + /// </summary> + /// <returns></returns> + protected override IConfiguration BuildUpdateFunctionConfig() + { + var c = IMRUUpdateConfiguration<int[], int[], int[]>.ConfigurationModule + .Set(IMRUUpdateConfiguration<int[], int[], int[]>.UpdateFunction, + GenericType<TestUpdateFunction>.Class) + .Build(); + + return TangFactory.GetTang().NewConfigurationBuilder(c) + .BindIntNamedParam<FailureType>(FailureType.EvaluatorFailureDuringTaskExecution.ToString()) + .Build(); + } + + internal sealed class TestUpdateFunction : IUpdateFunction<int[], int[], int[]> + { + private int _iterations; + private readonly int _maxIters; + private readonly int _dim; + private readonly int[] _intArr; + private readonly int _workers; + private readonly string _taskId; + private int _failureType; + + [Inject] + private TestUpdateFunction( + [Parameter(typeof(BroadcastReduceConfiguration.NumberOfIterations))] int maxIters, + [Parameter(typeof(BroadcastReduceConfiguration.Dimensions))] int dim, + [Parameter(typeof(BroadcastReduceConfiguration.NumWorkers))] int numWorkers, + [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId, + [Parameter(typeof(FailureType))] int failureType) + { + _maxIters = maxIters; + _iterations = 0; + _dim = dim; + _intArr = new int[_dim]; + _workers = numWorkers; + _taskId = taskId; + _failureType = failureType; + Logger.Log(Level.Info, "TestUpdateFunction: TaskId: {0}", _taskId); + Logger.Log(Level.Info, "Failure type: {0} failure", FailureType.IsEvaluatorFailure(_failureType) ? "evaluator" : "task"); + } + + /// <summary> + /// Update function + /// </summary> + /// <param name="input">integer array</param> + /// <returns>The same integer array</returns> + UpdateResult<int[], int[]> IUpdateFunction<int[], int[], int[]>.Update(int[] input) + { + if (input[0] != (_iterations + 1) * _workers) + { + Exceptions.Throw(new Exception("Expected input to update functon not same as actual input"), Logger); + } + + _iterations++; + Logger.Log(Level.Info, "Received value {0} in iteration {1}", input[0], _iterations); + MakeException(); + + if (_iterations < _maxIters) + { + for (int i = 0; i < _dim; i++) + { + _intArr[i] = _iterations + 1; + } + + return UpdateResult<int[], int[]>.AnotherRound(_intArr); + } + + return UpdateResult<int[], int[]>.Done(input); + } + + /// <summary> + /// Initialize function. Sends integer array with value 1 to all mappers + /// </summary> + /// <returns>Map input</returns> + UpdateResult<int[], int[]> IUpdateFunction<int[], int[], int[]>.Initialize() + { + for (int i = 0; i < _dim; i++) + { + _intArr[i] = _iterations + 1; + } + + return UpdateResult<int[], int[]>.AnotherRound(_intArr); + } + + private void MakeException() + { + if (_iterations == 10 && !_taskId.EndsWith("-" + NumberOfRetry)) + { + Logger.Log(Level.Warning, "Simulating {0} failure for taskId {1}", + FailureType.IsEvaluatorFailure(_failureType) ? "evaluator" : "task", + _taskId); + if (FailureType.IsEvaluatorFailure(_failureType)) + { + // simulate evaluator failure + Environment.Exit(1); + } + else + { + // simulate task failure + throw new ArgumentNullException("Simulating task failure"); + } + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj index 8856fca..d043ab1 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj @@ -124,6 +124,10 @@ under the License. <Compile Include="Functional\IMRU\IMRUBrodcastReduceTestBase.cs" /> <Compile Include="Functional\IMRU\IMRUCloseTaskTest.cs" /> <Compile Include="Functional\IMRU\IMRUMapperCountTest.cs" /> + <Compile Include="Functional\IMRU\TestFailMapperEvaluatorsOnInit.cs" /> + <Compile Include="Functional\IMRU\TestFailUpdateEvaluator.cs" /> + <Compile Include="Functional\IMRU\TestFailMapperTasks.cs" /> + <Compile Include="Functional\IMRU\TestFailMapperEvaluators.cs" /> <Compile Include="Functional\IMRU\TestTaskExceptions.cs" /> <Compile Include="Functional\Messaging\TestContextMessageSourceAndHandler.cs" /> <Compile Include="Functional\Messaging\TestMessageEventManager.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs index 4a8a048..572b245 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs @@ -162,8 +162,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl } catch (Exception e) { - Logger.Log(Level.Warning, "In Read function unable to read the message."); - Exceptions.CaughtAndThrow(e, Level.Error, Logger); + Logger.Log(Level.Warning, "In StreamingLink::Read function unable to read the message {0}.", e.GetType()); throw; } } @@ -186,8 +185,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl } catch (Exception e) { - Logger.Log(Level.Warning, "In ReadAsync function unable to read the message."); - Exceptions.CaughtAndThrow(e, Level.Error, Logger); + Logger.Log(Level.Warning, "In StreamingLink::ReadAsync function unable to read the message, {0}.", e.GetType()); throw; } } http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs index 0ec4c8a..cca8abd 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs @@ -19,6 +19,7 @@ using System; using System.Net; using System.Threading; using System.Threading.Tasks; +using Org.Apache.REEF.Utilities.AsyncUtils; using Org.Apache.REEF.Utilities.Diagnostics; using Org.Apache.REEF.Utilities.Logging; using Org.Apache.REEF.Wake.StreamingCodec; @@ -68,7 +69,15 @@ namespace Org.Apache.REEF.Wake.Remote.Impl : this(remoteEndpoint, streamingCodec, clientFactory) { _observer = observer; - Task.Factory.StartNew(() => ResponseLoop(), TaskCreationOptions.LongRunning); + try + { + Task.Factory.StartNew(() => ResponseLoop(), TaskCreationOptions.LongRunning); + } + catch (Exception e) + { + Logger.Log(Level.Warning, "StreamingTransportClient get exception from ResponseLoop: {0}.", e.GetType()); + throw e; + } } /// <summary> @@ -111,16 +120,24 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// </summary> private async Task ResponseLoop() { - while (!_cancellationSource.IsCancellationRequested) + try { - T message = await _link.ReadAsync(_cancellationSource.Token); - if (message == null) + while (!_cancellationSource.IsCancellationRequested) { - break; - } + T message = await _link.ReadAsync(_cancellationSource.Token); + if (message == null) + { + break; + } - TransportEvent<T> transportEvent = new TransportEvent<T>(message, _link); - _observer.OnNext(transportEvent); + TransportEvent<T> transportEvent = new TransportEvent<T>(message, _link); + _observer.OnNext(transportEvent); + } + } + catch (Exception e) + { + Logger.Log(Level.Warning, "StreamingTransportClient get exception in ResponseLoop: {0}.", e.GetType()); + throw e; } } } http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs index 706fac4..6a9ea1a 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs @@ -162,7 +162,9 @@ namespace Org.Apache.REEF.Wake.Remote.Impl { TcpClient client = await _listener.AcceptTcpClientAsync().ConfigureAwait(false); ProcessClient(client).LogAndIgnoreExceptionIfAny( - LOGGER, "Task Exception observed processing client in StreamingTransportServer.", Level.Warning); + LOGGER, + "StreamingTransportServer observed Task Exception during client processing.", + Level.Warning); } } catch (InvalidOperationException) @@ -173,6 +175,11 @@ namespace Org.Apache.REEF.Wake.Remote.Impl { LOGGER.Log(Level.Info, "StreamingTransportServer has been closed."); } + catch (Exception e) + { + LOGGER.Log(Level.Warning, "StreamingTransportServer got exception: {0}.", e.GetType()); + throw e; + } } /// <summary> @@ -181,24 +188,32 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// <param name="client">The connected client</param> private async Task ProcessClient(TcpClient client) { - // Keep reading messages from client until they disconnect or timeout CancellationToken token = _cancellationSource.Token; - using (ILink<T> link = new StreamingLink<T>(client, _streamingCodec)) + try { - while (!token.IsCancellationRequested) + // Keep reading messages from client until they disconnect or timeout + using (ILink<T> link = new StreamingLink<T>(client, _streamingCodec)) { - T message = await link.ReadAsync(token); - - if (message == null) + while (!token.IsCancellationRequested) { - break; - } + T message = await link.ReadAsync(token); - TransportEvent<T> transportEvent = new TransportEvent<T>(message, link); - _remoteObserver.OnNext(transportEvent); + if (message == null) + { + break; + } + + TransportEvent<T> transportEvent = new TransportEvent<T>(message, link); + _remoteObserver.OnNext(transportEvent); + } + LOGGER.Log(Level.Error, + "ProcessClient close the Link. IsCancellationRequested: " + token.IsCancellationRequested); } - LOGGER.Log(Level.Error, - "ProcessClient close the Link. IsCancellationRequested: " + token.IsCancellationRequested); + } + catch (Exception e) + { + LOGGER.Log(Level.Warning, "StreamingTransportServer get exception in ProcessClient: {0}, IsCancellationRequested {1}.", e.GetType(), token.IsCancellationRequested); + throw e; } } } http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/ConnectionRetryCount.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/ConnectionRetryCount.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/ConnectionRetryCount.cs index 934983f..75087d5 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/ConnectionRetryCount.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/ConnectionRetryCount.cs @@ -19,7 +19,7 @@ using Org.Apache.REEF.Tang.Annotations; namespace Org.Apache.REEF.Wake.Remote.Parameters { - [NamedParameter("Number of retries for connecting to endpoint", defaultValue: "20")] + [NamedParameter("Number of retries for connecting to endpoint", defaultValue: "200")] public sealed class ConnectionRetryCount : Name<int> { }
