http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs new file mode 100644 index 0000000..ffc5165 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Network.Group.Config; +using Org.Apache.REEF.Network.Group.Driver; +using Org.Apache.REEF.Network.Group.Driver.Impl; +using Org.Apache.REEF.Network.Group.Task; +using Org.Apache.REEF.Network.Group.Task.Impl; +using Org.Apache.REEF.Tang.Annotations; +using System; +using System.Reactive; + +namespace Org.Apache.REEF.Network.Group.Operators.Impl +{ + /// <summary> + /// MPI Operator used to send messages to be reduced by the ReduceReceiver. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + public class ReduceSender<T> : IReduceSender<T> + { + private const int DefaultVersion = 1; + + private ICommunicationGroupNetworkObserver _networkHandler; + private OperatorTopology<T> _topology; + + /// <summary> + /// Creates a new ReduceSender. + /// </summary> + /// <param name="operatorName">The name of the reduce operator</param> + /// <param name="groupName">The name of the reduce operator's CommunicationGroup</param> + /// <param name="topology">The Task's operator topology graph</param> + /// <param name="networkHandler">The handler used to handle incoming messages</param> + [Inject] + public ReduceSender( + [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string operatorName, + [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName, + OperatorTopology<T> topology, + ICommunicationGroupNetworkObserver networkHandler) + { + OperatorName = operatorName; + GroupName = groupName; + Version = DefaultVersion; + + _networkHandler = networkHandler; + _topology = topology; + _topology.Initialize(); + + var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message)); + _networkHandler.Register(operatorName, msgHandler); + } + + /// <summary> + /// Returns the name of the reduce operator. + /// </summary> + public string OperatorName { get; private set; } + + /// <summary> + /// Returns the name of the operator's CommunicationGroup. + /// </summary> + public string GroupName { get; private set; } + + /// <summary> + /// Returns the operator version. + /// </summary> + public int Version { get; private set; } + + /// <summary> + /// Sends the data to the operator's ReduceReceiver to be aggregated. + /// </summary> + /// <param name="data">The data to send</param> + public void Send(T data) + { + if (data == null) + { + throw new ArgumentNullException("data"); + } + + _topology.SendToParent(data, MessageType.Data); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs new file mode 100644 index 0000000..8219eb6 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterOperatorSpec.cs @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Network.Group.Operators.Impl +{ + /// <summary> + /// The specification used to define Scatter MPI Operators. + /// </summary> + public class ScatterOperatorSpec<T> : IOperatorSpec<T> + { + /// <summary> + /// Creates a new ScatterOperatorSpec. + /// </summary> + /// <param name="senderId">The identifier of the task that will + /// be sending messages</param> + /// <param name="codec">The codec used to serialize and + /// deserialize messages</param> + public ScatterOperatorSpec(string senderId, ICodec<T> codec) + { + SenderId = senderId; + Codec = codec; + } + + /// <summary> + /// Returns the identifier for the task that splits and scatters a list + /// of messages to other tasks. + /// </summary> + public string SenderId { get; private set; } + + /// <summary> + /// The codec used to serialize and deserialize messages. + /// </summary> + public ICodec<T> Codec { get; private set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs new file mode 100644 index 0000000..85b5c13 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reactive; +using System.Text; +using System.Threading.Tasks; +using Org.Apache.REEF.Network.Group.Config; +using Org.Apache.REEF.Network.Group.Driver.Impl; +using Org.Apache.REEF.Network.Group.Task; +using Org.Apache.REEF.Network.Group.Task.Impl; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Network.Group.Operators.Impl +{ + /// <summary> + /// MPI operator used to receive a sublist of messages sent + /// from the IScatterSender. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + public class ScatterReceiver<T> : IScatterReceiver<T> + { + private const int DefaultVersion = 1; + + private ICommunicationGroupNetworkObserver _networkHandler; + private OperatorTopology<T> _topology; + + /// <summary> + /// Creates a new ScatterReceiver. + /// </summary> + /// <param name="operatorName">The name of the scatter operator</param> + /// <param name="groupName">The name of the operator's CommunicationGroup</param> + /// <param name="topology">The task's operator topology graph</param> + /// <param name="networkHandler">Handles incoming messages from other tasks</param> + [Inject] + public ScatterReceiver( + [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string operatorName, + [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName, + OperatorTopology<T> topology, + ICommunicationGroupNetworkObserver networkHandler) + { + OperatorName = operatorName; + GroupName = groupName; + Version = DefaultVersion; + + _networkHandler = networkHandler; + _topology = topology; + _topology.Initialize(); + + var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message)); + _networkHandler.Register(operatorName, msgHandler); + } + + /// <summary> + /// Returns the name of the reduce operator + /// </summary> + public string OperatorName { get; private set; } + + /// <summary> + /// Returns the name of the operator's CommunicationGroup. + /// </summary> + public string GroupName { get; private set; } + + /// <summary> + /// Returns the operator version. + /// </summary> + public int Version { get; private set; } + + /// <summary> + /// Returns the class used to reduce incoming messages sent by ReduceSenders. + /// </summary> + public IReduceFunction<T> ReduceFunction { get; private set; } + + /// <summary> + /// Receive a sublist of messages sent from the IScatterSender. + /// </summary> + /// <returns>The sublist of messages</returns> + public List<T> Receive() + { + return _topology.ReceiveListFromParent(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs new file mode 100644 index 0000000..ee9e683 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reactive; +using System.Text; +using System.Threading.Tasks; +using Org.Apache.REEF.Network.Group.Config; +using Org.Apache.REEF.Network.Group.Driver.Impl; +using Org.Apache.REEF.Network.Group.Task; +using Org.Apache.REEF.Network.Group.Task.Impl; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Network.Group.Operators.Impl +{ + /// <summary> + /// MPI operator used to scatter a list of elements to all + /// of the IScatterReceivers. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + public class ScatterSender<T> : IScatterSender<T> + { + private const int DefaultVersion = 1; + + private ICommunicationGroupNetworkObserver _networkHandler; + private OperatorTopology<T> _topology; + + /// <summary> + /// Creates a new ScatterSender. + /// </summary> + /// <param name="operatorName">The name of the scatter operator</param> + /// <param name="groupName">The name of the operator's Communication Group</param> + /// <param name="topology">The operator topology</param> + /// <param name="networkHandler">The network handler</param> + [Inject] + public ScatterSender( + [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string operatorName, + [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName, + OperatorTopology<T> topology, + ICommunicationGroupNetworkObserver networkHandler) + { + OperatorName = operatorName; + GroupName = groupName; + Version = DefaultVersion; + + _networkHandler = networkHandler; + _topology = topology; + _topology.Initialize(); + + var msgHandler = Observer.Create<GroupCommunicationMessage>(message => _topology.OnNext(message)); + _networkHandler.Register(operatorName, msgHandler); + } + + public string OperatorName { get; private set; } + + public string GroupName { get; private set; } + + public int Version { get; private set; } + + /// <summary> + /// Split up the list of elements evenly and scatter each chunk + /// to the IScatterReceivers. + /// </summary> + /// <param name="elements">The list of elements to send.</param> + public void Send(List<T> elements) + { + _topology.ScatterToChildren(elements, MessageType.Data); + } + + /// <summary> + /// Split up the list of elements and scatter each chunk + /// to the IScatterReceivers. Each receiver will receive + /// a sublist of the specified size. + /// </summary> + /// <param name="elements">The list of elements to send.</param> + /// <param name="count">The size of each sublist</param> + public void Send(List<T> elements, int count) + { + _topology.ScatterToChildren(elements, count, MessageType.Data); + } + + /// <summary> + /// Split up the list of elements and scatter each chunk + /// to the IScatterReceivers in the specified task order. + /// </summary> + /// <param name="elements">The list of elements to send.</param> + /// <param name="order">The list of task identifiers representing + /// the order in which to scatter each sublist</param> + public void Send(List<T> elements, List<string> order) + { + _topology.ScatterToChildren(elements, order, MessageType.Data); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs new file mode 100644 index 0000000..c5ca60f --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using Org.Apache.REEF.Network.Group.Driver; +using Org.Apache.REEF.Network.Group.Driver.Impl; +using Org.Apache.REEF.Network.NetworkService; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake; +using Org.Apache.REEF.Wake.Remote.Impl; + +namespace Org.Apache.REEF.Network.Group.Operators.Impl +{ + /// <summary> + /// MPI operator used to do point-to-point communication between named Tasks. + /// </summary> + public class Sender + { + private INetworkService<GroupCommunicationMessage> _networkService; + private IIdentifierFactory _idFactory; + + /// <summary> + /// Creates a new Sender. + /// </summary> + /// <param name="networkService">The network services used to send messages.</param> + /// <param name="idFactory">Used to create IIdentifier for GroupCommunicationMessages.</param> + [Inject] + public Sender( + NetworkService<GroupCommunicationMessage> networkService, + IIdentifierFactory idFactory) + { + _networkService = networkService; + _idFactory = idFactory; + } + + /// <summary> + /// Send the GroupCommunicationMessage to the Task whose name is + /// included in the message. + /// </summary> + /// <param name="message">The message to send.</param> + public void Send(GroupCommunicationMessage message) + { + if (message == null) + { + throw new ArgumentNullException("message"); + } + if (string.IsNullOrEmpty(message.Destination)) + { + throw new ArgumentException("Message destination cannot be null or empty"); + } + + IIdentifier destId = _idFactory.Create(message.Destination); + var conn = _networkService.NewConnection(destId); + conn.Open(); + conn.Write(message); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupClient.cs new file mode 100644 index 0000000..9b96a9a --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupClient.cs @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Network.Group.Task.Impl; +using Org.Apache.REEF.Tang.Annotations; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Network.Group.Task +{ + /// <summary> + /// Used by Tasks to fetch MPI Operators in the group configured by the driver. + /// </summary> + [DefaultImplementation(typeof(CommunicationGroupClient))] + public interface ICommunicationGroupClient + { + /// <summary> + /// Returns the Communication Group name + /// </summary> + string GroupName { get; } + + /// <summary> + /// Gets the BroadcastSender with the given name and message type. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + /// <param name="operatorName">The name of the Broadcast operator</param> + /// <returns>The BroadcastSender</returns> + IBroadcastSender<T> GetBroadcastSender<T>(string operatorName); + + /// <summary> + /// Gets the BroadcastReceiver with the given name and message type. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + /// <param name="operatorName">The name of the Broadcast operator</param> + /// <returns>The BroadcastReceiver</returns> + IBroadcastReceiver<T> GetBroadcastReceiver<T>(string operatorName); + + /// <summary> + /// Gets the ReduceSender with the given name and message type. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + /// <param name="operatorName">The name of the Reduce operator</param> + /// <returns>The ReduceSender</returns> + IReduceSender<T> GetReduceSender<T>(string operatorName); + + /// <summary> + /// Gets the ReduceReceiver with the given name and message type. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + /// <param name="operatorName">The name of the Reduce operator</param> + /// <returns>The ReduceReceiver</returns> + IReduceReceiver<T> GetReduceReceiver<T>(string operatorName); + + /// <summary> + /// Gets the ScatterSender with the given name and message type. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + /// <param name="operatorName">The name of the Scatter operator</param> + /// <returns>The ScatterSender</returns> + IScatterSender<T> GetScatterSender<T>(string operatorName); + + /// <summary> + /// Gets the ScatterReceiver with the given name and message type. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + /// <param name="operatorName">The name of the Scatter operator</param> + /// <returns>The ScatterReceiver</returns> + IScatterReceiver<T> GetScatterReceiver<T>(string operatorName); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs new file mode 100644 index 0000000..d3034fc --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Network.Group.Driver; +using Org.Apache.REEF.Network.Group.Driver.Impl; +using Org.Apache.REEF.Network.Group.Task.Impl; +using Org.Apache.REEF.Tang.Annotations; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Network.Group.Task +{ + /// <summary> + /// Handles incoming messages sent to this Communication Group. + /// </summary> + [DefaultImplementation(typeof(CommunicationGroupNetworkObserver))] + public interface ICommunicationGroupNetworkObserver : IObserver<GroupCommunicationMessage> + { + /// <summary> + /// Registers the handler with the CommunicationGroupNetworkObserver. + /// Messages that are to be sent to the operator specified by operatorName + /// are handled by the given observer. + /// </summary> + /// <param name="operatorName">The name of the operator whose handler + /// will be invoked</param> + /// <param name="observer">The handler to invoke when messages are sent + /// to the operator specified by operatorName</param> + void Register(string operatorName, IObserver<GroupCommunicationMessage> observer); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/IMpiClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/IMpiClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/IMpiClient.cs new file mode 100644 index 0000000..2592b04 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/IMpiClient.cs @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Network.Group.Task.Impl; +using Org.Apache.REEF.Tang.Annotations; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Network.Group.Task +{ + /// <summary> + /// Used by Tasks to fetch CommunicationGroupClients. + /// </summary> + [DefaultImplementation(typeof(MpiClient))] + public interface IMpiClient : IDisposable + { + /// <summary> + /// Gets the CommunicationGroupClient with the given group name. + /// </summary> + /// <param name="groupName">The name of the CommunicationGroupClient</param> + /// <returns>The configured CommunicationGroupClient</returns> + ICommunicationGroupClient GetCommunicationGroup(string groupName); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/IMpiNetworkObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/IMpiNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/IMpiNetworkObserver.cs new file mode 100644 index 0000000..5fe948c --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/IMpiNetworkObserver.cs @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Network.Group.Driver; +using Org.Apache.REEF.Network.Group.Driver.Impl; +using Org.Apache.REEF.Network.Group.Task.Impl; +using Org.Apache.REEF.Network.NetworkService; +using Org.Apache.REEF.Tang.Annotations; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Org.Apache.REEF.Network.Group.Codec; + +namespace Org.Apache.REEF.Network.Group.Task +{ + /// <summary> + /// Handles all incoming messages for this Task. + /// </summary> + [DefaultImplementation(typeof(MpiNetworkObserver))] + public interface IMpiNetworkObserver : IObserver<NsMessage<GroupCommunicationMessage>> + { + /// <summary> + /// Registers the network handler for the given CommunicationGroup. + /// When messages are sent to the specified group name, the given handler + /// will be invoked with that message. + /// </summary> + /// <param name="groupName">The group name for the network handler</param> + /// <param name="commGroupHandler">The network handler to invoke when + /// messages are sent to the given group.</param> + void Register(string groupName, IObserver<GroupCommunicationMessage> commGroupHandler); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs new file mode 100644 index 0000000..e6d653d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Network.Group.Config; +using Org.Apache.REEF.Network.Group.Driver; +using Org.Apache.REEF.Network.Group.Driver.Impl; +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Network.Group.Operators.Impl; +using Org.Apache.REEF.Network.NetworkService; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Exceptions; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using System; +using System.Collections.Generic; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Tang.Implementations.Tang; + +namespace Org.Apache.REEF.Network.Group.Task.Impl +{ + /// <summary> + /// Used by Tasks to fetch MPI Operators in the group configured by the driver. + /// </summary> + public class CommunicationGroupClient : ICommunicationGroupClient + { + private readonly Logger LOGGER = Logger.GetLogger(typeof(CommunicationGroupClient)); + + private string _taskId; + private string _driverId; + + private Dictionary<string, IInjector> _operatorInjectors; + private Dictionary<string, object> _operators; + private NetworkService<GroupCommunicationMessage> _networkService; + private IMpiNetworkObserver _mpiNetworkHandler; + private ICommunicationGroupNetworkObserver _commGroupNetworkHandler; + + /// <summary> + /// Creates a new CommunicationGroupClient. + /// </summary> + /// <param name="taskId">The identifier for this Task.</param> + /// <param name="groupName">The name of the CommunicationGroup</param> + /// <param name="driverId">The identifier for the driver</param> + /// <param name="operatorConfigs">The serialized operator configurations</param> + /// <param name="mpiNetworkObserver">The handler for all incoming messages + /// across all Communication Groups</param> + /// <param name="networkService">The network service used to send messages.</param> + /// <param name="configSerializer">Used to deserialize operator configuration.</param> + [Inject] + public CommunicationGroupClient( + [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId, + [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName, + [Parameter(typeof(MpiConfigurationOptions.DriverId))] string driverId, + [Parameter(typeof(MpiConfigurationOptions.SerializedOperatorConfigs))] ISet<string> operatorConfigs, + IMpiNetworkObserver mpiNetworkObserver, + NetworkService<GroupCommunicationMessage> networkService, + AvroConfigurationSerializer configSerializer) + { + _taskId = taskId; + _driverId = driverId; + GroupName = groupName; + + _operators = new Dictionary<string, object>(); + _operatorInjectors = new Dictionary<string, IInjector>(); + + _networkService = networkService; + _mpiNetworkHandler = mpiNetworkObserver; + _commGroupNetworkHandler = new CommunicationGroupNetworkObserver(); + _mpiNetworkHandler.Register(groupName, _commGroupNetworkHandler); + + // Deserialize operator configuration and store each injector. + // When user requests the MPI Operator, use type information to + // create the instance. + foreach (string operatorConfigStr in operatorConfigs) + { + IConfiguration operatorConfig = configSerializer.FromString(operatorConfigStr); + + IInjector injector = TangFactory.GetTang().NewInjector(operatorConfig); + string operatorName = injector.GetNamedInstance<MpiConfigurationOptions.OperatorName, string>( + GenericType<MpiConfigurationOptions.OperatorName>.Class); + _operatorInjectors[operatorName] = injector; + } + } + + /// <summary> + /// Returns the Communication Group name + /// </summary> + public string GroupName { get; private set; } + + /// <summary> + /// Gets the BroadcastSender with the given name and message type. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + /// <param name="operatorName">The name of the Broadcast operator</param> + /// <returns>The BroadcastSender</returns> + public IBroadcastSender<T> GetBroadcastSender<T>(string operatorName) + { + return GetOperatorInstance<BroadcastSender<T>>(operatorName); + } + + /// <summary> + /// Gets the BroadcastReceiver with the given name and message type. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + /// <param name="operatorName">The name of the Broadcast operator</param> + /// <returns>The BroadcastReceiver</returns> + public IBroadcastReceiver<T> GetBroadcastReceiver<T>(string operatorName) + { + return GetOperatorInstance<BroadcastReceiver<T>>(operatorName); + } + + /// <summary> + /// Gets the ReduceSender with the given name and message type. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + /// <param name="operatorName">The name of the Reduce operator</param> + /// <returns>The ReduceSender</returns> + public IReduceSender<T> GetReduceSender<T>(string operatorName) + { + return GetOperatorInstance<ReduceSender<T>>(operatorName); + } + + /// <summary> + /// Gets the ReduceReceiver with the given name and message type. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + /// <param name="operatorName">The name of the Reduce operator</param> + /// <returns>The ReduceReceiver</returns> + public IReduceReceiver<T> GetReduceReceiver<T>(string operatorName) + { + return GetOperatorInstance<ReduceReceiver<T>>(operatorName); + } + + /// <summary> + /// Gets the ScatterSender with the given name and message type. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + /// <param name="operatorName">The name of the Scatter operator</param> + /// <returns>The ScatterSender</returns> + public IScatterSender<T> GetScatterSender<T>(string operatorName) + { + return GetOperatorInstance<ScatterSender<T>>(operatorName); + } + + /// <summary> + /// Gets the ScatterReceiver with the given name and message type. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + /// <param name="operatorName">The name of the Scatter operator</param> + /// <returns>The ScatterReceiver</returns> + public IScatterReceiver<T> GetScatterReceiver<T>(string operatorName) + { + return GetOperatorInstance<ScatterReceiver<T>>(operatorName); + } + + /// <summary> + /// Gets the MPI operator with the specified name and type. + /// If the operator hasn't been instanciated yet, find the injector + /// associated with the given operator name and use the type information + /// to create a new operator of that type. + /// </summary> + /// <typeparam name="T">The type of operator to create</typeparam> + /// <param name="operatorName">The name of the operator</param> + /// <returns>The newly created MPI Operator</returns> + private T GetOperatorInstance<T>(string operatorName) where T : class + { + if (string.IsNullOrEmpty(operatorName)) + { + throw new ArgumentNullException("operatorName"); + } + if (!_operatorInjectors.ContainsKey(operatorName)) + { + throw new ArgumentException("Invalid operator name, cannot create CommunicationGroupClient"); + } + + object op; + if (!_operators.TryGetValue(operatorName, out op)) + { + IInjector injector = _operatorInjectors[operatorName]; + + injector.BindVolatileParameter(GenericType<TaskConfigurationOptions.Identifier>.Class, _taskId); + injector.BindVolatileParameter(GenericType<MpiConfigurationOptions.CommunicationGroupName>.Class, GroupName); + injector.BindVolatileInstance(GenericType<ICommunicationGroupNetworkObserver>.Class, _commGroupNetworkHandler); + injector.BindVolatileInstance(GenericType<NetworkService<GroupCommunicationMessage>>.Class, _networkService); + injector.BindVolatileInstance(GenericType<ICommunicationGroupClient>.Class, this); + + try + { + op = injector.GetInstance<T>(); + _operators[operatorName] = op; + } + catch (InjectionException) + { + LOGGER.Log(Level.Error, "Cannot inject MPI operator: No known operator of type: {0}", typeof(T)); + throw; + } + } + + return (T) op; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs new file mode 100644 index 0000000..97ab082 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Network.Group.Driver; +using Org.Apache.REEF.Network.Group.Driver.Impl; +using Org.Apache.REEF.Tang.Annotations; +using System; +using System.Collections.Generic; + +namespace Org.Apache.REEF.Network.Group.Task.Impl +{ + /// <summary> + /// Handles incoming messages sent to this Communication Group. + /// </summary> + public class CommunicationGroupNetworkObserver : ICommunicationGroupNetworkObserver + { + private Dictionary<string, IObserver<GroupCommunicationMessage>> _handlers; + + /// <summary> + /// Creates a new CommunicationGroupNetworkObserver. + /// </summary> + [Inject] + public CommunicationGroupNetworkObserver() + { + _handlers = new Dictionary<string, IObserver<GroupCommunicationMessage>>(); + } + + /// <summary> + /// Registers the handler with the CommunicationGroupNetworkObserver. + /// Messages that are to be sent to the operator specified by operatorName + /// are handled by the given observer. + /// </summary> + /// <param name="operatorName">The name of the operator whose handler + /// will be invoked</param> + /// <param name="observer">The handler to invoke when messages are sent + /// to the operator specified by operatorName</param> + public void Register(string operatorName, IObserver<GroupCommunicationMessage> observer) + { + if (string.IsNullOrEmpty(operatorName)) + { + throw new ArgumentNullException("operatorName"); + } + if (observer == null) + { + throw new ArgumentNullException("observer"); + } + + _handlers[operatorName] = observer; + } + + /// <summary> + /// Handles the incoming GroupCommunicationMessage sent to this Communication Group. + /// Looks for the operator that the message is being sent to and invoke its handler. + /// </summary> + /// <param name="message">The incoming message</param> + public void OnNext(GroupCommunicationMessage message) + { + string operatorName = message.OperatorName; + + IObserver<GroupCommunicationMessage> handler; + if (!_handlers.TryGetValue(operatorName, out handler)) + { + throw new ArgumentException("No handler registered with the operator name: " + operatorName); + } + + handler.OnNext(message); + } + + public void OnError(Exception error) + { + } + + public void OnCompleted() + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/MpiClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/MpiClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/MpiClient.cs new file mode 100644 index 0000000..7cec022 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/MpiClient.cs @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Network.Group.Config; +using Org.Apache.REEF.Network.Group.Driver; +using Org.Apache.REEF.Network.Group.Driver.Impl; +using Org.Apache.REEF.Network.NetworkService; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Wake.Remote.Impl; +using System; +using System.Collections.Generic; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Tang.Implementations.Tang; + +namespace Org.Apache.REEF.Network.Group.Task.Impl +{ + /// <summary> + /// Used by Tasks to fetch CommunicationGroupClients. + /// </summary> + public class MpiClient : IMpiClient + { + private Dictionary<string, ICommunicationGroupClient> _commGroups; + + private INetworkService<GroupCommunicationMessage> _networkService; + + /// <summary> + /// Creates a new MpiClient and registers the task ID with the Name Server. + /// </summary> + /// <param name="groupConfigs">The set of serialized Group Communication configurations</param> + /// <param name="taskId">The identifier for this task</param> + /// <param name="mpiNetworkObserver">The network handler to receive incoming messages + /// for this task</param> + /// <param name="networkService">The network service used to send messages</param> + /// <param name="configSerializer">Used to deserialize Group Communication configuration</param> + [Inject] + public MpiClient( + [Parameter(typeof(MpiConfigurationOptions.SerializedGroupConfigs))] ISet<string> groupConfigs, + [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId, + IMpiNetworkObserver mpiNetworkObserver, + NetworkService<GroupCommunicationMessage> networkService, + AvroConfigurationSerializer configSerializer) + { + _commGroups = new Dictionary<string, ICommunicationGroupClient>(); + _networkService = networkService; + networkService.Register(new StringIdentifier(taskId)); + + foreach (string serializedGroupConfig in groupConfigs) + { + IConfiguration groupConfig = configSerializer.FromString(serializedGroupConfig); + + IInjector injector = TangFactory.GetTang().NewInjector(groupConfig); + injector.BindVolatileParameter(GenericType<TaskConfigurationOptions.Identifier>.Class, taskId); + injector.BindVolatileInstance(GenericType<IMpiNetworkObserver>.Class, mpiNetworkObserver); + injector.BindVolatileInstance(GenericType<NetworkService<GroupCommunicationMessage>>.Class, networkService); + + ICommunicationGroupClient commGroup = injector.GetInstance<ICommunicationGroupClient>(); + _commGroups[commGroup.GroupName] = commGroup; + } + } + + /// <summary> + /// Gets the CommunicationGroupClient for the given group name. + /// </summary> + /// <param name="groupName">The name of the CommunicationGroupClient</param> + /// <returns>The CommunicationGroupClient</returns> + public ICommunicationGroupClient GetCommunicationGroup(string groupName) + { + if (string.IsNullOrEmpty(groupName)) + { + throw new ArgumentNullException("groupName"); + } + if (!_commGroups.ContainsKey(groupName)) + { + throw new ArgumentException("No CommunicationGroupClient with name: " + groupName); + } + + return _commGroups[groupName]; + } + + /// <summary> + /// Disposes of the MpiClient's services. + /// </summary> + public void Dispose() + { + _networkService.Unregister(); + _networkService.Dispose(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/MpiNetworkObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/MpiNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/MpiNetworkObserver.cs new file mode 100644 index 0000000..baa2e5e --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/MpiNetworkObserver.cs @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Network.Group.Driver; +using Org.Apache.REEF.Network.Group.Driver.Impl; +using Org.Apache.REEF.Network.NetworkService; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Annotations; +using System; +using System.Collections.Generic; +using System.Linq; + +namespace Org.Apache.REEF.Network.Group.Task.Impl +{ + /// <summary> + /// Handles all incoming messages for this Task. + /// </summary> + public class MpiNetworkObserver : IMpiNetworkObserver + { + private static Logger LOGGER = Logger.GetLogger(typeof(MpiNetworkObserver)); + + private Dictionary<string, IObserver<GroupCommunicationMessage>> _commGroupHandlers; + + /// <summary> + /// Creates a new MpiNetworkObserver. + /// </summary> + [Inject] + public MpiNetworkObserver() + { + _commGroupHandlers = new Dictionary<string, IObserver<GroupCommunicationMessage>>(); + } + + /// <summary> + /// Handles the incoming NsMessage for this Task. + /// Delegates the GroupCommunicationMessage to the correct + /// CommunicationGroupNetworkObserver. + /// </summary> + /// <param name="nsMessage"></param> + public void OnNext(NsMessage<GroupCommunicationMessage> nsMessage) + { + if (nsMessage == null) + { + throw new ArgumentNullException("nsMessage"); + } + + try + { + GroupCommunicationMessage gcm = nsMessage.Data.First(); + _commGroupHandlers[gcm.GroupName].OnNext(gcm); + } + catch (InvalidOperationException) + { + LOGGER.Log(Level.Error, "Mpi Network Handler received message with no data"); + throw; + } + catch (KeyNotFoundException) + { + LOGGER.Log(Level.Error, "Mpi Network Handler received message for nonexistant group"); + throw; + } + } + + /// <summary> + /// Registers the network handler for the given CommunicationGroup. + /// When messages are sent to the specified group name, the given handler + /// will be invoked with that message. + /// </summary> + /// <param name="groupName">The group name for the network handler</param> + /// <param name="commGroupHandler">The network handler to invoke when + /// messages are sent to the given group.</param> + public void Register(string groupName, IObserver<GroupCommunicationMessage> commGroupHandler) + { + if (string.IsNullOrEmpty(groupName)) + { + throw new ArgumentNullException("groupName"); + } + if (commGroupHandler == null) + { + throw new ArgumentNullException("commGroupHandler"); + } + + _commGroupHandlers[groupName] = commGroupHandler; + } + + public void OnError(Exception error) + { + } + + public void OnCompleted() + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs new file mode 100644 index 0000000..f4c7a60 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Network.Group.Driver; +using System.Collections.Concurrent; +using Org.Apache.REEF.Network.Group.Driver.Impl; + +namespace Org.Apache.REEF.Network.Group.Task.Impl +{ + /// <summary> + /// Stores all incoming messages sent by a particular Task. + /// </summary> + internal class NodeStruct + { + private BlockingCollection<GroupCommunicationMessage> _messageQueue; + + /// <summary> + /// Creates a new NodeStruct. + /// </summary> + /// <param name="id">The Task identifier</param> + public NodeStruct(string id) + { + Identifier = id; + _messageQueue = new BlockingCollection<GroupCommunicationMessage>(); + } + + /// <summary> + /// Returns the identifier for the Task that sent all + /// messages in the message queue. + /// </summary> + public string Identifier { get; private set; } + + /// <summary> + /// Gets the first message in the message queue. + /// </summary> + /// <returns>The first available message.</returns> + public byte[][] GetData() + { + return _messageQueue.Take().Data; + } + + /// <summary> + /// Adds an incoming message to the message queue. + /// </summary> + /// <param name="gcm">The incoming message</param> + public void AddData(GroupCommunicationMessage gcm) + { + _messageQueue.Add(gcm); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs new file mode 100644 index 0000000..8752203 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs @@ -0,0 +1,484 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Network.Group.Config; +using Org.Apache.REEF.Network.Group.Driver; +using Org.Apache.REEF.Network.Group.Driver.Impl; +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Network.Group.Operators.Impl; +using Org.Apache.REEF.Network.NetworkService; +using Org.Apache.REEF.Network.Utilities; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Exceptions; +using Org.Apache.REEF.Wake.Remote; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using Org.Apache.REEF.Common.Io; +using Org.Apache.REEF.Common.Tasks; + +namespace Org.Apache.REEF.Network.Group.Task.Impl +{ + /// <summary> + /// Contains the Operator's topology graph. + /// Used to send or receive messages to/from operators in the same + /// Communication Group. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + public class OperatorTopology<T> : IObserver<GroupCommunicationMessage> + { + private const int DefaultTimeout = 10000; + private const int RetryCount = 5; + + private static Logger LOGGER = Logger.GetLogger(typeof(OperatorTopology<>)); + + private string _groupName; + private string _operatorName; + private string _selfId; + private string _driverId; + + private NodeStruct _parent; + private List<NodeStruct> _children; + private Dictionary<string, NodeStruct> _idToNodeMap; + private ICodec<T> _codec; + private INameClient _nameClient; + private Sender _sender; + private BlockingCollection<NodeStruct> _nodesWithData; + + /// <summary> + /// Creates a new OperatorTopology object. + /// </summary> + /// <param name="operatorName">The name of the MPI Operator</param> + /// <param name="groupName">The name of the operator's Communication Group</param> + /// <param name="taskId">The operator's Task identifier</param> + /// <param name="driverId">The identifer for the driver</param> + /// <param name="rootId">The identifier for the root Task in the topology graph</param> + /// <param name="childIds">The set of child Task identifiers in the topology graph</param> + /// <param name="networkService">The network service</param> + /// <param name="codec">The codec used to serialize and deserialize messages</param> + /// <param name="sender">The Sender used to do point to point communication</param> + [Inject] + public OperatorTopology( + [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string operatorName, + [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName, + [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId, + [Parameter(typeof(MpiConfigurationOptions.DriverId))] string driverId, + [Parameter(typeof(MpiConfigurationOptions.TopologyRootTaskId))] string rootId, + [Parameter(typeof(MpiConfigurationOptions.TopologyChildTaskIds))] ISet<string> childIds, + NetworkService<GroupCommunicationMessage> networkService, + ICodec<T> codec, + Sender sender) + { + _operatorName = operatorName; + _groupName = groupName; + _selfId = taskId; + _driverId = driverId; + _codec = codec; + _nameClient = networkService.NamingClient; + _sender = sender; + _nodesWithData = new BlockingCollection<NodeStruct>(); + _children = new List<NodeStruct>(); + _idToNodeMap = new Dictionary<string, NodeStruct>(); + + if (_selfId.Equals(rootId)) + { + _parent = null; + foreach (string childId in childIds) + { + NodeStruct node = new NodeStruct(childId); + _children.Add(node); + _idToNodeMap[childId] = node; + } + } + else + { + _parent = new NodeStruct(rootId); + _idToNodeMap[rootId] = _parent; + } + } + + /// <summary> + /// Initializes operator topology. + /// Waits until all Tasks in the CommunicationGroup have registered themselves + /// with the Name Service. + /// </summary> + public void Initialize() + { + using (LOGGER.LogFunction("OperatorTopology::Initialize")) + { + if (_parent != null) + { + WaitForTaskRegistration(_parent.Identifier, RetryCount); + } + + if (_children.Count > 0) + { + foreach (NodeStruct child in _children) + { + WaitForTaskRegistration(child.Identifier, RetryCount); + } + } + } + } + + /// <summary> + /// Handles the incoming GroupCommunicationMessage. + /// Updates the sending node's message queue. + /// </summary> + /// <param name="gcm">The incoming message</param> + public void OnNext(GroupCommunicationMessage gcm) + { + if (gcm == null) + { + throw new ArgumentNullException("gcm"); + } + if (gcm.Source == null) + { + throw new ArgumentException("Message must have a source"); + } + + NodeStruct sourceNode = FindNode(gcm.Source); + if (sourceNode == null) + { + throw new IllegalStateException("Received message from invalid task id: " + gcm.Source); + } + + _nodesWithData.Add(sourceNode); + sourceNode.AddData(gcm); + } + + /// <summary> + /// Sends the message to the parent Task. + /// </summary> + /// <param name="message">The message to send</param> + /// <param name="type">The message type</param> + public void SendToParent(T message, MessageType type) + { + if (_parent == null) + { + throw new ArgumentException("No parent for node"); + } + + SendToNode(message, MessageType.Data, _parent); + } + + /// <summary> + /// Sends the message to all child Tasks. + /// </summary> + /// <param name="message">The message to send</param> + /// <param name="type">The message type</param> + public void SendToChildren(T message, MessageType type) + { + if (message == null) + { + throw new ArgumentNullException("message"); + } + + foreach (NodeStruct child in _children) + { + SendToNode(message, MessageType.Data, child); + } + } + + /// <summary> + /// Splits the list of messages up evenly and sends each sublist + /// to the child Tasks. + /// </summary> + /// <param name="messages">The list of messages to scatter</param> + /// <param name="type">The message type</param> + public void ScatterToChildren(List<T> messages, MessageType type) + { + if (messages == null) + { + throw new ArgumentNullException("messages"); + } + if (_children.Count <= 0) + { + throw new ArgumentException("Cannot scatter, no children available"); + } + + int count = (int) Math.Ceiling(((double) messages.Count) / _children.Count); + ScatterHelper(messages, _children, count); + } + + /// <summary> + /// Splits the list of messages up into chunks of the specified size + /// and sends each sublist to the child Tasks. + /// </summary> + /// <param name="messages">The list of messages to scatter</param> + /// <param name="count">The size of each sublist</param> + /// <param name="type">The message type</param> + public void ScatterToChildren(List<T> messages, int count, MessageType type) + { + if (messages == null) + { + throw new ArgumentNullException("messages"); + } + if (count <= 0) + { + throw new ArgumentException("Count must be positive"); + } + + ScatterHelper(messages, _children, count); + } + + /// <summary> + /// Splits the list of messages up into chunks of the specified size + /// and sends each sublist to the child Tasks in the specified order. + /// </summary> + /// <param name="messages">The list of messages to scatter</param> + /// <param name="order">The order to send messages</param> + /// <param name="type">The message type</param> + public void ScatterToChildren(List<T> messages, List<string> order, MessageType type) + { + if (messages == null) + { + throw new ArgumentNullException("messages"); + } + if (order == null || order.Count != _children.Count) + { + throw new ArgumentException("order cannot be null and must have the same number of elements as child tasks"); + } + + List<NodeStruct> nodes = new List<NodeStruct>(); + foreach (string taskId in order) + { + NodeStruct node = FindNode(taskId); + if (node == null) + { + throw new IllegalStateException("Received message from invalid task id: " + taskId); + } + + nodes.Add(node); + } + + int count = (int) Math.Ceiling(((double) messages.Count) / _children.Count); + ScatterHelper(messages, nodes, count); + } + + /// <summary> + /// Receive an incoming message from the parent Task. + /// </summary> + /// <returns>The parent Task's message</returns> + public T ReceiveFromParent() + { + byte[][] data = ReceiveFromNode(_parent, true); + if (data == null || data.Length != 1) + { + throw new InvalidOperationException("Cannot receive data from parent node"); + } + + return _codec.Decode(data[0]); + } + + /// <summary> + /// Receive a list of incoming messages from the parent Task. + /// </summary> + /// <returns>The parent Task's list of messages</returns> + public List<T> ReceiveListFromParent() + { + byte[][] data = ReceiveFromNode(_parent, true); + if (data == null || data.Length == 0) + { + throw new InvalidOperationException("Cannot receive data from parent node"); + } + + return data.Select(b => _codec.Decode(b)).ToList(); + } + + /// <summary> + /// Receives all messages from child Tasks and reduces them with the + /// given IReduceFunction. + /// </summary> + /// <param name="reduceFunction">The class used to reduce messages</param> + /// <returns>The result of reducing messages</returns> + public T ReceiveFromChildren(IReduceFunction<T> reduceFunction) + { + if (reduceFunction == null) + { + throw new ArgumentNullException("reduceFunction"); + } + + var receivedData = new List<T>(); + var childrenToReceiveFrom = new HashSet<string>(_children.Select(node => node.Identifier)); + + while (childrenToReceiveFrom.Count > 0) + { + NodeStruct childWithData = GetNodeWithData(); + byte[][] data = ReceiveFromNode(childWithData, false); + if (data == null || data.Length != 1) + { + throw new InvalidOperationException("Received invalid data from child with id: " + childWithData.Identifier); + } + + receivedData.Add(_codec.Decode(data[0])); + childrenToReceiveFrom.Remove(childWithData.Identifier); + } + + return reduceFunction.Reduce(receivedData); + } + + public void OnError(Exception error) + { + } + + public void OnCompleted() + { + } + + /// <summary> + /// Get a node containing an incoming message. + /// </summary> + /// <returns>A NodeStruct with incoming data.</returns> + private NodeStruct GetNodeWithData() + { + CancellationTokenSource timeoutSource = new CancellationTokenSource(DefaultTimeout); + + try + { + return _nodesWithData.Take(timeoutSource.Token); + } + catch (OperationCanceledException) + { + LOGGER.Log(Level.Error, "No data to read from child"); + throw; + } + catch (ObjectDisposedException) + { + LOGGER.Log(Level.Error, "No data to read from child"); + throw; + } + catch (InvalidOperationException) + { + LOGGER.Log(Level.Error, "No data to read from child"); + throw; + } + } + + /// <summary> + /// Sends the message to the Task represented by the given NodeStruct. + /// </summary> + /// <param name="message">The message to send</param> + /// <param name="msgType">The message type</param> + /// <param name="node">The NodeStruct representing the Task to send to</param> + private void SendToNode(T message, MessageType msgType, NodeStruct node) + { + GroupCommunicationMessage gcm = new GroupCommunicationMessage(_groupName, _operatorName, + _selfId, node.Identifier, _codec.Encode(message), msgType); + + _sender.Send(gcm); + } + + /// <summary> + /// Sends the list of messages to the Task represented by the given NodeStruct. + /// </summary> + /// <param name="messages">The list of messages to send</param> + /// <param name="msgType">The message type</param> + /// <param name="node">The NodeStruct representing the Task to send to</param> + private void SendToNode(List<T> messages, MessageType msgType, NodeStruct node) + { + byte[][] encodedMessages = messages.Select(message => _codec.Encode(message)).ToArray(); + GroupCommunicationMessage gcm = new GroupCommunicationMessage(_groupName, _operatorName, + _selfId, node.Identifier, encodedMessages, msgType); + + _sender.Send(gcm); + } + + private void ScatterHelper(List<T> messages, List<NodeStruct> order, int count) + { + if (count <= 0) + { + throw new ArgumentException("Count must be positive"); + } + + int i = 0; + foreach (NodeStruct nodeStruct in order) + { + // The last sublist might be smaller than count if the number of + // child tasks is not evenly divisible by count + int left = messages.Count - i; + int size = (left < count) ? left : count; + if (size <= 0) + { + throw new ArgumentException("Scatter count must be positive"); + } + + List<T> sublist = messages.GetRange(i, size); + SendToNode(sublist, MessageType.Data, nodeStruct); + + i += size; + } + } + + /// <summary> + /// Receive a message from the Task represented by the given NodeStruct. + /// Removes the NodeStruct from the nodesWithData queue if requested. + /// </summary> + /// <param name="node">The node to receive from</param> + /// <param name="removeFromQueue">Whether or not to remove the NodeStruct + /// from the nodesWithData queue</param> + /// <returns>The byte array message from the node</returns> + private byte[][] ReceiveFromNode(NodeStruct node, bool removeFromQueue) + { + byte[][] data = node.GetData(); + if (removeFromQueue) + { + _nodesWithData.Take(node); + } + + return data; + } + + /// <summary> + /// Find the NodeStruct with the given Task identifier. + /// </summary> + /// <param name="identifier">The identifier of the Task</param> + /// <returns>The NodeStruct</returns> + private NodeStruct FindNode(string identifier) + { + NodeStruct node; + return _idToNodeMap.TryGetValue(identifier, out node) ? node : null; + } + + /// <summary> + /// Checks if the identifier is registered with the Name Server. + /// Throws exception if the operation fails more than the retry count. + /// </summary> + /// <param name="identifier">The identifier to look up</param> + /// <param name="retries">The number of times to retry the lookup operation</param> + private void WaitForTaskRegistration(string identifier, int retries) + { + for (int i = 0; i < retries; i++) + { + if (_nameClient.Lookup(identifier) != null) + { + return; + } + + Thread.Sleep(500); + LOGGER.Log(Level.Verbose, "Retry {0}: retrying lookup for node: {1}", i + 1, identifier); + } + + throw new IllegalStateException("Failed to initialize operator topology for node: " + identifier); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs new file mode 100644 index 0000000..5342410 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Network.Group.Config; +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Network.Group.Operators.Impl; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Implementations; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Wake.Remote; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Implementations.Tang; + +namespace Org.Apache.REEF.Network.Group.Topology +{ + /// <summary> + /// Represents a graph of MPI Operators where there are only two levels of + /// nodes: the root and all children extending from the root. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + public class FlatTopology<T> : ITopology<T> + { + private string _groupName; + private string _operatorName; + + private string _rootId; + private string _driverId; + + private Dictionary<string, TaskNode> _nodes; + private TaskNode _root; + + /// <summary> + /// Creates a new FlatTopology. + /// </summary> + /// <param name="operatorName">The operator name</param> + /// <param name="groupName">The name of the topology's CommunicationGroup</param> + /// <param name="rootId">The root Task identifier</param> + /// <param name="driverId">The driver identifier</param> + /// <param name="operatorSpec">The operator specification</param> + public FlatTopology( + string operatorName, + string groupName, + string rootId, + string driverId, + IOperatorSpec<T> operatorSpec) + { + _groupName = groupName; + _operatorName = operatorName; + _rootId = rootId; + _driverId = driverId; + + OperatorSpec = operatorSpec; + + _nodes = new Dictionary<string, TaskNode>(); + } + + /// <summary> + /// Gets the Operator specification + /// </summary> + public IOperatorSpec<T> OperatorSpec { get; set; } + + /// <summary> + /// Gets the task configuration for the operator topology. + /// </summary> + /// <param name="taskId">The task identifier</param> + /// <returns>The task configuration</returns> + public IConfiguration GetTaskConfiguration(string taskId) + { + var confBuilder = TangFactory.GetTang().NewConfigurationBuilder() + .BindImplementation(typeof(ICodec<T>), OperatorSpec.Codec.GetType()) + .BindNamedParameter<MpiConfigurationOptions.TopologyRootTaskId, string>( + GenericType<MpiConfigurationOptions.TopologyRootTaskId>.Class, + _rootId); + + foreach (string tId in _nodes.Keys) + { + if (!tId.Equals(_rootId)) + { + confBuilder.BindSetEntry<MpiConfigurationOptions.TopologyChildTaskIds, string>( + GenericType<MpiConfigurationOptions.TopologyChildTaskIds>.Class, + tId); + } + } + + if (OperatorSpec is BroadcastOperatorSpec<T>) + { + BroadcastOperatorSpec<T> broadcastSpec = OperatorSpec as BroadcastOperatorSpec<T>; + if (taskId.Equals(broadcastSpec.SenderId)) + { + confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<BroadcastSender<T>>.Class); + } + else + { + confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<BroadcastReceiver<T>>.Class); + } + } + else if (OperatorSpec is ReduceOperatorSpec<T>) + { + ReduceOperatorSpec<T> reduceSpec = OperatorSpec as ReduceOperatorSpec<T>; + confBuilder.BindImplementation(typeof(IReduceFunction<T>), reduceSpec.ReduceFunction.GetType()); + + if (taskId.Equals(reduceSpec.ReceiverId)) + { + confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<ReduceReceiver<T>>.Class); + } + else + { + confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<ReduceSender<T>>.Class); + } + } + else if (OperatorSpec is ScatterOperatorSpec<T>) + { + ScatterOperatorSpec<T> scatterSpec = OperatorSpec as ScatterOperatorSpec<T>; + if (taskId.Equals(scatterSpec.SenderId)) + { + confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<ScatterSender<T>>.Class); + } + else + { + confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<ScatterReceiver<T>>.Class); + } + } + else + { + throw new NotSupportedException("Spec type not supported"); + } + + return confBuilder.Build(); + } + + /// <summary> + /// Adds a task to the topology graph. + /// </summary> + /// <param name="taskId">The identifier of the task to add</param> + public void AddTask(string taskId) + { + if (string.IsNullOrEmpty(taskId)) + { + throw new ArgumentNullException("taskId"); + } + else if (_nodes.ContainsKey(taskId)) + { + throw new ArgumentException("Task has already been added to the topology"); + } + + if (taskId.Equals(_rootId)) + { + SetRootNode(_rootId); + } + else + { + AddChild(taskId); + } + } + + private void SetRootNode(string rootId) + { + TaskNode rootNode = new TaskNode(_groupName, _operatorName, rootId, _driverId); + _root = rootNode; + + foreach (TaskNode childNode in _nodes.Values) + { + rootNode.AddChild(childNode); + childNode.SetParent(rootNode); + } + } + + private void AddChild(string childId) + { + TaskNode childNode = new TaskNode(_groupName, _operatorName, childId, _driverId); + _nodes[childId] = childNode; + + if (_root != null) + { + _root.AddChild(childNode); + childNode.SetParent(_root); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs new file mode 100644 index 0000000..c4dc9e6 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Tang.Interface; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Network.Group.Topology +{ + /// <summary> + /// Represents a topology graph for IMpiOperators. + /// </summary> + public interface ITopology<T> + { + IOperatorSpec<T> OperatorSpec { get; } + + IConfiguration GetTaskConfiguration(string taskId); + + void AddTask(string taskId); + } +}
