Repository: reef Updated Branches: refs/heads/master 4ab6a8d1b -> df1a226d6
[REEF-1453] GroupCommunication should create a new observer for each Task This addressed the issue by * Mapping Task observers direclty to the IPEndpoints of their clients. * Adding a universal observer that distinguishes between the connecting IPEndpoint to ObserverContainer. * Removing the type map in ObserverContainer. * Restructuring the observer hierarchy such that the universal observer bootstraps the Task observers, which the node observers register with. JIRA: [REEF-1453](https://issues.apache.org/jira/browse/REEF-1453) This closes #1059 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/df1a226d Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/df1a226d Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/df1a226d Branch: refs/heads/master Commit: df1a226d63c25b4e1de25964ea34a074e0ddb32f Parents: 4ab6a8d Author: Andrew Chung <[email protected]> Authored: Tue Jun 28 14:36:23 2016 -0700 Committer: Julia Wang <[email protected]> Committed: Fri Jul 1 16:33:55 2016 -0700 ---------------------------------------------------------------------- .../Impl/GeneralGroupCommunicationMessage.cs | 31 +++-- .../Group/Driver/Impl/GroupCommDriver.cs | 3 +- .../Driver/Impl/GroupCommunicationMessage.cs | 16 +-- .../Group/Operators/Impl/BroadcastReceiver.cs | 6 - .../Group/Operators/Impl/BroadcastSender.cs | 6 - .../Group/Operators/Impl/ReduceReceiver.cs | 7 - .../Group/Operators/Impl/ReduceSender.cs | 4 - .../Group/Operators/Impl/ScatterReceiver.cs | 8 +- .../Group/Operators/Impl/ScatterSender.cs | 7 +- .../Task/ICommunicationGroupNetworkObserver.cs | 43 ------ .../Group/Task/IGroupCommNetworkObserver.cs | 43 ------ .../Group/Task/Impl/CommunicationGroupClient.cs | 7 - .../Impl/CommunicationGroupNetworkObserver.cs | 111 --------------- .../Group/Task/Impl/GroupCommNetworkObserver.cs | 92 ++++++------- .../Group/Task/Impl/NodeMessageObserver.cs | 80 +++++++++++ .../Group/Task/Impl/NodeObserverIdentifier.cs | 120 +++++++++++++++++ .../Group/Task/Impl/NodeStruct.cs | 16 ++- .../Group/Task/Impl/OperatorTopology.cs | 61 ++------- .../Group/Task/Impl/TaskMessageObserver.cs | 134 +++++++++++++++++++ .../NetworkService/INetworkService.cs | 6 + .../NetworkService/NetworkService.cs | 8 ++ .../NetworkService/StreamingNetworkService.cs | 94 ++++++++++--- .../Org.Apache.REEF.Network.csproj | 6 +- .../Remote/Impl/ObserverContainer.cs | 33 ++--- 24 files changed, 544 insertions(+), 398 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs index 7cce556..1009fc0 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs @@ -15,21 +15,16 @@ // specific language governing permissions and limitations // under the License. +using System; + namespace Org.Apache.REEF.Network.Group.Driver.Impl { /// <summary> /// Messages sent by MPI Operators. This is the class inherited by /// GroupCommunicationMessage but seen by Network Service /// </summary> - public class GeneralGroupCommunicationMessage - { - /// <summary> - /// Empty constructor to allow instantiation by reflection - /// </summary> - protected GeneralGroupCommunicationMessage() - { - } - + public abstract class GeneralGroupCommunicationMessage + { /// <summary> /// Create new CommunicationGroupMessage. /// </summary> @@ -37,36 +32,44 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl /// <param name="operatorName">The name of the MPI operator</param> /// <param name="source">The message source</param> /// <param name="destination">The message destination</param> + /// <param name="messageType">The type of the GC message</param> protected GeneralGroupCommunicationMessage( string groupName, string operatorName, string source, - string destination) + string destination, + Type messageType) { GroupName = groupName; OperatorName = operatorName; Source = source; Destination = destination; + MessageType = messageType; } /// <summary> /// Returns the Communication Group name. /// </summary> - internal string GroupName { get; set; } + internal string GroupName { get; private set; } /// <summary> /// Returns the MPI Operator name. /// </summary> - internal string OperatorName { get; set; } + internal string OperatorName { get; private set; } /// <summary> /// Returns the source of the message. /// </summary> - internal string Source { get; set; } + internal string Source { get; private set; } /// <summary> /// Returns the destination of the message. /// </summary> - internal string Destination { get; set; } + internal string Destination { get; private set; } + + /// <summary> + /// The Type of the GroupCommunicationMessage. + /// </summary> + internal Type MessageType { get; private set; } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs index f49c38b..e636b04 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs @@ -34,6 +34,7 @@ using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Tang.Util; using Org.Apache.REEF.Utilities.Logging; using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Wake.Remote; using ContextConfiguration = Org.Apache.REEF.Common.Context.ContextConfiguration; namespace Org.Apache.REEF.Network.Group.Driver.Impl @@ -196,7 +197,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl return TangFactory.GetTang().NewConfigurationBuilder(serviceConfig) .BindImplementation( - GenericType<IObserver<NsMessage<GeneralGroupCommunicationMessage>>>.Class, + GenericType<IObserver<IRemoteMessage<NsMessage<GeneralGroupCommunicationMessage>>>>.Class, GenericType<GroupCommNetworkObserver>.Class) .BindNamedParameter<NamingConfigurationOptions.NameServerAddress, string>( GenericType<NamingConfigurationOptions.NameServerAddress>.Class, http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs index aba944c..2f01cef 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs @@ -15,8 +15,6 @@ // specific language governing permissions and limitations // under the License. -using Org.Apache.REEF.Tang.Annotations; - namespace Org.Apache.REEF.Network.Group.Driver.Impl { /// <summary> @@ -25,14 +23,6 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl internal sealed class GroupCommunicationMessage<T> : GeneralGroupCommunicationMessage { /// <summary> - /// Empty constructor to allow instantiation by reflection - /// </summary> - [Inject] - private GroupCommunicationMessage() - { - } - - /// <summary> /// Create new CommunicationGroupMessage. /// </summary> /// <param name="groupName">The name of the communication group</param> @@ -46,7 +36,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl string source, string destination, T message) - : base(groupName, operatorName, source, destination) + : base(groupName, operatorName, source, destination, typeof(T)) { Data = new[] { message }; } @@ -65,7 +55,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl string source, string destination, T[] message) - : base(groupName, operatorName, source, destination) + : base(groupName, operatorName, source, destination, typeof(T)) { Data = message; } @@ -76,7 +66,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl internal T[] Data { get; - set; + private set; } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs index 6152e05..bc72fea 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -using System.Reactive; using System.Collections.Generic; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Driver.Impl; @@ -48,7 +47,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl /// <param name="initialize">Require Topology Initialize to be called to wait for all task being registered. /// Default is true. For unit testing, it can be set to false.</param> /// <param name="topology">The node's topology graph</param> - /// <param name="networkHandler">The incoming message handler</param> /// <param name="dataConverter">The converter used to convert original message to pipelined ones and vice versa.</param> [Inject] private BroadcastReceiver( @@ -56,7 +54,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl [Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName, [Parameter(typeof(GroupCommConfigurationOptions.Initialize))] bool initialize, OperatorTopology<PipelineMessage<T>> topology, - ICommunicationGroupNetworkObserver networkHandler, IPipelineDataConverter<T> dataConverter) { OperatorName = operatorName; @@ -65,9 +62,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl PipelineDataConverter = dataConverter; _topology = topology; _initialize = initialize; - - var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message)); - networkHandler.Register(operatorName, msgHandler); } /// <summary> http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs index 279dd33..aa75c6e 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs @@ -16,7 +16,6 @@ // under the License. using System; -using System.Reactive; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Driver.Impl; using Org.Apache.REEF.Network.Group.Task; @@ -49,7 +48,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl /// <param name="initialize">Require Topology Initialize to be called to wait for all task being registered. /// Default is true. For unit testing, it can be set to false.</param> /// <param name="topology">The node's topology graph</param> - /// <param name="networkHandler">The incoming message handler</param> /// <param name="dataConverter">The converter used to convert original /// message to pipelined ones and vice versa.</param> [Inject] @@ -58,7 +56,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl [Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName, [Parameter(typeof(GroupCommConfigurationOptions.Initialize))] bool initialize, OperatorTopology<PipelineMessage<T>> topology, - ICommunicationGroupNetworkObserver networkHandler, IPipelineDataConverter<T> dataConverter) { _topology = topology; @@ -67,9 +64,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl Version = PipelineVersion; PipelineDataConverter = dataConverter; _initialize = initialize; - - var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message)); - networkHandler.Register(operatorName, msgHandler); } /// <summary> http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs index d3a0102..e94c1ea 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs @@ -15,10 +15,8 @@ // specific language governing permissions and limitations // under the License. -using System.Reactive; using System.Collections.Generic; using Org.Apache.REEF.Network.Group.Config; -using Org.Apache.REEF.Network.Group.Driver.Impl; using Org.Apache.REEF.Network.Group.Task; using Org.Apache.REEF.Network.Group.Task.Impl; using Org.Apache.REEF.Tang.Annotations; @@ -49,7 +47,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl /// <param name="initialize">Require Topology Initialize to be called to wait for all task being registered. /// Default is true. For unit testing, it can be set to false.</param> /// <param name="topology">The task's operator topology graph</param> - /// <param name="networkHandler">Handles incoming messages from other tasks</param> /// <param name="reduceFunction">The class used to aggregate all incoming messages</param> /// <param name="dataConverter">The converter used to convert original /// message to pipelined ones and vice versa.</param> @@ -59,7 +56,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl [Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName, [Parameter(typeof(GroupCommConfigurationOptions.Initialize))] bool initialize, OperatorTopology<PipelineMessage<T>> topology, - ICommunicationGroupNetworkObserver networkHandler, IReduceFunction<T> reduceFunction, IPipelineDataConverter<T> dataConverter) { @@ -72,9 +68,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl _pipelinedReduceFunc = new PipelinedReduceFunction<T>(ReduceFunction); _topology = topology; - - var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message)); - networkHandler.Register(operatorName, msgHandler); } /// <summary> http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs index d2d1e5c..813db3e 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs @@ -60,7 +60,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl [Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName, [Parameter(typeof(GroupCommConfigurationOptions.Initialize))] bool initialize, OperatorTopology<PipelineMessage<T>> topology, - ICommunicationGroupNetworkObserver networkHandler, IReduceFunction<T> reduceFunction, IPipelineDataConverter<T> dataConverter) { @@ -74,9 +73,6 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl _topology = topology; _initialize = initialize; - var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message)); - networkHandler.Register(operatorName, msgHandler); - PipelineDataConverter = dataConverter; } http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs index ac8de01..d6fdfa1 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs @@ -17,7 +17,6 @@ using System.Collections.Generic; using System.Linq; -using System.Reactive; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Driver.Impl; using Org.Apache.REEF.Network.Group.Task; @@ -47,23 +46,18 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl /// <param name="initialize">Require Topology Initialize to be called to wait for all task being registered. /// Default is true. For unit testing, it can be set to false.</param> /// <param name="topology">The task's operator topology graph</param> - /// <param name="networkHandler">Handles incoming messages from other tasks</param> [Inject] private ScatterReceiver( [Parameter(typeof(GroupCommConfigurationOptions.OperatorName))] string operatorName, [Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName, [Parameter(typeof(GroupCommConfigurationOptions.Initialize))] bool initialize, - OperatorTopology<T> topology, - ICommunicationGroupNetworkObserver networkHandler) + OperatorTopology<T> topology) { OperatorName = operatorName; GroupName = groupName; Version = DefaultVersion; _topology = topology; _initialize = initialize; - - var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message)); - networkHandler.Register(operatorName, msgHandler); } /// <summary> http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs index 0f2cf83..440738b 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs @@ -46,23 +46,18 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl /// <param name="initialize">Require Topology Initialize to be called to wait for all task being registered. /// Default is true. For unit testing, it can be set to false.</param> /// <param name="topology">The operator topology</param> - /// <param name="networkHandler">The network handler</param> [Inject] private ScatterSender( [Parameter(typeof(GroupCommConfigurationOptions.OperatorName))] string operatorName, [Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName, [Parameter(typeof(GroupCommConfigurationOptions.Initialize))] bool initialize, - OperatorTopology<T> topology, - ICommunicationGroupNetworkObserver networkHandler) + OperatorTopology<T> topology) { OperatorName = operatorName; GroupName = groupName; Version = DefaultVersion; _topology = topology; _initialize = initialize; - - var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message)); - networkHandler.Register(operatorName, msgHandler); } public string OperatorName { get; private set; } http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs deleted file mode 100644 index afa407e..0000000 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs +++ /dev/null @@ -1,43 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -using System; -using Org.Apache.REEF.Network.Group.Driver.Impl; -using Org.Apache.REEF.Network.Group.Task.Impl; -using Org.Apache.REEF.Tang.Annotations; - -namespace Org.Apache.REEF.Network.Group.Task -{ - /// <summary> - /// Handles incoming messages sent to this Communication Group. - /// Writable Version - /// </summary> - [DefaultImplementation(typeof(CommunicationGroupNetworkObserver))] - internal interface ICommunicationGroupNetworkObserver : IObserver<GeneralGroupCommunicationMessage> - { - /// <summary> - /// Registers the handler with the WritableCommunicationGroupNetworkObserver. - /// Messages that are to be sent to the operator specified by operatorName - /// are handled by the given observer. - /// </summary> - /// <param name="operatorName">The name of the operator whose handler - /// will be invoked</param> - /// <param name="observer">The handler to invoke when messages are sent - /// to the operator specified by operatorName</param> - void Register(string operatorName, IObserver<GeneralGroupCommunicationMessage> observer); - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs deleted file mode 100644 index 55d6942..0000000 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs +++ /dev/null @@ -1,43 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -using System; -using Org.Apache.REEF.Network.Group.Driver.Impl; -using Org.Apache.REEF.Network.Group.Task.Impl; -using Org.Apache.REEF.Network.NetworkService; -using Org.Apache.REEF.Tang.Annotations; - -namespace Org.Apache.REEF.Network.Group.Task -{ - /// <summary> - /// Handles all incoming messages for this Task. - /// Writable Version - /// </summary> - [DefaultImplementation(typeof(GroupCommNetworkObserver))] - internal interface IGroupCommNetworkObserver : IObserver<NsMessage<GeneralGroupCommunicationMessage>> - { - /// <summary> - /// Registers the network handler for the given CommunicationGroup. - /// When messages are sent to the specified group name, the given handler - /// will be invoked with that message. - /// </summary> - /// <param name="groupName">The group name for the network handler</param> - /// <param name="commGroupHandler">The network handler to invoke when - /// messages are sent to the given group.</param> - void Register(string groupName, IObserver<GeneralGroupCommunicationMessage> commGroupHandler); - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs index aaeb98c..2fc90d9 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs @@ -43,25 +43,18 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// </summary> /// <param name="groupName">The name of the CommunicationGroup</param> /// <param name="operatorConfigs">The serialized operator configurations</param> - /// <param name="groupCommNetworkObserver">The handler for all incoming messages - /// across all Communication Groups</param> /// <param name="configSerializer">Used to deserialize operator configuration.</param> - /// <param name="commGroupNetworkHandler">Communication group network observer that holds all the handlers for each operator.</param> /// <param name="injector">injector forked from the injector that creates this instance</param> [Inject] private CommunicationGroupClient( [Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName, [Parameter(typeof(GroupCommConfigurationOptions.SerializedOperatorConfigs))] ISet<string> operatorConfigs, - IGroupCommNetworkObserver groupCommNetworkObserver, AvroConfigurationSerializer configSerializer, - ICommunicationGroupNetworkObserver commGroupNetworkHandler, IInjector injector) { _operators = new Dictionary<string, object>(); GroupName = groupName; - groupCommNetworkObserver.Register(groupName, commGroupNetworkHandler); - foreach (string operatorConfigStr in operatorConfigs) { IConfiguration operatorConfig = configSerializer.FromString(operatorConfigStr); http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs deleted file mode 100644 index 03d0dd4..0000000 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs +++ /dev/null @@ -1,111 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -using System; -using System.Collections.Generic; -using Org.Apache.REEF.Network.Group.Driver.Impl; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Utilities.Diagnostics; -using Org.Apache.REEF.Utilities.Logging; - -namespace Org.Apache.REEF.Network.Group.Task.Impl -{ - /// <summary> - /// Handles incoming messages sent to this Communication Group. - /// Writable version - /// </summary> - internal sealed class CommunicationGroupNetworkObserver : ICommunicationGroupNetworkObserver - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(CommunicationGroupNetworkObserver)); - private readonly Dictionary<string, IObserver<GeneralGroupCommunicationMessage>> _handlers; - - /// <summary> - /// Creates a new CommunicationGroupNetworkObserver. - /// </summary> - [Inject] - private CommunicationGroupNetworkObserver() - { - _handlers = new Dictionary<string, IObserver<GeneralGroupCommunicationMessage>>(); - } - - /// <summary> - /// Registers the handler with the CommunicationGroupNetworkObserver. - /// Messages that are to be sent to the operator specified by operatorName - /// are handled by the given observer. - /// </summary> - /// <param name="operatorName">The name of the operator whose handler - /// will be invoked</param> - /// <param name="observer">The writable handler to invoke when messages are sent - /// to the operator specified by operatorName</param> - void ICommunicationGroupNetworkObserver.Register(string operatorName, IObserver<GeneralGroupCommunicationMessage> observer) - { - if (string.IsNullOrEmpty(operatorName)) - { - throw new ArgumentNullException("operatorName"); - } - if (observer == null) - { - throw new ArgumentNullException("observer"); - } - - _handlers[operatorName] = observer; - } - - /// <summary> - /// Handles the incoming GeneralGroupCommunicationMessage sent to this Communication Group. - /// Looks for the operator that the message is being sent to and invoke its handler. - /// </summary> - /// <param name="message">The incoming message</param> - public void OnNext(GeneralGroupCommunicationMessage message) - { - string operatorName = message.OperatorName; - - IObserver<GeneralGroupCommunicationMessage> handler = GetOperatorHandler(operatorName); - if (handler == null) - { - Exceptions.Throw(new ArgumentException("No handler registered with the operator name: " + operatorName), LOGGER); - } - else - { - handler.OnNext(message); - } - } - - /// <summary> - /// GetOperatorHandler for operatorName - /// </summary> - /// <param name="operatorName"></param> - /// <returns></returns> - private IObserver<GeneralGroupCommunicationMessage> GetOperatorHandler(string operatorName) - { - IObserver<GeneralGroupCommunicationMessage> handler; - if (!_handlers.TryGetValue(operatorName, out handler)) - { - Exceptions.Throw(new ApplicationException("No handler registered yet with the operator name: " + operatorName), LOGGER); - } - return handler; - } - - public void OnError(Exception error) - { - } - - public void OnCompleted() - { - } - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs index 47a4554..d82ddd2 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs @@ -16,12 +16,18 @@ // under the License. using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using System.Net; using Org.Apache.REEF.Network.Group.Driver.Impl; using Org.Apache.REEF.Network.NetworkService; using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Implementations.InjectionPlan; +using Org.Apache.REEF.Utilities.Diagnostics; using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.Remote.Impl; namespace Org.Apache.REEF.Network.Group.Task.Impl { @@ -29,71 +35,67 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// Handles all incoming messages for this Task. /// Writable version /// </summary> - internal sealed class GroupCommNetworkObserver : IGroupCommNetworkObserver + internal sealed class GroupCommNetworkObserver : IObserver<IRemoteMessage<NsMessage<GeneralGroupCommunicationMessage>>> { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(GroupCommNetworkObserver)); + private static readonly Logger Logger = Logger.GetLogger(typeof(GroupCommNetworkObserver)); - private readonly Dictionary<string, IObserver<GeneralGroupCommunicationMessage>> _commGroupHandlers; + private readonly IInjectionFuture<StreamingNetworkService<GeneralGroupCommunicationMessage>> _networkService; + + private readonly ConcurrentDictionary<string, TaskMessageObserver> _taskMessageObservers = + new ConcurrentDictionary<string, TaskMessageObserver>(); + + /// <summary> + /// A ConcurrentDictionary is used here since there is no ConcurrentSet implementation in C#, and ConcurrentBag + /// does not allow for us to check for the existence of an item. The byte is simply a placeholder. + /// </summary> + private readonly ConcurrentDictionary<string, byte> _registeredNodes = new ConcurrentDictionary<string, byte>(); /// <summary> /// Creates a new GroupCommNetworkObserver. /// </summary> [Inject] - private GroupCommNetworkObserver() + private GroupCommNetworkObserver( + IInjectionFuture<StreamingNetworkService<GeneralGroupCommunicationMessage>> networkService) { - _commGroupHandlers = new Dictionary<string, IObserver<GeneralGroupCommunicationMessage>>(); + _networkService = networkService; } /// <summary> - /// Handles the incoming WritableNsMessage for this Task. - /// Delegates the GeneralGroupCommunicationMessage to the correct - /// WritableCommunicationGroupNetworkObserver. + /// Registers a <see cref="TaskMessageObserver"/> for a given <see cref="taskSourceId"/>. + /// If the <see cref="TaskMessageObserver"/> has already been initialized, it will return + /// the existing one. /// </summary> - /// <param name="nsMessage"></param> - public void OnNext(NsMessage<GeneralGroupCommunicationMessage> nsMessage) + public TaskMessageObserver RegisterAndGetForTask(string taskSourceId) { - if (nsMessage == null) - { - throw new ArgumentNullException("nsMessage"); - } - - try - { - GeneralGroupCommunicationMessage gcm = nsMessage.Data.First(); - _commGroupHandlers[gcm.GroupName].OnNext(gcm); - } - catch (InvalidOperationException) - { - LOGGER.Log(Level.Error, "Group Communication Network Handler received message with no data"); - throw; - } - catch (KeyNotFoundException) - { - LOGGER.Log(Level.Error, "Group Communication Network Handler received message for nonexistant group"); - throw; - } + // Add a TaskMessage observer for each upstream/downstream source. + return _taskMessageObservers.GetOrAdd(taskSourceId, new TaskMessageObserver(_networkService.Get())); } /// <summary> - /// Registers the network handler for the given CommunicationGroup. - /// When messages are sent to the specified group name, the given handler - /// will be invoked with that message. + /// On the first message, we map the <see cref="TaskMessageObserver"/> to the <see cref="IPEndPoint"/> + /// of the sending Task and register the observer with <see cref="IRemoteManager{T}"/> + /// by calling <see cref="TaskMessageObserver#OnNext"/>. On subsequent messages we simply ignore the message + /// and allow <see cref="ObserverContainer{T}"/> to send the message directly via the <see cref="IPEndPoint"/>. /// </summary> - /// <param name="groupName">The group name for the network handler</param> - /// <param name="commGroupHandler">The network handler to invoke when - /// messages are sent to the given group.</param> - public void Register(string groupName, IObserver<GeneralGroupCommunicationMessage> commGroupHandler) + /// <param name="remoteMessage"></param> + public void OnNext(IRemoteMessage<NsMessage<GeneralGroupCommunicationMessage>> remoteMessage) { - if (string.IsNullOrEmpty(groupName)) - { - throw new ArgumentNullException("groupName"); - } - if (commGroupHandler == null) + var nsMessage = remoteMessage.Message; + var gcm = nsMessage.Data.First(); + var gcMessageTaskSource = gcm.Source; + TaskMessageObserver observer; + if (!_taskMessageObservers.TryGetValue(gcMessageTaskSource, out observer)) { - throw new ArgumentNullException("commGroupHandler"); + throw new KeyNotFoundException("Unable to find registered NodeMessageObserver for source Task " + + gcMessageTaskSource + "."); } - _commGroupHandlers[groupName] = commGroupHandler; + _registeredNodes.GetOrAdd(gcMessageTaskSource, + id => + { + observer.OnNext(remoteMessage); + return new byte(); + }); } public void OnError(Exception error) @@ -104,4 +106,4 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl { } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeMessageObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeMessageObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeMessageObserver.cs new file mode 100644 index 0000000..00ca1d4 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeMessageObserver.cs @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Network.Group.Driver.Impl; +using Org.Apache.REEF.Network.NetworkService; + +namespace Org.Apache.REEF.Network.Group.Task.Impl +{ + /// <summary> + /// An observer for a node in a group communication graph. + /// </summary> + internal sealed class NodeMessageObserver<T> : IObserver<NsMessage<GeneralGroupCommunicationMessage>> + { + private readonly NodeStruct<T> _nodeStruct; + + internal NodeMessageObserver(NodeStruct<T> nodeStruct) + { + _nodeStruct = nodeStruct; + } + + /// <summary> + /// Add data into the queue. + /// </summary> + /// <param name="value"></param> + public void OnNext(NsMessage<GeneralGroupCommunicationMessage> value) + { + foreach (var data in value.Data) + { + var gcMessage = data as GroupCommunicationMessage<T>; + if (gcMessage != null && gcMessage.Data != null && gcMessage.Data.Length > 0) + { + _nodeStruct.AddData(gcMessage); + } + } + } + + /// <summary> + /// Gets the group name of the node. + /// </summary> + public string GroupName + { + get { return _nodeStruct.GroupName; } + } + + /// <summary> + /// Gets the operator name of the node. + /// </summary> + public string OperatorName + { + get { return _nodeStruct.OperatorName; } + } + + public void OnError(Exception error) + { + // TODO[JIRA REEF-1407]: Cancel on queue of node and handle error in application layer. + throw new NotImplementedException(); + } + + public void OnCompleted() + { + // TODO[JIRA REEF-1407]: Complete adding on queue of node. + throw new NotImplementedException(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeObserverIdentifier.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeObserverIdentifier.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeObserverIdentifier.cs new file mode 100644 index 0000000..884959f --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeObserverIdentifier.cs @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Network.Group.Driver.Impl; + +namespace Org.Apache.REEF.Network.Group.Task.Impl +{ + /// <summary> + /// An identifier for a given node in the group communication graph. + /// A node is uniquely identifiable by a combination of its Task ID, + /// <see cref="Type"/>, <see cref="GroupName"/>, and <see cref="OperatorName"/>. + /// </summary> + internal sealed class NodeObserverIdentifier + { + private readonly Type _type; + private readonly string _groupName; + private readonly string _operatorName; + + /// <summary> + /// Creates a NodeObserverIdentifier from an observer. + /// </summary> + public static NodeObserverIdentifier FromObserver<T>(NodeMessageObserver<T> observer) + { + return new NodeObserverIdentifier(typeof(T), observer.GroupName, observer.OperatorName); + } + + /// <summary> + /// Creates a NodeObserverIdentifier from a group communication message. + /// </summary> + public static NodeObserverIdentifier FromMessage(GeneralGroupCommunicationMessage message) + { + return new NodeObserverIdentifier(message.MessageType, message.GroupName, message.OperatorName); + } + + private NodeObserverIdentifier(Type type, string groupName, string operatorName) + { + _type = type; + _groupName = groupName; + _operatorName = operatorName; + } + + /// <summary> + /// The Type of the nodes messages. + /// </summary> + public Type Type + { + get { return _type; } + } + + /// <summary> + /// The group name of the node. + /// </summary> + public string GroupName + { + get { return _groupName; } + } + + /// <summary> + /// The operator name of the node. + /// </summary> + public string OperatorName + { + get { return _operatorName; } + } + + /// <summary> + /// Overrides <see cref="Equals"/>. Simply compares equivalence of instance fields. + /// </summary> + public override bool Equals(object obj) + { + if (ReferenceEquals(null, obj)) + { + return false; + } + + if (ReferenceEquals(this, obj)) + { + return true; + } + + return obj is NodeObserverIdentifier && Equals((NodeObserverIdentifier)obj); + } + + /// <summary> + /// Overrides <see cref="GetHashCode"/>. Generates hashcode based on the instance fields. + /// </summary> + public override int GetHashCode() + { + int hash = 17; + hash = (hash * 31) + _type.GetHashCode(); + hash = (hash * 31) + _groupName.GetHashCode(); + return (hash * 31) + _operatorName.GetHashCode(); + } + + /// <summary> + /// Compare equality of instance fields. + /// </summary> + private bool Equals(NodeObserverIdentifier other) + { + return _type == other.Type && + _groupName.Equals(other.GroupName) && + _operatorName.Equals(other.OperatorName); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs index e13d724..2140c61 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs @@ -33,9 +33,13 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// Creates a new NodeStruct. /// </summary> /// <param name="id">The Task identifier</param> - internal NodeStruct(string id) + /// <param name="groupName">The group name of the node.</param> + /// <param name="operatorName">The operator name of the node</param> + internal NodeStruct(string id, string groupName, string operatorName) { Identifier = id; + GroupName = groupName; + OperatorName = operatorName; _messageQueue = new BlockingCollection<GroupCommunicationMessage<T>>(); } @@ -46,6 +50,16 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl internal string Identifier { get; private set; } /// <summary> + /// The group name of the node. + /// </summary> + internal string GroupName { get; private set; } + + /// <summary> + /// The operator name of the node. + /// </summary> + internal string OperatorName { get; private set; } + + /// <summary> /// Gets the first message in the message queue. /// </summary> /// <returns>The first available message.</returns> http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs index bf63af4..66faa29 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs @@ -39,7 +39,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// Communication Group. /// </summary> /// <typeparam name="T">The message type</typeparam> - public sealed class OperatorTopology<T> : IOperatorTopology<T>, IObserver<GeneralGroupCommunicationMessage> + public sealed class OperatorTopology<T> : IOperatorTopology<T> { private static readonly Logger Logger = Logger.GetLogger(typeof(OperatorTopology<>)); @@ -66,6 +66,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// <param name="sleepTime">Sleep time between retry wating for registration</param> /// <param name="rootId">The identifier for the root Task in the topology graph</param> /// <param name="childIds">The set of child Task identifiers in the topology graph</param> + /// <param name="networkObserver"></param> /// <param name="networkService">The network service</param> /// <param name="sender">The Sender used to do point to point communication</param> [Inject] @@ -78,6 +79,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl [Parameter(typeof(GroupCommConfigurationOptions.SleepTimeWaitingForRegistration))] int sleepTime, [Parameter(typeof(GroupCommConfigurationOptions.TopologyRootTaskId))] string rootId, [Parameter(typeof(GroupCommConfigurationOptions.TopologyChildTaskIds))] ISet<string> childIds, + GroupCommNetworkObserver networkObserver, StreamingNetworkService<GeneralGroupCommunicationMessage> networkService, Sender sender) { @@ -89,12 +91,20 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl _sleepTime = sleepTime; _nameClient = networkService.NamingClient; _sender = sender; + _parent = _selfId.Equals(rootId) ? null : new NodeStruct<T>(rootId, groupName, operatorName); - _parent = _selfId.Equals(rootId) ? null : new NodeStruct<T>(rootId); + // Register the observers for Task IDs and nodes adjacent to the current node + // in the group communication graph. + if (_parent != null) + { + networkObserver.RegisterAndGetForTask(_parent.Identifier).RegisterNodeObserver(new NodeMessageObserver<T>(_parent)); + } foreach (var childId in childIds) { - _childNodeContainer.PutNode(new NodeStruct<T>(childId)); + var childNode = new NodeStruct<T>(childId, groupName, operatorName); + _childNodeContainer.PutNode(childNode); + networkObserver.RegisterAndGetForTask(childId).RegisterNodeObserver(new NodeMessageObserver<T>(childNode)); } } @@ -123,41 +133,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl } /// <summary> - /// Handles the incoming GroupCommunicationMessage. - /// Updates the sending node's message queue. - /// </summary> - /// <param name="gcm">The incoming message</param> - public void OnNext(GeneralGroupCommunicationMessage gcm) - { - if (gcm == null) - { - throw new ArgumentNullException("gcm"); - } - if (gcm.Source == null) - { - throw new ArgumentException("Message must have a source"); - } - - var sourceNode = (_parent != null && _parent.Identifier == gcm.Source) - ? _parent - : _childNodeContainer.GetChild(gcm.Source); - - if (sourceNode == null) - { - throw new IllegalStateException("Received message from invalid task id: " + gcm.Source); - } - - var message = gcm as GroupCommunicationMessage<T>; - - if (message == null) - { - throw new NullReferenceException("message passed not of type GroupCommunicationMessage"); - } - - sourceNode.AddData(message); - } - - /// <summary> /// Sends the message to the parent Task. /// </summary> /// <param name="message">The message to send</param> @@ -308,14 +283,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl return reduceFunction.Reduce(_childNodeContainer.GetDataFromAllChildren()); } - public void OnError(Exception error) - { - } - - public void OnCompleted() - { - } - public bool HasChildren() { return _childNodeContainer.Count > 0; @@ -396,4 +363,4 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl throw new IllegalStateException("Failed to initialize operator topology for node: " + identifier); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/TaskMessageObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/TaskMessageObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/TaskMessageObserver.cs new file mode 100644 index 0000000..d8dd449 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/TaskMessageObserver.cs @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Collections.Generic; +using System.Linq; +using Org.Apache.REEF.Network.Group.Driver.Impl; +using Org.Apache.REEF.Network.NetworkService; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.Remote.Impl; + +namespace Org.Apache.REEF.Network.Group.Task.Impl +{ + /// <summary> + /// The observer for a Task that multiplexes to the node observers associated with that Task. + /// </summary> + internal sealed class TaskMessageObserver : + IObserver<NsMessage<GeneralGroupCommunicationMessage>>, + IObserver<IRemoteMessage<NsMessage<GeneralGroupCommunicationMessage>>> + { + private readonly Dictionary<NodeObserverIdentifier, IObserver<NsMessage<GeneralGroupCommunicationMessage>>> _observers = + new Dictionary<NodeObserverIdentifier, IObserver<NsMessage<GeneralGroupCommunicationMessage>>>(); + + private readonly StreamingNetworkService<GeneralGroupCommunicationMessage> _networkService; + private readonly object _registrationLock = new object(); + private bool _hasRegistered = false; + private volatile NsMessage<GeneralGroupCommunicationMessage> _registrationMessage; + + public TaskMessageObserver(StreamingNetworkService<GeneralGroupCommunicationMessage> networkService) + { + _networkService = networkService; + } + + /// <summary> + /// Registers a node associated with the Task. + /// </summary> + public void RegisterNodeObserver<T>(NodeMessageObserver<T> observer) + { + _observers.Add(NodeObserverIdentifier.FromObserver(observer), observer); + } + + /// <summary> + /// This is called directly from the observer container with the registered IPEndpoint + /// of the Task ID. + /// </summary> + public void OnNext(NsMessage<GeneralGroupCommunicationMessage> value) + { + Handle(value); + } + + /// <summary> + /// This is called from the universal observer in ObserverContainer for the first message. + /// </summary> + public void OnNext(IRemoteMessage<NsMessage<GeneralGroupCommunicationMessage>> value) + { + // Lock to prevent duplication of messages. + lock (_registrationLock) + { + if (_hasRegistered) + { + return; + } + + var socketRemoteId = value.Identifier as SocketRemoteIdentifier; + if (socketRemoteId == null) + { + throw new InvalidOperationException(); + } + + // Handle the message first, then register the observer. + Handle(value.Message, true); + _networkService.RemoteManager.RegisterObserver(socketRemoteId.Addr, this); + _hasRegistered = true; + } + } + + public void OnError(Exception error) + { + // TODO[JIRA REEF-1407]: Propagate Exception to nodes associated with the Task. + throw new NotImplementedException(); + } + + public void OnCompleted() + { + // TODO[JIRA REEF-1407]: Propagate completion to nodes associated with the Task. + throw new NotImplementedException(); + } + + /// <summary> + /// Handles the group communication message. + /// </summary> + private void Handle(NsMessage<GeneralGroupCommunicationMessage> value, bool isRegistration = false) + { + // This is mainly used to handle the case should ObserverContainer + // decide to trigger handlers concurrently for a single message. + if (isRegistration) + { + // Process the registration message + _registrationMessage = value; + } + else if (_registrationMessage != null && value == _registrationMessage) + { + // This means that we've already processed the message. + // Ignore this message and discard the reference. + _registrationMessage = null; + return; + } + + var gcMessage = value.Data.First(); + + IObserver<NsMessage<GeneralGroupCommunicationMessage>> observer; + if (!_observers.TryGetValue(NodeObserverIdentifier.FromMessage(gcMessage), out observer)) + { + throw new InvalidOperationException(); + } + + observer.OnNext(value); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/NetworkService/INetworkService.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/INetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/INetworkService.cs index 9536d39..28208db 100644 --- a/lang/cs/Org.Apache.REEF.Network/NetworkService/INetworkService.cs +++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/INetworkService.cs @@ -18,6 +18,7 @@ using System; using Org.Apache.REEF.Common.Io; using Org.Apache.REEF.Wake; +using Org.Apache.REEF.Wake.Remote; namespace Org.Apache.REEF.Network.NetworkService { @@ -33,6 +34,11 @@ namespace Org.Apache.REEF.Network.NetworkService INameClient NamingClient { get; } /// <summary> + /// The remote manager of the NetworkService. + /// </summary> + IRemoteManager<NsMessage<T>> RemoteManager { get; } + + /// <summary> /// Open a new connection to the remote host registered to /// the name service with the given identifier /// </summary> http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs index bd0c94b..53a74b4 100644 --- a/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs +++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs @@ -81,6 +81,14 @@ namespace Org.Apache.REEF.Network.NetworkService public INameClient NamingClient { get; private set; } /// <summary> + /// The remote manager of the network service. + /// </summary> + public IRemoteManager<NsMessage<T>> RemoteManager + { + get { return _remoteManager; } + } + + /// <summary> /// Open a new connection to the remote host registered to /// the name service with the given identifier /// </summary> http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs index cfee235..a34e8cb 100644 --- a/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs +++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs @@ -22,13 +22,10 @@ using Org.Apache.REEF.Common.Io; using Org.Apache.REEF.Network.NetworkService.Codec; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Exceptions; -using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Utilities.Logging; using Org.Apache.REEF.Wake; using Org.Apache.REEF.Wake.Remote; using Org.Apache.REEF.Wake.Remote.Impl; -using Org.Apache.REEF.Wake.StreamingCodec; -using Org.Apache.REEF.Wake.Util; namespace Org.Apache.REEF.Network.NetworkService { @@ -42,37 +39,78 @@ namespace Org.Apache.REEF.Network.NetworkService private readonly IRemoteManager<NsMessage<T>> _remoteManager; private IIdentifier _localIdentifier; - private readonly IDisposable _messageHandlerDisposable; + private readonly IDisposable _universalObserverDisposable; + private readonly IDisposable _remoteMessageUniversalObserver; private readonly Dictionary<IIdentifier, IConnection<T>> _connectionMap; private readonly INameClient _nameClient; /// <summary> /// Create a new Writable NetworkService. /// </summary> - /// <param name="messageHandler">The observer to handle incoming messages</param> - /// <param name="idFactory">The factory used to create IIdentifiers</param> + /// <param name="universalObserver">The observer to handle incoming messages</param> /// <param name="nameClient">The name client used to register Ids</param> - /// <param name="remoteManagerFactory">Writable RemoteManagerFactory to create a - /// Writable RemoteManager</param> + /// <param name="remoteManagerFactory"> + /// Writable RemoteManagerFactory to create a Writable RemoteManager + /// </param> /// <param name="codec">Codec for Network Service message</param> /// <param name="localAddressProvider">The local address provider</param> - /// <param name="injector">Fork of the injector that created the Network service</param> [Inject] private StreamingNetworkService( - IObserver<NsMessage<T>> messageHandler, - IIdentifierFactory idFactory, + IObserver<NsMessage<T>> universalObserver, INameClient nameClient, StreamingRemoteManagerFactory remoteManagerFactory, NsMessageStreamingCodec<T> codec, - ILocalAddressProvider localAddressProvider, - IInjector injector) + ILocalAddressProvider localAddressProvider) + : this(universalObserver, null, nameClient, remoteManagerFactory, codec, localAddressProvider) + { + } + + /// <summary> + /// Create a new Writable NetworkService + /// </summary> + /// <param name="remoteMessageUniversalObserver">The observer to handle incoming messages</param> + /// <param name="nameClient">The name client used to register Ids</param> + /// <param name="remoteManagerFactory"> + /// Writable RemoteManagerFactory to create a Writable RemoteManager + /// </param> + /// <param name="codec">Codec for Network Service message</param> + /// <param name="localAddressProvider">The local address provider</param> + [Inject] + private StreamingNetworkService( + IObserver<IRemoteMessage<NsMessage<T>>> remoteMessageUniversalObserver, + INameClient nameClient, + StreamingRemoteManagerFactory remoteManagerFactory, + NsMessageStreamingCodec<T> codec, + ILocalAddressProvider localAddressProvider) + : this(null, remoteMessageUniversalObserver, nameClient, remoteManagerFactory, codec, localAddressProvider) + { + } + + [Inject] + private StreamingNetworkService( + IObserver<NsMessage<T>> universalObserver, + IObserver<IRemoteMessage<NsMessage<T>>> remoteMessageUniversalObserver, + INameClient nameClient, + StreamingRemoteManagerFactory remoteManagerFactory, + NsMessageStreamingCodec<T> codec, + ILocalAddressProvider localAddressProvider) { _remoteManager = remoteManagerFactory.GetInstance(localAddressProvider.LocalAddress, codec); - // Create and register incoming message handler - // TODO[REEF-419] This should use the TcpPortProvider mechanism - var anyEndpoint = new IPEndPoint(IPAddress.Any, 0); - _messageHandlerDisposable = _remoteManager.RegisterObserver(anyEndpoint, messageHandler); + if (universalObserver != null) + { + // Create and register incoming message handler + // TODO[REEF-419] This should use the TcpPortProvider mechanism + var anyEndpoint = new IPEndPoint(IPAddress.Any, 0); + _universalObserverDisposable = _remoteManager.RegisterObserver(anyEndpoint, universalObserver); + } + else + { + _universalObserverDisposable = null; + } + + _remoteMessageUniversalObserver = remoteMessageUniversalObserver != null ? + _remoteManager.RegisterObserver(remoteMessageUniversalObserver) : null; _nameClient = nameClient; _connectionMap = new Dictionary<IIdentifier, IConnection<T>>(); @@ -89,6 +127,14 @@ namespace Org.Apache.REEF.Network.NetworkService } /// <summary> + /// RemoteManager for registering Observers. + /// </summary> + public IRemoteManager<NsMessage<T>> RemoteManager + { + get { return _remoteManager; } + } + + /// <summary> /// Open a new connection to the remote host registered to /// the name service with the given identifier /// </summary> @@ -141,8 +187,18 @@ namespace Org.Apache.REEF.Network.NetworkService } NamingClient.Unregister(_localIdentifier.ToString()); + _localIdentifier = null; - _messageHandlerDisposable.Dispose(); + + if (_universalObserverDisposable != null) + { + _universalObserverDisposable.Dispose(); + } + + if (_remoteMessageUniversalObserver != null) + { + _remoteMessageUniversalObserver.Dispose(); + } } /// <summary> @@ -156,4 +212,4 @@ namespace Org.Apache.REEF.Network.NetworkService Logger.Log(Level.Verbose, "Disposed of network service"); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj index 6472597..c53027d 100644 --- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj +++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj @@ -87,6 +87,9 @@ under the License. <Compile Include="Group\Pipelining\StreamingPipelineMessageCodec.cs" /> <Compile Include="Group\Task\ICommunicationGroupClientInternal.cs" /> <Compile Include="Group\Task\Impl\ChildNodeContainer.cs" /> + <Compile Include="Group\Task\Impl\NodeMessageObserver.cs" /> + <Compile Include="Group\Task\Impl\NodeObserverIdentifier.cs" /> + <Compile Include="Group\Task\Impl\TaskMessageObserver.cs" /> <Compile Include="Group\Task\IOperatorTopology.cs" /> <Compile Include="Group\Operators\IReduceFunction.cs" /> <Compile Include="Group\Operators\IReduceReceiver.cs" /> @@ -98,11 +101,8 @@ under the License. <Compile Include="Group\Pipelining\PipelineMessage.cs" /> <Compile Include="Group\Pipelining\PipelineMessageCodec.cs" /> <Compile Include="Group\Task\ICommunicationGroupClient.cs" /> - <Compile Include="Group\Task\ICommunicationGroupNetworkObserver.cs" /> <Compile Include="Group\Task\IGroupCommClient.cs" /> - <Compile Include="Group\Task\IGroupCommNetworkObserver.cs" /> <Compile Include="Group\Task\Impl\CommunicationGroupClient.cs" /> - <Compile Include="Group\Task\Impl\CommunicationGroupNetworkObserver.cs" /> <Compile Include="Group\Task\Impl\GroupCommClient.cs" /> <Compile Include="Group\Task\Impl\GroupCommNetworkObserver.cs" /> <Compile Include="Group\Task\Impl\NodeStruct.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/df1a226d/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ObserverContainer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ObserverContainer.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ObserverContainer.cs index 857e87c..cc4c57b 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ObserverContainer.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ObserverContainer.cs @@ -25,12 +25,13 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// <summary> /// Stores registered IObservers for DefaultRemoteManager. /// Can register and look up IObservers by remote IPEndPoint. + /// TODO[JIRA REEF-1407]: Remove <see cref="IObserver{T}"/> and add custom OnError/OnCompleted with IPEndpoints. /// </summary> internal sealed class ObserverContainer<T> : IObserver<TransportEvent<IRemoteEvent<T>>> { private readonly ConcurrentDictionary<IPEndPoint, IObserver<T>> _endpointMap; - private readonly ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>> _typeMap; private IObserver<T> _universalObserver; + private IObserver<IRemoteMessage<T>> _remoteMessageUniversalObserver; /// <summary> /// Constructs a new ObserverContainer used to manage remote IObservers. @@ -38,7 +39,6 @@ namespace Org.Apache.REEF.Wake.Remote.Impl public ObserverContainer() { _endpointMap = new ConcurrentDictionary<IPEndPoint, IObserver<T>>(new IPEndPointComparer()); - _typeMap = new ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>>(); } /// <summary> @@ -67,8 +67,8 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// <returns>An IDisposable used to unregister the observer with</returns> public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer) { - _typeMap[typeof(T)] = observer; - return Disposable.Create(() => _typeMap.TryRemove(typeof(T), out observer)); + _remoteMessageUniversalObserver = observer; + return Disposable.Create(() => _remoteMessageUniversalObserver = null); } /// <summary> @@ -84,25 +84,26 @@ namespace Org.Apache.REEF.Wake.Remote.Impl T value = remoteEvent.Value; bool handled = false; - IObserver<T> observer1; - IObserver<IRemoteMessage<T>> observer2; if (_universalObserver != null) { _universalObserver.OnNext(value); handled = true; } - if (_endpointMap.TryGetValue(remoteEvent.RemoteEndPoint, out observer1)) - { - // IObserver was registered by IPEndpoint - observer1.OnNext(value); - handled = true; - } - else if (_typeMap.TryGetValue(value.GetType(), out observer2)) + + if (_remoteMessageUniversalObserver != null) { // IObserver was registered by event type IRemoteIdentifier id = new SocketRemoteIdentifier(remoteEvent.RemoteEndPoint); IRemoteMessage<T> remoteMessage = new DefaultRemoteMessage<T>(id, value); - observer2.OnNext(remoteMessage); + _remoteMessageUniversalObserver.OnNext(remoteMessage); + handled = true; + } + + IObserver<T> observer; + if (_endpointMap.TryGetValue(remoteEvent.RemoteEndPoint, out observer)) + { + // IObserver was registered by IPEndpoint + observer.OnNext(value); handled = true; } @@ -114,10 +115,12 @@ namespace Org.Apache.REEF.Wake.Remote.Impl public void OnError(Exception error) { + // TODO[JIRA REEF-1407]: Propagate Exception upwards. May need to change signature. } public void OnCompleted() { + // TODO[JIRA REEF-1407]: Propagate completion upwards. May need to change signature. } } -} +} \ No newline at end of file
