Repository: reef Updated Branches: refs/heads/master 50075c244 -> 798f2dbb0
[REEF-1408] Adding IMRU functional tests * Create IMRU test bases class * Add test subclass to test IMRUDriver and IMRUTasks * Add test subclass to test IMRU task close event handler JIRA: [REEF-1408](https://issues.apache.org/jira/browse/REEF-1408) Pull request: This closes #1021 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/798f2dbb Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/798f2dbb Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/798f2dbb Branch: refs/heads/master Commit: 798f2dbb075107330c8be575182a28d8fa19e89f Parents: 50075c2 Author: Julia Wang <[email protected]> Authored: Thu Jun 2 00:06:15 2016 -0700 Committer: Mariia Mykhailova <[email protected]> Committed: Wed Jun 8 16:52:33 2016 -0700 ---------------------------------------------------------------------- .../Properties/AssemblyInfo.cs | 7 + .../IMRU/IMRUBrodcastReduceTestBase.cs | 312 +++++++++++++++++++ .../IMRUBrodcastReduceWithoutIMRUClientTest.cs | 91 ++++++ .../Functional/IMRU/IMRUCloseTaskTest.cs | 177 +++++++++++ .../Functional/ReefFunctionalTest.cs | 24 +- .../Org.Apache.REEF.Tests.csproj | 7 + 6 files changed, 616 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/798f2dbb/lang/cs/Org.Apache.REEF.IMRU.Examples/Properties/AssemblyInfo.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/Properties/AssemblyInfo.cs index 8d27883..d5690d7 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Examples/Properties/AssemblyInfo.cs +++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/Properties/AssemblyInfo.cs @@ -51,3 +51,10 @@ using System.Runtime.InteropServices; // [assembly: AssemblyVersion("1.0.*")] [assembly: AssemblyVersion("0.16.0.0")] [assembly: AssemblyFileVersion("0.16.0.0")] + +// Allow the tests project access to `internal` APIs +[assembly: InternalsVisibleTo("Org.Apache.REEF.Tests, publickey=" + + "00240000048000009400000006020000002400005253413100040000010001005df3e621d886a9" + + "9c03469d0f93a9f5d45aa2c883f50cd158759e93673f759ec4657fd84cc79d2db38ef1a2d914cc" + + "b7c717846a897e11dd22eb260a7ce2da2dccf0263ea63e2b3f7dac24f28882aa568ef544341d17" + + "618392a1095f4049ad079d4f4f0b429bb535699155fd6a7652ec7d6c1f1ba2b560f11ef3a86b5945d288cf")] http://git-wip-us.apache.org/repos/asf/reef/blob/798f2dbb/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs new file mode 100644 index 0000000..16d927b --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs @@ -0,0 +1,312 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Globalization; +using System.IO; +using System.Linq; +using Org.Apache.REEF.IMRU.API; +using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce; +using Org.Apache.REEF.IMRU.OnREEF.Driver; +using Org.Apache.REEF.IMRU.OnREEF.Parameters; +using Org.Apache.REEF.IO.PartitionedData.Random; +using Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks; +using Org.Apache.REEF.Network.Group.Config; +using Org.Apache.REEF.Network.Group.Driver; +using Org.Apache.REEF.Network.Group.Driver.Impl; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Implementations.Configuration; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs; + +namespace Org.Apache.REEF.Tests.Functional.IMRU +{ + /// <summary> + /// IMRU test base class that defines basic configurations for IMRU driver that can be shared by subclasses. + /// </summary> + public abstract class IMRUBrodcastReduceTestBase : ReefFunctionalTest + { + protected static readonly Logger Logger = Logger.GetLogger(typeof(IMRUBrodcastReduceTestBase)); + private const string IMRUJobName = "IMRUBroadcastReduce"; + + /// <summary> + /// Abstract method for subclass to override it to provide configurations for driver handlers + /// </summary> + /// <typeparam name="TMapInput"></typeparam> + /// <typeparam name="TMapOutput"></typeparam> + /// <typeparam name="TResult"></typeparam> + /// <typeparam name="TPartitionType"></typeparam> + /// <returns></returns> + protected abstract IConfiguration DriverEventHandlerConfigurations<TMapInput, TMapOutput, TResult, TPartitionType>(); + + /// <summary> + /// This method provides a default way to call TestRun. + /// It gets driver configurations from base class, including the DriverEventHandlerConfigurations defined by subclass, + /// then calls TestRun for running the test. + /// Subclass can override it if they have different parameters for the test + /// </summary> + /// <param name="runOnYarn"></param> + /// <param name="numTasks"></param> + /// <param name="chunkSize"></param> + /// <param name="dims"></param> + /// <param name="iterations"></param> + /// <param name="mapperMemory"></param> + /// <param name="updateTaskMemory"></param> + /// <param name="testFolder"></param> + protected void TestBroadCastAndReduce(bool runOnYarn, + int numTasks, + int chunkSize, + int dims, + int iterations, + int mapperMemory, + int updateTaskMemory, + string testFolder = DefaultRuntimeFolder) + { + string runPlatform = runOnYarn ? "yarn" : "local"; + TestRun(DriverConfiguration<int[], int[], int[], Stream>( + CreateIMRUJobDefinitionBuilder(numTasks - 1, chunkSize, iterations, dims, mapperMemory, updateTaskMemory), + DriverEventHandlerConfigurations<int[], int[], int[], Stream>()), + typeof(BroadcastReduceDriver), + numTasks, + "BroadcastReduceDriver", + runPlatform, + testFolder); + } + + /// <summary> + /// Build driver configuration + /// </summary> + /// <typeparam name="TMapInput"></typeparam> + /// <typeparam name="TMapOutput"></typeparam> + /// <typeparam name="TResult"></typeparam> + /// <typeparam name="TPartitionType"></typeparam> + /// <param name="jobDefinition"></param> + /// <param name="driverHandlerConfig"></param> + /// <returns></returns> + private IConfiguration DriverConfiguration<TMapInput, TMapOutput, TResult, TPartitionType>( + IMRUJobDefinition jobDefinition, + IConfiguration driverHandlerConfig) + { + string driverId = string.Format("IMRU-{0}-Driver", jobDefinition.JobName); + IConfiguration overallPerMapConfig = null; + var configurationSerializer = new AvroConfigurationSerializer(); + + try + { + overallPerMapConfig = Configurations.Merge(jobDefinition.PerMapConfigGeneratorConfig.ToArray()); + } + catch (Exception e) + { + Exceptions.Throw(e, "Issues in merging PerMapCOnfigGenerator configurations", Logger); + } + + var imruDriverConfiguration = TangFactory.GetTang().NewConfigurationBuilder(new[] + { + driverHandlerConfig, + CreateGroupCommunicationConfiguration<TMapInput, TMapOutput, TResult, TPartitionType>(jobDefinition.NumberOfMappers + 1, + driverId), + jobDefinition.PartitionedDatasetConfiguration, + overallPerMapConfig + }) + .BindNamedParameter(typeof(SerializedMapConfiguration), + configurationSerializer.ToString(jobDefinition.MapFunctionConfiguration)) + .BindNamedParameter(typeof(SerializedUpdateConfiguration), + configurationSerializer.ToString(jobDefinition.UpdateFunctionConfiguration)) + .BindNamedParameter(typeof(SerializedMapInputCodecConfiguration), + configurationSerializer.ToString(jobDefinition.MapInputCodecConfiguration)) + .BindNamedParameter(typeof(SerializedMapInputPipelineDataConverterConfiguration), + configurationSerializer.ToString(jobDefinition.MapInputPipelineDataConverterConfiguration)) + .BindNamedParameter(typeof(SerializedUpdateFunctionCodecsConfiguration), + configurationSerializer.ToString(jobDefinition.UpdateFunctionCodecsConfiguration)) + .BindNamedParameter(typeof(SerializedMapOutputPipelineDataConverterConfiguration), + configurationSerializer.ToString(jobDefinition.MapOutputPipelineDataConverterConfiguration)) + .BindNamedParameter(typeof(SerializedReduceConfiguration), + configurationSerializer.ToString(jobDefinition.ReduceFunctionConfiguration)) + .BindNamedParameter(typeof(SerializedResultHandlerConfiguration), + configurationSerializer.ToString(jobDefinition.ResultHandlerConfiguration)) + .BindNamedParameter(typeof(MemoryPerMapper), + jobDefinition.MapperMemory.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter(typeof(MemoryForUpdateTask), + jobDefinition.UpdateTaskMemory.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter(typeof(CoresPerMapper), + jobDefinition.MapTaskCores.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter(typeof(CoresForUpdateTask), + jobDefinition.UpdateTaskCores.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter(typeof(InvokeGC), + jobDefinition.InvokeGarbageCollectorAfterIteration.ToString(CultureInfo.InvariantCulture)) + .Build(); + return imruDriverConfiguration; + } + + /// <summary> + /// Create group communication configuration + /// </summary> + /// <typeparam name="TMapInput"></typeparam> + /// <typeparam name="TMapOutput"></typeparam> + /// <typeparam name="TResult"></typeparam> + /// <typeparam name="TPartitionType"></typeparam> + /// <param name="numberOfTasks"></param> + /// <param name="driverId"></param> + /// <returns></returns> + private IConfiguration CreateGroupCommunicationConfiguration<TMapInput, TMapOutput, TResult, TPartitionType>( + int numberOfTasks, + string driverId) + { + return TangFactory.GetTang().NewConfigurationBuilder() + .BindStringNamedParam<GroupCommConfigurationOptions.DriverId>(driverId) + .BindStringNamedParam<GroupCommConfigurationOptions.MasterTaskId>(IMRUConstants.UpdateTaskName) + .BindStringNamedParam<GroupCommConfigurationOptions.GroupName>(IMRUConstants.CommunicationGroupName) + .BindIntNamedParam<GroupCommConfigurationOptions.FanOut>(IMRUConstants.TreeFanout.ToString(CultureInfo.InvariantCulture)) + .BindIntNamedParam<GroupCommConfigurationOptions.NumberOfTasks>(numberOfTasks.ToString(CultureInfo.InvariantCulture)) + .BindImplementation(GenericType<IGroupCommDriver>.Class, GenericType<GroupCommDriver>.Class) + .Build(); + } + + /// <summary> + /// Create IMRU Job Definition with IMRU required configurations + /// </summary> + /// <param name="numberofMappers"></param> + /// <param name="chunkSize"></param> + /// <param name="numIterations"></param> + /// <param name="dim"></param> + /// <param name="mapperMemory"></param> + /// <param name="updateTaskMemory"></param> + /// <returns></returns> + protected IMRUJobDefinition CreateIMRUJobDefinitionBuilder(int numberofMappers, + int chunkSize, + int numIterations, + int dim, + int mapperMemory, + int updateTaskMemory) + { + var updateFunctionConfig = + TangFactory.GetTang().NewConfigurationBuilder(BuildUpdateFunctionConfig()) + .BindNamedParameter(typeof(BroadcastReduceConfiguration.NumberOfIterations), + numIterations.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter(typeof(BroadcastReduceConfiguration.Dimensions), + dim.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter(typeof(BroadcastReduceConfiguration.NumWorkers), + numberofMappers.ToString(CultureInfo.InvariantCulture)) + .Build(); + + return new IMRUJobDefinitionBuilder() + .SetMapFunctionConfiguration(BuildMapperFunctionConfig()) + .SetUpdateFunctionConfiguration(updateFunctionConfig) + .SetMapInputCodecConfiguration(BuildMapInputCodecConfig()) + .SetUpdateFunctionCodecsConfiguration(BuildUpdateFunctionCodecsConfig()) + .SetReduceFunctionConfiguration(BuildReduceFunctionConfig()) + .SetMapInputPipelineDataConverterConfiguration(BuildDataConverterConfig(chunkSize)) + .SetMapOutputPipelineDataConverterConfiguration(BuildDataConverterConfig(chunkSize)) + .SetPartitionedDatasetConfiguration(BuildPartitionedDatasetConfiguration(numberofMappers)) + .SetJobName(IMRUJobName) + .SetNumberOfMappers(numberofMappers) + .SetMapperMemory(mapperMemory) + .SetUpdateTaskMemory(updateTaskMemory) + .Build(); + } + + /// <summary> + /// Data Converter Configuration. Subclass can override it to have its own test Data Converter. + /// </summary> + /// <param name="chunkSize"></param> + /// <returns></returns> + protected virtual IConfiguration BuildDataConverterConfig(int chunkSize) + { + return TangFactory.GetTang() + .NewConfigurationBuilder(IMRUPipelineDataConverterConfiguration<int[]>.ConfigurationModule + .Set(IMRUPipelineDataConverterConfiguration<int[]>.MapInputPiplelineDataConverter, + GenericType<PipelineIntDataConverter>.Class).Build()) + .BindNamedParameter(typeof(BroadcastReduceConfiguration.ChunkSize), + chunkSize.ToString(CultureInfo.InvariantCulture)) + .Build(); + } + + /// <summary> + /// Mapper function configuration. Subclass can override it to have its own test function. + /// </summary> + /// <returns></returns> + protected virtual IConfiguration BuildMapperFunctionConfig() + { + return IMRUMapConfiguration<int[], int[]>.ConfigurationModule + .Set(IMRUMapConfiguration<int[], int[]>.MapFunction, + GenericType<BroadcastReceiverReduceSenderMapFunction>.Class) + .Build(); + } + + /// <summary> + /// Update function configuration. Subclass can override it to have its own test function. + /// </summary> + /// <returns></returns> + protected virtual IConfiguration BuildUpdateFunctionConfig() + { + return IMRUUpdateConfiguration<int[], int[], int[]>.ConfigurationModule + .Set(IMRUUpdateConfiguration<int[], int[], int[]>.UpdateFunction, + GenericType<BroadcastSenderReduceReceiverUpdateFunction>.Class) + .Build(); + } + + /// <summary> + /// Partition dataset configuration. Subclass can override it to have its own test dataset config + /// </summary> + /// <param name="numberofMappers"></param> + /// <returns></returns> + protected virtual IConfiguration BuildPartitionedDatasetConfiguration(int numberofMappers) + { + return RandomInputDataConfiguration.ConfigurationModule.Set( + RandomInputDataConfiguration.NumberOfPartitions, + numberofMappers.ToString()).Build(); + } + + /// <summary> + /// Map Input Codec configuration. Subclass can override it to have its own test Codec. + /// </summary> + /// <returns></returns> + protected virtual IConfiguration BuildMapInputCodecConfig() + { + return IMRUCodecConfiguration<int[]>.ConfigurationModule + .Set(IMRUCodecConfiguration<int[]>.Codec, GenericType<IntArrayStreamingCodec>.Class) + .Build(); + } + + /// <summary> + /// Update function Codec configuration. Subclass can override it to have its own test Codec. + /// </summary> + /// <returns></returns> + protected virtual IConfiguration BuildUpdateFunctionCodecsConfig() + { + return IMRUCodecConfiguration<int[]>.ConfigurationModule + .Set(IMRUCodecConfiguration<int[]>.Codec, GenericType<IntArrayStreamingCodec>.Class) + .Build(); + } + + /// <summary> + /// Reduce function configuration. Subclass can override it to have its own test function. + /// </summary> + /// <returns></returns> + protected virtual IConfiguration BuildReduceFunctionConfig() + { + return IMRUReduceFunctionConfiguration<int[]>.ConfigurationModule + .Set(IMRUReduceFunctionConfiguration<int[]>.ReduceFunction, + GenericType<IntArraySumReduceFunction>.Class) + .Build(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/798f2dbb/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceWithoutIMRUClientTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceWithoutIMRUClientTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceWithoutIMRUClientTest.cs new file mode 100644 index 0000000..c00f44d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceWithoutIMRUClientTest.cs @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.IMRU.OnREEF.Driver; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Xunit; +using TraceLevel = System.Diagnostics.TraceLevel; + +namespace Org.Apache.REEF.Tests.Functional.IMRU +{ + [Collection("FunctionalTests")] + public class IMRUBrodcastReduceWithoutIMRUClientTest : IMRUBrodcastReduceTestBase + { + /// <summary> + /// This test is for the normal scenarios of IMRUDriver and IMRUTasks on local runtime + /// </summary> + [Fact] + public void TestWithHandlersInIMRUDriverOnLocalRuntime() + { + int chunkSize = 2; + int dims = 10; + int iterations = 10; + int mapperMemory = 5120; + int updateTaskMemory = 5120; + int numTasks = 4; + string testFolder = DefaultRuntimeFolder + TestId; + TestBroadCastAndReduce(false, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory, testFolder); + ValidateSuccessForLocalRuntime(numTasks, 0, 0, testFolder); + CleanUp(testFolder); + } + + /// <summary> + /// This test is for the normal scenarios of IMRUDriver and IMRUTasks on yarn + /// </summary> + [Fact(Skip = "Requires Yarn")] + public void TestWithHandlersInIMRUDriverOnYarn() + { + int chunkSize = 2; + int dims = 10; + int iterations = 10; + int mapperMemory = 5120; + int updateTaskMemory = 5120; + int numTasks = 4; + TestBroadCastAndReduce(false, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory); + } + + /// <summary> + /// This method defines event handlers for driver. As default, it uses all the handlers defined in IMRUDriver. + /// </summary> + /// <typeparam name="TMapInput"></typeparam> + /// <typeparam name="TMapOutput"></typeparam> + /// <typeparam name="TResult"></typeparam> + /// <typeparam name="TPartitionType"></typeparam> + /// <returns></returns> + protected override IConfiguration DriverEventHandlerConfigurations<TMapInput, TMapOutput, TResult, TPartitionType>() + { + return REEF.Driver.DriverConfiguration.ConfigurationModule + .Set(REEF.Driver.DriverConfiguration.OnEvaluatorAllocated, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnDriverStarted, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnContextActive, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnTaskCompleted, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnEvaluatorFailed, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnContextFailed, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnTaskFailed, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.CustomTraceLevel, TraceLevel.Info.ToString()) + .Build(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/798f2dbb/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs new file mode 100644 index 0000000..2c766f2 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs @@ -0,0 +1,177 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.IMRU.OnREEF.Driver; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Logging; +using Xunit; +using TraceLevel = System.Diagnostics.TraceLevel; + +namespace Org.Apache.REEF.Tests.Functional.IMRU +{ + /// <summary> + /// This is to test close event handler in IMRU tasks + /// The test provide IRunningTask, IFailedTask and ICOmpletedTask handlers so that to trigger close events and handle the + /// failed tasks and completed tasks + /// </summary> + [Collection("FunctionalTests")] + public class IMRUCloseTaskTest : IMRUBrodcastReduceTestBase + { + private const string CompletedTaskMessage = "CompletedTaskMessage"; + private const string FailTaskMessage = "FailTaskMessage"; + + /// <summary> + /// This test is for running in local runtime + /// It sends close event for all the running tasks. + /// It first informs the Call method to stop. + /// If Call method is running properly, it will respect to this flag and will return properly, that will end up ICompletedTask event. + ////If Call method is hung some where and cannot be returned, the close handler will throw exception, that would cause IFailedTask event. + /// As we are testing IMRU Task not a test task, the behavior is not deterministic. It can be CompletedTask or FailedTask + /// No matter how the task is closed, the total number of completed task and failed task should be equal to the + /// total number of the tasks. + /// </summary> + [Fact] + public void TestTaskCloseOnLocalRuntime() + { + int chunkSize = 2; + int dims = 50; + int iterations = 200; + int mapperMemory = 5120; + int updateTaskMemory = 5120; + int numTasks = 4; + string testFolder = DefaultRuntimeFolder + TestId; + TestBroadCastAndReduce(false, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory, testFolder); + string[] lines = ReadLogFile(DriverStdout, "driver", testFolder); + int failedCount = GetMessageCount(lines, FailTaskMessage); + int completedCount = GetMessageCount(lines, CompletedTaskMessage); + Assert.Equal(numTasks, failedCount + completedCount); + CleanUp(testFolder); + } + + /// <summary> + /// Same testing for running on YARN + /// It sends close event for all the running tasks. + /// It first informs the Call method to stop. + /// If Call method is running properly, it will respect to this flag and will return properly, that will end up ICompletedTask event. + ////If Call method is hung some where and cannot be returned, the close handler will throw exception, that would cause IFailedTask event. + /// As we are testing IMRU Task not a test task, the behavior is not deterministic. It can be CompletedTask or FailedTask + /// No matter how the task is closed, the total number of completed task and failed task should be equal to the + /// total number of the tasks. + /// </summary> + [Fact(Skip = "Requires Yarn")] + public void TestTaskCloseOnLocalRuntimeOnYarn() + { + int chunkSize = 2; + int dims = 50; + int iterations = 200; + int mapperMemory = 5120; + int updateTaskMemory = 5120; + int numTasks = 4; + TestBroadCastAndReduce(true, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory); + } + + /// <summary> + /// This method overrides base class method and defines its own event handlers for driver. + /// It uses its own RunningTaskHandler, FailedTaskHandler and CompletedTaskHandler so that to simulate the test scenarios + /// and verify the test result. + /// Rest of the event handlers use those from IMRUDriver. In IActiveContext handler in IMRUDriver, IMRU tasks are bound for the test. + /// </summary> + /// <typeparam name="TMapInput"></typeparam> + /// <typeparam name="TMapOutput"></typeparam> + /// <typeparam name="TResult"></typeparam> + /// <typeparam name="TPartitionType"></typeparam> + /// <returns></returns> + protected override IConfiguration DriverEventHandlerConfigurations<TMapInput, TMapOutput, TResult, TPartitionType>() + { + return REEF.Driver.DriverConfiguration.ConfigurationModule + .Set(REEF.Driver.DriverConfiguration.OnEvaluatorAllocated, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnDriverStarted, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnContextActive, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnTaskCompleted, + GenericType<TestHandlers>.Class) + .Set(REEF.Driver.DriverConfiguration.OnEvaluatorFailed, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnContextFailed, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnTaskFailed, + GenericType<TestHandlers>.Class) + .Set(REEF.Driver.DriverConfiguration.OnTaskRunning, + GenericType<TestHandlers>.Class) + .Set(REEF.Driver.DriverConfiguration.CustomTraceLevel, TraceLevel.Info.ToString()) + .Build(); + } + + /// <summary> + /// Test handlers + /// </summary> + internal sealed class TestHandlers : IObserver<IRunningTask>, IObserver<IFailedTask>, IObserver<ICompletedTask> + { + [Inject] + private TestHandlers() + { + } + + /// <summary> + /// Log the task id and dispose the context + /// </summary> + public void OnNext(IRunningTask value) + { + Logger.Log(Level.Info, "Received running task, closing it" + value.Id); + value.Dispose(ByteUtilities.StringToByteArrays(TaskManager.CloseTaskByDriver)); + } + + /// <summary> + /// Validate the event and dispose the context + /// </summary> + /// <param name="value"></param> + public void OnNext(IFailedTask value) + { + Logger.Log(Level.Info, FailTaskMessage + value.Id); + var failedExeption = ByteUtilities.ByteArraysToString(value.Data.Value); + Assert.Contains(TaskManager.TaskKilledByDriver, failedExeption); + value.GetActiveContext().Value.Dispose(); + } + + /// <summary> + /// Log the task id and dispose the context + /// </summary> + public void OnNext(ICompletedTask value) + { + Logger.Log(Level.Info, CompletedTaskMessage + value.Id); + value.ActiveContext.Dispose(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/798f2dbb/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs index 90487e0..d71f20d 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs @@ -28,6 +28,7 @@ using Microsoft.WindowsAzure.Storage.Blob; using Org.Apache.REEF.Client.API; using Org.Apache.REEF.Client.Local; using Org.Apache.REEF.Client.Yarn; +using Org.Apache.REEF.Tang.Implementations.Configuration; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Utilities; @@ -40,6 +41,8 @@ namespace Org.Apache.REEF.Tests.Functional { public class ReefFunctionalTest : IDisposable { + private readonly static Logger Logger = Logger.GetLogger(typeof(ReefFunctionalTest)); + protected const string DriverStdout = "driver.stdout"; protected const string DriverStderr = "driver.stderr"; protected const string EvaluatorStdout = "evaluator.stdout"; @@ -51,8 +54,9 @@ namespace Org.Apache.REEF.Tests.Functional private const string Local = "local"; private const string YARN = "yarn"; private const int SleepTime = 1000; + private const string PortRangeStart = "8900"; + private const string PortRangeCount = "1000"; - private readonly static Logger Logger = Logger.GetLogger(typeof(ReefFunctionalTest)); private const string StorageAccountKeyEnvironmentVariable = "REEFTestStorageAccountKey"; private const string StorageAccountNameEnvironmentVariable = "REEFTestStorageAccountName"; private bool _testSuccess = false; @@ -171,6 +175,17 @@ namespace Org.Apache.REEF.Tests.Functional } /// <summary> + /// Get message counts from lines given + /// </summary> + /// <param name="lines"></param> + /// <param name="message"></param> + /// <returns></returns> + protected int GetMessageCount(string[] lines, string message) + { + return lines.Where(s => s.Contains(message)).ToArray().Length; + } + + /// <summary> /// See <see cref="ValidateMessageSuccessfullyLogged"/> for detail. This function is <see cref="ValidateMessageSuccessfullyLogged"/> /// for the driver log. /// </summary> @@ -351,7 +366,12 @@ namespace Org.Apache.REEF.Tests.Functional .Set(LocalRuntimeClientConfiguration.RuntimeFolder, dir) .Build(); case YARN: - return YARNClientConfiguration.ConfigurationModule.Build(); + var yarnClientConfig = YARNClientConfiguration.ConfigurationModule.Build(); + var tcpPortConfig = TcpPortConfigurationModule.ConfigurationModule + .Set(TcpPortConfigurationModule.PortRangeStart, PortRangeStart) + .Set(TcpPortConfigurationModule.PortRangeCount, PortRangeCount) + .Build(); + return Configurations.Merge(yarnClientConfig, tcpPortConfig); default: throw new Exception("Unknown runtime: " + runOnYarn); } http://git-wip-us.apache.org/repos/asf/reef/blob/798f2dbb/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj index ea695d4..1c6949a 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj @@ -101,6 +101,9 @@ under the License. <Compile Include="Functional\FaultTolerant\TestResubmitEvaluator.cs" /> <Compile Include="Functional\FaultTolerant\TestResubmitTask.cs" /> <Compile Include="Functional\IMRU\IMRUBroadcastReduceTest.cs" /> + <Compile Include="Functional\IMRU\IMRUBrodcastReduceWithoutIMRUClientTest.cs" /> + <Compile Include="Functional\IMRU\IMRUBrodcastReduceTestBase.cs" /> + <Compile Include="Functional\IMRU\IMRUCloseTaskTest.cs" /> <Compile Include="Functional\IMRU\IMRUMapperCountTest.cs" /> <Compile Include="Functional\IMRU\TestTaskExceptions.cs" /> <Compile Include="Functional\Messaging\TestContextMessageSourceAndHandler.cs" /> @@ -188,6 +191,10 @@ under the License. <Project>{cc797c57-b465-4d11-98ac-edaaef5899a6}</Project> <Name>Org.Apache.REEF.IMRU</Name> </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.IO\Org.Apache.REEF.IO.csproj"> + <Project>{dec0f0a8-dbef-4ebf-b69c-e2369c15abf1}</Project> + <Name>Org.Apache.REEF.IO</Name> + </ProjectReference> </ItemGroup> <ItemGroup> <Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />
