Repository: reef
Updated Branches:
  refs/heads/master 50075c244 -> 798f2dbb0


[REEF-1408] Adding IMRU functional tests

* Create IMRU test bases class
* Add test subclass to test IMRUDriver and IMRUTasks
* Add test subclass to test IMRU task close event handler

JIRA:
  [REEF-1408](https://issues.apache.org/jira/browse/REEF-1408)

Pull request:
  This closes #1021


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/798f2dbb
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/798f2dbb
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/798f2dbb

Branch: refs/heads/master
Commit: 798f2dbb075107330c8be575182a28d8fa19e89f
Parents: 50075c2
Author: Julia Wang <[email protected]>
Authored: Thu Jun 2 00:06:15 2016 -0700
Committer: Mariia Mykhailova <[email protected]>
Committed: Wed Jun 8 16:52:33 2016 -0700

----------------------------------------------------------------------
 .../Properties/AssemblyInfo.cs                  |   7 +
 .../IMRU/IMRUBrodcastReduceTestBase.cs          | 312 +++++++++++++++++++
 .../IMRUBrodcastReduceWithoutIMRUClientTest.cs  |  91 ++++++
 .../Functional/IMRU/IMRUCloseTaskTest.cs        | 177 +++++++++++
 .../Functional/ReefFunctionalTest.cs            |  24 +-
 .../Org.Apache.REEF.Tests.csproj                |   7 +
 6 files changed, 616 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/798f2dbb/lang/cs/Org.Apache.REEF.IMRU.Examples/Properties/AssemblyInfo.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/Properties/AssemblyInfo.cs 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/Properties/AssemblyInfo.cs
index 8d27883..d5690d7 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Examples/Properties/AssemblyInfo.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/Properties/AssemblyInfo.cs
@@ -51,3 +51,10 @@ using System.Runtime.InteropServices;
 // [assembly: AssemblyVersion("1.0.*")]
 [assembly: AssemblyVersion("0.16.0.0")]
 [assembly: AssemblyFileVersion("0.16.0.0")]
+
+// Allow the tests project access to `internal` APIs
+[assembly: InternalsVisibleTo("Org.Apache.REEF.Tests, publickey=" +
+ 
"00240000048000009400000006020000002400005253413100040000010001005df3e621d886a9"
 +
+ 
"9c03469d0f93a9f5d45aa2c883f50cd158759e93673f759ec4657fd84cc79d2db38ef1a2d914cc"
 +
+ 
"b7c717846a897e11dd22eb260a7ce2da2dccf0263ea63e2b3f7dac24f28882aa568ef544341d17"
 +
+ 
"618392a1095f4049ad079d4f4f0b429bb535699155fd6a7652ec7d6c1f1ba2b560f11ef3a86b5945d288cf")]

http://git-wip-us.apache.org/repos/asf/reef/blob/798f2dbb/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
new file mode 100644
index 0000000..16d927b
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
@@ -0,0 +1,312 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Globalization;
+using System.IO;
+using System.Linq;
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using Org.Apache.REEF.IO.PartitionedData.Random;
+using 
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks;
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
+
+namespace Org.Apache.REEF.Tests.Functional.IMRU
+{
+    /// <summary>
+    /// IMRU test base class that defines basic configurations for IMRU driver 
that can be shared by subclasses. 
+    /// </summary>
+    public abstract class IMRUBrodcastReduceTestBase : ReefFunctionalTest
+    {
+        protected static readonly Logger Logger = 
Logger.GetLogger(typeof(IMRUBrodcastReduceTestBase));
+        private const string IMRUJobName = "IMRUBroadcastReduce";
+
+        /// <summary>
+        /// Abstract method for subclass to override it to provide 
configurations for driver handlers 
+        /// </summary>
+        /// <typeparam name="TMapInput"></typeparam>
+        /// <typeparam name="TMapOutput"></typeparam>
+        /// <typeparam name="TResult"></typeparam>
+        /// <typeparam name="TPartitionType"></typeparam>
+        /// <returns></returns>
+        protected abstract IConfiguration 
DriverEventHandlerConfigurations<TMapInput, TMapOutput, TResult, 
TPartitionType>();
+
+        /// <summary>
+        /// This method provides a default way to call TestRun. 
+        /// It gets driver configurations from base class, including the 
DriverEventHandlerConfigurations defined by subclass,
+        /// then calls TestRun for running the test.
+        /// Subclass can override it if they have different parameters for the 
test
+        /// </summary>
+        /// <param name="runOnYarn"></param>
+        /// <param name="numTasks"></param>
+        /// <param name="chunkSize"></param>
+        /// <param name="dims"></param>
+        /// <param name="iterations"></param>
+        /// <param name="mapperMemory"></param>
+        /// <param name="updateTaskMemory"></param>
+        /// <param name="testFolder"></param>
+        protected void TestBroadCastAndReduce(bool runOnYarn,
+            int numTasks,
+            int chunkSize,
+            int dims,
+            int iterations,
+            int mapperMemory,
+            int updateTaskMemory,
+            string testFolder = DefaultRuntimeFolder)
+        {
+            string runPlatform = runOnYarn ? "yarn" : "local";
+            TestRun(DriverConfiguration<int[], int[], int[], Stream>(
+                CreateIMRUJobDefinitionBuilder(numTasks - 1, chunkSize, 
iterations, dims, mapperMemory, updateTaskMemory),
+                DriverEventHandlerConfigurations<int[], int[], int[], 
Stream>()),
+                typeof(BroadcastReduceDriver),
+                numTasks,
+                "BroadcastReduceDriver",
+                runPlatform,
+                testFolder);
+        }
+
+        /// <summary>
+        /// Build driver configuration
+        /// </summary>
+        /// <typeparam name="TMapInput"></typeparam>
+        /// <typeparam name="TMapOutput"></typeparam>
+        /// <typeparam name="TResult"></typeparam>
+        /// <typeparam name="TPartitionType"></typeparam>
+        /// <param name="jobDefinition"></param>
+        /// <param name="driverHandlerConfig"></param>
+        /// <returns></returns>
+        private IConfiguration DriverConfiguration<TMapInput, TMapOutput, 
TResult, TPartitionType>(
+            IMRUJobDefinition jobDefinition,
+            IConfiguration driverHandlerConfig)
+        {
+            string driverId = string.Format("IMRU-{0}-Driver", 
jobDefinition.JobName);
+            IConfiguration overallPerMapConfig = null;
+            var configurationSerializer = new AvroConfigurationSerializer();
+
+            try
+            {
+                overallPerMapConfig = 
Configurations.Merge(jobDefinition.PerMapConfigGeneratorConfig.ToArray());
+            }
+            catch (Exception e)
+            {
+                Exceptions.Throw(e, "Issues in merging PerMapCOnfigGenerator 
configurations", Logger);
+            }
+
+            var imruDriverConfiguration = 
TangFactory.GetTang().NewConfigurationBuilder(new[]
+            {
+                driverHandlerConfig,
+                CreateGroupCommunicationConfiguration<TMapInput, TMapOutput, 
TResult, TPartitionType>(jobDefinition.NumberOfMappers + 1,
+                    driverId),
+                jobDefinition.PartitionedDatasetConfiguration,
+                overallPerMapConfig
+            })
+                .BindNamedParameter(typeof(SerializedMapConfiguration),
+                    
configurationSerializer.ToString(jobDefinition.MapFunctionConfiguration))
+                .BindNamedParameter(typeof(SerializedUpdateConfiguration),
+                    
configurationSerializer.ToString(jobDefinition.UpdateFunctionConfiguration))
+                
.BindNamedParameter(typeof(SerializedMapInputCodecConfiguration),
+                    
configurationSerializer.ToString(jobDefinition.MapInputCodecConfiguration))
+                
.BindNamedParameter(typeof(SerializedMapInputPipelineDataConverterConfiguration),
+                    
configurationSerializer.ToString(jobDefinition.MapInputPipelineDataConverterConfiguration))
+                
.BindNamedParameter(typeof(SerializedUpdateFunctionCodecsConfiguration),
+                    
configurationSerializer.ToString(jobDefinition.UpdateFunctionCodecsConfiguration))
+                
.BindNamedParameter(typeof(SerializedMapOutputPipelineDataConverterConfiguration),
+                    
configurationSerializer.ToString(jobDefinition.MapOutputPipelineDataConverterConfiguration))
+                .BindNamedParameter(typeof(SerializedReduceConfiguration),
+                    
configurationSerializer.ToString(jobDefinition.ReduceFunctionConfiguration))
+                
.BindNamedParameter(typeof(SerializedResultHandlerConfiguration),
+                    
configurationSerializer.ToString(jobDefinition.ResultHandlerConfiguration))
+                .BindNamedParameter(typeof(MemoryPerMapper),
+                    
jobDefinition.MapperMemory.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter(typeof(MemoryForUpdateTask),
+                    
jobDefinition.UpdateTaskMemory.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter(typeof(CoresPerMapper),
+                    
jobDefinition.MapTaskCores.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter(typeof(CoresForUpdateTask),
+                    
jobDefinition.UpdateTaskCores.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter(typeof(InvokeGC),
+                    
jobDefinition.InvokeGarbageCollectorAfterIteration.ToString(CultureInfo.InvariantCulture))
+                .Build();
+            return imruDriverConfiguration;
+        }
+
+        /// <summary>
+        /// Create group communication configuration
+        /// </summary>
+        /// <typeparam name="TMapInput"></typeparam>
+        /// <typeparam name="TMapOutput"></typeparam>
+        /// <typeparam name="TResult"></typeparam>
+        /// <typeparam name="TPartitionType"></typeparam>
+        /// <param name="numberOfTasks"></param>
+        /// <param name="driverId"></param>
+        /// <returns></returns>
+        private IConfiguration 
CreateGroupCommunicationConfiguration<TMapInput, TMapOutput, TResult, 
TPartitionType>(
+            int numberOfTasks,
+            string driverId)
+        {
+            return TangFactory.GetTang().NewConfigurationBuilder()
+                
.BindStringNamedParam<GroupCommConfigurationOptions.DriverId>(driverId)
+                
.BindStringNamedParam<GroupCommConfigurationOptions.MasterTaskId>(IMRUConstants.UpdateTaskName)
+                
.BindStringNamedParam<GroupCommConfigurationOptions.GroupName>(IMRUConstants.CommunicationGroupName)
+                
.BindIntNamedParam<GroupCommConfigurationOptions.FanOut>(IMRUConstants.TreeFanout.ToString(CultureInfo.InvariantCulture))
+                
.BindIntNamedParam<GroupCommConfigurationOptions.NumberOfTasks>(numberOfTasks.ToString(CultureInfo.InvariantCulture))
+                .BindImplementation(GenericType<IGroupCommDriver>.Class, 
GenericType<GroupCommDriver>.Class)
+                .Build();
+        }
+
+        /// <summary>
+        /// Create IMRU Job Definition with IMRU required configurations
+        /// </summary>
+        /// <param name="numberofMappers"></param>
+        /// <param name="chunkSize"></param>
+        /// <param name="numIterations"></param>
+        /// <param name="dim"></param>
+        /// <param name="mapperMemory"></param>
+        /// <param name="updateTaskMemory"></param>
+        /// <returns></returns>
+        protected IMRUJobDefinition CreateIMRUJobDefinitionBuilder(int 
numberofMappers,
+            int chunkSize,
+            int numIterations,
+            int dim,
+            int mapperMemory,
+            int updateTaskMemory)
+        {
+            var updateFunctionConfig =
+                
TangFactory.GetTang().NewConfigurationBuilder(BuildUpdateFunctionConfig())
+                    
.BindNamedParameter(typeof(BroadcastReduceConfiguration.NumberOfIterations),
+                        numIterations.ToString(CultureInfo.InvariantCulture))
+                    
.BindNamedParameter(typeof(BroadcastReduceConfiguration.Dimensions),
+                        dim.ToString(CultureInfo.InvariantCulture))
+                    
.BindNamedParameter(typeof(BroadcastReduceConfiguration.NumWorkers),
+                        numberofMappers.ToString(CultureInfo.InvariantCulture))
+                    .Build();
+
+            return new IMRUJobDefinitionBuilder()
+                .SetMapFunctionConfiguration(BuildMapperFunctionConfig())
+                .SetUpdateFunctionConfiguration(updateFunctionConfig)
+                .SetMapInputCodecConfiguration(BuildMapInputCodecConfig())
+                
.SetUpdateFunctionCodecsConfiguration(BuildUpdateFunctionCodecsConfig())
+                .SetReduceFunctionConfiguration(BuildReduceFunctionConfig())
+                
.SetMapInputPipelineDataConverterConfiguration(BuildDataConverterConfig(chunkSize))
+                
.SetMapOutputPipelineDataConverterConfiguration(BuildDataConverterConfig(chunkSize))
+                
.SetPartitionedDatasetConfiguration(BuildPartitionedDatasetConfiguration(numberofMappers))
+                .SetJobName(IMRUJobName)
+                .SetNumberOfMappers(numberofMappers)
+                .SetMapperMemory(mapperMemory)
+                .SetUpdateTaskMemory(updateTaskMemory)
+                .Build();
+        }
+
+        /// <summary>
+        ///  Data Converter Configuration. Subclass can override it to have 
its own test Data Converter.
+        /// </summary>
+        /// <param name="chunkSize"></param>
+        /// <returns></returns>
+        protected virtual IConfiguration BuildDataConverterConfig(int 
chunkSize)
+        {
+            return TangFactory.GetTang()
+                
.NewConfigurationBuilder(IMRUPipelineDataConverterConfiguration<int[]>.ConfigurationModule
+                    
.Set(IMRUPipelineDataConverterConfiguration<int[]>.MapInputPiplelineDataConverter,
+                        GenericType<PipelineIntDataConverter>.Class).Build())
+                
.BindNamedParameter(typeof(BroadcastReduceConfiguration.ChunkSize),
+                    chunkSize.ToString(CultureInfo.InvariantCulture))
+                .Build();
+        }
+
+        /// <summary>
+        /// Mapper function configuration. Subclass can override it to have 
its own test function.
+        /// </summary>
+        /// <returns></returns>
+        protected virtual IConfiguration BuildMapperFunctionConfig()
+        {
+            return IMRUMapConfiguration<int[], int[]>.ConfigurationModule
+                .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
+                    
GenericType<BroadcastReceiverReduceSenderMapFunction>.Class)
+                .Build();
+        }
+
+        /// <summary>
+        /// Update function configuration. Subclass can override it to have 
its own test function.
+        /// </summary>
+        /// <returns></returns>
+        protected virtual IConfiguration BuildUpdateFunctionConfig()
+        {
+            return IMRUUpdateConfiguration<int[], int[], 
int[]>.ConfigurationModule
+                .Set(IMRUUpdateConfiguration<int[], int[], 
int[]>.UpdateFunction,
+                    
GenericType<BroadcastSenderReduceReceiverUpdateFunction>.Class)
+                .Build();
+        }
+
+        /// <summary>
+        /// Partition dataset configuration. Subclass can override it to have 
its own test dataset config
+        /// </summary>
+        /// <param name="numberofMappers"></param>
+        /// <returns></returns>
+        protected virtual IConfiguration 
BuildPartitionedDatasetConfiguration(int numberofMappers)
+        {
+            return RandomInputDataConfiguration.ConfigurationModule.Set(
+                RandomInputDataConfiguration.NumberOfPartitions,
+                numberofMappers.ToString()).Build();
+        }
+
+        /// <summary>
+        /// Map Input Codec configuration. Subclass can override it to have 
its own test Codec.
+        /// </summary>
+        /// <returns></returns>
+        protected virtual IConfiguration BuildMapInputCodecConfig()
+        {
+            return IMRUCodecConfiguration<int[]>.ConfigurationModule
+                .Set(IMRUCodecConfiguration<int[]>.Codec, 
GenericType<IntArrayStreamingCodec>.Class)
+                .Build();
+        }
+
+        /// <summary>
+        /// Update function Codec configuration. Subclass can override it to 
have its own test Codec.
+        /// </summary>
+        /// <returns></returns>
+        protected virtual IConfiguration BuildUpdateFunctionCodecsConfig()
+        {
+            return IMRUCodecConfiguration<int[]>.ConfigurationModule
+                .Set(IMRUCodecConfiguration<int[]>.Codec, 
GenericType<IntArrayStreamingCodec>.Class)
+                .Build();
+        }
+
+        /// <summary>
+        /// Reduce function configuration. Subclass can override it to have 
its own test function.
+        /// </summary>
+        /// <returns></returns>
+        protected virtual IConfiguration BuildReduceFunctionConfig()
+        {
+            return IMRUReduceFunctionConfiguration<int[]>.ConfigurationModule
+                .Set(IMRUReduceFunctionConfiguration<int[]>.ReduceFunction,
+                    GenericType<IntArraySumReduceFunction>.Class)
+                .Build();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/798f2dbb/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceWithoutIMRUClientTest.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceWithoutIMRUClientTest.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceWithoutIMRUClientTest.cs
new file mode 100644
index 0000000..c00f44d
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceWithoutIMRUClientTest.cs
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Xunit;
+using TraceLevel = System.Diagnostics.TraceLevel;
+
+namespace Org.Apache.REEF.Tests.Functional.IMRU
+{
+    [Collection("FunctionalTests")]
+    public class IMRUBrodcastReduceWithoutIMRUClientTest : 
IMRUBrodcastReduceTestBase
+    {
+        /// <summary>
+        /// This test is for the normal scenarios of IMRUDriver and IMRUTasks 
on local runtime
+        /// </summary>
+        [Fact]
+        public void TestWithHandlersInIMRUDriverOnLocalRuntime()
+        {
+            int chunkSize = 2;
+            int dims = 10;
+            int iterations = 10;
+            int mapperMemory = 5120;
+            int updateTaskMemory = 5120;
+            int numTasks = 4;
+            string testFolder = DefaultRuntimeFolder + TestId;
+            TestBroadCastAndReduce(false, numTasks, chunkSize, dims, 
iterations, mapperMemory, updateTaskMemory, testFolder);
+            ValidateSuccessForLocalRuntime(numTasks, 0, 0, testFolder);
+            CleanUp(testFolder);
+        }
+
+        /// <summary>
+        /// This test is for the normal scenarios of IMRUDriver and IMRUTasks 
on yarn
+        /// </summary>
+        [Fact(Skip = "Requires Yarn")]
+        public void TestWithHandlersInIMRUDriverOnYarn()
+        {
+            int chunkSize = 2;
+            int dims = 10;
+            int iterations = 10;
+            int mapperMemory = 5120;
+            int updateTaskMemory = 5120;
+            int numTasks = 4;
+            TestBroadCastAndReduce(false, numTasks, chunkSize, dims, 
iterations, mapperMemory, updateTaskMemory);
+        }
+
+        /// <summary>
+        /// This method defines event handlers for driver. As default, it uses 
all the handlers defined in IMRUDriver.
+        /// </summary>
+        /// <typeparam name="TMapInput"></typeparam>
+        /// <typeparam name="TMapOutput"></typeparam>
+        /// <typeparam name="TResult"></typeparam>
+        /// <typeparam name="TPartitionType"></typeparam>
+        /// <returns></returns>
+        protected override IConfiguration 
DriverEventHandlerConfigurations<TMapInput, TMapOutput, TResult, 
TPartitionType>()
+        {
+            return REEF.Driver.DriverConfiguration.ConfigurationModule
+                .Set(REEF.Driver.DriverConfiguration.OnEvaluatorAllocated,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnDriverStarted,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnContextActive,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskCompleted,
+                     GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnEvaluatorFailed,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnContextFailed,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskFailed,
+                     GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.CustomTraceLevel, 
TraceLevel.Info.ToString())
+                .Build();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/798f2dbb/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
new file mode 100644
index 0000000..2c766f2
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+using Xunit;
+using TraceLevel = System.Diagnostics.TraceLevel;
+
+namespace Org.Apache.REEF.Tests.Functional.IMRU
+{
+    /// <summary>
+    /// This is to test close event handler in IMRU tasks
+    /// The test provide IRunningTask, IFailedTask and ICOmpletedTask handlers 
so that to trigger close events and handle the 
+    /// failed tasks and completed tasks
+    /// </summary>
+    [Collection("FunctionalTests")]
+    public class IMRUCloseTaskTest : IMRUBrodcastReduceTestBase
+    {
+        private const string CompletedTaskMessage = "CompletedTaskMessage";
+        private const string FailTaskMessage = "FailTaskMessage";
+
+        /// <summary>
+        /// This test is for running in local runtime
+        /// It sends close event for all the running tasks.
+        /// It first informs the Call method to stop.
+        /// If Call method is running properly, it will respect to this flag 
and will return properly, that will end up ICompletedTask event.
+        ////If Call method is hung some where and cannot be returned, the 
close handler will throw exception, that would cause IFailedTask event.
+        /// As we are testing IMRU Task not a test task, the behavior is not 
deterministic. It can be CompletedTask or FailedTask
+        /// No matter how the task is closed, the total number of completed 
task and failed task should be equal to the 
+        /// total number of the tasks.
+        /// </summary>
+        [Fact]
+        public void TestTaskCloseOnLocalRuntime()
+        {
+            int chunkSize = 2;
+            int dims = 50;
+            int iterations = 200;
+            int mapperMemory = 5120;
+            int updateTaskMemory = 5120;
+            int numTasks = 4;
+            string testFolder = DefaultRuntimeFolder + TestId;
+            TestBroadCastAndReduce(false, numTasks, chunkSize, dims, 
iterations, mapperMemory, updateTaskMemory, testFolder);
+            string[] lines = ReadLogFile(DriverStdout, "driver", testFolder);
+            int failedCount = GetMessageCount(lines, FailTaskMessage);
+            int completedCount = GetMessageCount(lines, CompletedTaskMessage);
+            Assert.Equal(numTasks, failedCount + completedCount);
+            CleanUp(testFolder);
+        }
+
+        /// <summary>
+        /// Same testing for running on YARN
+        /// It sends close event for all the running tasks.
+        /// It first informs the Call method to stop.
+        /// If Call method is running properly, it will respect to this flag 
and will return properly, that will end up ICompletedTask event.
+        ////If Call method is hung some where and cannot be returned, the 
close handler will throw exception, that would cause IFailedTask event.
+        /// As we are testing IMRU Task not a test task, the behavior is not 
deterministic. It can be CompletedTask or FailedTask
+        /// No matter how the task is closed, the total number of completed 
task and failed task should be equal to the 
+        /// total number of the tasks.
+        /// </summary>
+        [Fact(Skip = "Requires Yarn")]
+        public void TestTaskCloseOnLocalRuntimeOnYarn()
+        {
+            int chunkSize = 2;
+            int dims = 50;
+            int iterations = 200;
+            int mapperMemory = 5120;
+            int updateTaskMemory = 5120;
+            int numTasks = 4;
+            TestBroadCastAndReduce(true, numTasks, chunkSize, dims, 
iterations, mapperMemory, updateTaskMemory);
+        }
+
+        /// <summary>
+        /// This method overrides base class method and defines its own event 
handlers for driver. 
+        /// It uses its own RunningTaskHandler, FailedTaskHandler and 
CompletedTaskHandler so that to simulate the test scenarios 
+        /// and verify the test result. 
+        /// Rest of the event handlers use those from IMRUDriver. In 
IActiveContext handler in IMRUDriver, IMRU tasks are bound for the test.
+        /// </summary>
+        /// <typeparam name="TMapInput"></typeparam>
+        /// <typeparam name="TMapOutput"></typeparam>
+        /// <typeparam name="TResult"></typeparam>
+        /// <typeparam name="TPartitionType"></typeparam>
+        /// <returns></returns>
+        protected override IConfiguration 
DriverEventHandlerConfigurations<TMapInput, TMapOutput, TResult, 
TPartitionType>()
+        {
+            return REEF.Driver.DriverConfiguration.ConfigurationModule
+                .Set(REEF.Driver.DriverConfiguration.OnEvaluatorAllocated,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnDriverStarted,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnContextActive,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskCompleted,
+                    GenericType<TestHandlers>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnEvaluatorFailed,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnContextFailed,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskFailed,
+                    GenericType<TestHandlers>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskRunning,
+                    GenericType<TestHandlers>.Class)
+                .Set(REEF.Driver.DriverConfiguration.CustomTraceLevel, 
TraceLevel.Info.ToString())
+                .Build();
+        }
+
+        /// <summary>
+        /// Test handlers
+        /// </summary>
+        internal sealed class TestHandlers : IObserver<IRunningTask>, 
IObserver<IFailedTask>, IObserver<ICompletedTask>
+        {
+            [Inject]
+            private TestHandlers()
+            {
+            }
+
+            /// <summary>
+            /// Log the task id and dispose the context
+            /// </summary>
+            public void OnNext(IRunningTask value)
+            {
+                Logger.Log(Level.Info, "Received running task, closing it" + 
value.Id);
+                
value.Dispose(ByteUtilities.StringToByteArrays(TaskManager.CloseTaskByDriver));
+            }
+
+            /// <summary>
+            /// Validate the event and dispose the context
+            /// </summary>
+            /// <param name="value"></param>
+            public void OnNext(IFailedTask value)
+            {
+                Logger.Log(Level.Info, FailTaskMessage + value.Id);
+                var failedExeption = 
ByteUtilities.ByteArraysToString(value.Data.Value);
+                Assert.Contains(TaskManager.TaskKilledByDriver, 
failedExeption);
+                value.GetActiveContext().Value.Dispose();
+            }
+
+            /// <summary>
+            /// Log the task id and dispose the context
+            /// </summary>
+            public void OnNext(ICompletedTask value)
+            {
+                Logger.Log(Level.Info, CompletedTaskMessage + value.Id);
+                value.ActiveContext.Dispose();
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/798f2dbb/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
index 90487e0..d71f20d 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
@@ -28,6 +28,7 @@ using Microsoft.WindowsAzure.Storage.Blob;
 using Org.Apache.REEF.Client.API;
 using Org.Apache.REEF.Client.Local;
 using Org.Apache.REEF.Client.Yarn;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
 using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Utilities;
@@ -40,6 +41,8 @@ namespace Org.Apache.REEF.Tests.Functional
 {
     public class ReefFunctionalTest : IDisposable
     {
+        private readonly static Logger Logger = 
Logger.GetLogger(typeof(ReefFunctionalTest));
+
         protected const string DriverStdout = "driver.stdout";
         protected const string DriverStderr = "driver.stderr";
         protected const string EvaluatorStdout = "evaluator.stdout";
@@ -51,8 +54,9 @@ namespace Org.Apache.REEF.Tests.Functional
         private const string Local = "local";
         private const string YARN = "yarn";
         private const int SleepTime = 1000;
+        private const string PortRangeStart = "8900";
+        private const string PortRangeCount = "1000";
 
-        private readonly static Logger Logger = 
Logger.GetLogger(typeof(ReefFunctionalTest));
         private const string StorageAccountKeyEnvironmentVariable = 
"REEFTestStorageAccountKey";
         private const string StorageAccountNameEnvironmentVariable = 
"REEFTestStorageAccountName";
         private bool _testSuccess = false;
@@ -171,6 +175,17 @@ namespace Org.Apache.REEF.Tests.Functional
         }
 
         /// <summary>
+        /// Get message counts from lines given
+        /// </summary>
+        /// <param name="lines"></param>
+        /// <param name="message"></param>
+        /// <returns></returns>
+        protected int GetMessageCount(string[] lines, string message)
+        {
+            return lines.Where(s => s.Contains(message)).ToArray().Length;
+        }
+
+        /// <summary>
         /// See <see cref="ValidateMessageSuccessfullyLogged"/> for detail. 
This function is <see cref="ValidateMessageSuccessfullyLogged"/>
         /// for the driver log.
         /// </summary>
@@ -351,7 +366,12 @@ namespace Org.Apache.REEF.Tests.Functional
                         .Set(LocalRuntimeClientConfiguration.RuntimeFolder, 
dir)
                         .Build();
                 case YARN:
-                    return YARNClientConfiguration.ConfigurationModule.Build();
+                    var yarnClientConfig = 
YARNClientConfiguration.ConfigurationModule.Build();
+                    var tcpPortConfig = 
TcpPortConfigurationModule.ConfigurationModule
+                       .Set(TcpPortConfigurationModule.PortRangeStart, 
PortRangeStart)
+                       .Set(TcpPortConfigurationModule.PortRangeCount, 
PortRangeCount)
+                       .Build();
+                    return Configurations.Merge(yarnClientConfig, 
tcpPortConfig);
                 default:
                     throw new Exception("Unknown runtime: " + runOnYarn);
             }

http://git-wip-us.apache.org/repos/asf/reef/blob/798f2dbb/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj 
b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index ea695d4..1c6949a 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -101,6 +101,9 @@ under the License.
     <Compile Include="Functional\FaultTolerant\TestResubmitEvaluator.cs" />
     <Compile Include="Functional\FaultTolerant\TestResubmitTask.cs" />
     <Compile Include="Functional\IMRU\IMRUBroadcastReduceTest.cs" />
+    <Compile 
Include="Functional\IMRU\IMRUBrodcastReduceWithoutIMRUClientTest.cs" />
+    <Compile Include="Functional\IMRU\IMRUBrodcastReduceTestBase.cs" />
+    <Compile Include="Functional\IMRU\IMRUCloseTaskTest.cs" />
     <Compile Include="Functional\IMRU\IMRUMapperCountTest.cs" />
     <Compile Include="Functional\IMRU\TestTaskExceptions.cs" />
     <Compile 
Include="Functional\Messaging\TestContextMessageSourceAndHandler.cs" />
@@ -188,6 +191,10 @@ under the License.
       <Project>{cc797c57-b465-4d11-98ac-edaaef5899a6}</Project>
       <Name>Org.Apache.REEF.IMRU</Name>
     </ProjectReference>
+    <ProjectReference 
Include="$(SolutionDir)\Org.Apache.REEF.IO\Org.Apache.REEF.IO.csproj">
+      <Project>{dec0f0a8-dbef-4ebf-b69c-e2369c15abf1}</Project>
+      <Name>Org.Apache.REEF.IO</Name>
+    </ProjectReference>
   </ItemGroup>
   <ItemGroup>
     <Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />

Reply via email to