[REEF-178] introduce pipelining in our REEF Group Communication operators Support pipeline in group communication
JIRA: REEF-178. (https://issues.apache.org/jira/browse/REEF-178) This Closes #139 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/c02c80da Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/c02c80da Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/c02c80da Branch: refs/heads/master Commit: c02c80dacfa7b9b16529958961f637f06dd73e20 Parents: b9bb7b1 Author: Julia Wang <[email protected]> Authored: Wed Apr 8 11:13:09 2015 -0700 Committer: Julia Wang <[email protected]> Committed: Wed Apr 8 13:24:43 2015 -0700 ---------------------------------------------------------------------- .../Group/Driver/ICommunicationGroupDriver.cs | 27 ++ .../Driver/Impl/CommunicationGroupDriver.cs | 91 +++++- .../Operators/Impl/BroadcastOperatorSpec.cs | 25 +- .../Group/Operators/Impl/BroadcastReceiver.cs | 50 ++- .../Group/Operators/Impl/BroadcastSender.cs | 39 ++- .../Operators/Impl/PipelinedReduceFunction.cs | 60 ++++ .../Group/Operators/Impl/ReduceOperatorSpec.cs | 27 ++ .../Group/Operators/Impl/ReduceReceiver.cs | 48 ++- .../Group/Operators/Impl/ReduceSender.cs | 80 +++-- .../Group/Operators/Impl/ScatterOperatorSpec.cs | 23 ++ .../Group/Pipelining/IPipelineDataConverter.cs | 58 ++++ .../Impl/DefaultPipelineDataConverter.cs | 74 +++++ .../Group/Pipelining/PipelineMessage.cs | 49 +++ .../Group/Pipelining/PipelineMessageCodec.cs | 70 ++++ .../Group/Task/Impl/NodeStruct.cs | 16 + .../Group/Task/Impl/OperatorTopology.cs | 133 ++++++-- .../Group/Topology/FlatTopology.cs | 17 +- .../Group/Topology/TreeTopology.cs | 15 +- .../Org.Apache.REEF.Network.csproj | 5 + .../Functional/MPI/MpiTestConfig.cs | 15 + .../Functional/MPI/MpiTestConstants.cs | 2 + .../PipelinedBroadcastReduceDriver.cs | 320 +++++++++++++++++++ .../PipelinedBroadcastReduceTest.cs | 98 ++++++ .../PipelinedMasterTask.cs | 102 ++++++ .../PipelinedSlaveTask.cs | 89 ++++++ .../Org.Apache.REEF.Tests.csproj | 10 +- 26 files changed, 1428 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs index 0fa4aae..2e11441 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs @@ -17,9 +17,11 @@ * under the License. */ +using System; using System.Collections.Generic; using Org.Apache.REEF.Network.Group.Operators; using Org.Apache.REEF.Network.Group.Operators.Impl; +using Org.Apache.REEF.Network.Group.Pipelining; using Org.Apache.REEF.Network.Group.Topology; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Wake.Remote; @@ -46,6 +48,18 @@ namespace Org.Apache.REEF.Network.Group.Driver /// <param name="operatorName">The name of the broadcast operator</param> /// <param name="masterTaskId">The master task id in broadcast operator</param> /// <param name="topologyType">The topology type for the operator</param> + /// <param name="pipelineDataConverter">The class used to convert data back and forth to pipelined one</param> + /// <returns>The same CommunicationGroupDriver with the added Broadcast operator info</returns> + ICommunicationGroupDriver AddBroadcast<TMessage, TMessageCodec>(string operatorName, string masterTaskId, TopologyTypes topologyType, IPipelineDataConverter<TMessage> pipelineDataConverter) where TMessageCodec : ICodec<TMessage>; + + /// <summary> + /// Adds the Broadcast MPI operator to the communication group. + /// </summary> + /// <typeparam name="TMessage">The type of messages that operators will send</typeparam> + /// <typeparam name="TMessageCodec">The codec used for serializing messages</typeparam> + /// <param name="operatorName">The name of the broadcast operator</param> + /// <param name="masterTaskId">The master task id in broadcast operator</param> + /// <param name="topologyType">The topology type for the operator</param> /// <returns>The same CommunicationGroupDriver with the added Broadcast operator info</returns> ICommunicationGroupDriver AddBroadcast<TMessage, TMessageCodec>(string operatorName, string masterTaskId, TopologyTypes topologyType = TopologyTypes.Flat) where TMessageCodec : ICodec<TMessage>; @@ -68,8 +82,21 @@ namespace Org.Apache.REEF.Network.Group.Driver /// <param name="reduceFunction">The class used to aggregate all messages.</param> /// <param name="topologyType">The topology for the operator</param> /// <returns>The same CommunicationGroupDriver with the added Reduce operator info</returns> + ICommunicationGroupDriver AddReduce<TMessage, TMessageCodec>(string operatorName, string masterTaskId, IReduceFunction<TMessage> reduceFunction, TopologyTypes topologyType, IPipelineDataConverter<TMessage> pipelineDataConverter) where TMessageCodec : ICodec<TMessage>; + + /// <summary> + /// Adds the Reduce MPI operator to the communication group. + /// </summary> + /// <typeparam name="TMessage">The type of messages that operators will send</typeparam> + /// <typeparam name="TMessageCodec">The codec used for serializing messages</typeparam> + /// <param name="operatorName">The name of the reduce operator</param> + /// <param name="masterTaskId">The master task id for the typology</param> + /// <param name="reduceFunction">The class used to aggregate all messages.</param> + /// <param name="topologyType">The topology for the operator</param> + /// <returns>The same CommunicationGroupDriver with the added Reduce operator info</returns> ICommunicationGroupDriver AddReduce<TMessage, TMessageCodec>(string operatorName, string masterTaskId, IReduceFunction<TMessage> reduceFunction, TopologyTypes topologyType = TopologyTypes.Flat) where TMessageCodec : ICodec<TMessage>; + /// <summary> /// Adds the Reduce MPI operator to the communication group with default IntCodec /// </summary> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs index 6c07598..065c158 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs @@ -22,6 +22,8 @@ using System.Reflection; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Operators; using Org.Apache.REEF.Network.Group.Operators.Impl; +using Org.Apache.REEF.Network.Group.Pipelining; +using Org.Apache.REEF.Network.Group.Pipelining.Impl; using Org.Apache.REEF.Network.Group.Topology; using Org.Apache.REEF.Tang.Exceptions; using Org.Apache.REEF.Tang.Formats; @@ -97,6 +99,44 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl /// <param name="operatorName">The name of the broadcast operator</param> /// <param name="masterTaskId">The master task id in broadcast operator</param> /// <param name="topologyType">The topology type for the operator</param> + /// <param name="pipelineDataConverter">The class type used to convert data back and forth to pipelined one</param> + /// <returns>The same CommunicationGroupDriver with the added Broadcast operator info</returns> + /// <returns></returns> + public ICommunicationGroupDriver AddBroadcast<TMessage, TMessageCodec>(string operatorName, string masterTaskId, TopologyTypes topologyType, IPipelineDataConverter<TMessage> pipelineDataConverter) where TMessageCodec : ICodec<TMessage> + { + if (_finalized) + { + throw new IllegalStateException("Can't add operators once the spec has been built."); + } + + var spec = new BroadcastOperatorSpec<TMessage, TMessageCodec>( + masterTaskId, + pipelineDataConverter); + + ITopology<TMessage, TMessageCodec> topology; + if (topologyType == TopologyTypes.Flat) + { + topology = new FlatTopology<TMessage, TMessageCodec>(operatorName, _groupName, spec.SenderId, _driverId, spec); + } + else + { + topology = new TreeTopology<TMessage, TMessageCodec>(operatorName, _groupName, spec.SenderId, _driverId, spec, + _fanOut); + } + + _topologies[operatorName] = topology; + _operatorSpecs[operatorName] = spec; + + return this; + } + + /// <summary> + /// </summary> + /// <typeparam name="TMessage">The type of messages that operators will send</typeparam> + /// <typeparam name="TMessageCodec">The codec used for serializing messages</typeparam> + /// <param name="operatorName">The name of the broadcast operator</param> + /// <param name="masterTaskId">The master task id in broadcast operator</param> + /// <param name="topologyType">The topology type for the operator</param> /// <returns>The same CommunicationGroupDriver with the added Broadcast operator info</returns> /// <returns></returns> public ICommunicationGroupDriver AddBroadcast<TMessage, TMessageCodec>(string operatorName, string masterTaskId, TopologyTypes topologyType = TopologyTypes.Flat) where TMessageCodec : ICodec<TMessage> @@ -107,7 +147,8 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl } var spec = new BroadcastOperatorSpec<TMessage, TMessageCodec>( - masterTaskId); + masterTaskId, + new DefaultPipelineDataConverter<TMessage>()); ITopology<TMessage, TMessageCodec> topology; if (topologyType == TopologyTypes.Flat) @@ -136,7 +177,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl public ICommunicationGroupDriver AddBroadcast(string operatorName, string masterTaskId, TopologyTypes topologyType = TopologyTypes.Flat) { - return AddBroadcast<int,IntCodec>(operatorName, masterTaskId, topologyType); + return AddBroadcast<int, IntCodec>(operatorName, masterTaskId, topologyType); } /// <summary> @@ -153,6 +194,51 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl string operatorName, string masterTaskId, IReduceFunction<TMessage> reduceFunction, + TopologyTypes topologyType, + IPipelineDataConverter<TMessage> pipelineDataConverter) where TMessageCodec : ICodec<TMessage> + { + if (_finalized) + { + throw new IllegalStateException("Can't add operators once the spec has been built."); + } + + var spec = new ReduceOperatorSpec<TMessage, TMessageCodec>( + masterTaskId, + pipelineDataConverter, + reduceFunction); + + ITopology<TMessage, TMessageCodec> topology; + + if (topologyType == TopologyTypes.Flat) + { + topology = new FlatTopology<TMessage, TMessageCodec>(operatorName, _groupName, spec.ReceiverId, _driverId, spec); + } + else + { + topology = new TreeTopology<TMessage, TMessageCodec>(operatorName, _groupName, spec.ReceiverId, _driverId, spec, + _fanOut); + } + + _topologies[operatorName] = topology; + _operatorSpecs[operatorName] = spec; + + return this; + } + + /// <summary> + /// Adds the Reduce MPI operator to the communication group. + /// </summary> + /// <typeparam name="TMessage">The type of messages that operators will send</typeparam> + /// <typeparam name="TMessageCodec">The codec used for serializing messages</typeparam> + /// <param name="operatorName">The name of the reduce operator</param> + /// <param name="masterTaskId">The master task id for the typology</param> + /// <param name="reduceFunction">The class used to aggregate all messages.</param> + /// <param name="topologyType">The topology for the operator</param> + /// <returns>The same CommunicationGroupDriver with the added Reduce operator info</returns> + public ICommunicationGroupDriver AddReduce<TMessage, TMessageCodec>( + string operatorName, + string masterTaskId, + IReduceFunction<TMessage> reduceFunction, TopologyTypes topologyType = TopologyTypes.Flat) where TMessageCodec : ICodec<TMessage> { if (_finalized) @@ -162,6 +248,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl var spec = new ReduceOperatorSpec<TMessage, TMessageCodec>( masterTaskId, + new DefaultPipelineDataConverter<TMessage>(), reduceFunction); ITopology<TMessage, TMessageCodec> topology; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs index 15a4374..cd8122a 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs @@ -19,6 +19,8 @@ using System; using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Network.Group.Pipelining; +using Org.Apache.REEF.Network.Group.Pipelining.Impl; namespace Org.Apache.REEF.Network.Group.Operators.Impl { @@ -35,10 +37,31 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl public BroadcastOperatorSpec(string senderId) { SenderId = senderId; - Codec = typeof(T2); + Codec = typeof(T2); + PipelineDataConverter = new DefaultPipelineDataConverter<T1>(); } /// <summary> + /// Create a new BroadcastOperatorSpec. + /// </summary> + /// <param name="senderId">The identifier of the root sending Task.</param> + /// <param name="dataConverter">The converter used to convert original + /// message to pipelined ones and vice versa.</param> + public BroadcastOperatorSpec( + string senderId, + IPipelineDataConverter<T1> dataConverter) + { + SenderId = senderId; + Codec = typeof(T2);; + PipelineDataConverter = dataConverter ?? new DefaultPipelineDataConverter<T1>(); + } + + /// <summary> + /// Returns the IPipelineDataConverter class type used to convert messages to pipeline form and vice-versa + /// </summary> + public IPipelineDataConverter<T1> PipelineDataConverter { get; private set; } + + /// <summary> /// Returns the identifier of the Task that will broadcast data to other Tasks. /// </summary> public string SenderId { get; private set; } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs index d45fa24..b8b2a5a 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs @@ -18,25 +18,27 @@ */ using System.Reactive; +using System.Collections.Generic; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Driver.Impl; using Org.Apache.REEF.Network.Group.Task; using Org.Apache.REEF.Network.Group.Task.Impl; using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Network.Group.Pipelining; +using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.Network.Group.Operators.Impl { /// <summary> - /// MPI Operator used to receive broadcast messages. + /// MPI Operator used to receive broadcast messages in pipelined fashion. /// </summary> /// <typeparam name="T">The type of message being sent.</typeparam> public class BroadcastReceiver<T> : IBroadcastReceiver<T> { - private const int DefaultVersion = 1; - + private const int PipelineVersion = 2; private readonly ICommunicationGroupNetworkObserver _networkHandler; - private readonly OperatorTopology<T> _topology; - + private readonly OperatorTopology<PipelineMessage<T>> _topology; + private static readonly Logger Logger = Logger.GetLogger(typeof(BroadcastReceiver<T>)); /// <summary> /// Creates a new BroadcastReceiver. /// </summary> @@ -45,16 +47,19 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl /// operator belongs to</param> /// <param name="topology">The node's topology graph</param> /// <param name="networkHandler">The incoming message handler</param> + /// <param name="dataConverter">The converter used to convert original + /// message to pipelined ones and vice versa.</param> [Inject] public BroadcastReceiver( [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string operatorName, [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName, - OperatorTopology<T> topology, - ICommunicationGroupNetworkObserver networkHandler) + OperatorTopology<PipelineMessage<T>> topology, + ICommunicationGroupNetworkObserver networkHandler, + IPipelineDataConverter<T> dataConverter) { OperatorName = operatorName; GroupName = groupName; - Version = DefaultVersion; + Version = PipelineVersion; _networkHandler = networkHandler; _topology = topology; @@ -62,6 +67,8 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message)); _networkHandler.Register(operatorName, msgHandler); + + PipelineDataConverter = dataConverter; } /// <summary> @@ -80,17 +87,34 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl public int Version { get; private set; } /// <summary> + /// Returns the IPipelineDataConvert used to convert messages to pipeline form and vice-versa + /// </summary> + public IPipelineDataConverter<T> PipelineDataConverter { get; private set; } + + + /// <summary> /// Receive a message from parent BroadcastSender. /// </summary> /// <returns>The incoming message</returns> public T Receive() { - var data = _topology.ReceiveFromParent(); - if (_topology.HasChildren()) + PipelineMessage<T> message; + var messageList = new List<PipelineMessage<T>>(); + + do { - _topology.SendToChildren(data, MessageType.Data); - } - return data; + message = _topology.ReceiveFromParent(); + + if (_topology.HasChildren()) + { + _topology.SendToChildren(message, MessageType.Data); + } + + messageList.Add(message); + } while (!message.IsLast); + + return PipelineDataConverter.FullMessage(messageList); } + } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs index f88b093..dc0142b 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs @@ -24,20 +24,21 @@ using Org.Apache.REEF.Network.Group.Driver.Impl; using Org.Apache.REEF.Network.Group.Task; using Org.Apache.REEF.Network.Group.Task.Impl; using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Network.Group.Pipelining; +using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.Network.Group.Operators.Impl { /// <summary> - /// MPI Operator used to send messages to child Tasks. + /// MPI Operator used to send messages to child Tasks in pipelined fashion. /// </summary> /// <typeparam name="T">The message type</typeparam> public class BroadcastSender<T> : IBroadcastSender<T> { - private const int DefaultVersion = 1; + private static readonly Logger Logger = Logger.GetLogger(typeof(BroadcastSender<T>)); + private const int PipelineVersion = 2; + private readonly OperatorTopology<PipelineMessage<T>> _topology; - private readonly ICommunicationGroupNetworkObserver _networkHandler; - private readonly OperatorTopology<T> _topology; - /// <summary> /// Creates a new BroadcastSender to send messages to other Tasks. /// </summary> @@ -46,23 +47,27 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl /// belongs to</param> /// <param name="topology">The node's topology graph</param> /// <param name="networkHandler">The incoming message handler</param> + /// <param name="dataConverter">The converter used to convert original + /// message to pipelined ones and vice versa.</param> [Inject] public BroadcastSender( [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string operatorName, [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName, - OperatorTopology<T> topology, - ICommunicationGroupNetworkObserver networkHandler) + OperatorTopology<PipelineMessage<T>> topology, + ICommunicationGroupNetworkObserver networkHandler, + IPipelineDataConverter<T> dataConverter) { OperatorName = operatorName; GroupName = groupName; - Version = DefaultVersion; + Version = PipelineVersion; - _networkHandler = networkHandler; _topology = topology; _topology.Initialize(); var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message)); - _networkHandler.Register(operatorName, msgHandler); + networkHandler.Register(operatorName, msgHandler); + + PipelineDataConverter = dataConverter; } /// <summary> @@ -81,17 +86,27 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl public int Version { get; private set; } /// <summary> + /// Returns the IPipelineDataConvert used to convert messages to pipeline form and vice-versa + /// </summary> + public IPipelineDataConverter<T> PipelineDataConverter { get; private set; } + + /// <summary> /// Send the data to all BroadcastReceivers. /// </summary> /// <param name="data">The data to send.</param> public void Send(T data) { + var messageList = PipelineDataConverter.PipelineMessage(data); + if (data == null) { - throw new ArgumentNullException("data"); + throw new ArgumentNullException("data"); } - _topology.SendToChildren(data, MessageType.Data); + foreach (var message in messageList) + { + _topology.SendToChildren(message, MessageType.Data); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/PipelinedReduceFunction.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/PipelinedReduceFunction.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/PipelinedReduceFunction.cs new file mode 100644 index 0000000..cd2ae5e --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/PipelinedReduceFunction.cs @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Generic; +using Org.Apache.REEF.Network.Group.Pipelining; + +namespace Org.Apache.REEF.Network.Group.Operators.Impl +{ + /// <summary> + /// The class used to aggregate pipelined messages sent by ReduceSenders. + /// </summary> + /// <typeparam name="T">The message type.</typeparam> + public class PipelinedReduceFunction<T> : IReduceFunction<PipelineMessage<T>> + { + /// <summary> + /// The base reduce function class that operates on actual message type T. + /// </summary> + /// <typeparam name="T">The message type.</typeparam> + private readonly IReduceFunction<T> _baseReduceFunc; + public PipelinedReduceFunction(IReduceFunction<T> baseReduceFunc) + { + _baseReduceFunc = baseReduceFunc; + } + + /// <summary> + /// Reduce the IEnumerable of pipeline messages into one pipeline message. + /// </summary> + /// <param name="elements">The pipeline messages to reduce</param> + /// <returns>The reduced pipeline message</returns> + public PipelineMessage<T> Reduce(IEnumerable<PipelineMessage<T>> elements) + { + var messageList = new List<T>(); + var isLast = false; + + foreach (var message in elements) + { + messageList.Add(message.Data); + isLast = message.IsLast; + } + + return new PipelineMessage<T>(_baseReduceFunc.Reduce(messageList), isLast); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs index f72cea5..bf60841 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs @@ -19,6 +19,8 @@ using System; using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Network.Group.Pipelining.Impl; +using Org.Apache.REEF.Network.Group.Pipelining; namespace Org.Apache.REEF.Network.Group.Operators.Impl { @@ -41,9 +43,34 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl ReceiverId = receiverId; Codec = typeof(T2); ReduceFunction = reduceFunction; + PipelineDataConverter = new DefaultPipelineDataConverter<T1>(); } /// <summary> + /// Creates a new ReduceOperatorSpec. + /// </summary> + /// <param name="receiverId">The identifier of the task that + /// will receive and reduce incoming messages.</param> + /// <param name="reduceFunction">The class used to aggregate all messages.</param> + /// <param name="dataConverter">The converter used to convert original + /// message to pipelined ones and vice versa.</param> + public ReduceOperatorSpec( + string receiverId, + IPipelineDataConverter<T1> dataConverter, + IReduceFunction<T1> reduceFunction) + { + ReceiverId = receiverId; + Codec = typeof(T2); + ReduceFunction = reduceFunction; + PipelineDataConverter = dataConverter ?? new DefaultPipelineDataConverter<T1>(); + } + + /// <summary> + /// Returns the IPipelineDataConvert used to convert messages to pipeline form and vice-versa + /// </summary> + public IPipelineDataConverter<T1> PipelineDataConverter { get; private set; } + + /// <summary> /// Returns the identifier for the task that receives and reduces /// incoming messages. /// </summary> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs index 20d4ff7..70ed1ae 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs @@ -18,24 +18,27 @@ */ using System.Reactive; +using System.Collections.Generic; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Driver.Impl; using Org.Apache.REEF.Network.Group.Task; using Org.Apache.REEF.Network.Group.Task.Impl; using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Network.Group.Pipelining; +using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.Network.Group.Operators.Impl { /// <summary> - /// MPI operator used to receive and reduce messages. + /// MPI operator used to receive and reduce messages in pipelined fashion. /// </summary> /// <typeparam name="T">The message type</typeparam> public class ReduceReceiver<T> : IReduceReceiver<T> { - private const int DefaultVersion = 1; - - private readonly ICommunicationGroupNetworkObserver _networkHandler; - private readonly OperatorTopology<T> _topology; + private static readonly Logger Logger = Logger.GetLogger(typeof (ReduceReceiver<T>)); + private const int PipelineVersion = 2; + private readonly OperatorTopology<PipelineMessage<T>> _topology; + private readonly PipelinedReduceFunction<T> _pipelinedReduceFunc; /// <summary> /// Creates a new ReduceReceiver. @@ -45,25 +48,30 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl /// <param name="topology">The task's operator topology graph</param> /// <param name="networkHandler">Handles incoming messages from other tasks</param> /// <param name="reduceFunction">The class used to aggregate all incoming messages</param> + /// <param name="dataConverter">The converter used to convert original + /// message to pipelined ones and vice versa.</param> [Inject] public ReduceReceiver( - [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string operatorName, - [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName, - OperatorTopology<T> topology, + [Parameter(typeof (MpiConfigurationOptions.OperatorName))] string operatorName, + [Parameter(typeof (MpiConfigurationOptions.CommunicationGroupName))] string groupName, + OperatorTopology<PipelineMessage<T>> topology, ICommunicationGroupNetworkObserver networkHandler, - IReduceFunction<T> reduceFunction) + IReduceFunction<T> reduceFunction, + IPipelineDataConverter<T> dataConverter) { OperatorName = operatorName; GroupName = groupName; - Version = DefaultVersion; + Version = PipelineVersion; ReduceFunction = reduceFunction; - _networkHandler = networkHandler; + _pipelinedReduceFunc = new PipelinedReduceFunction<T>(ReduceFunction); _topology = topology; _topology.Initialize(); var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message)); - _networkHandler.Register(operatorName, msgHandler); + networkHandler.Register(operatorName, msgHandler); + + PipelineDataConverter = dataConverter; } /// <summary> @@ -87,13 +95,27 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl public IReduceFunction<T> ReduceFunction { get; private set; } /// <summary> + /// Returns the IPipelineDataConvert used to convert messages to pipeline form and vice-versa + /// </summary> + public IPipelineDataConverter<T> PipelineDataConverter { get; private set; } + + /// <summary> /// Receives messages sent by all ReduceSenders and aggregates them /// using the specified IReduceFunction. /// </summary> /// <returns>The single aggregated data</returns> public T Reduce() { - return _topology.ReceiveFromChildren(ReduceFunction); + PipelineMessage<T> message; + var messageList = new List<PipelineMessage<T>>(); + + do + { + message = _topology.ReceiveFromChildren(_pipelinedReduceFunc); + messageList.Add(message); + } while (!message.IsLast); + + return PipelineDataConverter.FullMessage(messageList); } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs index d21983a..4d73e04 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs @@ -18,27 +18,28 @@ */ using System; -using System.Collections.Generic; using System.Reactive; +using System.Collections.Generic; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Driver.Impl; using Org.Apache.REEF.Network.Group.Task; using Org.Apache.REEF.Network.Group.Task.Impl; using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Network.Group.Pipelining; +using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.Network.Group.Operators.Impl { /// <summary> - /// MPI Operator used to send messages to be reduced by the ReduceReceiver. + /// MPI Operator used to send messages to be reduced by the ReduceReceiver in pipelined fashion. /// </summary> /// <typeparam name="T">The message type</typeparam> public class ReduceSender<T> : IReduceSender<T> { - private const int DefaultVersion = 1; - - private readonly ICommunicationGroupNetworkObserver _networkHandler; - private readonly OperatorTopology<T> _topology; - private readonly IReduceFunction<T> _reduceFunction; + private static readonly Logger Logger = Logger.GetLogger(typeof (ReduceSender<T>)); + private const int PipelineVersion = 2; + private readonly OperatorTopology<PipelineMessage<T>> _topology; + private readonly PipelinedReduceFunction<T> _pipelinedReduceFunc; /// <summary> /// Creates a new ReduceSender. @@ -47,24 +48,32 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl /// <param name="groupName">The name of the reduce operator's CommunicationGroup</param> /// <param name="topology">The Task's operator topology graph</param> /// <param name="networkHandler">The handler used to handle incoming messages</param> + /// <param name="reduceFunction">The function used to reduce the incoming messages</param> + /// <param name="dataConverter">The converter used to convert original + /// message to pipelined ones and vice versa.</param> [Inject] public ReduceSender( [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string operatorName, [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName, - OperatorTopology<T> topology, + OperatorTopology<PipelineMessage<T>> topology, ICommunicationGroupNetworkObserver networkHandler, - IReduceFunction<T> reduceFunction) + IReduceFunction<T> reduceFunction, + IPipelineDataConverter<T> dataConverter) { OperatorName = operatorName; GroupName = groupName; - Version = DefaultVersion; - _reduceFunction = reduceFunction; - _networkHandler = networkHandler; + ReduceFunction = reduceFunction; + + Version = PipelineVersion; + + _pipelinedReduceFunc = new PipelinedReduceFunction<T>(ReduceFunction); _topology = topology; _topology.Initialize(); var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message)); - _networkHandler.Register(operatorName, msgHandler); + networkHandler.Register(operatorName, msgHandler); + + PipelineDataConverter = dataConverter; } /// <summary> @@ -85,33 +94,46 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl /// <summary> /// Get reduced data from children, reduce with the data given, then sends reduced data to parent /// </summary> + public IReduceFunction<T> ReduceFunction { get; private set; } + + /// <summary> + /// Returns the IPipelineDataConvert used to convert messages to pipeline form and vice-versa + /// </summary> + public IPipelineDataConverter<T> PipelineDataConverter { get; private set; } + + /// <summary> + /// Sends the data to the operator's ReduceReceiver to be aggregated. + /// </summary> /// <param name="data">The data to send</param> public void Send(T data) { + var messageList = PipelineDataConverter.PipelineMessage(data); + if (data == null) { - throw new ArgumentNullException("data"); + throw new ArgumentNullException("data"); } - //middle notes - if (_topology.HasChildren()) + foreach (var message in messageList) { - var reducedValueOfChildren = _topology.ReceiveFromChildren(_reduceFunction); + if (_topology.HasChildren()) + { + var reducedValueOfChildren = _topology.ReceiveFromChildren(_pipelinedReduceFunc); + + var mergeddData = new List<PipelineMessage<T>> {message}; - var mergeddData = new List<T>(); - mergeddData.Add(data); - if (reducedValueOfChildren != null) + if (reducedValueOfChildren != null) + { + mergeddData.Add(reducedValueOfChildren); + } + + var reducedValue = _pipelinedReduceFunc.Reduce(mergeddData); + _topology.SendToParent(reducedValue, MessageType.Data); + } + else { - mergeddData.Add(reducedValueOfChildren); + _topology.SendToParent(message, MessageType.Data); } - T reducedValue = _reduceFunction.Reduce(mergeddData); - - _topology.SendToParent(reducedValue, MessageType.Data); - } - else - { - //leaf node - _topology.SendToParent(data, MessageType.Data); } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs index 158a6c5..5961615 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs @@ -19,6 +19,7 @@ using System; using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Network.Group.Pipelining; namespace Org.Apache.REEF.Network.Group.Operators.Impl { @@ -41,6 +42,28 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl } /// <summary> + /// Creates a new ScatterOperatorSpec. + /// </summary> + /// <param name="senderId">The identifier of the task that will + /// be sending messages</param> + /// deserialize messages</param> + /// <param name="dataConverter">The converter used to convert original + /// message to pipelined ones and vice versa.</param> + public ScatterOperatorSpec( + string senderId, + IPipelineDataConverter<T1> dataConverter) + { + SenderId = senderId; + Codec = typeof(T2); + PipelineDataConverter = dataConverter; + } + + /// <summary> + /// Returns the IPipelineDataConvert used to convert messages to pipeline form and vice-versa + /// </summary> + public IPipelineDataConverter<T1> PipelineDataConverter { get; private set; } + + /// <summary> /// Returns the identifier for the task that splits and scatters a list /// of messages to other tasks. /// </summary> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/IPipelineDataConverter.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/IPipelineDataConverter.cs b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/IPipelineDataConverter.cs new file mode 100644 index 0000000..fa0d072 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/IPipelineDataConverter.cs @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Generic; +using Org.Apache.REEF.Network.Group.Pipelining.Impl; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Interface; + +namespace Org.Apache.REEF.Network.Group.Pipelining +{ + + /// <summary> + /// User specified class to convert the message to be communicated in to pipelining + /// amenable data and vice-versa + /// </summary> + /// <typeparam name="T">The message type</typeparam> + //[DefaultImplementation(typeof(DefaultPipelineDataConverter<>))] + public interface IPipelineDataConverter<T> + { + /// <summary> + /// Converts the original message to be communicated in to a vector of pipelined messages. + /// Each element of vector is communicated as a single logical unit. + /// </summary> + /// <param name="message">The original message</param> + /// <returns>The list of pipelined messages</returns> + List<PipelineMessage<T>> PipelineMessage(T message); + + /// <summary> + /// Constructs the full final message from the vector of communicated pipelined messages + /// </summary> + /// <param name="pipelineMessage">The enumerator over received pipelined messages</param> + /// <returns>The full constructed message</returns> + T FullMessage(List<PipelineMessage<T>> pipelineMessage); + + /// <summary> + /// Constructs the configuration of the class. Basically the arguments of the class like chunksize + /// </summary> + /// <returns>The configuration for this data converter class</returns> + IConfiguration GetConfiguration(); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/DefaultPipelineDataConverter.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/DefaultPipelineDataConverter.cs b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/DefaultPipelineDataConverter.cs new file mode 100644 index 0000000..5601cb7 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/DefaultPipelineDataConverter.cs @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Generic; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Annotations; + + +namespace Org.Apache.REEF.Network.Group.Pipelining.Impl +{ + + /// <summary> + /// Default IPipelineDataConverter implementation + /// This basically is a non-pipelined implementation that just packs the whole message in one single PipelineMessage + /// </summary> + /// <typeparam name="T">The message type</typeparam> + public class DefaultPipelineDataConverter<T> : IPipelineDataConverter<T> + { + [Inject] + public DefaultPipelineDataConverter() + { + } + + /// <summary> + /// Converts the original message to be communicated in to a single pipelined message + /// </summary> + /// <param name="message">The original message</param> + /// <returns>The list of pipelined messages with only one element</returns> + public List<PipelineMessage<T>> PipelineMessage(T message) + { + var messageList = new List<PipelineMessage<T>>(); + messageList.Add(new PipelineMessage<T>(message, true)); + return messageList; + } + + /// <summary> + /// Constructs the full final message from the communicated pipelined message + /// </summary> + /// <param name="pipelineMessage">The enumerator over received pipelined messages + /// It is assumed to have only one element</param> + /// <returns>The full constructed message</returns> + public T FullMessage(List<PipelineMessage<T>> pipelineMessage) + { + if (pipelineMessage.Count != 1) + { + throw new System.Exception("Number of pipelined messages not equal to 1 in default IPipelineDataConverter implementation"); + } + + return pipelineMessage[0].Data; + } + + public IConfiguration GetConfiguration() + { + return TangFactory.GetTang().NewConfigurationBuilder().Build(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/PipelineMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/PipelineMessage.cs b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/PipelineMessage.cs new file mode 100644 index 0000000..196e0eb --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/PipelineMessage.cs @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Network.Group.Pipelining +{ + /// <summary> + /// the message for pipelined communication + /// </summary> + /// <typeparam name="T">The message type</typeparam> + public class PipelineMessage<T> + { + /// <summary> + /// Create new PipelineMessage. + /// </summary> + /// <param name="data">The actual byte data</param> + /// <param name="isLast">Whether this is last pipeline message</param> + public PipelineMessage(T data, bool isLast) + { + Data = data; + IsLast = isLast; + } + + /// <summary> + /// Returns the actual message + /// </summary> + public T Data { get; private set; } + + /// <summary> + /// Returns whether this is the last pipelined message + /// </summary> + public bool IsLast { get; private set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/PipelineMessageCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/PipelineMessageCodec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/PipelineMessageCodec.cs new file mode 100644 index 0000000..1049914 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/PipelineMessageCodec.cs @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote; +using System; +using System.Linq; + +namespace Org.Apache.REEF.Network.Group.Pipelining +{ + /// <summary> + /// The codec for PipelineMessage + /// </summary> + /// <typeparam name="T">The message type</typeparam> + public class PipelineMessageCodec<T> : ICodec<PipelineMessage<T>> + { + /// <summary> + /// Creates new PipelineMessageCodec + /// </summary> + /// <param name="baseCodec">The codec for actual message in PipelineMessage</param> + [Inject] + public PipelineMessageCodec(ICodec<T> baseCodec) + { + BaseCodec = baseCodec; + } + + /// <summary>Encodes the given object into a Byte Array</summary> + /// <param name="obj"></param> + /// <returns>a byte[] representation of the object</returns> + public byte[] Encode(PipelineMessage<T> obj) + { + var baseCoding = BaseCodec.Encode(obj.Data); + var result = new byte[baseCoding.Length + sizeof(bool)]; + Buffer.BlockCopy(baseCoding, 0, result, 0, baseCoding.Length); + Buffer.BlockCopy(BitConverter.GetBytes(obj.IsLast), 0, result, baseCoding.Length, sizeof(bool)); + return result; + } + + /// <summary>Decodes the given byte array into a PipelineMessage object</summary> + /// <param name="data"></param> + /// <returns>the decoded PipelineMessage object</returns> + public PipelineMessage<T> Decode(byte[] data) + { + var message = BaseCodec.Decode(data.Take(data.Length - sizeof(bool)).ToArray()); + var isLast = BitConverter.ToBoolean(data, data.Length - sizeof(bool)); + return new PipelineMessage<T>(message, isLast); + } + + /// <summary> + /// Codec for actual message T + /// </summary> + public ICodec<T> BaseCodec { get; private set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs index 2766f4f..013f76b 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs @@ -62,5 +62,21 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl { _messageQueue.Add(gcm); } + + /// <summary> + /// Tells whether there is a message in queue or not. + /// </summary> + /// <returns>True if queue is non empty, false otherwise.</returns> + public bool HasMessage() + { + if (_messageQueue.Count != 0) + { + return true; + } + else + { + return false; + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs index 07d8376..2e98d3d 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs @@ -29,7 +29,6 @@ using Org.Apache.REEF.Network.Group.Driver.Impl; using Org.Apache.REEF.Network.Group.Operators; using Org.Apache.REEF.Network.Group.Operators.Impl; using Org.Apache.REEF.Network.NetworkService; -using Org.Apache.REEF.Network.Utilities; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Exceptions; using Org.Apache.REEF.Utilities.Logging; @@ -48,7 +47,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl private const int DefaultTimeout = 50000; private const int RetryCount = 10; - private static readonly Logger LOGGER = Logger.GetLogger(typeof(OperatorTopology<>)); + private static readonly Logger Logger = Logger.GetLogger(typeof(OperatorTopology<>)); private readonly string _groupName; private readonly string _operatorName; @@ -64,6 +63,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl private readonly INameClient _nameClient; private readonly Sender _sender; private readonly BlockingCollection<NodeStruct> _nodesWithData; + private readonly Object _thisLock = new Object(); /// <summary> /// Creates a new OperatorTopology object. @@ -113,9 +113,9 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl _parent = new NodeStruct(rootId); _idToNodeMap[rootId] = _parent; } - foreach (string childId in childIds) + foreach (var childId in childIds) { - NodeStruct node = new NodeStruct(childId); + var node = new NodeStruct(childId); _children.Add(node); _idToNodeMap[childId] = node; } @@ -128,7 +128,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// </summary> public void Initialize() { - using (LOGGER.LogFunction("OperatorTopology::Initialize")) + using (Logger.LogFunction("OperatorTopology::Initialize")) { if (_parent != null) { @@ -137,7 +137,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl if (_children.Count > 0) { - foreach (NodeStruct child in _children) + foreach (var child in _children) { WaitForTaskRegistration(child.Identifier, _retryCount); } @@ -161,14 +161,17 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl throw new ArgumentException("Message must have a source"); } - NodeStruct sourceNode = FindNode(gcm.Source); + var sourceNode = FindNode(gcm.Source); if (sourceNode == null) { throw new IllegalStateException("Received message from invalid task id: " + gcm.Source); } - _nodesWithData.Add(sourceNode); - sourceNode.AddData(gcm); + lock (_thisLock) + { + _nodesWithData.Add(sourceNode); + sourceNode.AddData(gcm); + } } /// <summary> @@ -198,7 +201,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl throw new ArgumentNullException("message"); } - foreach (NodeStruct child in _children) + foreach (var child in _children) { SendToNode(message, MessageType.Data, child); } @@ -221,7 +224,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl return; } - int count = (int) Math.Ceiling(((double) messages.Count) / _children.Count); + var count = (int) Math.Ceiling(((double) messages.Count) / _children.Count); ScatterHelper(messages, _children, count); } @@ -286,7 +289,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// <returns>The parent Task's message</returns> public T ReceiveFromParent() { - byte[][] data = ReceiveFromNode(_parent, true); + byte[][] data = ReceiveFromNode(_parent); if (data == null || data.Length != 1) { throw new InvalidOperationException("Cannot receive data from parent node"); @@ -301,7 +304,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// <returns>The parent Task's list of messages</returns> public List<T> ReceiveListFromParent() { - byte[][] data = ReceiveFromNode(_parent, true); + byte[][] data = ReceiveFromNode(_parent); if (data == null || data.Length == 0) { throw new InvalidOperationException("Cannot receive data from parent node"); @@ -328,15 +331,19 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl while (childrenToReceiveFrom.Count > 0) { - NodeStruct childWithData = GetNodeWithData(); - byte[][] data = ReceiveFromNode(childWithData, false); - if (data == null || data.Length != 1) + var childrenWithData = GetNodeWithData(childrenToReceiveFrom); + + foreach (var child in childrenWithData) { - throw new InvalidOperationException("Received invalid data from child with id: " + childWithData.Identifier); - } + byte[][] data = ReceiveFromNode(child); + if (data == null || data.Length != 1) + { + throw new InvalidOperationException("Received invalid data from child with id: " + child.Identifier); + } - receivedData.Add(_codec.Decode(data[0])); - childrenToReceiveFrom.Remove(childWithData.Identifier); + receivedData.Add(_codec.Decode(data[0])); + childrenToReceiveFrom.Remove(child.Identifier); + } } return reduceFunction.Reduce(receivedData); @@ -356,6 +363,71 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl } /// <summary> + /// Get a set of nodes containing an incoming message and belonging to candidate set of nodes. + /// </summary> + ///<param name="nodeSetIdentifier">Candidate set of nodes from which data is to be received</param> + /// <returns>A Vector of NodeStruct with incoming data.</returns> + private IEnumerable<NodeStruct> GetNodeWithData(IEnumerable<string> nodeSetIdentifier) + { + CancellationTokenSource timeoutSource = new CancellationTokenSource(_timeout); + List<NodeStruct> nodesSubsetWithData = new List<NodeStruct>(); + + try + { + lock (_thisLock) + { + foreach (var identifier in nodeSetIdentifier) + { + if (!_idToNodeMap.ContainsKey(identifier)) + { + throw new Exception("Trying to get data from the node not present in the node map"); + } + + if (_idToNodeMap[identifier].HasMessage()) + { + nodesSubsetWithData.Add(_idToNodeMap[identifier]); + } + } + + if (nodesSubsetWithData.Count > 0) + { + return nodesSubsetWithData; + } + + while (_nodesWithData.Count != 0) + { + _nodesWithData.Take(); + } + } + + var potentialNode = _nodesWithData.Take(); + + while (!nodeSetIdentifier.Contains(potentialNode.Identifier)) + { + potentialNode = _nodesWithData.Take(); + } + + return new NodeStruct[] { potentialNode }; + + } + catch (OperationCanceledException) + { + Logger.Log(Level.Error, "No data to read from child"); + throw; + } + catch (ObjectDisposedException) + { + Logger.Log(Level.Error, "No data to read from child"); + throw; + } + catch (InvalidOperationException) + { + Logger.Log(Level.Error, "No data to read from child"); + throw; + } + } + + /// <summary> /// Get a node containing an incoming message. /// </summary> /// <returns>A NodeStruct with incoming data.</returns> @@ -369,17 +441,17 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl } catch (OperationCanceledException) { - LOGGER.Log(Level.Error, "No data to read from child"); + Logger.Log(Level.Error, "No data to read from child"); throw; } catch (ObjectDisposedException) { - LOGGER.Log(Level.Error, "No data to read from child"); + Logger.Log(Level.Error, "No data to read from child"); throw; } catch (InvalidOperationException) { - LOGGER.Log(Level.Error, "No data to read from child"); + Logger.Log(Level.Error, "No data to read from child"); throw; } } @@ -444,17 +516,10 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// Removes the NodeStruct from the nodesWithData queue if requested. /// </summary> /// <param name="node">The node to receive from</param> - /// <param name="removeFromQueue">Whether or not to remove the NodeStruct - /// from the nodesWithData queue</param> /// <returns>The byte array message from the node</returns> - private byte[][] ReceiveFromNode(NodeStruct node, bool removeFromQueue) + private byte[][] ReceiveFromNode(NodeStruct node) { byte[][] data = node.GetData(); - if (removeFromQueue) - { - _nodesWithData.Take(node); - } - return data; } @@ -479,16 +544,16 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl { for (int i = 0; i < retries; i++) { - if (_nameClient.Lookup(identifier) != null) + System.Net.IPEndPoint endPoint; + if ((endPoint = _nameClient.Lookup(identifier)) != null) { return; } Thread.Sleep(500); - LOGGER.Log(Level.Verbose, "Retry {0}: retrying lookup for node: {1}", i + 1, identifier); } throw new IllegalStateException("Failed to initialize operator topology for node: " + identifier); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs index 83990d7..909dc4c 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs @@ -26,6 +26,7 @@ using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Tang.Util; using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Network.Group.Pipelining; namespace Org.Apache.REEF.Network.Group.Topology { @@ -90,7 +91,7 @@ namespace Org.Apache.REEF.Network.Group.Topology if (taskId.Equals(_rootId)) { - foreach (string tId in _nodes.Keys) + foreach (var tId in _nodes.Keys) { if (!tId.Equals(_rootId)) { @@ -103,7 +104,12 @@ namespace Org.Apache.REEF.Network.Group.Topology if (OperatorSpec is BroadcastOperatorSpec<T1, T2>) { - BroadcastOperatorSpec<T1, T2> broadcastSpec = OperatorSpec as BroadcastOperatorSpec<T1, T2>; + var broadcastSpec = OperatorSpec as BroadcastOperatorSpec<T1, T2>; + + confBuilder.AddConfiguration(broadcastSpec.PipelineDataConverter.GetConfiguration()); + confBuilder.BindImplementation(typeof(IPipelineDataConverter<T1>), broadcastSpec.PipelineDataConverter.GetType()) + .BindImplementation(GenericType<ICodec<PipelineMessage<T1>>>.Class, GenericType<PipelineMessageCodec<T1>>.Class); + if (taskId.Equals(broadcastSpec.SenderId)) { confBuilder.BindImplementation(GenericType<IMpiOperator<T1>>.Class, GenericType<BroadcastSender<T1>>.Class); @@ -115,8 +121,11 @@ namespace Org.Apache.REEF.Network.Group.Topology } else if (OperatorSpec is ReduceOperatorSpec<T1, T2>) { - ReduceOperatorSpec<T1, T2> reduceSpec = OperatorSpec as ReduceOperatorSpec<T1, T2>; - confBuilder.BindImplementation(typeof(IReduceFunction<T1>), reduceSpec.ReduceFunction.GetType()); + var reduceSpec = OperatorSpec as ReduceOperatorSpec<T1, T2>; + confBuilder.AddConfiguration(reduceSpec.PipelineDataConverter.GetConfiguration()); + confBuilder.BindImplementation(typeof(IPipelineDataConverter<T1>), reduceSpec.PipelineDataConverter.GetType()) + .BindImplementation(typeof(IReduceFunction<T1>), reduceSpec.ReduceFunction.GetType()) + .BindImplementation(GenericType<ICodec<PipelineMessage<T1>>>.Class, GenericType<PipelineMessageCodec<T1>>.Class); if (taskId.Equals(reduceSpec.ReceiverId)) { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs index bc324cf..1ee459e 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs @@ -26,6 +26,7 @@ using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Tang.Util; using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Network.Group.Pipelining; namespace Org.Apache.REEF.Network.Group.Topology { @@ -42,7 +43,7 @@ namespace Org.Apache.REEF.Network.Group.Topology private TaskNode _logicalRoot; private TaskNode _prev; - private int _fanOut; + private readonly int _fanOut; /// <summary> /// Creates a new TreeTopology. @@ -120,7 +121,10 @@ namespace Org.Apache.REEF.Network.Group.Topology if (OperatorSpec is BroadcastOperatorSpec<T1, T2>) { - BroadcastOperatorSpec<T1, T2> broadcastSpec = OperatorSpec as BroadcastOperatorSpec<T1, T2>; + var broadcastSpec = OperatorSpec as BroadcastOperatorSpec<T1, T2>; + confBuilder.AddConfiguration(broadcastSpec.PipelineDataConverter.GetConfiguration()); + confBuilder.BindImplementation(typeof(IPipelineDataConverter<T1>), broadcastSpec.PipelineDataConverter.GetType()) + .BindImplementation(GenericType<ICodec<PipelineMessage<T1>>>.Class, GenericType<PipelineMessageCodec<T1>>.Class); if (taskId.Equals(broadcastSpec.SenderId)) { confBuilder.BindImplementation(GenericType<IMpiOperator<T1>>.Class, GenericType<BroadcastSender<T1>>.Class); @@ -132,8 +136,11 @@ namespace Org.Apache.REEF.Network.Group.Topology } else if (OperatorSpec is ReduceOperatorSpec<T1, T2>) { - ReduceOperatorSpec<T1, T2> reduceSpec = OperatorSpec as ReduceOperatorSpec<T1, T2>; - confBuilder.BindImplementation(typeof(IReduceFunction<T1>), reduceSpec.ReduceFunction.GetType()); + var reduceSpec = OperatorSpec as ReduceOperatorSpec<T1, T2>; + confBuilder.AddConfiguration(reduceSpec.PipelineDataConverter.GetConfiguration()); + confBuilder.BindImplementation(typeof(IPipelineDataConverter<T1>), reduceSpec.PipelineDataConverter.GetType()) + .BindImplementation(typeof(IReduceFunction<T1>), reduceSpec.ReduceFunction.GetType()) + .BindImplementation(GenericType<ICodec<PipelineMessage<T1>>>.Class, GenericType<PipelineMessageCodec<T1>>.Class); if (taskId.Equals(reduceSpec.ReceiverId)) { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj index 7cab887..58e41a8 100644 --- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj +++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj @@ -66,6 +66,7 @@ under the License. <Compile Include="Group\Operators\Impl\BroadcastOperatorSpec.cs" /> <Compile Include="Group\Operators\Impl\BroadcastReceiver.cs" /> <Compile Include="Group\Operators\Impl\BroadcastSender.cs" /> + <Compile Include="Group\Operators\Impl\PipelinedReduceFunction.cs" /> <Compile Include="Group\Operators\Impl\ReduceFunction.cs" /> <Compile Include="Group\Operators\Impl\ReduceOperatorSpec.cs" /> <Compile Include="Group\Operators\Impl\ReduceReceiver.cs" /> @@ -80,6 +81,10 @@ under the License. <Compile Include="Group\Operators\IReduceSender.cs" /> <Compile Include="Group\Operators\IScatterReceiver.cs" /> <Compile Include="Group\Operators\IScatterSender.cs" /> + <Compile Include="Group\Pipelining\Impl\DefaultPipelineDataConverter.cs" /> + <Compile Include="Group\Pipelining\IPipelineDataConverter.cs" /> + <Compile Include="Group\Pipelining\PipelineMessage.cs" /> + <Compile Include="Group\Pipelining\PipelineMessageCodec.cs" /> <Compile Include="Group\Task\ICommunicationGroupClient.cs" /> <Compile Include="Group\Task\ICommunicationGroupNetworkObserver.cs" /> <Compile Include="Group\Task\IMpiClient.cs" /> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs index 6d93cab..a3989a0 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs @@ -37,5 +37,20 @@ namespace Org.Apache.REEF.Tests.Functional.MPI public class FanOut : Name<int> { } + + [NamedParameter("integer id of the evaluator")] + public class EvaluatorId : Name<string> + { + } + + [NamedParameter("Size of the array")] + public class ArraySize : Name<int> + { + } + + [NamedParameter("Chunk size for pipelining")] + public class ChunkSize : Name<int> + { + } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs index 403990f..668add3 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs @@ -30,5 +30,7 @@ namespace Org.Apache.REEF.Tests.Functional.MPI public const string SlaveTaskId = "SlaveTask-"; public const int NumIterations = 10; public const int FanOut = 2; + public const int ChunkSize = 2; + public const int ArrayLength = 6; } }
