http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs new file mode 100644 index 0000000..423e916 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Network.Group.Config +{ + public class MpiConfigurationOptions + { + [NamedParameter("Name of the communication group")] + public class CommunicationGroupName : Name<string> + { + } + + [NamedParameter("Name of the MPI operator")] + public class OperatorName : Name<string> + { + } + + [NamedParameter("Driver identifier")] + public class DriverId : Name<string> + { + } + + [NamedParameter("Master task identifier")] + public class MasterTaskId : Name<string> + { + } + + [NamedParameter("Serialized communication group configuration")] + public class SerializedGroupConfigs : Name<ISet<string>> + { + } + + [NamedParameter("Serialized operator configuration")] + public class SerializedOperatorConfigs : Name<ISet<string>> + { + } + + [NamedParameter("Id of root task in operator topology")] + public class TopologyRootTaskId : Name<string> + { + } + + [NamedParameter("Ids of child tasks in operator topology")] + public class TopologyChildTaskIds : Name<ISet<string>> + { + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs new file mode 100644 index 0000000..a2a249d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Network.Group.Operators.Impl; +using Org.Apache.REEF.Tang.Interface; +using System.Collections.Generic; + +namespace Org.Apache.REEF.Network.Group.Driver +{ + /// <summary> + /// Used to configure MPI operators in Reef driver. + /// All operators in the same Communication Group run on the the + /// same set of tasks. + /// </summary> + public interface ICommunicationGroupDriver + { + /// <summary> + /// Returns the list of task ids that belong to this Communication Group + /// </summary> + List<string> TaskIds { get; } + + /// <summary> + /// Adds the Broadcast MPI operator to the communication group. + /// </summary> + /// <typeparam name="T">The type of messages that operators will send</typeparam> + /// <param name="operatorName">The name of the broadcast operator</param> + /// <param name="spec">The specification that defines the Broadcast operator</param> + /// <returns>The same CommunicationGroupDriver with the added Broadcast operator info</returns> + ICommunicationGroupDriver AddBroadcast<T>(string operatorName, BroadcastOperatorSpec<T> spec); + + /// <summary> + /// Adds the Reduce MPI operator to the communication group. + /// </summary> + /// <typeparam name="T">The type of messages that operators will send</typeparam> + /// <param name="operatorName">The name of the reduce operator</param> + /// <param name="spec">The specification that defines the Reduce operator</param> + /// <returns>The same CommunicationGroupDriver with the added Reduce operator info</returns> + ICommunicationGroupDriver AddReduce<T>(string operatorName, ReduceOperatorSpec<T> spec); + + /// <summary> + /// Adds the Scatter MPI operator to the communication group. + /// </summary> + /// <typeparam name="T">The type of messages that operators will send</typeparam> + /// <param name="operatorName">The name of the scatter operator</param> + /// <param name="spec">The specification that defines the Scatter operator</param> + /// <returns>The same CommunicationGroupDriver with the added Scatter operator info</returns> + ICommunicationGroupDriver AddScatter<T>(string operatorName, ScatterOperatorSpec<T> spec); + + /// <summary> + /// Finalizes the CommunicationGroupDriver. + /// After the CommunicationGroupDriver has been finalized, no more operators may + /// be added to the group. + /// </summary> + /// <returns>The same finalized CommunicationGroupDriver</returns> + ICommunicationGroupDriver Build(); + + /// <summary> + /// Add a task to the communication group. + /// The CommunicationGroupDriver must have called Build() before adding tasks to the group. + /// </summary> + /// <param name="taskId">The id of the task to add</param> + void AddTask(string taskId); + + /// <summary> + /// Get the Task Configuration for this communication group. + /// Must be called only after all tasks have been added to the CommunicationGroupDriver. + /// </summary> + /// <param name="taskId">The task id of the task that belongs to this Communication Group</param> + /// <returns>The Task Configuration for this communication group</returns> + IConfiguration GetGroupTaskConfiguration(string taskId); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Driver/IMpiDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/IMpiDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/IMpiDriver.cs new file mode 100644 index 0000000..422d63e --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/IMpiDriver.cs @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Tang.Interface; + +namespace Org.Apache.REEF.Network.Group.Driver +{ + /// <summary> + /// Used to create Communication Groups for MPI Operators. + /// Also manages configuration for MPI tasks/services. + /// </summary> + public interface IMpiDriver + { + /// <summary> + /// Returns the identifier for the master task + /// </summary> + string MasterTaskId { get; } + + /// <summary> + /// Create a new CommunicationGroup with the given name and number of tasks/operators. + /// </summary> + /// <param name="groupName">The new group name</param> + /// <param name="numTasks">The number of tasks/operators in the group.</param> + /// <returns>The new Communication Group</returns> + ICommunicationGroupDriver NewCommunicationGroup(string groupName, int numTasks); + + /// <summary> + /// Generates context configuration with a unique identifier. + /// </summary> + /// <returns>The configured context configuration</returns> + IConfiguration GetContextConfiguration(); + + /// <summary> + /// Get the service configuration required for running MPI on Reef tasks. + /// </summary> + /// <returns>The service configuration for the Reef tasks</returns> + IConfiguration GetServiceConfiguration(); + + /// <summary> + /// Checks whether this active context can be used to run the Master Task. + /// </summary> + /// <param name="activeContext">The active context to check</param> + /// <returns>True if the active context can run the Master task, + /// otherwise false.</returns> + bool IsMasterTaskContext(IActiveContext activeContext); + + /// <summary> + /// Checks whether this context configuration is used to configure the Master Task. + /// </summary> + /// <param name="contextConfiguration">The context configuration to check</param> + /// <returns>True if the context configuration is used to configure the Master + /// Task, otherwise false.</returns> + bool IsMasterContextConfiguration(IConfiguration contextConfiguration); + + /// <summary> + /// Gets the context number associated with the Active Context id. + /// </summary> + /// <param name="activeContext">The active context to check</param> + /// <returns>The context number associated with the active context id</returns> + int GetContextNum(IActiveContext activeContext); + + /// <summary> + /// Get the configuration for a particular task. + /// + /// The task may belong to many Communication Groups, so each one is serialized + /// in the configuration as a SerializedGroupConfig. + /// + /// The user must merge their part of task configuration (task id, task class) + /// with this returned MPI task configuration. + /// </summary> + /// <param name="taskId">The id of the task Configuration to generate</param> + /// <returns>The MPI task configuration with communication group and + /// operator configuration set.</returns> + IConfiguration GetMpiTaskConfiguration(string taskId); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs new file mode 100644 index 0000000..d29c504 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Network.Group.Config; +using Org.Apache.REEF.Network.Group.Operators.Impl; +using Org.Apache.REEF.Network.Group.Topology; +using Org.Apache.REEF.Network.Utilities; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Exceptions; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Implementations; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using System.Collections.Generic; +using System.Reflection; +using Org.Apache.REEF.Tang.Implementations.Tang; + +namespace Org.Apache.REEF.Network.Group.Driver.Impl +{ + /// <summary> + /// Used to configure MPI operators in Reef driver. + /// All operators in the same Communication Group run on the the + /// same set of tasks. + /// </summary> + public class CommunicationGroupDriver : ICommunicationGroupDriver + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(CommunicationGroupDriver)); + + private string _groupName; + private string _driverId; + private int _numTasks; + private int _tasksAdded; + private bool _finalized; + + private AvroConfigurationSerializer _confSerializer; + + private object _topologyLock; + private Dictionary<string, object> _operatorSpecs; + private Dictionary<string, object> _topologies; + + /// <summary> + /// Create a new CommunicationGroupDriver. + /// </summary> + /// <param name="groupName">The communication group name</param> + /// <param name="driverId">Identifier of the Reef driver</param> + /// <param name="numTasks">The number of tasks each operator will use</param> + /// <param name="confSerializer">Used to serialize task configuration</param> + public CommunicationGroupDriver( + string groupName, + string driverId, + int numTasks, + AvroConfigurationSerializer confSerializer) + { + _confSerializer = confSerializer; + _groupName = groupName; + _driverId = driverId; + _numTasks = numTasks; + _tasksAdded = 0; + _finalized = false; + + _topologyLock = new object(); + + _operatorSpecs = new Dictionary<string, object>(); + _topologies = new Dictionary<string, object>(); + TaskIds = new List<string>(); + } + + /// <summary> + /// Returns the list of task ids that belong to this Communication Group + /// </summary> + public List<string> TaskIds { get; private set; } + + /// <summary> + /// Adds the Broadcast MPI operator to the communication group. + /// </summary> + /// <typeparam name="T">The type of messages that operators will send</typeparam> + /// <param name="operatorName">The name of the broadcast operator</param> + /// <param name="spec">The specification that defines the Broadcast operator</param> + /// <returns>The same CommunicationGroupDriver with the added Broadcast operator info</returns> + public ICommunicationGroupDriver AddBroadcast<T>( + string operatorName, + BroadcastOperatorSpec<T> spec) + { + if (_finalized) + { + throw new IllegalStateException("Can't add operators once the spec has been built."); + } + + ITopology<T> topology = new FlatTopology<T>(operatorName, _groupName, spec.SenderId, _driverId, spec); + _topologies[operatorName] = topology; + _operatorSpecs[operatorName] = spec; + + return this; + } + + /// <summary> + /// Adds the Reduce MPI operator to the communication group. + /// </summary> + /// <typeparam name="T">The type of messages that operators will send</typeparam> + /// <param name="operatorName">The name of the reduce operator</param> + /// <param name="spec">The specification that defines the Reduce operator</param> + /// <returns>The same CommunicationGroupDriver with the added Reduce operator info</returns> + public ICommunicationGroupDriver AddReduce<T>( + string operatorName, + ReduceOperatorSpec<T> spec) + { + if (_finalized) + { + throw new IllegalStateException("Can't add operators once the spec has been built."); + } + + ITopology<T> topology = new FlatTopology<T>(operatorName, _groupName, spec.ReceiverId, _driverId, spec); + _topologies[operatorName] = topology; + _operatorSpecs[operatorName] = spec; + + return this; + } + + /// <summary> + /// Adds the Scatter MPI operator to the communication group. + /// </summary> + /// <typeparam name="T">The type of messages that operators will send</typeparam> + /// <param name="operatorName">The name of the scatter operator</param> + /// <param name="spec">The specification that defines the Scatter operator</param> + /// <returns>The same CommunicationGroupDriver with the added Scatter operator info</returns> + public ICommunicationGroupDriver AddScatter<T>(string operatorName, ScatterOperatorSpec<T> spec) + { + if (_finalized) + { + throw new IllegalStateException("Can't add operators once the spec has been built."); + } + + ITopology<T> topology = new FlatTopology<T>(operatorName, _groupName, spec.SenderId, _driverId, spec); + _topologies[operatorName] = topology; + _operatorSpecs[operatorName] = spec; + + return this; + } + + /// <summary> + /// Finalizes the CommunicationGroupDriver. + /// After the CommunicationGroupDriver has been finalized, no more operators may + /// be added to the group. + /// </summary> + /// <returns>The same finalized CommunicationGroupDriver</returns> + public ICommunicationGroupDriver Build() + { + _finalized = true; + return this; + } + + /// <summary> + /// Add a task to the communication group. + /// The CommunicationGroupDriver must have called Build() before adding tasks to the group. + /// </summary> + /// <param name="taskId">The id of the task to add</param> + public void AddTask(string taskId) + { + if (!_finalized) + { + throw new IllegalStateException("CommunicationGroupDriver must call Build() before adding tasks to the group."); + } + + lock (_topologyLock) + { + _tasksAdded++; + if (_tasksAdded > _numTasks) + { + throw new IllegalStateException("Added too many tasks to Communication Group, expected: " + _numTasks); + } + + TaskIds.Add(taskId); + foreach (string operatorName in _operatorSpecs.Keys) + { + AddTask(operatorName, taskId); + } + } + } + + /// <summary> + /// Get the Task Configuration for this communication group. + /// Must be called only after all tasks have been added to the CommunicationGroupDriver. + /// </summary> + /// <param name="taskId">The task id of the task that belongs to this Communication Group</param> + /// <returns>The Task Configuration for this communication group</returns> + public IConfiguration GetGroupTaskConfiguration(string taskId) + { + if (!TaskIds.Contains(taskId)) + { + return null; + } + + // Make sure all tasks have been added to communication group before generating config + lock (_topologyLock) + { + if (_tasksAdded != _numTasks) + { + throw new IllegalStateException( + "Must add all tasks to communication group before fetching configuration"); + } + } + + var confBuilder = TangFactory.GetTang().NewConfigurationBuilder() + .BindNamedParameter<MpiConfigurationOptions.DriverId, string>( + GenericType<MpiConfigurationOptions.DriverId>.Class, + _driverId) + .BindNamedParameter<MpiConfigurationOptions.CommunicationGroupName, string>( + GenericType<MpiConfigurationOptions.CommunicationGroupName>.Class, + _groupName); + + foreach (var operatorName in _topologies.Keys) + { + var innerConf = TangFactory.GetTang().NewConfigurationBuilder(GetOperatorConfiguration(operatorName, taskId)) + .BindNamedParameter<MpiConfigurationOptions.DriverId, string>( + GenericType<MpiConfigurationOptions.DriverId>.Class, + _driverId) + .BindNamedParameter<MpiConfigurationOptions.OperatorName, string>( + GenericType<MpiConfigurationOptions.OperatorName>.Class, + operatorName) + .Build(); + + confBuilder.BindSetEntry<MpiConfigurationOptions.SerializedOperatorConfigs, string>( + GenericType<MpiConfigurationOptions.SerializedOperatorConfigs>.Class, + _confSerializer.ToString(innerConf)); + } + + return confBuilder.Build(); + } + + private void AddTask(string operatorName, string taskId) + { + var topology = _topologies[operatorName]; + MethodInfo info = topology.GetType().GetMethod("AddTask"); + info.Invoke(topology, new[] { (object) taskId }); + } + + private IConfiguration GetOperatorConfiguration(string operatorName, string taskId) + { + var topology = _topologies[operatorName]; + MethodInfo info = topology.GetType().GetMethod("GetTaskConfiguration"); + return (IConfiguration) info.Invoke(topology, new[] { (object) taskId }); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs new file mode 100644 index 0000000..1439a36 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Network.Group.Driver.Impl +{ + /// <summary> + /// Messages sent by MPI Operators + /// </summary> + public class GroupCommunicationMessage + { + /// <summary> + /// Create new CommunicationGroupMessage. + /// </summary> + /// <param name="groupName">The name of the communication group</param> + /// <param name="operatorName">The name of the MPI operator</param> + /// <param name="source">The message source</param> + /// <param name="destination">The message destination</param> + /// <param name="data">The actual byte array of data</param> + /// <param name="messageType">The type of message to send</param> + public GroupCommunicationMessage( + string groupName, + string operatorName, + string source, + string destination, + byte[] data, + MessageType messageType) + { + GroupName = groupName; + OperatorName = operatorName; + Source = source; + Destination = destination; + Data = new[] { data }; + MsgType = messageType; + } + + /// <summary> + /// Create new CommunicationGroupMessage. + /// </summary> + /// <param name="groupName">The name of the communication group</param> + /// <param name="operatorName">The name of the MPI operator</param> + /// <param name="source">The message source</param> + /// <param name="destination">The message destination</param> + /// <param name="data">The actual byte array of data</param> + /// <param name="messageType">The type of message to send</param> + public GroupCommunicationMessage( + string groupName, + string operatorName, + string source, + string destination, + byte[][] data, + MessageType messageType) + { + GroupName = groupName; + OperatorName = operatorName; + Source = source; + Destination = destination; + Data = data; + MsgType = messageType; + } + + /// <summary> + /// Returns the Communication Group name. + /// </summary> + public string GroupName { get; private set; } + + /// <summary> + /// Returns the MPI Operator name. + /// </summary> + public string OperatorName { get; private set; } + + /// <summary> + /// Returns the source of the message. + /// </summary> + public string Source { get; private set; } + + /// <summary> + /// Returns the destination of the message. + /// </summary> + public string Destination { get; private set; } + + /// <summary> + /// Returns the message data. + /// </summary> + public byte[][] Data { get; private set; } + + /// <summary> + /// Returns the type of message being sent. + /// </summary> + public MessageType MsgType { get; private set; } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MessageType.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MessageType.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MessageType.cs new file mode 100644 index 0000000..cd8ace2 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MessageType.cs @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Network.Group.Driver.Impl +{ + /// <summary> + /// Represents the different types of messages that Mpi Tasks can + /// send to each other. + /// </summary> + public enum MessageType + { + Data = 0 + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MpiDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MpiDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MpiDriver.cs new file mode 100644 index 0000000..a373ef3 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MpiDriver.cs @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Net; +using System.Threading; +using Org.Apache.REEF.Common.Io; +using Org.Apache.REEF.Common.Services; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Network.Group.Codec; +using Org.Apache.REEF.Network.Group.Config; +using Org.Apache.REEF.Network.Group.Task.Impl; +using Org.Apache.REEF.Network.Naming; +using Org.Apache.REEF.Network.NetworkService; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Network.Group.Driver.Impl +{ + /// <summary> + /// Used to create Communication Groups for MPI Operators on the Reef driver. + /// Also manages configuration for MPI tasks/services. + /// </summary> + public class MpiDriver : IMpiDriver + { + private const string MasterTaskContextName = "MasterTaskContext"; + private const string SlaveTaskContextName = "SlaveTaskContext"; + + private static Logger LOGGER = Logger.GetLogger(typeof(MpiDriver)); + + private string _driverId; + private string _nameServerAddr; + private int _nameServerPort; + private int _contextIds; + + private Dictionary<string, ICommunicationGroupDriver> _commGroups; + private AvroConfigurationSerializer _configSerializer; + private NameServer _nameServer; + + /// <summary> + /// Create a new MpiDriver object. + /// </summary> + /// <param name="driverId">Identifer for the REEF driver</param> + /// <param name="masterTaskId">Identifer for MPI master task</param> + /// <param name="configSerializer">Used to serialize task configuration</param> + [Inject] + public MpiDriver( + [Parameter(typeof(MpiConfigurationOptions.DriverId))] string driverId, + [Parameter(typeof(MpiConfigurationOptions.MasterTaskId))] string masterTaskId, + AvroConfigurationSerializer configSerializer) + { + _driverId = driverId; + _contextIds = -1; + MasterTaskId = masterTaskId; + + _configSerializer = configSerializer; + _commGroups = new Dictionary<string, ICommunicationGroupDriver>(); + _nameServer = new NameServer(0); + + IPEndPoint localEndpoint = _nameServer.LocalEndpoint; + _nameServerAddr = localEndpoint.Address.ToString(); + _nameServerPort = localEndpoint.Port; + } + + /// <summary> + /// Returns the identifier for the master task + /// </summary> + public string MasterTaskId { get; private set; } + + /// <summary> + /// Create a new CommunicationGroup with the given name and number of tasks/operators. + /// </summary> + /// <param name="groupName">The new group name</param> + /// <param name="numTasks">The number of tasks/operators in the group.</param> + /// <returns>The new Communication Group</returns> + public ICommunicationGroupDriver NewCommunicationGroup(string groupName, int numTasks) + { + if (string.IsNullOrEmpty(groupName)) + { + throw new ArgumentNullException("groupName"); + } + else if (numTasks < 1) + { + throw new ArgumentException("NumTasks must be greater than 0"); + } + else if (_commGroups.ContainsKey(groupName)) + { + throw new ArgumentException("Group Name already registered with MpiDriver"); + } + + var commGroup = new CommunicationGroupDriver(groupName, _driverId, numTasks, _configSerializer); + _commGroups[groupName] = commGroup; + return commGroup; + } + + /// <summary> + /// Generates context configuration with a unique identifier. + /// </summary> + /// <returns>The configured context configuration</returns> + public IConfiguration GetContextConfiguration() + { + int contextNum = Interlocked.Increment(ref _contextIds); + string id = (contextNum == 0) + ? MasterTaskContextName + : GetSlaveTaskContextName(contextNum); + + return ContextConfiguration.ConfigurationModule + .Set(ContextConfiguration.Identifier, id) + .Build(); + } + + /// <summary> + /// Get the service configuration required for running MPI on Reef tasks. + /// </summary> + /// <returns>The service configuration for the Reef tasks</returns> + public IConfiguration GetServiceConfiguration() + { + IConfiguration serviceConfig = ServiceConfiguration.ConfigurationModule + .Set(ServiceConfiguration.Services, GenericType<NetworkService<GroupCommunicationMessage>>.Class) + .Build(); + + return TangFactory.GetTang().NewConfigurationBuilder(serviceConfig) + .BindImplementation( + GenericType<IObserver<NsMessage<GroupCommunicationMessage>>>.Class, + GenericType<MpiNetworkObserver>.Class) + .BindImplementation( + GenericType<ICodec<GroupCommunicationMessage>>.Class, + GenericType<GroupCommunicationMessageCodec>.Class) + .BindNamedParameter<NamingConfigurationOptions.NameServerAddress, string>( + GenericType<NamingConfigurationOptions.NameServerAddress>.Class, + _nameServerAddr) + .BindNamedParameter<NamingConfigurationOptions.NameServerPort, int>( + GenericType<NamingConfigurationOptions.NameServerPort>.Class, + _nameServerPort.ToString(CultureInfo.InvariantCulture)) + .Build(); + } + + /// <summary> + /// Get the configuration for a particular task. + /// + /// The task may belong to many Communication Groups, so each one is serialized + /// in the configuration as a SerializedGroupConfig. + /// + /// The user must merge their part of task configuration (task id, task class) + /// with this returned MPI task configuration. + /// </summary> + /// <param name="taskId">The id of the task Configuration to generate</param> + /// <returns>The MPI task configuration with communication group and + /// operator configuration set.</returns> + public IConfiguration GetMpiTaskConfiguration(string taskId) + { + var confBuilder = TangFactory.GetTang().NewConfigurationBuilder(); + + foreach (ICommunicationGroupDriver commGroup in _commGroups.Values) + { + var taskConf = commGroup.GetGroupTaskConfiguration(taskId); + if (taskConf != null) + { + confBuilder.BindSetEntry<MpiConfigurationOptions.SerializedGroupConfigs, string>( + GenericType<MpiConfigurationOptions.SerializedGroupConfigs>.Class, + _configSerializer.ToString(taskConf)); + } + } + + return confBuilder.Build(); + } + + /// <summary> + /// Checks whether this active context can be used to run the Master Task. + /// </summary> + /// <param name="activeContext">The active context to check</param> + /// <returns>True if the active context can run the Master task, + /// otherwise false.</returns> + public bool IsMasterTaskContext(IActiveContext activeContext) + { + return activeContext.Id.Equals(MasterTaskContextName); + } + + /// <summary> + /// Checks whether this context configuration is used to configure the Master Task. + /// </summary> + /// <param name="contextConfiguration">The context configuration to check</param> + /// <returns>True if the context configuration is used to configure the Master + /// Task, otherwise false.</returns> + public bool IsMasterContextConfiguration(IConfiguration contextConfiguration) + { + return Utilities.Utils.GetContextId(contextConfiguration).Equals(MasterTaskContextName); + } + + /// <summary> + /// Gets the context number associated with the Active Context id. + /// </summary> + /// <param name="activeContext">The active context to check</param> + /// <returns>The context number associated with the active context id</returns> + public int GetContextNum(IActiveContext activeContext) + { + if (activeContext.Id.Equals(MasterTaskContextName)) + { + return 0; + } + + string[] parts = activeContext.Id.Split('-'); + if (parts.Length != 2) + { + throw new ArgumentException("Invalid id in active context"); + } + + return int.Parse(parts[1], CultureInfo.InvariantCulture); + } + + private string GetSlaveTaskContextName(int contextNum) + { + return string.Format(CultureInfo.InvariantCulture, "{0}-{1}", SlaveTaskContextName, contextNum); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/TaskStarter.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/TaskStarter.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/TaskStarter.cs new file mode 100644 index 0000000..48463dc --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/TaskStarter.cs @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Network.Utilities; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Implementations.Configuration; +using Org.Apache.REEF.Tang.Interface; + +namespace Org.Apache.REEF.Network.Group.Driver.Impl +{ + /// <summary> + /// Helper class to start MPI tasks. + /// </summary> + public class TaskStarter + { + private static Logger LOGGER = Logger.GetLogger(typeof(TaskStarter)); + + private object _lock; + private int _numTasks; + private int _tasksAdded; + private string _masterTaskId; + + private IMpiDriver _mpiDriver; + private List<Tuple<string, IConfiguration, IActiveContext>> _taskTuples; + + /// <summary> + /// Create new TaskStarter. + /// After adding the correct number of tasks to the TaskStarter, the + /// Tasks will be started on their given active context. + /// </summary> + /// <param name="mpiDriver">The IMpiDriver for the MPI tasks</param> + /// <param name="numTasks">The number of Tasks that need to be added before + /// the Tasks will be started. </param> + public TaskStarter(IMpiDriver mpiDriver, int numTasks) + { + LOGGER.Log(Level.Verbose, "Creating TaskStarter"); + _masterTaskId = mpiDriver.MasterTaskId; + _numTasks = numTasks; + _tasksAdded = 0; + _lock = new object(); + + _mpiDriver = mpiDriver; + _taskTuples = new List<Tuple<string, IConfiguration, IActiveContext>>(); + } + + /// <summary> + /// Queues the task into the TaskStarter. + /// + /// Once the correct number of tasks have been queued, the final Configuration + /// will be generated and run on the given Active Context. + /// </summary> + /// <param name="partialTaskConfig">The partial task configuration containing Task + /// identifier and Task class</param> + /// <param name="activeContext">The Active Context to run the Task on</param> + public void QueueTask(IConfiguration partialTaskConfig, IActiveContext activeContext) + { + string taskId = Utils.GetTaskId(partialTaskConfig); + LOGGER.Log(Level.Verbose, "Adding context with identifier: " + taskId); + + lock (_lock) + { + _taskTuples.Add( + new Tuple<string, IConfiguration, IActiveContext>(taskId, partialTaskConfig, activeContext)); + + if (Interlocked.Increment(ref _tasksAdded) == _numTasks) + { + StartTasks(); + } + } + } + + /// <summary> + /// Starts the Master Task followed by the Slave Tasks. + /// </summary> + private void StartTasks() + { + Tuple<string, IConfiguration, IActiveContext> masterTaskTuple; + try + { + masterTaskTuple = _taskTuples.Single(tuple => tuple.Item1.Equals(_masterTaskId)); + } + catch (InvalidOperationException) + { + LOGGER.Log(Level.Error, "There must be exactly one master task. The driver has been misconfigured."); + throw; + } + + LOGGER.Log(Level.Verbose, "Starting master task on context id: {0}.", masterTaskTuple.Item3.Id); + StartTask(masterTaskTuple.Item1, masterTaskTuple.Item2, masterTaskTuple.Item3); + + LOGGER.Log(Level.Verbose, "Starting slave tasks."); + foreach (Tuple<string, IConfiguration, IActiveContext> taskTuple in _taskTuples) + { + string taskId = taskTuple.Item1; + if (taskId.Equals(_masterTaskId)) + { + continue; + } + + StartTask(taskId, taskTuple.Item2, taskTuple.Item3); + } + } + + private void StartTask( + string taskId, + IConfiguration userPartialTaskConf, + IActiveContext activeContext) + { + IConfiguration mpiTaskConfiguration = _mpiDriver.GetMpiTaskConfiguration(taskId); + IConfiguration mergedTaskConf = Configurations.Merge(userPartialTaskConf, mpiTaskConfiguration); + activeContext.SubmitTask(mergedTaskConf); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastReceiver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastReceiver.cs new file mode 100644 index 0000000..09c8c30 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastReceiver.cs @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Network.Group.Operators +{ + /// <summary> + /// MPI Operator used to receive broadcast messages. + /// </summary> + /// <typeparam name="T">The type of message being sent.</typeparam> + public interface IBroadcastReceiver<T> : IMpiOperator<T> + { + /// <summary> + /// Receive a message from parent BroadcastSender. + /// </summary> + /// <returns>The incoming message</returns> + T Receive(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastSender.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastSender.cs new file mode 100644 index 0000000..534fa9f --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IBroadcastSender.cs @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Network.Group.Operators +{ + /// <summary> + /// MPI Operator used to send messages to child Tasks. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + public interface IBroadcastSender<T> : IMpiOperator<T> + { + /// <summary> + /// Send the data to all BroadcastReceivers. + /// </summary> + /// <param name="data">The data to send.</param> + void Send(T data); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/IMpiOperator.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IMpiOperator.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IMpiOperator.cs new file mode 100644 index 0000000..661f348 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IMpiOperator.cs @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Network.Group.Operators +{ + /// <summary> + /// An Mpi Operator to be used in a Reef Task. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + public interface IMpiOperator<T> + { + /// <summary> + /// The operator name. + /// </summary> + string OperatorName { get; } + + /// <summary> + /// The name of the operator's CommunicationGroup. + /// </summary> + string GroupName { get; } + + /// <summary> + /// The operator version number. + /// </summary> + int Version { get; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/IOperatorSpec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IOperatorSpec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IOperatorSpec.cs new file mode 100644 index 0000000..5d18677 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IOperatorSpec.cs @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Network.Group.Operators +{ + /// <summary> + /// The specification used to define Broadcast Operators. + /// </summary> + public interface IOperatorSpec<T> + { + /// <summary> + /// Returns the codec used to serialize and deserialize messages. + /// </summary> + ICodec<T> Codec { get; } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceFunction.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceFunction.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceFunction.cs new file mode 100644 index 0000000..020b09a --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceFunction.cs @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Network.Group.Operators +{ + /// <summary> + /// The class used to aggregate messages sent by ReduceSenders. + /// </summary> + /// <typeparam name="T">The message type.</typeparam> + public interface IReduceFunction<T> + { + /// <summary> + /// Reduce the IEnumerable of messages into one message. + /// </summary> + /// <param name="elements">The messages to reduce</param> + /// <returns>The reduced message</returns> + T Reduce(IEnumerable<T> elements); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceReceiver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceReceiver.cs new file mode 100644 index 0000000..2305968 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceReceiver.cs @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Network.Group.Operators +{ + /// <summary> + /// MPI operator used to receive and reduce messages. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + public interface IReduceReceiver<T> : IMpiOperator<T> + { + /// <summary> + /// Returns the class used to reduce incoming messages sent by ReduceSenders. + /// </summary> + IReduceFunction<T> ReduceFunction { get; } + + /// <summary> + /// Receives messages sent by all ReduceSenders and aggregates them + /// using the specified IReduceFunction. + /// </summary> + /// <returns>The single aggregated data</returns> + T Reduce(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs new file mode 100644 index 0000000..c15ded6 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Network.Group.Operators +{ + /// <summary> + /// MPI Operator used to send messages to be reduced by the ReduceReceiver. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + public interface IReduceSender<T> : IMpiOperator<T> + { + /// <summary> + /// Sends data to the operator's ReduceReceiver to be aggregated. + /// </summary> + /// <param name="data">The data to send</param> + void Send(T data); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterReceiver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterReceiver.cs new file mode 100644 index 0000000..7aa4e81 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterReceiver.cs @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Network.Group.Operators +{ + /// <summary> + /// MPI operator used to receive a sublist of messages sent + /// from the IScatterSender. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + public interface IScatterReceiver<T> : IMpiOperator<T> + { + /// <summary> + /// Receive a sublist of messages sent from the IScatterSender. + /// </summary> + /// <returns>The sublist of messages</returns> + List<T> Receive(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterSender.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterSender.cs new file mode 100644 index 0000000..c5a2c3d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IScatterSender.cs @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Network.Group.Operators +{ + /// <summary> + /// MPI operator used to scatter a list of elements to all + /// of the IScatterReceivers. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + public interface IScatterSender<T> : IMpiOperator<T> + { + /// <summary> + /// Split up the list of elements evenly and scatter each chunk + /// to the IScatterReceivers. + /// </summary> + /// <param name="elements">The list of elements to send.</param> + void Send(List<T> elements); + + /// <summary> + /// Split up the list of elements and scatter each chunk + /// to the IScatterReceivers. Each receiver will receive + /// a sublist of the specified size. + /// </summary> + /// <param name="elements">The list of elements to send.</param> + /// <param name="count">The size of each sublist</param> + void Send(List<T> elements, int count); + + /// <summary> + /// Split up the list of elements and scatter each chunk + /// to the IScatterReceivers in the specified task order. + /// </summary> + /// <param name="elements">The list of elements to send.</param> + /// <param name="order">The list of task identifiers representing + /// the order in which to scatter each sublist</param> + void Send(List<T> elements, List<string> order); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs new file mode 100644 index 0000000..904e4ef --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastOperatorSpec.cs @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Network.Group.Operators.Impl +{ + /// <summary> + /// The specification used to define Broadcast Operators. + /// </summary> + public class BroadcastOperatorSpec<T> : IOperatorSpec<T> + { + /// <summary> + /// Create a new BroadcastOperatorSpec. + /// </summary> + /// <param name="senderId">The identifier of the root sending Task.</param> + /// <param name="codecType">The codec used to serialize messages.</param> + public BroadcastOperatorSpec(string senderId, ICodec<T> codecType) + { + SenderId = senderId; + Codec = codecType; + } + + /// <summary> + /// Returns the identifier of the Task that will broadcast data to other Tasks. + /// </summary> + public string SenderId { get; private set; } + + /// <summary> + /// Returns the ICodec used to serialize messages. + /// </summary> + public ICodec<T> Codec { get; private set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs new file mode 100644 index 0000000..4374ab5 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Network.Group.Config; +using Org.Apache.REEF.Network.Group.Driver; +using Org.Apache.REEF.Network.Group.Driver.Impl; +using Org.Apache.REEF.Network.Group.Task; +using Org.Apache.REEF.Network.Group.Task.Impl; +using Org.Apache.REEF.Tang.Annotations; +using System.Reactive; + +namespace Org.Apache.REEF.Network.Group.Operators.Impl +{ + /// <summary> + /// MPI Operator used to receive broadcast messages. + /// </summary> + /// <typeparam name="T">The type of message being sent.</typeparam> + public class BroadcastReceiver<T> : IBroadcastReceiver<T> + { + private const int DefaultVersion = 1; + + private ICommunicationGroupNetworkObserver _networkHandler; + private OperatorTopology<T> _topology; + + /// <summary> + /// Creates a new BroadcastReceiver. + /// </summary> + /// <param name="operatorName">The operator identifier</param> + /// <param name="groupName">The name of the CommunicationGroup that the + /// operator belongs to</param> + /// <param name="topology">The node's topology graph</param> + /// <param name="networkHandler">The incoming message handler</param> + [Inject] + public BroadcastReceiver( + [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string operatorName, + [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName, + OperatorTopology<T> topology, + ICommunicationGroupNetworkObserver networkHandler) + { + OperatorName = operatorName; + GroupName = groupName; + Version = DefaultVersion; + + _networkHandler = networkHandler; + _topology = topology; + _topology.Initialize(); + + var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message)); + _networkHandler.Register(operatorName, msgHandler); + } + + /// <summary> + /// Returns the operator identifier. + /// </summary> + public string OperatorName { get; private set; } + + /// <summary> + /// Returns the name of the CommunicationGroup that the operator belongs to. + /// </summary> + public string GroupName { get; private set; } + + /// <summary> + /// Returns the operator version. + /// </summary> + public int Version { get; private set; } + + /// <summary> + /// Receive a message from parent BroadcastSender. + /// </summary> + /// <returns>The incoming message</returns> + public T Receive() + { + return _topology.ReceiveFromParent(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs new file mode 100644 index 0000000..4e48428 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Network.Group.Config; +using Org.Apache.REEF.Network.Group.Driver; +using Org.Apache.REEF.Network.Group.Driver.Impl; +using Org.Apache.REEF.Network.Group.Task; +using Org.Apache.REEF.Network.Group.Task.Impl; +using Org.Apache.REEF.Tang.Annotations; +using System; +using System.Reactive; + +namespace Org.Apache.REEF.Network.Group.Operators.Impl +{ + /// <summary> + /// MPI Operator used to send messages to child Tasks. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + public class BroadcastSender<T> : IBroadcastSender<T> + { + private const int DefaultVersion = 1; + + private ICommunicationGroupNetworkObserver _networkHandler; + private OperatorTopology<T> _topology; + + /// <summary> + /// Creates a new BroadcastSender to send messages to other Tasks. + /// </summary> + /// <param name="operatorName">The identifier for the operator</param> + /// <param name="groupName">The name of the CommunicationGroup that the operator + /// belongs to</param> + /// <param name="topology">The node's topology graph</param> + /// <param name="networkHandler">The incoming message handler</param> + [Inject] + public BroadcastSender( + [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string operatorName, + [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName, + OperatorTopology<T> topology, + ICommunicationGroupNetworkObserver networkHandler) + { + OperatorName = operatorName; + GroupName = groupName; + Version = DefaultVersion; + + _networkHandler = networkHandler; + _topology = topology; + _topology.Initialize(); + + var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message)); + _networkHandler.Register(operatorName, msgHandler); + } + + /// <summary> + /// Returns the identifier for the MPI operator. + /// </summary> + public string OperatorName { get; private set; } + + /// <summary> + /// Returns the name of the operator's CommunicationGroup. + /// </summary> + public string GroupName { get; private set; } + + /// <summary> + /// Returns the operator version. + /// </summary> + public int Version { get; private set; } + + /// <summary> + /// Send the data to all BroadcastReceivers. + /// </summary> + /// <param name="data">The data to send.</param> + public void Send(T data) + { + if (data == null) + { + throw new ArgumentNullException("data"); + } + + _topology.SendToChildren(data, MessageType.Data); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceFunction.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceFunction.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceFunction.cs new file mode 100644 index 0000000..bc60055 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceFunction.cs @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; + +namespace Org.Apache.REEF.Network.Group.Operators.Impl +{ + public class ReduceFunction<T> : IReduceFunction<T> + { + private Func<T, T, T> _reduceFunction; + private T _initialValue; + + private ReduceFunction(Func<T, T, T> reduceFunction) + { + _reduceFunction = reduceFunction; + } + + private ReduceFunction(Func<T, T, T> reduceFunction, T initialValue) + { + _reduceFunction = reduceFunction; + _initialValue = initialValue; + } + + public static IReduceFunction<T> Create(Func<T, T, T> reduceFunction) + { + return new ReduceFunction<T>(reduceFunction); + } + + public static IReduceFunction<T> Create(Func<T, T, T> reduceFunction, T initialValue) + { + return new ReduceFunction<T>(reduceFunction, initialValue); + } + + public T Reduce(IEnumerable<T> elements) + { + if (_initialValue == null) + { + return elements.Aggregate(_reduceFunction); + } + + return elements.Aggregate(_initialValue, _reduceFunction); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs new file mode 100644 index 0000000..37b6ce7 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceOperatorSpec.cs @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Network.Group.Operators.Impl +{ + /// <summary> + /// The specification used to define Reduce MPI Operators. + /// </summary> + public class ReduceOperatorSpec<T> : IOperatorSpec<T> + { + /// <summary> + /// Creates a new ReduceOperatorSpec. + /// </summary> + /// <param name="receiverId">The identifier of the task that + /// will receive and reduce incoming messages.</param> + /// <param name="codec">The codec used for serializing messages.</param> + /// <param name="reduceFunction">The class used to aggregate all messages.</param> + public ReduceOperatorSpec( + string receiverId, + ICodec<T> codec, + IReduceFunction<T> reduceFunction) + { + ReceiverId = receiverId; + Codec = codec; + ReduceFunction = reduceFunction; + } + + /// <summary> + /// Returns the identifier for the task that receives and reduces + /// incoming messages. + /// </summary> + public string ReceiverId { get; private set; } + + /// <summary> + /// The codec used to serialize and deserialize messages. + /// </summary> + public ICodec<T> Codec { get; private set; } + + /// <summary> + /// The class used to aggregate incoming messages. + /// </summary> + public IReduceFunction<T> ReduceFunction { get; private set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs new file mode 100644 index 0000000..3c722ef --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Network.Group.Config; +using Org.Apache.REEF.Network.Group.Driver; +using Org.Apache.REEF.Network.Group.Driver.Impl; +using Org.Apache.REEF.Network.Group.Task; +using Org.Apache.REEF.Network.Group.Task.Impl; +using Org.Apache.REEF.Tang.Annotations; +using System.Reactive; + +namespace Org.Apache.REEF.Network.Group.Operators.Impl +{ + /// <summary> + /// MPI operator used to receive and reduce messages. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + public class ReduceReceiver<T> : IReduceReceiver<T> + { + private const int DefaultVersion = 1; + + private ICommunicationGroupNetworkObserver _networkHandler; + private OperatorTopology<T> _topology; + + /// <summary> + /// Creates a new ReduceReceiver. + /// </summary> + /// <param name="operatorName">The name of the reduce operator</param> + /// <param name="groupName">The name of the operator's CommunicationGroup</param> + /// <param name="topology">The task's operator topology graph</param> + /// <param name="networkHandler">Handles incoming messages from other tasks</param> + /// <param name="reduceFunction">The class used to aggregate all incoming messages</param> + [Inject] + public ReduceReceiver( + [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string operatorName, + [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName, + OperatorTopology<T> topology, + ICommunicationGroupNetworkObserver networkHandler, + IReduceFunction<T> reduceFunction) + { + OperatorName = operatorName; + GroupName = groupName; + Version = DefaultVersion; + ReduceFunction = reduceFunction; + + _networkHandler = networkHandler; + _topology = topology; + _topology.Initialize(); + + var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message)); + _networkHandler.Register(operatorName, msgHandler); + } + + /// <summary> + /// Returns the name of the reduce operator + /// </summary> + public string OperatorName { get; private set; } + + /// <summary> + /// Returns the name of the operator's CommunicationGroup. + /// </summary> + public string GroupName { get; private set; } + + /// <summary> + /// Returns the operator version. + /// </summary> + public int Version { get; private set; } + + /// <summary> + /// Returns the class used to reduce incoming messages sent by ReduceSenders. + /// </summary> + public IReduceFunction<T> ReduceFunction { get; private set; } + + /// <summary> + /// Receives messages sent by all ReduceSenders and aggregates them + /// using the specified IReduceFunction. + /// </summary> + /// <returns>The single aggregated data</returns> + public T Reduce() + { + return _topology.ReceiveFromChildren(ReduceFunction); + } + } +}
