[REEF-178]  introduce pipelining in our REEF Group Communication operators

Support pipeline in group communication

JIRA: REEF-178. (https://issues.apache.org/jira/browse/REEF-178)

This Closes #139


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/c02c80da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/c02c80da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/c02c80da

Branch: refs/heads/master
Commit: c02c80dacfa7b9b16529958961f637f06dd73e20
Parents: b9bb7b1
Author: Julia Wang <[email protected]>
Authored: Wed Apr 8 11:13:09 2015 -0700
Committer: Julia Wang <[email protected]>
Committed: Wed Apr 8 13:24:43 2015 -0700

----------------------------------------------------------------------
 .../Group/Driver/ICommunicationGroupDriver.cs   |  27 ++
 .../Driver/Impl/CommunicationGroupDriver.cs     |  91 +++++-
 .../Operators/Impl/BroadcastOperatorSpec.cs     |  25 +-
 .../Group/Operators/Impl/BroadcastReceiver.cs   |  50 ++-
 .../Group/Operators/Impl/BroadcastSender.cs     |  39 ++-
 .../Operators/Impl/PipelinedReduceFunction.cs   |  60 ++++
 .../Group/Operators/Impl/ReduceOperatorSpec.cs  |  27 ++
 .../Group/Operators/Impl/ReduceReceiver.cs      |  48 ++-
 .../Group/Operators/Impl/ReduceSender.cs        |  80 +++--
 .../Group/Operators/Impl/ScatterOperatorSpec.cs |  23 ++
 .../Group/Pipelining/IPipelineDataConverter.cs  |  58 ++++
 .../Impl/DefaultPipelineDataConverter.cs        |  74 +++++
 .../Group/Pipelining/PipelineMessage.cs         |  49 +++
 .../Group/Pipelining/PipelineMessageCodec.cs    |  70 ++++
 .../Group/Task/Impl/NodeStruct.cs               |  16 +
 .../Group/Task/Impl/OperatorTopology.cs         | 133 ++++++--
 .../Group/Topology/FlatTopology.cs              |  17 +-
 .../Group/Topology/TreeTopology.cs              |  15 +-
 .../Org.Apache.REEF.Network.csproj              |   5 +
 .../Functional/MPI/MpiTestConfig.cs             |  15 +
 .../Functional/MPI/MpiTestConstants.cs          |   2 +
 .../PipelinedBroadcastReduceDriver.cs           | 320 +++++++++++++++++++
 .../PipelinedBroadcastReduceTest.cs             |  98 ++++++
 .../PipelinedMasterTask.cs                      | 102 ++++++
 .../PipelinedSlaveTask.cs                       |  89 ++++++
 .../Org.Apache.REEF.Tests.csproj                |  10 +-
 26 files changed, 1428 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
index 0fa4aae..2e11441 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
@@ -17,9 +17,11 @@
  * under the License.
  */
 
+using System;
 using System.Collections.Generic;
 using Org.Apache.REEF.Network.Group.Operators;
 using Org.Apache.REEF.Network.Group.Operators.Impl;
+using Org.Apache.REEF.Network.Group.Pipelining;
 using Org.Apache.REEF.Network.Group.Topology;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Wake.Remote;
@@ -46,6 +48,18 @@ namespace Org.Apache.REEF.Network.Group.Driver
         /// <param name="operatorName">The name of the broadcast 
operator</param>
         /// <param name="masterTaskId">The master task id in broadcast 
operator</param>
         /// <param name="topologyType">The topology type for the 
operator</param>
+        /// <param name="pipelineDataConverter">The class used to convert data 
back and forth to pipelined one</param>
+        /// <returns>The same CommunicationGroupDriver with the added 
Broadcast operator info</returns>
+        ICommunicationGroupDriver AddBroadcast<TMessage, TMessageCodec>(string 
operatorName, string masterTaskId, TopologyTypes topologyType, 
IPipelineDataConverter<TMessage> pipelineDataConverter) where TMessageCodec : 
ICodec<TMessage>;
+
+        /// <summary>
+        /// Adds the Broadcast MPI operator to the communication group.
+        /// </summary>
+        /// <typeparam name="TMessage">The type of messages that operators 
will send</typeparam>
+        /// <typeparam name="TMessageCodec">The codec used for serializing 
messages</typeparam>
+        /// <param name="operatorName">The name of the broadcast 
operator</param>
+        /// <param name="masterTaskId">The master task id in broadcast 
operator</param>
+        /// <param name="topologyType">The topology type for the 
operator</param>
         /// <returns>The same CommunicationGroupDriver with the added 
Broadcast operator info</returns>
         ICommunicationGroupDriver AddBroadcast<TMessage, TMessageCodec>(string 
operatorName, string masterTaskId, TopologyTypes topologyType = 
TopologyTypes.Flat) where TMessageCodec : ICodec<TMessage>;
 
@@ -68,8 +82,21 @@ namespace Org.Apache.REEF.Network.Group.Driver
         /// <param name="reduceFunction">The class used to aggregate all 
messages.</param>
         /// <param name="topologyType">The topology for the operator</param>
         /// <returns>The same CommunicationGroupDriver with the added Reduce 
operator info</returns>
+        ICommunicationGroupDriver AddReduce<TMessage, TMessageCodec>(string 
operatorName, string masterTaskId, IReduceFunction<TMessage> reduceFunction, 
TopologyTypes topologyType, IPipelineDataConverter<TMessage> 
pipelineDataConverter) where TMessageCodec : ICodec<TMessage>;
+
+        /// <summary>
+        /// Adds the Reduce MPI operator to the communication group.
+        /// </summary>
+        /// <typeparam name="TMessage">The type of messages that operators 
will send</typeparam>
+        /// <typeparam name="TMessageCodec">The codec used for serializing 
messages</typeparam>
+        /// <param name="operatorName">The name of the reduce operator</param>
+        /// <param name="masterTaskId">The master task id for the 
typology</param>
+        /// <param name="reduceFunction">The class used to aggregate all 
messages.</param>
+        /// <param name="topologyType">The topology for the operator</param>
+        /// <returns>The same CommunicationGroupDriver with the added Reduce 
operator info</returns>
         ICommunicationGroupDriver AddReduce<TMessage, TMessageCodec>(string 
operatorName, string masterTaskId, IReduceFunction<TMessage> reduceFunction, 
TopologyTypes topologyType = TopologyTypes.Flat) where TMessageCodec : 
ICodec<TMessage>;
 
+
         /// <summary>
         /// Adds the Reduce MPI operator to the communication group with 
default IntCodec
         /// </summary>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
index 6c07598..065c158 100644
--- 
a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
@@ -22,6 +22,8 @@ using System.Reflection;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Operators;
 using Org.Apache.REEF.Network.Group.Operators.Impl;
+using Org.Apache.REEF.Network.Group.Pipelining;
+using Org.Apache.REEF.Network.Group.Pipelining.Impl;
 using Org.Apache.REEF.Network.Group.Topology;
 using Org.Apache.REEF.Tang.Exceptions;
 using Org.Apache.REEF.Tang.Formats;
@@ -97,6 +99,44 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         /// <param name="operatorName">The name of the broadcast 
operator</param>
         /// <param name="masterTaskId">The master task id in broadcast 
operator</param>
         /// <param name="topologyType">The topology type for the 
operator</param>
+        /// <param name="pipelineDataConverter">The class type used to convert 
data back and forth to pipelined one</param>
+        /// <returns>The same CommunicationGroupDriver with the added 
Broadcast operator info</returns>
+        /// <returns></returns>
+        public ICommunicationGroupDriver AddBroadcast<TMessage, 
TMessageCodec>(string operatorName, string masterTaskId, TopologyTypes 
topologyType, IPipelineDataConverter<TMessage> pipelineDataConverter) where 
TMessageCodec : ICodec<TMessage>
+        {
+            if (_finalized)
+            {
+                throw new IllegalStateException("Can't add operators once the 
spec has been built.");
+            }
+
+            var spec = new BroadcastOperatorSpec<TMessage, TMessageCodec>(
+                masterTaskId,
+                pipelineDataConverter);
+
+            ITopology<TMessage, TMessageCodec> topology;
+            if (topologyType == TopologyTypes.Flat)
+            {
+                topology = new FlatTopology<TMessage, 
TMessageCodec>(operatorName, _groupName, spec.SenderId, _driverId, spec);
+            }
+            else
+            {
+                topology = new TreeTopology<TMessage, 
TMessageCodec>(operatorName, _groupName, spec.SenderId, _driverId, spec,
+                    _fanOut);
+            }
+
+            _topologies[operatorName] = topology;
+            _operatorSpecs[operatorName] = spec;
+
+            return this;
+        }
+
+        /// <summary>
+        /// </summary>
+        /// <typeparam name="TMessage">The type of messages that operators 
will send</typeparam>
+        /// <typeparam name="TMessageCodec">The codec used for serializing 
messages</typeparam>
+        /// <param name="operatorName">The name of the broadcast 
operator</param>
+        /// <param name="masterTaskId">The master task id in broadcast 
operator</param>
+        /// <param name="topologyType">The topology type for the 
operator</param>
         /// <returns>The same CommunicationGroupDriver with the added 
Broadcast operator info</returns>
         /// <returns></returns>
         public ICommunicationGroupDriver AddBroadcast<TMessage, 
TMessageCodec>(string operatorName, string masterTaskId, TopologyTypes 
topologyType = TopologyTypes.Flat) where TMessageCodec : ICodec<TMessage>
@@ -107,7 +147,8 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
             }
 
             var spec = new BroadcastOperatorSpec<TMessage, TMessageCodec>(
-                masterTaskId);
+                masterTaskId,
+                new DefaultPipelineDataConverter<TMessage>());
 
             ITopology<TMessage, TMessageCodec> topology;
             if (topologyType == TopologyTypes.Flat)
