Repository: incubator-reef
Updated Branches:
  refs/heads/master 7edb8570e -> 0292caf14


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Topology/TaskNode.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/TaskNode.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TaskNode.cs
new file mode 100644
index 0000000..54a46a1
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TaskNode.cs
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Util;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Network.Group.Topology
+{
+    /// <summary>
+    /// Represents a node in the operator topology graph.
+    /// </summary>
+    internal class TaskNode
+    {
+        private string _groupName;
+        private string _operatorName;
+        private string _taskId;
+        private string _driverId;
+
+        private TaskNode _parent;
+        private List<TaskNode> _children;
+
+        public TaskNode(
+            string groupName, 
+            string operatorName, 
+            string taskId, 
+            string driverId)
+        {
+            _groupName = groupName;
+            _operatorName = operatorName;
+            _taskId = taskId;
+            _driverId = driverId;
+
+            _children = new List<TaskNode>();
+        }
+
+        public string TaskId { get; private set; }
+
+        public void AddChild(TaskNode child)
+        {
+            _children.Add(child);
+        }
+
+        public void SetParent(TaskNode parent)
+        {
+            _parent = parent;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj 
b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
index b759c24..fb6e183 100644
--- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
+++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
@@ -95,6 +95,49 @@ under the License.
     <Reference Include="System.Xml" />
   </ItemGroup>
   <ItemGroup>
+    <Compile Include="Group\Codec\GcmMessageProto.cs" />
+    <Compile Include="Group\Codec\GroupCommunicationMessageCodec.cs" />
+    <Compile Include="Group\Config\MpiConfigurationOptions.cs" />
+    <Compile Include="Group\Driver\ICommunicationGroupDriver.cs" />
+    <Compile Include="Group\Driver\IMpiDriver.cs" />
+    <Compile Include="Group\Driver\Impl\CommunicationGroupDriver.cs" />
+    <Compile Include="Group\Driver\Impl\GroupCommunicationMessage.cs" />
+    <Compile Include="Group\Driver\Impl\MessageType.cs" />
+    <Compile Include="Group\Driver\Impl\MpiDriver.cs" />
+    <Compile Include="Group\Driver\Impl\TaskStarter.cs" />
+    <Compile Include="Group\Operators\IBroadcastReceiver.cs" />
+    <Compile Include="Group\Operators\IBroadcastSender.cs" />
+    <Compile Include="Group\Operators\IMpiOperator.cs" />
+    <Compile Include="Group\Operators\Impl\BroadcastOperatorSpec.cs" />
+    <Compile Include="Group\Operators\Impl\BroadcastReceiver.cs" />
+    <Compile Include="Group\Operators\Impl\BroadcastSender.cs" />
+    <Compile Include="Group\Operators\Impl\ReduceFunction.cs" />
+    <Compile Include="Group\Operators\Impl\ReduceOperatorSpec.cs" />
+    <Compile Include="Group\Operators\Impl\ReduceReceiver.cs" />
+    <Compile Include="Group\Operators\Impl\ReduceSender.cs" />
+    <Compile Include="Group\Operators\Impl\ScatterOperatorSpec.cs" />
+    <Compile Include="Group\Operators\Impl\ScatterReceiver.cs" />
+    <Compile Include="Group\Operators\Impl\ScatterSender.cs" />
+    <Compile Include="Group\Operators\Impl\Sender.cs" />
+    <Compile Include="Group\Operators\IOperatorSpec.cs" />
+    <Compile Include="Group\Operators\IReduceFunction.cs" />
+    <Compile Include="Group\Operators\IReduceReceiver.cs" />
+    <Compile Include="Group\Operators\IReduceSender.cs" />
+    <Compile Include="Group\Operators\IScatterReceiver.cs" />
+    <Compile Include="Group\Operators\IScatterSender.cs" />
+    <Compile Include="Group\Task\ICommunicationGroupClient.cs" />
+    <Compile Include="Group\Task\ICommunicationGroupNetworkObserver.cs" />
+    <Compile Include="Group\Task\IMpiClient.cs" />
+    <Compile Include="Group\Task\IMpiNetworkObserver.cs" />
+    <Compile Include="Group\Task\Impl\CommunicationGroupClient.cs" />
+    <Compile Include="Group\Task\Impl\CommunicationGroupNetworkObserver.cs" />
+    <Compile Include="Group\Task\Impl\MpiClient.cs" />
+    <Compile Include="Group\Task\Impl\MpiNetworkObserver.cs" />
+    <Compile Include="Group\Task\Impl\NodeStruct.cs" />
+    <Compile Include="Group\Task\Impl\OperatorTopology.cs" />
+    <Compile Include="Group\Topology\FlatTopology.cs" />
+    <Compile Include="Group\Topology\ITopology.cs" />
+    <Compile Include="Group\Topology\TaskNode.cs" />
     <Compile Include="Naming\Codec\NamingLookupRequestCodec.cs" />
     <Compile Include="Naming\Codec\NamingLookupResponseCodec.cs" />
     <Compile Include="Naming\Codec\NamingRegisterRequestCodec.cs" />

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs
new file mode 100644
index 0000000..8d00e69
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.IO;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Driver.Bridge;
+using Org.Apache.REEF.Examples.MachineLearning.KMeans;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Tests.Functional.ML.KMeans
+{
+    [TestClass]
+    public class TestKMeans : ReefFunctionalTest
+    {
+        private const int K = 3;
+        private const int Partitions = 2;
+        private const string SmallMouseDataFile = @"mouseData_small.csv";
+        private const string MouseDataFile = @"mouseData.csv";
+
+        private bool _useSmallDataSet = false;
+        private string _dataFile = MouseDataFile;
+
+        [TestInitialize()]
+        public void TestSetup()
+        {
+            if (_useSmallDataSet)
+            {
+                _dataFile = SmallMouseDataFile;
+            }  
+
+            CleanUp();
+            Init();
+        }
+
+        [TestCleanup]
+        public void TestCleanup()
+        {
+            Console.WriteLine("Post test check and clean up");
+            CleanUp();
+        }
+
+        [TestMethod, Priority(1), TestCategory("FunctionalGated")]
+        [Description("Test KMeans clustering with things directly run without 
reef")]
+        [DeploymentItem(@".")]
+        [DeploymentItem(@"Data", ".")]
+        [Ignore]
+        [Timeout(180 * 1000)]
+        public void TestKMeansOnDirectRunViaFileSystem()
+        {
+            int iteration = 0;
+            string executionDirectory = 
Path.Combine(Directory.GetCurrentDirectory(), 
Constants.KMeansExecutionBaseDirectory, 
Guid.NewGuid().ToString("N").Substring(0, 4));
+            List<DataVector> centroids = 
DataVector.ShuffleDataAndGetInitialCentriods(_dataFile, Partitions, K, 
executionDirectory);
+            
+            // initialize all tasks
+            List<LegacyKMeansTask> tasks = new List<LegacyKMeansTask>();
+            List<DataVector> labeledData = new List<DataVector>();
+            for (int i = 0; i < Partitions; i++)
+            {
+                DataPartitionCache p = new DataPartitionCache(i, 
executionDirectory);
+                tasks.Add(new LegacyKMeansTask(p, K, executionDirectory));
+                labeledData.AddRange(p.DataVectors);
+            }
+
+            float loss = float.MaxValue;
+            while (true)
+            {
+                for (int i = 0; i < Partitions; i++)
+                {
+                    tasks[i].CallWithWritingToFileSystem(null);
+                }
+                List<DataVector> newCentroids = 
PartialMean.AggregateTrueMeansToFileSystem(Partitions, K, executionDirectory);
+                DataVector.WriteToCentroidFile(newCentroids, 
executionDirectory);
+                centroids = newCentroids;
+                float newLoss = 
LegacyKMeansTask.ComputeLossFunction(centroids, labeledData);
+                if (newLoss > loss)
+                {
+                    throw new InvalidOperationException(
+                        string.Format(CultureInfo.InvariantCulture, "The new 
loss {0} is larger than previous loss {1}, while loss function must be 
monotonically decreasing across iterations", newLoss, loss));
+                }
+                else if (newLoss.Equals(loss))
+                {
+                    
Console.WriteLine(string.Format(CultureInfo.InvariantCulture, "KMeans 
clustering has converged with a loss value of {0} at iteration {1} ", newLoss, 
iteration));
+                    break;
+                }
+                else
+                {
+                    loss = newLoss;
+                }
+                iteration++;
+            }
+        }
+
+        [TestMethod, Priority(1), TestCategory("FunctionalGated")]
+        [Description("Test KMeans clustering on reef local runtime with group 
communications")]
+        [DeploymentItem(@".")]
+        [DeploymentItem(@"Data", ".")]
+        [Ignore]
+        [Timeout(180 * 1000)]
+        public void TestKMeansOnLocalRuntimeWithGroupCommunications()
+        {
+            IsOnLocalRuntiime = true;
+            TestRun(AssembliesToCopy(), DriverConfiguration());
+            ValidateSuccessForLocalRuntime(Partitions + 1);
+        }
+
+        [TestMethod, Priority(1), TestCategory("FunctionalGated")]
+        [Description("Test KMeans clustering on reef YARN runtime - one box")]
+        [DeploymentItem(@".")]
+        [DeploymentItem(@"Data", ".")]
+        [Timeout(180 * 1000)]
+        [Ignore]    // ignored by default
+        public void TestKMeansOnYarnOneBoxWithGroupCommunications()
+        {
+            TestRun(AssembliesToCopy(), DriverConfiguration(), runOnYarn: 
true);
+            Assert.IsNotNull("BreakPointChecker");
+        }
+
+        private IConfiguration DriverConfiguration()
+        {
+            return DriverBridgeConfiguration.ConfigurationModule
+                 .Set(DriverBridgeConfiguration.OnDriverStarted, 
GenericType<KMeansDriverHandlers>.Class)
+                 .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, 
GenericType<KMeansDriverHandlers>.Class)
+                 .Set(DriverBridgeConfiguration.OnEvaluatorRequested, 
GenericType<KMeansDriverHandlers>.Class)
+                 .Set(DriverBridgeConfiguration.OnContextActive, 
GenericType<KMeansDriverHandlers>.Class)
+                 .Set(DriverBridgeConfiguration.CommandLineArguments, 
"DataFile:" + _dataFile)
+                 .Set(DriverBridgeConfiguration.CustomTraceLevel, 
Level.Info.ToString())
+                 .Build();
+        }
+
+        private HashSet<string> AssembliesToCopy()
+        {
+            HashSet<string> appDlls = new HashSet<string>();
+            appDlls.Add(typeof(KMeansDriverHandlers).Assembly.GetName().Name);
+            appDlls.Add(typeof(LegacyKMeansTask).Assembly.GetName().Name);
+            appDlls.Add(typeof(INameClient).Assembly.GetName().Name);
+            appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name);
+            return appDlls;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs
new file mode 100644
index 0000000..7472d56
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.Linq;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Bridge;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Operators.Impl;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote.Impl;
+
+namespace Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest
+{
+    public class BroadcastReduceDriver : IStartHandler, 
IObserver<IEvaluatorRequestor>, IObserver<IAllocatedEvaluator>, 
IObserver<IActiveContext>, IObserver<IFailedEvaluator>
+    {
+        private static readonly Logger LOGGER = 
Logger.GetLogger(typeof(BroadcastReduceDriver));
+
+        private int _numEvaluators;
+        private int _numIterations;
+
+        private IMpiDriver _mpiDriver;
+        private ICommunicationGroupDriver _commGroup;
+        private TaskStarter _mpiTaskStarter;
+
+        [Inject]
+        public BroadcastReduceDriver(
+            [Parameter(typeof(MpiTestConfig.NumEvaluators))] int numEvaluators,
+            [Parameter(typeof(MpiTestConfig.NumIterations))] int numIterations,
+            AvroConfigurationSerializer confSerializer)
+        {
+            Identifier = "BroadcastStartHandler";
+            _numEvaluators = numEvaluators;
+            _numIterations = numIterations;
+
+            _mpiDriver = new MpiDriver(
+                MpiTestConstants.DriverId,
+                MpiTestConstants.MasterTaskId,
+                confSerializer);
+
+            _commGroup = _mpiDriver.NewCommunicationGroup(
+                MpiTestConstants.GroupName, 
+                numEvaluators)
+                    .AddBroadcast(
+                        MpiTestConstants.BroadcastOperatorName,
+                        new BroadcastOperatorSpec<int>(
+                            MpiTestConstants.MasterTaskId,
+                            new IntCodec()))
+                    .AddReduce(
+                        MpiTestConstants.ReduceOperatorName,
+                        new ReduceOperatorSpec<int>(
+                            MpiTestConstants.MasterTaskId,
+                            new IntCodec(), 
+                            new SumFunction()))
+                    .Build();
+
+            _mpiTaskStarter = new TaskStarter(_mpiDriver, numEvaluators);
+
+            CreateClassHierarchy();
+        }
+
+        public string Identifier { get; set; }
+
+        public void OnNext(IEvaluatorRequestor evaluatorRequestor)
+        {
+            EvaluatorRequest request = new EvaluatorRequest(_numEvaluators, 
512, 2, "WonderlandRack", "BroadcastEvaluator");
+            evaluatorRequestor.Submit(request);
+        }
+
+        public void OnNext(IAllocatedEvaluator allocatedEvaluator)
+        {
+            IConfiguration contextConf = _mpiDriver.GetContextConfiguration();
+            IConfiguration serviceConf = _mpiDriver.GetServiceConfiguration();
+            allocatedEvaluator.SubmitContextAndService(contextConf, 
serviceConf);
+        }
+
+        public void OnNext(IActiveContext activeContext)
+        {
+            if (_mpiDriver.IsMasterTaskContext(activeContext))
+            {
+                // Configure Master Task
+                IConfiguration partialTaskConf = 
TangFactory.GetTang().NewConfigurationBuilder(
+                    TaskConfiguration.ConfigurationModule
+                        .Set(TaskConfiguration.Identifier, 
MpiTestConstants.MasterTaskId)
+                        .Set(TaskConfiguration.Task, 
GenericType<MasterTask>.Class)
+                        .Build())
+                    .BindNamedParameter<MpiTestConfig.NumEvaluators, int>(
+                        GenericType<MpiTestConfig.NumEvaluators>.Class,
+                        _numEvaluators.ToString(CultureInfo.InvariantCulture))
+                    .BindNamedParameter<MpiTestConfig.NumIterations, int>(
+                        GenericType<MpiTestConfig.NumIterations>.Class,
+                        _numIterations.ToString(CultureInfo.InvariantCulture))
+                    .Build();
+
+                _commGroup.AddTask(MpiTestConstants.MasterTaskId);
+                _mpiTaskStarter.QueueTask(partialTaskConf, activeContext);
+            }
+            else
+            {
+                // Configure Slave Task
+                string slaveTaskId = "SlaveTask-" + activeContext.Id;
+                IConfiguration partialTaskConf = 
TangFactory.GetTang().NewConfigurationBuilder(
+                    TaskConfiguration.ConfigurationModule
+                        .Set(TaskConfiguration.Identifier, slaveTaskId)
+                        .Set(TaskConfiguration.Task, 
GenericType<SlaveTask>.Class)
+                        .Build())
+                    .BindNamedParameter<MpiTestConfig.NumEvaluators, int>(
+                        GenericType<MpiTestConfig.NumEvaluators>.Class,
+                        _numEvaluators.ToString(CultureInfo.InvariantCulture))
+                    .BindNamedParameter<MpiTestConfig.NumIterations, int>(
+                        GenericType<MpiTestConfig.NumIterations>.Class,
+                        _numIterations.ToString(CultureInfo.InvariantCulture))
+                    .Build();
+
+                _commGroup.AddTask(slaveTaskId);
+                _mpiTaskStarter.QueueTask(partialTaskConf, activeContext);
+            }
+        }
+
+        public void OnNext(IFailedEvaluator value)
+        {
+        }
+
+        public void OnError(Exception error)
+        {
+        }
+
+        public void OnCompleted()
+        {
+        }
+
+        private void CreateClassHierarchy()
+        {
+            HashSet<string> clrDlls = new HashSet<string>();
+            clrDlls.Add(typeof(IDriver).Assembly.GetName().Name);
+            clrDlls.Add(typeof(ITask).Assembly.GetName().Name);
+            clrDlls.Add(typeof(BroadcastReduceDriver).Assembly.GetName().Name);
+            clrDlls.Add(typeof(INameClient).Assembly.GetName().Name);
+            clrDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name);
+
+            ClrHandlerHelper.GenerateClassHierarchy(clrDlls);
+        }
+
+        private class SumFunction : IReduceFunction<int>
+        {
+            [Inject]
+            public SumFunction()
+            {
+            }
+
+            public int Reduce(IEnumerable<int> elements)
+            {
+                return elements.Sum();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs
new file mode 100644
index 0000000..c74c923
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using System.Globalization;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Bridge;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest
+{
+    [TestClass]
+    public class BroadcastReduceTest : ReefFunctionalTest
+    {
+        [TestInitialize]
+        public void TestSetup()
+        {
+            CleanUp();
+        }
+
+        [TestCleanup]
+        public void TestCleanup()
+        {
+            CleanUp();
+        }
+
+        [TestMethod]
+        public void TestBroadcastAndReduce()
+        {
+            int numTasks = 4;
+
+            IConfiguration driverConfig = 
TangFactory.GetTang().NewConfigurationBuilder(
+                DriverBridgeConfiguration.ConfigurationModule
+                    .Set(DriverBridgeConfiguration.OnDriverStarted, 
GenericType<BroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, 
GenericType<BroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnEvaluatorRequested, 
GenericType<BroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnEvaluatorFailed, 
GenericType<BroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnContextActive, 
GenericType<BroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.CustomTraceLevel, 
Level.Info.ToString())
+                    .Build())
+                .BindNamedParameter<MpiTestConfig.NumIterations, int>(
+                    GenericType<MpiTestConfig.NumIterations>.Class,
+                    
MpiTestConstants.NumIterations.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter<MpiTestConfig.NumEvaluators, int>(
+                    GenericType<MpiTestConfig.NumEvaluators>.Class,
+                    numTasks.ToString(CultureInfo.InvariantCulture))
+                .Build();
+                    
+            HashSet<string> appDlls = new HashSet<string>();
+            appDlls.Add(typeof(IDriver).Assembly.GetName().Name);
+            appDlls.Add(typeof(ITask).Assembly.GetName().Name);
+            appDlls.Add(typeof(BroadcastReduceDriver).Assembly.GetName().Name);
+            appDlls.Add(typeof(INameClient).Assembly.GetName().Name);
+            appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name);
+
+            TestRun(appDlls, driverConfig, false, JavaLoggingSetting.VERBOSE);
+            ValidateSuccessForLocalRuntime(numTasks);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/MasterTask.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/MasterTask.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/MasterTask.cs
new file mode 100644
index 0000000..78f8840
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/MasterTask.cs
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Linq;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest
+{
+    public class MasterTask : ITask
+    {
+        private static Logger _logger = Logger.GetLogger(typeof(MasterTask));
+
+        private int _numIters;
+        private int _numReduceSenders;
+
+        private IMpiClient _mpiClient;
+        private ICommunicationGroupClient _commGroup;
+        private IBroadcastSender<int> _broadcastSender;
+        private IReduceReceiver<int> _sumReducer;
+
+        [Inject]
+        public MasterTask(
+            [Parameter(typeof(MpiTestConfig.NumIterations))] int numIters,
+            [Parameter(typeof(MpiTestConfig.NumEvaluators))] int numEvaluators,
+            IMpiClient mpiClient)
+        {
+            _logger.Log(Level.Info, "Hello from master task");
+            _numIters = numIters;
+            _numReduceSenders = numEvaluators - 1;
+            _mpiClient = mpiClient;
+
+            _commGroup = 
mpiClient.GetCommunicationGroup(MpiTestConstants.GroupName);
+            _broadcastSender = 
_commGroup.GetBroadcastSender<int>(MpiTestConstants.BroadcastOperatorName);
+            _sumReducer = 
_commGroup.GetReduceReceiver<int>(MpiTestConstants.ReduceOperatorName);
+        }
+
+        public byte[] Call(byte[] memento)
+        {
+            for (int i = 1; i <= _numIters; i++)
+            {
+                // Each slave task calculates the nth triangle number
+                _broadcastSender.Send(i);
+                
+                // Sum up all of the calculated triangle numbers
+                int sum = _sumReducer.Reduce();
+                _logger.Log(Level.Info, "Received sum: {0} on iteration: {1}", 
sum, i);
+
+                int expected = TriangleNumber(i) * _numReduceSenders;
+                if (sum != TriangleNumber(i) * _numReduceSenders)
+                {
+                    throw new Exception("Expected " + expected + " but got " + 
sum);
+                }
+            }
+
+            return null;
+        }
+
+        public void Dispose()
+        {
+            _mpiClient.Dispose();
+        }
+
+        private int TriangleNumber(int n)
+        {
+            return Enumerable.Range(1, n).Sum();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/SlaveTask.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/SlaveTask.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/SlaveTask.cs
new file mode 100644
index 0000000..20b9351
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/SlaveTask.cs
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Linq;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest
+{
+    public class SlaveTask : ITask
+    {
+        private static Logger _logger = Logger.GetLogger(typeof(SlaveTask));
+
+        private int _numIterations;
+        private IMpiClient _mpiClient;
+        private ICommunicationGroupClient _commGroup;
+        private IBroadcastReceiver<int> _broadcastReceiver;
+        private IReduceSender<int> _triangleNumberSender;
+
+        [Inject]
+        public SlaveTask(
+            [Parameter(typeof(MpiTestConfig.NumIterations))] int numIters,
+            IMpiClient mpiClient)
+        {
+            _logger.Log(Level.Info, "Hello from slave task");
+
+            _numIterations = numIters;
+            _mpiClient = mpiClient;
+            _commGroup = 
_mpiClient.GetCommunicationGroup(MpiTestConstants.GroupName);
+            _broadcastReceiver = 
_commGroup.GetBroadcastReceiver<int>(MpiTestConstants.BroadcastOperatorName);
+            _triangleNumberSender = 
_commGroup.GetReduceSender<int>(MpiTestConstants.ReduceOperatorName);
+        }
+
+        public byte[] Call(byte[] memento)
+        {
+            for (int i = 0; i < _numIterations; i++)
+            {
+                // Receive n from Master Task
+                int n = _broadcastReceiver.Receive();
+                _logger.Log(Level.Info, "Calculating TriangleNumber({0}) on 
slave task...", n);
+
+                // Calculate the nth Triangle number and send it back to driver
+                int triangleNum = TriangleNumber(n);
+                _logger.Log(Level.Info, "Sending sum: {0} on iteration {1}.", 
triangleNum, i);
+                _triangleNumberSender.Send(triangleNum);
+            }
+
+            return null;
+        }
+
+        public void Dispose()
+        {
+            _mpiClient.Dispose();
+        }
+
+        private int TriangleNumber(int n)
+        {
+            return Enumerable.Range(1, n).Sum();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs
new file mode 100644
index 0000000..9450e20
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Tests.Functional.MPI
+{
+    public class MpiTestConfig
+    {
+        [NamedParameter("Number of iterations of messages to send")]
+        public class NumIterations : Name<int>
+        {
+        }
+
+        [NamedParameter("Number of Evaluators")]
+        public class NumEvaluators : Name<int>
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs
new file mode 100644
index 0000000..5cff899
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Tests.Functional.MPI
+{
+    internal class MpiTestConstants
+    {
+        public const string DriverId = "BroadcastReduceDriver";
+        public const string GroupName = "BroadcastReduceGroup";
+        public const string BroadcastOperatorName = "Broadcast";
+        public const string ReduceOperatorName = "Reduce";
+        public const string ScatterOperatorName = "Scatter";
+        public const string MasterTaskId = "MasterTask";
+        public const string SlaveTaskId = "SlaveTask-";
+        public const int NumIterations = 10;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs
new file mode 100644
index 0000000..07168c2
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using System.Linq;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Tests.Functional.MPI.ScatterReduceTest
+{
+    public class MasterTask : ITask
+    {
+        private static Logger _logger = Logger.GetLogger(typeof(MasterTask));
+
+        private IMpiClient _mpiClient;
+        private ICommunicationGroupClient _commGroup;
+        private IScatterSender<int> _scatterSender;
+        private IReduceReceiver<int> _sumReducer;
+
+        [Inject]
+        public MasterTask(IMpiClient mpiClient)
+        {
+            _logger.Log(Level.Info, "Hello from master task");
+            _mpiClient = mpiClient;
+
+            _commGroup = 
mpiClient.GetCommunicationGroup(MpiTestConstants.GroupName);
+            _scatterSender = 
_commGroup.GetScatterSender<int>(MpiTestConstants.ScatterOperatorName);
+            _sumReducer = 
_commGroup.GetReduceReceiver<int>(MpiTestConstants.ReduceOperatorName);
+        }
+
+        public byte[] Call(byte[] memento)
+        {
+            List<int> data = Enumerable.Range(1, 100).ToList();
+            List<string> order = GetScatterOrder();
+            _scatterSender.Send(data, order);
+
+            int sum = _sumReducer.Reduce();
+            _logger.Log(Level.Info, "Received sum: {0}", sum);
+
+            return null;
+        }
+
+        public void Dispose()
+        {
+            _mpiClient.Dispose();
+        }
+
+        private List<string> GetScatterOrder()
+        {
+            return new List<string> { "SlaveTask-4", "SlaveTask-3", 
"SlaveTask-2", "SlaveTask-1" };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceDriver.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceDriver.cs
new file mode 100644
index 0000000..18ff22b
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceDriver.cs
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Bridge;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Operators.Impl;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote.Impl;
+
+namespace Org.Apache.REEF.Tests.Functional.MPI.ScatterReduceTest
+{
+    public class ScatterReduceDriver : IStartHandler, 
IObserver<IEvaluatorRequestor>, IObserver<IAllocatedEvaluator>, 
IObserver<IActiveContext>, IObserver<IFailedEvaluator>
+    {
+        private static readonly Logger LOGGER = 
Logger.GetLogger(typeof(ScatterReduceDriver));
+
+        private int _numEvaluators;
+
+        private IMpiDriver _mpiDriver;
+        private ICommunicationGroupDriver _commGroup;
+        private TaskStarter _mpiTaskStarter;
+
+        [Inject]
+        public ScatterReduceDriver(
+            [Parameter(typeof(MpiTestConfig.NumEvaluators))] int numEvaluators,
+            AvroConfigurationSerializer confSerializer)
+        {
+            Identifier = "BroadcastStartHandler";
+            _numEvaluators = numEvaluators;
+
+            _mpiDriver = new MpiDriver(
+                MpiTestConstants.DriverId,
+                MpiTestConstants.MasterTaskId,
+                confSerializer);
+
+            _commGroup = _mpiDriver.NewCommunicationGroup(
+                MpiTestConstants.GroupName, 
+                numEvaluators)
+                    .AddScatter(
+                        MpiTestConstants.ScatterOperatorName,
+                        new ScatterOperatorSpec<int>(
+                            MpiTestConstants.MasterTaskId,
+                            new IntCodec()))
+                    .AddReduce(
+                        MpiTestConstants.ReduceOperatorName,
+                        new ReduceOperatorSpec<int>(
+                            MpiTestConstants.MasterTaskId,
+                            new IntCodec(), 
+                            new SumFunction()))
+                    .Build();
+
+            _mpiTaskStarter = new TaskStarter(_mpiDriver, numEvaluators);
+
+            CreateClassHierarchy();
+        }
+
+        public string Identifier { get; set; }
+
+        public void OnNext(IEvaluatorRequestor evaluatorRequestor)
+        {
+            EvaluatorRequest request = new EvaluatorRequest(_numEvaluators, 
512, 2, "WonderlandRack", "BroadcastEvaluator");
+            evaluatorRequestor.Submit(request);
+        }
+
+        public void OnNext(IAllocatedEvaluator allocatedEvaluator)
+        {
+            IConfiguration contextConf = _mpiDriver.GetContextConfiguration();
+            IConfiguration serviceConf = _mpiDriver.GetServiceConfiguration();
+            allocatedEvaluator.SubmitContextAndService(contextConf, 
serviceConf);
+        }
+
+        public void OnNext(IActiveContext activeContext)
+        {
+            if (_mpiDriver.IsMasterTaskContext(activeContext))
+            {
+                // Configure Master Task
+                IConfiguration partialTaskConf = 
TaskConfiguration.ConfigurationModule
+                    .Set(TaskConfiguration.Identifier, 
MpiTestConstants.MasterTaskId)
+                    .Set(TaskConfiguration.Task, GenericType<MasterTask>.Class)
+                    .Build();
+
+                _commGroup.AddTask(MpiTestConstants.MasterTaskId);
+                _mpiTaskStarter.QueueTask(partialTaskConf, activeContext);
+            }
+            else
+            {
+                // Configure Slave Task
+                string slaveTaskId = MpiTestConstants.SlaveTaskId +
+                    _mpiDriver.GetContextNum(activeContext);
+
+                IConfiguration partialTaskConf = 
TaskConfiguration.ConfigurationModule
+                    .Set(TaskConfiguration.Identifier, slaveTaskId)
+                    .Set(TaskConfiguration.Task, GenericType<SlaveTask>.Class)
+                    .Build();
+
+                _commGroup.AddTask(slaveTaskId);
+                _mpiTaskStarter.QueueTask(partialTaskConf, activeContext);
+            }
+        }
+
+        public void OnNext(IFailedEvaluator value)
+        {
+        }
+
+        public void OnError(Exception error)
+        {
+        }
+
+        public void OnCompleted()
+        {
+        }
+
+        private void CreateClassHierarchy()
+        {
+            HashSet<string> clrDlls = new HashSet<string>();
+            clrDlls.Add(typeof(IDriver).Assembly.GetName().Name);
+            clrDlls.Add(typeof(ITask).Assembly.GetName().Name);
+            clrDlls.Add(typeof(ScatterReduceDriver).Assembly.GetName().Name);
+            clrDlls.Add(typeof(INameClient).Assembly.GetName().Name);
+            clrDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name);
+
+            ClrHandlerHelper.GenerateClassHierarchy(clrDlls);
+        }
+
+        private class SumFunction : IReduceFunction<int>
+        {
+            [Inject]
+            public SumFunction()
+            {
+            }
+
+            public int Reduce(IEnumerable<int> elements)
+            {
+                return elements.Sum();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceTest.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceTest.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceTest.cs
new file mode 100644
index 0000000..1a0eef5
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceTest.cs
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using System.Globalization;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Bridge;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Tests.Functional.MPI.ScatterReduceTest
+{
+    [TestClass]
+    public class ScatterReduceTest : ReefFunctionalTest
+    {
+        [TestInitialize]
+        public void TestSetup()
+        {
+            CleanUp();
+        }
+
+        [TestCleanup]
+        public void TestCleanup()
+        {
+            CleanUp();
+        }
+
+        [TestMethod]
+        public void TestScatterAndReduce()
+        {
+            int numTasks = 5;
+
+            IConfiguration driverConfig = 
TangFactory.GetTang().NewConfigurationBuilder(
+                DriverBridgeConfiguration.ConfigurationModule
+                    .Set(DriverBridgeConfiguration.OnDriverStarted, 
GenericType<ScatterReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, 
GenericType<ScatterReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnEvaluatorRequested, 
GenericType<ScatterReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnEvaluatorFailed, 
GenericType<ScatterReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnContextActive, 
GenericType<ScatterReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.CustomTraceLevel, 
Level.Info.ToString())
+                    .Build())
+                .BindNamedParameter<MpiTestConfig.NumEvaluators, int>(
+                    GenericType<MpiTestConfig.NumEvaluators>.Class,
+                    numTasks.ToString(CultureInfo.InvariantCulture))
+                .Build();
+                    
+            HashSet<string> appDlls = new HashSet<string>();
+            appDlls.Add(typeof(IDriver).Assembly.GetName().Name);
+            appDlls.Add(typeof(ITask).Assembly.GetName().Name);
+            appDlls.Add(typeof(ScatterReduceDriver).Assembly.GetName().Name);
+            appDlls.Add(typeof(INameClient).Assembly.GetName().Name);
+            appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name);
+
+            TestRun(appDlls, driverConfig, false, JavaLoggingSetting.VERBOSE);
+            ValidateSuccessForLocalRuntime(numTasks);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/SlaveTask.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/SlaveTask.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/SlaveTask.cs
new file mode 100644
index 0000000..327b991
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/SlaveTask.cs
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using System.Linq;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Tests.Functional.MPI.ScatterReduceTest
+{
+    public class SlaveTask : ITask
+    {
+        private static Logger _logger = Logger.GetLogger(typeof(SlaveTask));
+
+        private IMpiClient _mpiClient;
+        private ICommunicationGroupClient _commGroup;
+        private IScatterReceiver<int> _scatterReceiver;
+        private IReduceSender<int> _sumSender;
+
+        [Inject]
+        public SlaveTask(IMpiClient mpiClient)
+        {
+            _logger.Log(Level.Info, "Hello from slave task");
+
+            _mpiClient = mpiClient;
+            _commGroup = 
_mpiClient.GetCommunicationGroup(MpiTestConstants.GroupName);
+            _scatterReceiver = 
_commGroup.GetScatterReceiver<int>(MpiTestConstants.ScatterOperatorName);
+            _sumSender = 
_commGroup.GetReduceSender<int>(MpiTestConstants.ReduceOperatorName);
+        }
+
+        public byte[] Call(byte[] memento)
+        {
+            List<int> data = _scatterReceiver.Receive();
+            _logger.Log(Level.Info, "Received data: {0}", string.Join(" ", 
data));
+
+            int sum = data.Sum();
+            _logger.Log(Level.Info, "Sending back sum: {0}", sum);
+            _sumSender.Send(sum);
+
+            return null;
+        }
+
+        public void Dispose()
+        {
+            _mpiClient.Dispose();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj 
b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index f8b4385..45c39c5 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -126,6 +126,17 @@ under the License.
     <Compile Include="Functional\Messaging\MessageDriver.cs" />
     <Compile Include="Functional\Messaging\MessageTask.cs" />
     <Compile Include="Functional\Messaging\TestTaskMessage.cs" />
+    <Compile Include="Functional\ML\KMeans\TestKMeans.cs" />
+    <Compile 
Include="Functional\MPI\BroadcastReduceTest\BroadcastReduceDriver.cs" />
+    <Compile 
Include="Functional\MPI\BroadcastReduceTest\BroadcastReduceTest.cs" />
+    <Compile Include="Functional\MPI\BroadcastReduceTest\MasterTask.cs" />
+    <Compile Include="Functional\MPI\BroadcastReduceTest\SlaveTask.cs" />
+    <Compile Include="Functional\MPI\MpiTestConfig.cs" />
+    <Compile Include="Functional\MPI\MpiTestConstants.cs" />
+    <Compile Include="Functional\MPI\ScatterReduceTest\MasterTask.cs" />
+    <Compile Include="Functional\MPI\ScatterReduceTest\ScatterReduceDriver.cs" 
/>
+    <Compile Include="Functional\MPI\ScatterReduceTest\ScatterReduceTest.cs" />
+    <Compile Include="Functional\MPI\ScatterReduceTest\SlaveTask.cs" />
     <Compile Include="Functional\ReefFunctionalTest.cs" />
     <Compile Include="Network\BlockingCollectionExtensionTests.cs" />
     <Compile Include="Network\NameServerTests.cs" />

Reply via email to