@@ -136,7 +177,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         public ICommunicationGroupDriver AddBroadcast(string operatorName, 
string masterTaskId,
             TopologyTypes topologyType = TopologyTypes.Flat)
         {
-            return AddBroadcast<int,IntCodec>(operatorName, masterTaskId, 
topologyType);
+            return AddBroadcast<int, IntCodec>(operatorName, masterTaskId, 
topologyType);
         }
 
         /// <summary>
@@ -153,6 +194,51 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
             string operatorName,
             string masterTaskId,
             IReduceFunction<TMessage> reduceFunction,
+            TopologyTypes topologyType,
+            IPipelineDataConverter<TMessage> pipelineDataConverter) where 
TMessageCodec : ICodec<TMessage>
+        {
+            if (_finalized)
+            {
+                throw new IllegalStateException("Can't add operators once the 
spec has been built.");
+            }
+
+            var spec = new ReduceOperatorSpec<TMessage, TMessageCodec>(
+                masterTaskId,
+                pipelineDataConverter,
+                reduceFunction);
+
+            ITopology<TMessage, TMessageCodec> topology;
+
+            if (topologyType == TopologyTypes.Flat)
+            {
+                topology = new FlatTopology<TMessage, 
TMessageCodec>(operatorName, _groupName, spec.ReceiverId, _driverId, spec);
+            }
+            else
+            {
+                topology = new TreeTopology<TMessage, 
TMessageCodec>(operatorName, _groupName, spec.ReceiverId, _driverId, spec,
+                    _fanOut);
+            }
+
+            _topologies[operatorName] = topology;
+            _operatorSpecs[operatorName] = spec;
+
+            return this;
+        }
+
+               /// <summary>
+        /// Adds the Reduce MPI operator to the communication group.
+        /// </summary>
+        /// <typeparam name="TMessage">The type of messages that operators 
will send</typeparam>
+        /// <typeparam name="TMessageCodec">The codec used for serializing 
messages</typeparam>
+        /// <param name="operatorName">The name of the reduce operator</param>
+        /// <param name="masterTaskId">The master task id for the 
typology</param>
+        /// <param name="reduceFunction">The class used to aggregate all 
messages.</param>
+        /// <param name="topologyType">The topology for the operator</param>
+        /// <returns>The same CommunicationGroupDriver with the added Reduce 
operator info</returns>
+        public ICommunicationGroupDriver AddReduce<TMessage, TMessageCodec>(
+            string operatorName,
+            string masterTaskId,
+            IReduceFunction<TMessage> reduceFunction,
             TopologyTypes topologyType = TopologyTypes.Flat) where 
TMessageCodec : ICodec<TMessage>
         {
             if (_finalized)
@@ -162,6 +248,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
 
             var spec = new ReduceOperatorSpec<TMessage, TMessageCodec>(
                 masterTaskId,
+                new DefaultPipelineDataConverter<TMessage>(),
                 reduceFunction);
 
             ITopology<TMessage, TMessageCodec> topology;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs
index 15a4374..cd8122a 100644
--- 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs
@@ -19,6 +19,8 @@
 
 using System;
 using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Network.Group.Pipelining;
+using Org.Apache.REEF.Network.Group.Pipelining.Impl;
 
 namespace Org.Apache.REEF.Network.Group.Operators.Impl
 {
@@ -35,10 +37,31 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         public BroadcastOperatorSpec(string senderId)
         {
             SenderId = senderId;
-            Codec = typeof(T2);
+             Codec = typeof(T2);
+            PipelineDataConverter = new DefaultPipelineDataConverter<T1>();
         }
 
         /// <summary>
+        /// Create a new BroadcastOperatorSpec.
+        /// </summary>
+        /// <param name="senderId">The identifier of the root sending 
Task.</param>
+        /// <param name="dataConverter">The converter used to convert original
+        /// message to pipelined ones and vice versa.</param>
+        public BroadcastOperatorSpec(
+            string senderId,
+            IPipelineDataConverter<T1> dataConverter)
+        {
+            SenderId = senderId;
+            Codec = typeof(T2);;
+            PipelineDataConverter = dataConverter ?? new 
DefaultPipelineDataConverter<T1>();
+        }
+
+        /// <summary>
+        /// Returns the IPipelineDataConverter class type used to convert 
messages to pipeline form and vice-versa
+        /// </summary>
+        public IPipelineDataConverter<T1> PipelineDataConverter { get; private 
set; }
+
+        /// <summary>
         /// Returns the identifier of the Task that will broadcast data to 
other Tasks.
         /// </summary>
         public string SenderId { get; private set; }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
index d45fa24..b8b2a5a 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
@@ -18,25 +18,27 @@
  */
 
 using System.Reactive;
+using System.Collections.Generic;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Task;
 using Org.Apache.REEF.Network.Group.Task.Impl;
 using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Network.Group.Pipelining;
+using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.Network.Group.Operators.Impl
 {
     /// <summary>
-    /// MPI Operator used to receive broadcast messages.
+    /// MPI Operator used to receive broadcast messages in pipelined fashion.
     /// </summary>
     /// <typeparam name="T">The type of message being sent.</typeparam>
     public class BroadcastReceiver<T> : IBroadcastReceiver<T>
     {
-        private const int DefaultVersion = 1;
-
+        private const int PipelineVersion = 2;
         private readonly ICommunicationGroupNetworkObserver _networkHandler;
-        private readonly OperatorTopology<T> _topology;
-
+        private readonly OperatorTopology<PipelineMessage<T>> _topology;
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(BroadcastReceiver<T>));
         /// <summary>
         /// Creates a new BroadcastReceiver.
         /// </summary>
@@ -45,16 +47,19 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// operator belongs to</param>
         /// <param name="topology">The node's topology graph</param>
         /// <param name="networkHandler">The incoming message handler</param>
+        /// <param name="dataConverter">The converter used to convert original
+        /// message to pipelined ones and vice versa.</param>
         [Inject]
         public BroadcastReceiver(
             [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string 
operatorName,
             
[Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string 
groupName,
-            OperatorTopology<T> topology, 
-            ICommunicationGroupNetworkObserver networkHandler)
+            OperatorTopology<PipelineMessage<T>> topology,
+            ICommunicationGroupNetworkObserver networkHandler,
+            IPipelineDataConverter<T> dataConverter)
         {
             OperatorName = operatorName;
             GroupName = groupName;
-            Version = DefaultVersion;
+            Version = PipelineVersion;
 
             _networkHandler = networkHandler;
             _topology = topology;
@@ -62,6 +67,8 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
 
             var msgHandler = 
Observer.Create<GroupCommunicationMessage>(message => 
_topology.OnNext(message));
             _networkHandler.Register(operatorName, msgHandler);
+
+            PipelineDataConverter = dataConverter;
         }
 
         /// <summary>
@@ -80,17 +87,34 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         public int Version { get; private set; }
 
         /// <summary>
+        /// Returns the IPipelineDataConvert used to convert messages to 
pipeline form and vice-versa
+        /// </summary>
+        public IPipelineDataConverter<T> PipelineDataConverter { get; private 
set; }
+
+
+        /// <summary>
         /// Receive a message from parent BroadcastSender.
         /// </summary>
         /// <returns>The incoming message</returns>
         public T Receive()
         {
-            var data = _topology.ReceiveFromParent();
-            if (_topology.HasChildren())
+            PipelineMessage<T> message;
+            var messageList = new List<PipelineMessage<T>>();
+
+            do
             {
-                _topology.SendToChildren(data, MessageType.Data);
-            }
-            return data;
+                message = _topology.ReceiveFromParent();
+
+                if (_topology.HasChildren())
+                {
+                    _topology.SendToChildren(message, MessageType.Data);
+                }
+
+                messageList.Add(message);
+            } while (!message.IsLast);
+
+            return PipelineDataConverter.FullMessage(messageList);
         }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
index f88b093..dc0142b 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
@@ -24,20 +24,21 @@ using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Task;
 using Org.Apache.REEF.Network.Group.Task.Impl;
 using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Network.Group.Pipelining;
+using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.Network.Group.Operators.Impl
 {
     /// <summary>
-    /// MPI Operator used to send messages to child Tasks.
+    /// MPI Operator used to send messages to child Tasks in pipelined fashion.
     /// </summary>
     /// <typeparam name="T">The message type</typeparam>
     public class BroadcastSender<T> : IBroadcastSender<T>
     {
-        private const int DefaultVersion = 1;
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(BroadcastSender<T>));
+        private const int PipelineVersion = 2;
+        private readonly OperatorTopology<PipelineMessage<T>> _topology;
 
-        private readonly ICommunicationGroupNetworkObserver _networkHandler;
-        private readonly OperatorTopology<T> _topology;
-            
         /// <summary>
         /// Creates a new BroadcastSender to send messages to other Tasks.
         /// </summary>
@@ -46,23 +47,27 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// belongs to</param>
         /// <param name="topology">The node's topology graph</param>
         /// <param name="networkHandler">The incoming message handler</param>
+        /// <param name="dataConverter">The converter used to convert original
+        /// message to pipelined ones and vice versa.</param>
         [Inject]
         public BroadcastSender(
             [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string 
operatorName,
             
[Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string 
groupName,
-            OperatorTopology<T> topology, 
-            ICommunicationGroupNetworkObserver networkHandler)
+            OperatorTopology<PipelineMessage<T>> topology,
+            ICommunicationGroupNetworkObserver networkHandler,
+            IPipelineDataConverter<T> dataConverter)
         {
             OperatorName = operatorName;
             GroupName = groupName;
-            Version = DefaultVersion;
+            Version = PipelineVersion;
 
-            _networkHandler = networkHandler;
             _topology = topology;
             _topology.Initialize();
 
             var msgHandler = 
Observer.Create<GroupCommunicationMessage>(message => 
_topology.OnNext(message));
-            _networkHandler.Register(operatorName, msgHandler);
+            networkHandler.Register(operatorName, msgHandler);
+
+            PipelineDataConverter = dataConverter;
         }
 
         /// <summary>
@@ -81,17 +86,27 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         public int Version { get; private set; }
 
         /// <summary>
+        /// Returns the IPipelineDataConvert used to convert messages to 
pipeline form and vice-versa
+        /// </summary>
+        public IPipelineDataConverter<T> PipelineDataConverter { get; private 
set; }
+
+        /// <summary>
         /// Send the data to all BroadcastReceivers.
         /// </summary>
         /// <param name="data">The data to send.</param>
         public void Send(T data)
         {
+            var messageList = PipelineDataConverter.PipelineMessage(data);
+
             if (data == null)
             {
-                throw new ArgumentNullException("data");    
+                throw new ArgumentNullException("data");
             }
 
-            _topology.SendToChildren(data, MessageType.Data);
+            foreach (var message in messageList)
+            {
+                _topology.SendToChildren(message, MessageType.Data);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/PipelinedReduceFunction.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/PipelinedReduceFunction.cs
 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/PipelinedReduceFunction.cs
new file mode 100644
index 0000000..cd2ae5e
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/PipelinedReduceFunction.cs
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using Org.Apache.REEF.Network.Group.Pipelining;
+
+namespace Org.Apache.REEF.Network.Group.Operators.Impl
+{
+    /// <summary>
+    /// The class used to aggregate pipelined messages sent by ReduceSenders.
+    /// </summary>
+    /// <typeparam name="T">The message type.</typeparam>
+    public class PipelinedReduceFunction<T> : 
IReduceFunction<PipelineMessage<T>>
+    {
+        /// <summary>
+        /// The base reduce function class that operates on actual message 
type T.
+        /// </summary>
+        /// <typeparam name="T">The message type.</typeparam>
+        private readonly IReduceFunction<T> _baseReduceFunc;
+        public PipelinedReduceFunction(IReduceFunction<T> baseReduceFunc)
+        {
+            _baseReduceFunc = baseReduceFunc;
+        }
+        
+        /// <summary>
+        /// Reduce the IEnumerable of pipeline messages into one pipeline 
message.
+        /// </summary>
+        /// <param name="elements">The pipeline messages to reduce</param>
+        /// <returns>The reduced pipeline message</returns>
+        public PipelineMessage<T> Reduce(IEnumerable<PipelineMessage<T>> 
elements)
+        {
+            var messageList = new List<T>();
+            var isLast = false;
+
+            foreach (var message in elements)
+            {
+                messageList.Add(message.Data);
+                isLast = message.IsLast;
+            }
+
+            return new PipelineMessage<T>(_baseReduceFunc.Reduce(messageList), 
isLast);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs
index f72cea5..bf60841 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs
@@ -19,6 +19,8 @@
 
 using System;
 using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Network.Group.Pipelining.Impl;
+using Org.Apache.REEF.Network.Group.Pipelining;
 
 namespace Org.Apache.REEF.Network.Group.Operators.Impl
 {
@@ -41,9 +43,34 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
             ReceiverId = receiverId;
             Codec = typeof(T2);
             ReduceFunction = reduceFunction;
+            PipelineDataConverter = new DefaultPipelineDataConverter<T1>();
         }
 
         /// <summary>
+        /// Creates a new ReduceOperatorSpec.
+        /// </summary>
+        /// <param name="receiverId">The identifier of the task that
+        /// will receive and reduce incoming messages.</param>
+        /// <param name="reduceFunction">The class used to aggregate all 
messages.</param>
+        /// <param name="dataConverter">The converter used to convert original
+        /// message to pipelined ones and vice versa.</param>
+        public ReduceOperatorSpec(
+            string receiverId,
+            IPipelineDataConverter<T1> dataConverter,
+            IReduceFunction<T1> reduceFunction)
+        {
+            ReceiverId = receiverId;
+            Codec = typeof(T2);
+            ReduceFunction = reduceFunction;
+            PipelineDataConverter = dataConverter ?? new 
DefaultPipelineDataConverter<T1>();
+        }
+
+        /// <summary>
+        /// Returns the IPipelineDataConvert used to convert messages to 
pipeline form and vice-versa
+        /// </summary>
+        public IPipelineDataConverter<T1> PipelineDataConverter { get; private 
set; }
+
+        /// <summary>
         /// Returns the identifier for the task that receives and reduces
         /// incoming messages.
         /// </summary>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
index 20d4ff7..70ed1ae 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
@@ -18,24 +18,27 @@
  */
 
 using System.Reactive;
+using System.Collections.Generic;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Task;
 using Org.Apache.REEF.Network.Group.Task.Impl;
 using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Network.Group.Pipelining;
+using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.Network.Group.Operators.Impl
 {
     /// <summary>
-    /// MPI operator used to receive and reduce messages.
+    /// MPI operator used to receive and reduce messages in pipelined fashion.
     /// </summary>
     /// <typeparam name="T">The message type</typeparam>
     public class ReduceReceiver<T> : IReduceReceiver<T>
     {
-        private const int DefaultVersion = 1;
-
-        private readonly ICommunicationGroupNetworkObserver _networkHandler;
-        private readonly OperatorTopology<T> _topology;
+        private static readonly Logger Logger = Logger.GetLogger(typeof 
(ReduceReceiver<T>));
+        private const int PipelineVersion = 2;
+        private readonly OperatorTopology<PipelineMessage<T>> _topology;
+        private readonly PipelinedReduceFunction<T> _pipelinedReduceFunc;
 
         /// <summary>
         /// Creates a new ReduceReceiver.
@@ -45,25 +48,30 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <param name="topology">The task's operator topology graph</param>
         /// <param name="networkHandler">Handles incoming messages from other 
tasks</param>
         /// <param name="reduceFunction">The class used to aggregate all 
incoming messages</param>
+        /// <param name="dataConverter">The converter used to convert original
+        /// message to pipelined ones and vice versa.</param>
         [Inject]
         public ReduceReceiver(
-            [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string 
operatorName,
-            
[Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string 
groupName,
-            OperatorTopology<T> topology, 
+            [Parameter(typeof (MpiConfigurationOptions.OperatorName))] string 
operatorName,
+            [Parameter(typeof 
(MpiConfigurationOptions.CommunicationGroupName))] string groupName,
+            OperatorTopology<PipelineMessage<T>> topology,
             ICommunicationGroupNetworkObserver networkHandler,
-            IReduceFunction<T> reduceFunction)
+            IReduceFunction<T> reduceFunction,
+            IPipelineDataConverter<T> dataConverter)
         {
             OperatorName = operatorName;
             GroupName = groupName;
-            Version = DefaultVersion;
+            Version = PipelineVersion;
             ReduceFunction = reduceFunction;
 
-            _networkHandler = networkHandler;
+            _pipelinedReduceFunc = new 
PipelinedReduceFunction<T>(ReduceFunction);
             _topology = topology;
             _topology.Initialize();
 
             var msgHandler = 
Observer.Create<GroupCommunicationMessage>(message => 
_topology.OnNext(message));
-            _networkHandler.Register(operatorName, msgHandler);
+            networkHandler.Register(operatorName, msgHandler);
+
+            PipelineDataConverter = dataConverter;
         }
 
         /// <summary>
@@ -87,13 +95,27 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         public IReduceFunction<T> ReduceFunction { get; private set; }
 
         /// <summary>
+        /// Returns the IPipelineDataConvert used to convert messages to 
pipeline form and vice-versa
+        /// </summary>
+        public IPipelineDataConverter<T> PipelineDataConverter { get; private 
set; }
+
+        /// <summary>
         /// Receives messages sent by all ReduceSenders and aggregates them
         /// using the specified IReduceFunction.
         /// </summary>
         /// <returns>The single aggregated data</returns>
         public T Reduce()
         {
-            return _topology.ReceiveFromChildren(ReduceFunction);
+            PipelineMessage<T> message;
+            var messageList = new List<PipelineMessage<T>>();
+
+            do
+            {
+                message = _topology.ReceiveFromChildren(_pipelinedReduceFunc);
+                messageList.Add(message);
+            } while (!message.IsLast);
+
+            return PipelineDataConverter.FullMessage(messageList);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
index d21983a..4d73e04 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
@@ -18,27 +18,28 @@
  */
 
 using System;
-using System.Collections.Generic;
 using System.Reactive;
+using System.Collections.Generic;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Task;
 using Org.Apache.REEF.Network.Group.Task.Impl;
 using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Network.Group.Pipelining;
+using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.Network.Group.Operators.Impl
 {
     /// <summary>
-    /// MPI Operator used to send messages to be reduced by the ReduceReceiver.
+    /// MPI Operator used to send messages to be reduced by the ReduceReceiver 
in pipelined fashion.
     /// </summary>
     /// <typeparam name="T">The message type</typeparam>
     public class ReduceSender<T> : IReduceSender<T>
     {
-        private const int DefaultVersion = 1;
-
-        private readonly ICommunicationGroupNetworkObserver _networkHandler;
-        private readonly OperatorTopology<T> _topology;
-        private readonly IReduceFunction<T> _reduceFunction;
+        private static readonly Logger Logger = Logger.GetLogger(typeof 
(ReduceSender<T>));
+        private const int PipelineVersion = 2;
+        private readonly OperatorTopology<PipelineMessage<T>> _topology;
+        private readonly PipelinedReduceFunction<T> _pipelinedReduceFunc;
 
         /// <summary>
         /// Creates a new ReduceSender.
@@ -47,24 +48,32 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <param name="groupName">The name of the reduce operator's 
CommunicationGroup</param>
         /// <param name="topology">The Task's operator topology graph</param>
         /// <param name="networkHandler">The handler used to handle incoming 
messages</param>
+        /// <param name="reduceFunction">The function used to reduce the 
incoming messages</param>
+        /// <param name="dataConverter">The converter used to convert original
+        /// message to pipelined ones and vice versa.</param>
         [Inject]
         public ReduceSender(
             [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string 
operatorName,
             
[Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string 
groupName,
-            OperatorTopology<T> topology, 
+            OperatorTopology<PipelineMessage<T>> topology,
             ICommunicationGroupNetworkObserver networkHandler,
-            IReduceFunction<T> reduceFunction)
+            IReduceFunction<T> reduceFunction,
+            IPipelineDataConverter<T> dataConverter)
         {
             OperatorName = operatorName;
             GroupName = groupName;
-            Version = DefaultVersion;
-            _reduceFunction = reduceFunction;
-            _networkHandler = networkHandler;
+            ReduceFunction = reduceFunction;
+
+            Version = PipelineVersion;
+
+            _pipelinedReduceFunc = new 
PipelinedReduceFunction<T>(ReduceFunction);
             _topology = topology;
             _topology.Initialize();
 
             var msgHandler = 
Observer.Create<GroupCommunicationMessage>(message => 
_topology.OnNext(message));
-            _networkHandler.Register(operatorName, msgHandler);
+            networkHandler.Register(operatorName, msgHandler);
+
+            PipelineDataConverter = dataConverter;
         }
 
         /// <summary>
@@ -85,33 +94,46 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <summary>
         /// Get reduced data from children, reduce with the data given, then 
sends reduced data to parent
         /// </summary>
+        public IReduceFunction<T> ReduceFunction { get; private set; }
+
+        /// <summary>
+        /// Returns the IPipelineDataConvert used to convert messages to 
pipeline form and vice-versa
+        /// </summary>
+        public IPipelineDataConverter<T> PipelineDataConverter { get; private 
set; }
+
+        /// <summary>
+        /// Sends the data to the operator's ReduceReceiver to be aggregated.
+        /// </summary>
         /// <param name="data">The data to send</param>
         public void Send(T data)
         {
+            var messageList = PipelineDataConverter.PipelineMessage(data);
+
             if (data == null)
             {
-                throw new ArgumentNullException("data");    
+                throw new ArgumentNullException("data");
             }
 
-            //middle notes
-            if (_topology.HasChildren())
+            foreach (var message in messageList)
             {
-                var reducedValueOfChildren = 
_topology.ReceiveFromChildren(_reduceFunction);
+                if (_topology.HasChildren())
+                {
+                    var reducedValueOfChildren = 
_topology.ReceiveFromChildren(_pipelinedReduceFunc);
+
+                    var mergeddData = new List<PipelineMessage<T>> {message};
 
-                var mergeddData = new List<T>();
-                mergeddData.Add(data);
-                if (reducedValueOfChildren != null)
+                    if (reducedValueOfChildren != null)
+                    {
+                        mergeddData.Add(reducedValueOfChildren);
+                    }
+
+                    var reducedValue = 
_pipelinedReduceFunc.Reduce(mergeddData);
+                    _topology.SendToParent(reducedValue, MessageType.Data);
+                }
+                else
                 {
-                    mergeddData.Add(reducedValueOfChildren);
+                    _topology.SendToParent(message, MessageType.Data);
                 }
-                T reducedValue = _reduceFunction.Reduce(mergeddData);
-
-                _topology.SendToParent(reducedValue, MessageType.Data);
-            }
-            else
-            {
-                //leaf node
-                _topology.SendToParent(data, MessageType.Data);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs
index 158a6c5..5961615 100644
--- 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs
@@ -19,6 +19,7 @@
 
 using System;
 using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Network.Group.Pipelining;
 
 namespace Org.Apache.REEF.Network.Group.Operators.Impl
 {
@@ -41,6 +42,28 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         }
 
         /// <summary>
+        /// Creates a new ScatterOperatorSpec.
+        /// </summary>
+        /// <param name="senderId">The identifier of the task that will
+        /// be sending messages</param>
+        /// deserialize messages</param>
+        /// <param name="dataConverter">The converter used to convert original
+        /// message to pipelined ones and vice versa.</param>
+        public ScatterOperatorSpec(
+            string senderId,
+            IPipelineDataConverter<T1> dataConverter)
+        {
+            SenderId = senderId;
+            Codec = typeof(T2);
+            PipelineDataConverter = dataConverter;
+        }
+
+        /// <summary>
+        /// Returns the IPipelineDataConvert used to convert messages to 
pipeline form and vice-versa
+        /// </summary>
+        public IPipelineDataConverter<T1> PipelineDataConverter { get; private 
set; }
+
+        /// <summary>
         /// Returns the identifier for the task that splits and scatters a list
         /// of messages to other tasks.
         /// </summary>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/IPipelineDataConverter.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/IPipelineDataConverter.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/IPipelineDataConverter.cs
new file mode 100644
index 0000000..fa0d072
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/IPipelineDataConverter.cs
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using Org.Apache.REEF.Network.Group.Pipelining.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Interface;
+
+namespace Org.Apache.REEF.Network.Group.Pipelining
+{
+
+    /// <summary>
+    /// User specified class to convert the message to be communicated in to 
pipelining 
+    /// amenable data and vice-versa 
+    /// </summary>
+    /// <typeparam name="T">The message type</typeparam>
+    //[DefaultImplementation(typeof(DefaultPipelineDataConverter<>))]
+    public interface IPipelineDataConverter<T>
+    {
+        /// <summary>
+        /// Converts the original message to be communicated in to a vector of 
pipelined messages.
+        /// Each element of vector is communicated as a single logical unit.
+        /// </summary>
+        /// <param name="message">The original message</param>
+        /// <returns>The list of pipelined messages</returns>
+        List<PipelineMessage<T>> PipelineMessage(T message);
+
+        /// <summary>
+        /// Constructs the full final message from the vector of communicated 
pipelined messages
+        /// </summary>
+        /// <param name="pipelineMessage">The enumerator over received 
pipelined messages</param>
+        /// <returns>The full constructed message</returns>
+        T FullMessage(List<PipelineMessage<T>> pipelineMessage);
+
+        /// <summary>
+        /// Constructs the configuration of the class. Basically the arguments 
of the class like chunksize
+        /// </summary>
+        /// <returns>The configuration for this data converter class</returns>
+        IConfiguration GetConfiguration();
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/DefaultPipelineDataConverter.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/DefaultPipelineDataConverter.cs
 
b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/DefaultPipelineDataConverter.cs
new file mode 100644
index 0000000..5601cb7
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/DefaultPipelineDataConverter.cs
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Annotations;
+
+
+namespace Org.Apache.REEF.Network.Group.Pipelining.Impl
+{
+
+    /// <summary>
+    /// Default IPipelineDataConverter implementation
+    /// This basically is a non-pipelined implementation that just packs the 
whole message in one single PipelineMessage
+    /// </summary>
+    /// <typeparam name="T">The message type</typeparam>
+    public class DefaultPipelineDataConverter<T> : IPipelineDataConverter<T>
+    {
+        [Inject]
+        public DefaultPipelineDataConverter()
+        {
+        }
+
+        /// <summary>
+        /// Converts the original message to be communicated in to a single 
pipelined message
+        /// </summary>
+        /// <param name="message">The original message</param>
+        /// <returns>The list of pipelined messages with only one 
element</returns>
+        public List<PipelineMessage<T>> PipelineMessage(T message)
+        {
+            var messageList = new List<PipelineMessage<T>>();
+            messageList.Add(new PipelineMessage<T>(message, true));
+            return messageList;
+        }
+
+        /// <summary>
+        /// Constructs the full final message from the communicated pipelined 
message
+        /// </summary>
+        /// <param name="pipelineMessage">The enumerator over received 
pipelined messages
+        /// It is assumed to have only one element</param>
+        /// <returns>The full constructed message</returns>
+        public T FullMessage(List<PipelineMessage<T>> pipelineMessage)
+        {
+            if (pipelineMessage.Count != 1)
+            {
+                throw new System.Exception("Number of pipelined messages not 
equal to 1 in default IPipelineDataConverter implementation");
+            }
+
+            return pipelineMessage[0].Data;
+        }
+
+        public IConfiguration GetConfiguration()
+        {
+            return TangFactory.GetTang().NewConfigurationBuilder().Build();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/PipelineMessage.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/PipelineMessage.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/PipelineMessage.cs
new file mode 100644
index 0000000..196e0eb
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/PipelineMessage.cs
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Network.Group.Pipelining
+{
+    /// <summary>
+    /// the message for pipelined communication
+    /// </summary>
+    /// <typeparam name="T">The message type</typeparam>
+    public class PipelineMessage<T>
+    {
+        /// <summary>
+        /// Create new PipelineMessage.
+        /// </summary>
+        /// <param name="data">The actual byte data</param>
+        /// <param name="isLast">Whether this is last pipeline message</param>
+        public PipelineMessage(T data, bool isLast)
+        {
+            Data = data;
+            IsLast = isLast;
+        }
+
+        /// <summary>
+        /// Returns the actual message
+        /// </summary>
+        public T Data { get; private set; }
+        
+        /// <summary>
+        /// Returns whether this is the last pipelined message
+        /// </summary>
+        public bool IsLast { get; private set; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/PipelineMessageCodec.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/PipelineMessageCodec.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/PipelineMessageCodec.cs
new file mode 100644
index 0000000..1049914
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/PipelineMessageCodec.cs
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote;
+using System;
+using System.Linq;
+
+namespace Org.Apache.REEF.Network.Group.Pipelining
+{
+    /// <summary>
+    /// The codec for PipelineMessage
+    /// </summary>
+    /// <typeparam name="T">The message type</typeparam>
+    public class PipelineMessageCodec<T> : ICodec<PipelineMessage<T>>
+    {
+        /// <summary>
+        /// Creates new PipelineMessageCodec
+        /// </summary>
+        /// <param name="baseCodec">The codec for actual message in 
PipelineMessage</param>
+        [Inject]
+        public PipelineMessageCodec(ICodec<T> baseCodec)
+        {
+            BaseCodec = baseCodec;
+        }
+
+        /// <summary>Encodes the given object into a Byte Array</summary>
+        /// <param name="obj"></param>
+        /// <returns>a byte[] representation of the object</returns>
+        public byte[] Encode(PipelineMessage<T> obj)
+        {
+            var baseCoding = BaseCodec.Encode(obj.Data);
+            var result = new byte[baseCoding.Length + sizeof(bool)];
+            Buffer.BlockCopy(baseCoding, 0, result, 0, baseCoding.Length);
+            Buffer.BlockCopy(BitConverter.GetBytes(obj.IsLast), 0, result, 
baseCoding.Length, sizeof(bool));
+            return result;
+        }
+
+        /// <summary>Decodes the given byte array into a PipelineMessage 
object</summary>
+        /// <param name="data"></param>
+        /// <returns>the decoded PipelineMessage object</returns>
+        public PipelineMessage<T> Decode(byte[] data)
+        {
+            var message = BaseCodec.Decode(data.Take(data.Length - 
sizeof(bool)).ToArray());
+            var isLast = BitConverter.ToBoolean(data, data.Length - 
sizeof(bool));
+            return new PipelineMessage<T>(message, isLast);
+        }
+                
+        /// <summary>
+        /// Codec for actual message T
+        /// </summary>
+        public ICodec<T> BaseCodec { get; private set; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
index 2766f4f..013f76b 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
@@ -62,5 +62,21 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         {
             _messageQueue.Add(gcm);
         }
+
+        /// <summary>
+        /// Tells whether there is a message in queue or not.
+        /// </summary>
+        /// <returns>True if queue is non empty, false otherwise.</returns>
+        public bool HasMessage()
+        {
+            if (_messageQueue.Count != 0)
+            {
+                return true;
+            }
+            else
+            {
+                return false;
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
index 07d8376..2e98d3d 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
@@ -29,7 +29,6 @@ using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Operators;
 using Org.Apache.REEF.Network.Group.Operators.Impl;
 using Org.Apache.REEF.Network.NetworkService;
-using Org.Apache.REEF.Network.Utilities;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Exceptions;
 using Org.Apache.REEF.Utilities.Logging;
@@ -48,7 +47,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         private const int DefaultTimeout = 50000;
         private const int RetryCount = 10;
 
-        private static readonly Logger LOGGER = 
Logger.GetLogger(typeof(OperatorTopology<>));
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(OperatorTopology<>));
 
         private readonly string _groupName;
         private readonly string _operatorName;
@@ -64,6 +63,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         private readonly INameClient _nameClient;
         private readonly Sender _sender;
         private readonly BlockingCollection<NodeStruct> _nodesWithData;
+        private readonly Object _thisLock = new Object();
 
         /// <summary>
         /// Creates a new OperatorTopology object.
@@ -113,9 +113,9 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
                 _parent = new NodeStruct(rootId);
                 _idToNodeMap[rootId] = _parent;
             }
-            foreach (string childId in childIds)
+            foreach (var childId in childIds)
             {
-                NodeStruct node = new NodeStruct(childId);
+                var node = new NodeStruct(childId);
                 _children.Add(node);
                 _idToNodeMap[childId] = node;
             }
@@ -128,7 +128,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// </summary>
         public void Initialize()
         {
-            using (LOGGER.LogFunction("OperatorTopology::Initialize"))
+            using (Logger.LogFunction("OperatorTopology::Initialize"))
             {
                 if (_parent != null)
                 {
@@ -137,7 +137,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
 
                 if (_children.Count > 0)
                 {
-                    foreach (NodeStruct child in _children)
+                    foreach (var child in _children)
                     {
                         WaitForTaskRegistration(child.Identifier, _retryCount);
                     }
@@ -161,14 +161,17 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
                 throw new ArgumentException("Message must have a source");
             }
 
-            NodeStruct sourceNode = FindNode(gcm.Source);
+            var sourceNode = FindNode(gcm.Source);
             if (sourceNode == null)
             {
                 throw new IllegalStateException("Received message from invalid 
task id: " + gcm.Source);
             }
 
-            _nodesWithData.Add(sourceNode);
-            sourceNode.AddData(gcm);
+            lock (_thisLock)
+            {
+                _nodesWithData.Add(sourceNode);
+                sourceNode.AddData(gcm);
+            }
         }
 
         /// <summary>
@@ -198,7 +201,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
                 throw new ArgumentNullException("message");
             }
 
-            foreach (NodeStruct child in _children)
+            foreach (var child in _children)
             {
                 SendToNode(message, MessageType.Data, child); 
             }
@@ -221,7 +224,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
                 return;
             }
 
-            int count = (int) Math.Ceiling(((double) messages.Count) / 
_children.Count);
+            var count = (int) Math.Ceiling(((double) messages.Count) / 
_children.Count);
             ScatterHelper(messages, _children, count);
         }
 
@@ -286,7 +289,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <returns>The parent Task's message</returns>
         public T ReceiveFromParent()
         {
-            byte[][] data = ReceiveFromNode(_parent, true);
+            byte[][] data = ReceiveFromNode(_parent);
             if (data == null || data.Length != 1)
             {
                 throw new InvalidOperationException("Cannot receive data from 
parent node");
@@ -301,7 +304,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <returns>The parent Task's list of messages</returns>
         public List<T> ReceiveListFromParent()
         {
-            byte[][] data = ReceiveFromNode(_parent, true);
+            byte[][] data = ReceiveFromNode(_parent);
             if (data == null || data.Length == 0)
             {
                 throw new InvalidOperationException("Cannot receive data from 
parent node");
@@ -328,15 +331,19 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
 
             while (childrenToReceiveFrom.Count > 0)
             {
-                NodeStruct childWithData = GetNodeWithData();
-                byte[][] data = ReceiveFromNode(childWithData, false);
-                if (data == null || data.Length != 1)
+                var childrenWithData = GetNodeWithData(childrenToReceiveFrom);
+
+                foreach (var child in childrenWithData)
                 {
-                    throw new InvalidOperationException("Received invalid data 
from child with id: " + childWithData.Identifier);
-                }
+                    byte[][] data = ReceiveFromNode(child);
+                    if (data == null || data.Length != 1)
+                    {
+                        throw new InvalidOperationException("Received invalid 
data from child with id: " + child.Identifier);
+                    }
 
-                receivedData.Add(_codec.Decode(data[0]));
-                childrenToReceiveFrom.Remove(childWithData.Identifier);
+                    receivedData.Add(_codec.Decode(data[0]));
+                    childrenToReceiveFrom.Remove(child.Identifier);
+                }
             }
 
             return reduceFunction.Reduce(receivedData);
@@ -356,6 +363,71 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         }
 
         /// <summary>
+        /// Get a set of nodes containing an incoming message and belonging to 
candidate set of nodes.
+        /// </summary>
+        ///<param name="nodeSetIdentifier">Candidate set of nodes from which 
data is to be received</param>
+        /// <returns>A Vector of NodeStruct with incoming data.</returns>
+        private IEnumerable<NodeStruct> GetNodeWithData(IEnumerable<string> 
nodeSetIdentifier)
+        {
+            CancellationTokenSource timeoutSource = new 
CancellationTokenSource(_timeout);
+            List<NodeStruct> nodesSubsetWithData = new List<NodeStruct>();
+
+            try
+            {
+                lock (_thisLock)
+                {
+                    foreach (var identifier in nodeSetIdentifier)
+                    {
+                        if (!_idToNodeMap.ContainsKey(identifier))
+                        {
+                            throw new Exception("Trying to get data from the 
node not present in the node map");
+                        }
+
+                        if (_idToNodeMap[identifier].HasMessage())
+                        {
+                            nodesSubsetWithData.Add(_idToNodeMap[identifier]);
+                        }
+                    }
+
+                    if (nodesSubsetWithData.Count > 0)
+                    {
+                        return nodesSubsetWithData;
+                    }
+
+                    while (_nodesWithData.Count != 0)
+                    {
+                        _nodesWithData.Take();
+                    }
+                }
+
+                var potentialNode = _nodesWithData.Take();
+
+                while (!nodeSetIdentifier.Contains(potentialNode.Identifier))
+                {
+                    potentialNode = _nodesWithData.Take();
+                }
+
+                return new NodeStruct[] { potentialNode };
+
+            }
+            catch (OperationCanceledException)
+            {
+                Logger.Log(Level.Error, "No data to read from child");
+                throw;
+            }
+            catch (ObjectDisposedException)
+            {
+                Logger.Log(Level.Error, "No data to read from child");
+                throw;
+            }
+            catch (InvalidOperationException)
+            {
+                Logger.Log(Level.Error, "No data to read from child");
+                throw;
+            }
+        }
+
+        /// <summary>
         /// Get a node containing an incoming message.
         /// </summary>
         /// <returns>A NodeStruct with incoming data.</returns>
@@ -369,17 +441,17 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             }
             catch (OperationCanceledException)
             {
-                LOGGER.Log(Level.Error, "No data to read from child");
+                Logger.Log(Level.Error, "No data to read from child");
                 throw;
             }
             catch (ObjectDisposedException)
             {
-                LOGGER.Log(Level.Error, "No data to read from child");
+                Logger.Log(Level.Error, "No data to read from child");
                 throw;
             }
             catch (InvalidOperationException)
             {
-                LOGGER.Log(Level.Error, "No data to read from child");
+                Logger.Log(Level.Error, "No data to read from child");
                 throw;
             }
         }
@@ -444,17 +516,10 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// Removes the NodeStruct from the nodesWithData queue if requested.
         /// </summary>
         /// <param name="node">The node to receive from</param>
-        /// <param name="removeFromQueue">Whether or not to remove the 
NodeStruct
-        /// from the nodesWithData queue</param>
         /// <returns>The byte array message from the node</returns>
-        private byte[][] ReceiveFromNode(NodeStruct node, bool removeFromQueue)
+        private byte[][] ReceiveFromNode(NodeStruct node)
         {
             byte[][] data = node.GetData();
-            if (removeFromQueue)
-            {
-                _nodesWithData.Take(node);
-            }
-
             return data;
         }
 
@@ -479,16 +544,16 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         {
             for (int i = 0; i < retries; i++)
             {
-                if (_nameClient.Lookup(identifier) != null)
+                System.Net.IPEndPoint endPoint;
+                if ((endPoint = _nameClient.Lookup(identifier)) != null)
                 {
                     return;
                 }
 
                 Thread.Sleep(500);
-                LOGGER.Log(Level.Verbose, "Retry {0}: retrying lookup for 
node: {1}", i + 1, identifier);
             }
 
             throw new IllegalStateException("Failed to initialize operator 
topology for node: " + identifier);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
index 83990d7..909dc4c 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
@@ -26,6 +26,7 @@ using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Network.Group.Pipelining;
 
 namespace Org.Apache.REEF.Network.Group.Topology
 {
@@ -90,7 +91,7 @@ namespace Org.Apache.REEF.Network.Group.Topology
 
             if (taskId.Equals(_rootId))
             {
-                foreach (string tId in _nodes.Keys)
+                foreach (var tId in _nodes.Keys)
                 {
                     if (!tId.Equals(_rootId))
                     {
@@ -103,7 +104,12 @@ namespace Org.Apache.REEF.Network.Group.Topology
 
             if (OperatorSpec is BroadcastOperatorSpec<T1, T2>)
             {
-                BroadcastOperatorSpec<T1, T2> broadcastSpec = OperatorSpec as 
BroadcastOperatorSpec<T1, T2>;
+                var broadcastSpec = OperatorSpec as BroadcastOperatorSpec<T1, 
T2>;
+
+                
confBuilder.AddConfiguration(broadcastSpec.PipelineDataConverter.GetConfiguration());
+                
confBuilder.BindImplementation(typeof(IPipelineDataConverter<T1>), 
broadcastSpec.PipelineDataConverter.GetType())
+                
.BindImplementation(GenericType<ICodec<PipelineMessage<T1>>>.Class, 
GenericType<PipelineMessageCodec<T1>>.Class);
+
                 if (taskId.Equals(broadcastSpec.SenderId))
                 {
                     
confBuilder.BindImplementation(GenericType<IMpiOperator<T1>>.Class, 
GenericType<BroadcastSender<T1>>.Class);
@@ -115,8 +121,11 @@ namespace Org.Apache.REEF.Network.Group.Topology
             }
             else if (OperatorSpec is ReduceOperatorSpec<T1, T2>)
             {
-                ReduceOperatorSpec<T1, T2> reduceSpec = OperatorSpec as 
ReduceOperatorSpec<T1, T2>;
-                confBuilder.BindImplementation(typeof(IReduceFunction<T1>), 
reduceSpec.ReduceFunction.GetType());
+                var reduceSpec = OperatorSpec as ReduceOperatorSpec<T1, T2>;
+                
confBuilder.AddConfiguration(reduceSpec.PipelineDataConverter.GetConfiguration());
+                
confBuilder.BindImplementation(typeof(IPipelineDataConverter<T1>), 
reduceSpec.PipelineDataConverter.GetType())
+                .BindImplementation(typeof(IReduceFunction<T1>), 
reduceSpec.ReduceFunction.GetType())
+                
.BindImplementation(GenericType<ICodec<PipelineMessage<T1>>>.Class, 
GenericType<PipelineMessageCodec<T1>>.Class);
                 
                 if (taskId.Equals(reduceSpec.ReceiverId))
                 {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs
index bc324cf..1ee459e 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs
@@ -26,6 +26,7 @@ using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Network.Group.Pipelining;
 
 namespace Org.Apache.REEF.Network.Group.Topology
 {
@@ -42,7 +43,7 @@ namespace Org.Apache.REEF.Network.Group.Topology
         private TaskNode _logicalRoot;
         private TaskNode _prev;
 
-        private int _fanOut;
+        private readonly int _fanOut;
 
         /// <summary>
         /// Creates a new TreeTopology.
@@ -120,7 +121,10 @@ namespace Org.Apache.REEF.Network.Group.Topology
 
             if (OperatorSpec is BroadcastOperatorSpec<T1, T2>)
             {
-                BroadcastOperatorSpec<T1, T2> broadcastSpec = OperatorSpec as 
BroadcastOperatorSpec<T1, T2>;
+                var broadcastSpec = OperatorSpec as BroadcastOperatorSpec<T1, 
T2>;
+                
confBuilder.AddConfiguration(broadcastSpec.PipelineDataConverter.GetConfiguration());
+                
confBuilder.BindImplementation(typeof(IPipelineDataConverter<T1>), 
broadcastSpec.PipelineDataConverter.GetType())
+                
.BindImplementation(GenericType<ICodec<PipelineMessage<T1>>>.Class, 
GenericType<PipelineMessageCodec<T1>>.Class);
                 if (taskId.Equals(broadcastSpec.SenderId))
                 {
                     
confBuilder.BindImplementation(GenericType<IMpiOperator<T1>>.Class, 
GenericType<BroadcastSender<T1>>.Class);
@@ -132,8 +136,11 @@ namespace Org.Apache.REEF.Network.Group.Topology
             }
             else if (OperatorSpec is ReduceOperatorSpec<T1, T2>)
             {
-                ReduceOperatorSpec<T1, T2> reduceSpec = OperatorSpec as 
ReduceOperatorSpec<T1, T2>;
-                confBuilder.BindImplementation(typeof(IReduceFunction<T1>), 
reduceSpec.ReduceFunction.GetType());
+                var reduceSpec = OperatorSpec as ReduceOperatorSpec<T1, T2>;
+                
confBuilder.AddConfiguration(reduceSpec.PipelineDataConverter.GetConfiguration());
+                
confBuilder.BindImplementation(typeof(IPipelineDataConverter<T1>), 
reduceSpec.PipelineDataConverter.GetType())
+                .BindImplementation(typeof(IReduceFunction<T1>), 
reduceSpec.ReduceFunction.GetType())
+                
.BindImplementation(GenericType<ICodec<PipelineMessage<T1>>>.Class, 
GenericType<PipelineMessageCodec<T1>>.Class);
 
                 if (taskId.Equals(reduceSpec.ReceiverId))
                 {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj 
b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
index 7cab887..58e41a8 100644
--- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
+++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
@@ -66,6 +66,7 @@ under the License.
     <Compile Include="Group\Operators\Impl\BroadcastOperatorSpec.cs" />
     <Compile Include="Group\Operators\Impl\BroadcastReceiver.cs" />
     <Compile Include="Group\Operators\Impl\BroadcastSender.cs" />
+    <Compile Include="Group\Operators\Impl\PipelinedReduceFunction.cs" />
     <Compile Include="Group\Operators\Impl\ReduceFunction.cs" />
     <Compile Include="Group\Operators\Impl\ReduceOperatorSpec.cs" />
     <Compile Include="Group\Operators\Impl\ReduceReceiver.cs" />
@@ -80,6 +81,10 @@ under the License.
     <Compile Include="Group\Operators\IReduceSender.cs" />
     <Compile Include="Group\Operators\IScatterReceiver.cs" />
     <Compile Include="Group\Operators\IScatterSender.cs" />
+    <Compile Include="Group\Pipelining\Impl\DefaultPipelineDataConverter.cs" />
+    <Compile Include="Group\Pipelining\IPipelineDataConverter.cs" />
+    <Compile Include="Group\Pipelining\PipelineMessage.cs" />
+    <Compile Include="Group\Pipelining\PipelineMessageCodec.cs" />
     <Compile Include="Group\Task\ICommunicationGroupClient.cs" />
     <Compile Include="Group\Task\ICommunicationGroupNetworkObserver.cs" />
     <Compile Include="Group\Task\IMpiClient.cs" />

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs
index 6d93cab..a3989a0 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs
@@ -37,5 +37,20 @@ namespace Org.Apache.REEF.Tests.Functional.MPI
         public class FanOut : Name<int>
         {
         }
+
+        [NamedParameter("integer id of the evaluator")]
+        public class EvaluatorId : Name<string>
+        {
+        }
+
+        [NamedParameter("Size of the array")]
+        public class ArraySize : Name<int>
+        {
+        }
+
+        [NamedParameter("Chunk size for pipelining")]
+        public class ChunkSize : Name<int>
+        {
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs
index 403990f..668add3 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs
@@ -30,5 +30,7 @@ namespace Org.Apache.REEF.Tests.Functional.MPI
         public const string SlaveTaskId = "SlaveTask-";
         public const int NumIterations = 10;
         public const int FanOut = 2;
+        public const int ChunkSize = 2;
+        public const int ArrayLength = 6;
     }
 }

Reply via email to