http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/Naming/NameClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/NameClient.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/NameClient.cs deleted file mode 100644 index 806f1d0..0000000 --- a/lang/cs/Source/REEF/reef-io/Network/Naming/NameClient.cs +++ /dev/null @@ -1,279 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Common.io; -using Org.Apache.REEF.IO.Network.Naming.Codec; -using Org.Apache.REEF.IO.Network.Naming.Events; -using Org.Apache.REEF.Utilities.Diagnostics; -using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Wake; -using Org.Apache.REEF.Wake.Remote; -using Org.Apache.REEF.Wake.Remote.Impl; -using Org.Apache.REEF.Wake.RX; -using Org.Apache.REEF.Wake.RX.Impl; -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; -using System.Net; -using System.Reactive; - -namespace Org.Apache.REEF.IO.Network.Naming -{ - /// <summary> - /// Client for the Reef name service. - /// Used to register, unregister, and lookup IP Addresses of known hosts. - /// </summary> - public class NameClient : INameClient - { - private static Logger _logger = Logger.GetLogger(typeof(NameClient)); - - private BlockingCollection<NamingLookupResponse> _lookupResponseQueue; - private BlockingCollection<NamingGetAllResponse> _getAllResponseQueue; - private BlockingCollection<NamingRegisterResponse> _registerResponseQueue; - private BlockingCollection<NamingUnregisterResponse> _unregisterResponseQueue; - - private TransportClient<NamingEvent> _client; - - private NameLookupClient _lookupClient; - private NameRegisterClient _registerClient; - - private bool _disposed; - - /// <summary> - /// Constructs a NameClient to register, lookup, and unregister IPEndpoints - /// with the NameServer. - /// </summary> - /// <param name="remoteAddress">The ip address of the NameServer</param> - /// <param name="remotePort">The port of the NameServer</param> - [Inject] - public NameClient( - [Parameter(typeof(NamingConfigurationOptions.NameServerAddress))] string remoteAddress, - [Parameter(typeof(NamingConfigurationOptions.NameServerPort))] int remotePort) - { - IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse(remoteAddress), remotePort); - Initialize(remoteEndpoint); - _disposed = false; - } - - /// <summary> - /// Constructs a NameClient to register, lookup, and unregister IPEndpoints - /// with the NameServer. - /// </summary> - /// <param name="remoteEndpoint">The endpoint of the NameServer</param> - public NameClient(IPEndPoint remoteEndpoint) - { - Initialize(remoteEndpoint); - _disposed = false; - } - - /// <summary> - /// Synchronously registers the identifier with the NameService. - /// Overwrites the previous mapping if the identifier has already - /// been registered. - /// </summary> - /// <param name="id">The key used to map the remote endpoint</param> - /// <param name="endpoint">The endpoint to map</param> - public void Register(string id, IPEndPoint endpoint) - { - if (id == null) - { - Exceptions.Throw(new ArgumentNullException("id"), _logger); - } - if (endpoint == null) - { - Exceptions.Throw(new ArgumentNullException("endpoint"), _logger); - } - - _logger.Log(Level.Info, "Registering id: " + id + ", and endpoint: " + endpoint); - _registerClient.Register(id, endpoint); - } - - /// <summary> - /// Synchronously unregisters the remote identifier with the NameService - /// </summary> - /// <param name="id">The identifier to unregister</param> - public void Unregister(string id) - { - if (id == null) - { - Exceptions.Throw(new ArgumentNullException("id"), _logger); - } - - _logger.Log(Level.Info, "Unregistering id: " + id); - _registerClient.Unregister(id); - } - - /// <summary> - /// Synchronously looks up the IPEndpoint for the registered identifier. - /// </summary> - /// <param name="id">The identifier to look up</param> - /// <returns>The mapped IPEndpoint for the identifier, or null if - /// the identifier has not been registered with the NameService</returns> - public IPEndPoint Lookup(string id) - { - if (id == null) - { - Exceptions.Throw(new ArgumentNullException("id"), _logger); - } - - List<NameAssignment> assignments = Lookup(new List<string> { id }); - if (assignments != null && assignments.Count > 0) - { - return assignments.First().Endpoint; - } - - return null; - } - - /// <summary> - /// Synchronously looks up the IPEndpoint for each of the registered identifiers in the list. - /// </summary> - /// <param name="ids">The list of identifiers to look up</param> - /// <returns>The list of NameAssignments representing a pair of identifer - /// and mapped IPEndpoint for that identifier. If any of the requested identifiers - /// are not registered with the NameService, their corresponding NameAssignment - /// IPEndpoint value will be null.</returns> - public List<NameAssignment> Lookup(List<string> ids) - { - if (ids == null || ids.Count == 0) - { - Exceptions.Throw(new ArgumentNullException("ids cannot be null or empty"), _logger); - } - - _logger.Log(Level.Verbose, "Looking up ids"); - List<NameAssignment> assignments = _lookupClient.Lookup(ids); - if (assignments != null) - { - return assignments; - } - Exceptions.Throw(new WakeRuntimeException("NameClient failed to look up ids."), _logger); - return null; //above line will throw exception. So null will never be returned. - } - - /// <summary> - /// Restart the name client in case of failure. - /// </summary> - /// <param name="serverEndpoint">The new server endpoint to connect to</param> - public void Restart(IPEndPoint serverEndpoint) - { - _client.Dispose(); - Initialize(serverEndpoint); - } - - /// <summary> - /// Releases resources used by NameClient - /// </summary> - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - protected virtual void Dispose(bool disposing) - { - if (_disposed) - { - return; - } - if (disposing) - { - _client.Dispose(); - } - _disposed = true; - } - - /// <summary> - /// Create a new transport client connected to the NameServer at the given remote endpoint. - /// </summary> - /// <param name="serverEndpoint">The NameServer endpoint to connect to.</param> - private void Initialize(IPEndPoint serverEndpoint) - { - _lookupResponseQueue = new BlockingCollection<NamingLookupResponse>(); - _getAllResponseQueue = new BlockingCollection<NamingGetAllResponse>(); - _registerResponseQueue = new BlockingCollection<NamingRegisterResponse>(); - _unregisterResponseQueue = new BlockingCollection<NamingUnregisterResponse>(); - - IObserver<TransportEvent<NamingEvent>> clientHandler = CreateClientHandler(); - ICodec<NamingEvent> codec = CreateClientCodec(); - _client = new TransportClient<NamingEvent>(serverEndpoint, codec, clientHandler); - - _lookupClient = new NameLookupClient(_client, _lookupResponseQueue, _getAllResponseQueue); - _registerClient = new NameRegisterClient(_client, _registerResponseQueue, _unregisterResponseQueue); - } - - /// <summary> - /// Create handler to handle async responses from the NameServer. - /// </summary> - /// <returns>The client handler to manage responses from the NameServer</returns> - private IObserver<TransportEvent<NamingEvent>> CreateClientHandler() - { - PubSubSubject<NamingEvent> subject = new PubSubSubject<NamingEvent>(); - subject.Subscribe(Observer.Create<NamingLookupResponse>(msg => HandleResponse(_lookupResponseQueue, msg))); - subject.Subscribe(Observer.Create<NamingGetAllResponse>(msg => HandleResponse(_getAllResponseQueue, msg))); - subject.Subscribe(Observer.Create<NamingRegisterResponse>(msg => HandleResponse(_registerResponseQueue, msg))); - subject.Subscribe(Observer.Create<NamingUnregisterResponse>(msg => HandleResponse(_unregisterResponseQueue, msg))); - return new ClientObserver(subject); - } - - /// <summary> - /// Create the codec used to serialize/deserialize NamingEvent messages - /// </summary> - /// <returns>The serialization codec</returns> - private ICodec<NamingEvent> CreateClientCodec() - { - MultiCodec<NamingEvent> codec = new MultiCodec<NamingEvent>(); - codec.Register(new NamingLookupRequestCodec(), "org.apache.reef.io.network.naming.serialization.NamingLookupRequest"); - codec.Register(new NamingLookupResponseCodec(), "org.apache.reef.io.network.naming.serialization.NamingLookupResponse"); - NamingRegisterRequestCodec requestCodec = new NamingRegisterRequestCodec(); - codec.Register(requestCodec, "org.apache.reef.io.network.naming.serialization.NamingRegisterRequest"); - codec.Register(new NamingRegisterResponseCodec(requestCodec), "org.apache.reef.io.network.naming.serialization.NamingRegisterResponse"); - codec.Register(new NamingUnregisterRequestCodec(), "org.apache.reef.io.network.naming.serialization.NamingUnregisterRequest"); - return codec; - } - - private void HandleResponse<T>(BlockingCollection<T> queue, T message) - { - queue.Add(message); - } - - /// <summary> - /// Helper class used to handle response events from the NameServer. - /// Delegates the event to the appropriate response queue depending on - /// its event type. - /// </summary> - private class ClientObserver : AbstractObserver<TransportEvent<NamingEvent>> - { - private IObserver<NamingEvent> _handler; - - public ClientObserver(IObserver<NamingEvent> handler) - { - _handler = handler; - } - - public override void OnNext(TransportEvent<NamingEvent> value) - { - NamingEvent message = value.Data; - message.Link = value.Link; - _handler.OnNext(message); - } - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/Naming/NameLookupClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/NameLookupClient.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/NameLookupClient.cs deleted file mode 100644 index 7499734..0000000 --- a/lang/cs/Source/REEF/reef-io/Network/Naming/NameLookupClient.cs +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; -using System.Net; -using System.Threading; -using Org.Apache.REEF.Common.io; -using Org.Apache.REEF.IO.Network.Naming.Events; -using Org.Apache.REEF.Wake.Remote.Impl; - -namespace Org.Apache.REEF.IO.Network.Naming -{ - /// <summary> - /// Helper class to send lookup events to the name server - /// </summary> - internal class NameLookupClient - { - private TransportClient<NamingEvent> _client; - private BlockingCollection<NamingLookupResponse> _lookupResponseQueue; - private BlockingCollection<NamingGetAllResponse> _getAllResponseQueue; - - /// <summary> - /// Constructs a new NameLookupClient. - /// </summary> - /// <param name="client">The transport client used to connect to the NameServer</param> - /// <param name="lookupQueue">The queue used to signal that a response - /// has been received from the NameServer</param> - /// <param name="getAllQueue">The queue used to signal that a GetAllResponse - /// has been received from the NameServer</param> - public NameLookupClient(TransportClient<NamingEvent> client, - BlockingCollection<NamingLookupResponse> lookupQueue, - BlockingCollection<NamingGetAllResponse> getAllQueue) - { - _client = client; - _lookupResponseQueue = lookupQueue; - _getAllResponseQueue = getAllQueue; - } - - /// <summary> - /// Look up the IPEndPoint that has been registered with the NameServer using - /// the given identifier as the key. - /// </summary> - /// <param name="id">The id for the IPEndPoint</param> - /// <param name="token">The cancellation token used for timeout</param> - /// <returns>The registered IPEndpoint, or null if the identifer has not - /// been registered with the NameServer or if the operation times out.</returns> - public IPEndPoint Lookup(string id, CancellationToken token) - { - List<string> ids = new List<string> { id }; - List<NameAssignment> assignment = Lookup(ids); - return (assignment == null || assignment.Count == 0) ? null : assignment.First().Endpoint; - } - - /// <summary> - /// Look up IPEndPoints that have been registered with the NameService - /// </summary> - /// <param name="ids">The list of ids to look up</param> - /// <returns>A list of NameAssignments representing the mapped identifier/IPEndpoint - /// pairs</returns> - public List<NameAssignment> Lookup(List<string> ids) - { - _client.Send(new NamingLookupRequest(ids)); - NamingLookupResponse response = _lookupResponseQueue.Take(); - return response.NameAssignments; - } - - /// <summary> - /// Synchronously gets all of the identifier/IPEndpoint pairs registered with the NameService. - /// </summary> - /// <returns>A list of NameAssignments representing the mapped identifier/IPEndpoint - /// pairs</returns> - public List<NameAssignment> GetAll() - { - _client.Send(new NamingGetAllRequest()); - NamingGetAllResponse response = _getAllResponseQueue.Take(); - return response.Assignments; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/Naming/NameRegisterClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/NameRegisterClient.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/NameRegisterClient.cs deleted file mode 100644 index 88dc3c0..0000000 --- a/lang/cs/Source/REEF/reef-io/Network/Naming/NameRegisterClient.cs +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System.Collections.Concurrent; -using System.Net; -using Org.Apache.REEF.Common.io; -using Org.Apache.REEF.IO.Network.Naming.Events; -using Org.Apache.REEF.Wake.Remote.Impl; - -namespace Org.Apache.REEF.IO.Network.Naming -{ - /// <summary> - /// Helper class to send register and unregister events to the NameServer. - /// </summary> - internal class NameRegisterClient - { - private TransportClient<NamingEvent> _client; - private BlockingCollection<NamingRegisterResponse> _registerResponseQueue; - private BlockingCollection<NamingUnregisterResponse> _unregisterResponseQueue; - - public NameRegisterClient(TransportClient<NamingEvent> client, - BlockingCollection<NamingRegisterResponse> registerQueue, - BlockingCollection<NamingUnregisterResponse> unregisterQueue) - { - _client = client; - _registerResponseQueue = registerQueue; - _unregisterResponseQueue = unregisterQueue; - } - - /// <summary> - /// Synchronously register the id and endpoint with the NameServer. - /// </summary> - /// <param name="id">The identifier</param> - /// <param name="endpoint">The endpoint</param> - public void Register(string id, IPEndPoint endpoint) - { - NameAssignment assignment = new NameAssignment(id, endpoint); - _client.Send(new NamingRegisterRequest(assignment)); - _registerResponseQueue.Take(); - } - - /// <summary> - /// Synchronously unregisters the identifier with the NameServer. - /// </summary> - /// <param name="id">The identifer to unregister</param> - public void Unregister(string id) - { - _client.Send(new NamingUnregisterRequest(id)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/Naming/NameServer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/NameServer.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/NameServer.cs deleted file mode 100644 index 462e05a..0000000 --- a/lang/cs/Source/REEF/reef-io/Network/Naming/NameServer.cs +++ /dev/null @@ -1,196 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Common.io; -using Org.Apache.REEF.IO.Network.Naming.Codec; -using Org.Apache.REEF.IO.Network.Naming.Events; -using Org.Apache.REEF.IO.Network.Naming.Observers; -using Org.Apache.REEF.Utilities.Diagnostics; -using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Wake.Remote; -using Org.Apache.REEF.Wake.Remote.Impl; -using Org.Apache.REEF.Wake.RX; -using Org.Apache.REEF.Wake.RX.Impl; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Net; - -namespace Org.Apache.REEF.IO.Network.Naming -{ - /// <summary> - /// Service that manages names and IPEndpoints for well known hosts. - /// Can register, unregister, and look up IPAddresses using a string identifier. - /// </summary> - public class NameServer : INameServer - { - private static Logger _logger = Logger.GetLogger(typeof(NameServer)); - - private TransportServer<NamingEvent> _server; - private Dictionary<string, IPEndPoint> _idToAddrMap; - - /// <summary> - /// Create a new NameServer to run on the specified port. - /// </summary> - /// <param name="port">The port to listen for incoming connections on.</param> - [Inject] - public NameServer([Parameter(typeof(NamingConfigurationOptions.NameServerPort))] int port) - { - IObserver<TransportEvent<NamingEvent>> handler = CreateServerHandler(); - _idToAddrMap = new Dictionary<string, IPEndPoint>(); - ICodec<NamingEvent> codec = CreateServerCodec(); - - // Start transport server, get listening IP endpoint - _logger.Log(Level.Info, "Starting naming server"); - _server = new TransportServer<NamingEvent>(port, handler, codec); - _server.Run(); - LocalEndpoint = _server.LocalEndpoint; - } - - public IPEndPoint LocalEndpoint { get; private set; } - - /// <summary> - /// Looks up the IPEndpoints for each string identifier - /// </summary> - /// <param name="ids">The IDs to look up</param> - /// <returns>A list of Name assignments representing the identifier - /// that was searched for and the mapped IPEndpoint</returns> - public List<NameAssignment> Lookup(List<string> ids) - { - if (ids == null) - { - Exceptions.Throw(new ArgumentNullException("ids"), _logger); - } - - return ids.Where(id => _idToAddrMap.ContainsKey(id)) - .Select(id => new NameAssignment(id, _idToAddrMap[id])) - .ToList(); - } - - /// <summary> - /// Gets all of the registered identifier/endpoint pairs. - /// </summary> - /// <returns>A list of all of the registered identifiers and their - /// mapped IPEndpoints</returns> - public List<NameAssignment> GetAll() - { - return _idToAddrMap.Select(pair => new NameAssignment(pair.Key, pair.Value)).ToList(); - } - - /// <summary> - /// Registers the string identifier with the given IPEndpoint - /// </summary> - /// <param name="id">The string ident</param> - /// <param name="endpoint">The mapped endpoint</param> - public void Register(string id, IPEndPoint endpoint) - { - if (id == null) - { - Exceptions.Throw(new ArgumentNullException("id"), _logger); - } - if (endpoint == null) - { - Exceptions.Throw(new ArgumentNullException("endpoint"), _logger); - } - - _logger.Log(Level.Info, "Registering id: " + id + ", and endpoint: " + endpoint); - _idToAddrMap[id] = endpoint; - } - - /// <summary> - /// Unregister the given identifier with the NameServer - /// </summary> - /// <param name="id">The identifier to unregister</param> - public void Unregister(string id) - { - if (id == null) - { - Exceptions.Throw(new ArgumentNullException("id"), _logger); - } - - _logger.Log(Level.Info, "Unregistering id: " + id); - _idToAddrMap.Remove(id); - } - - /// <summary> - /// Stops the NameServer - /// </summary> - public void Dispose() - { - _server.Dispose(); - } - - /// <summary> - /// Create the handler to manage incoming NamingEvent types - /// </summary> - /// <returns>The server handler</returns> - private IObserver<TransportEvent<NamingEvent>> CreateServerHandler() - { - PubSubSubject<NamingEvent> subject = new PubSubSubject<NamingEvent>(); - subject.Subscribe(new NamingLookupRequestObserver(this)); - subject.Subscribe(new NamingGetAllRequestObserver(this)); - subject.Subscribe(new NamingRegisterRequestObserver(this)); - subject.Subscribe(new NamingUnregisterRequestObserver(this)); - return new ServerHandler(subject); - } - - /// <summary> - /// Create the codec used to serialize/deserialize NamingEvent messages - /// </summary> - /// <returns>The serialization codec</returns> - private ICodec<NamingEvent> CreateServerCodec() - { - MultiCodec<NamingEvent> codec = new MultiCodec<NamingEvent>(); - codec.Register(new NamingLookupRequestCodec(), "org.apache.reef.io.network.naming.serialization.NamingLookupRequest"); - codec.Register(new NamingLookupResponseCodec(), "org.apache.reef.io.network.naming.serialization.NamingLookupResponse"); - NamingRegisterRequestCodec requestCodec = new NamingRegisterRequestCodec(); - codec.Register(requestCodec, "org.apache.reef.io.network.naming.serialization.NamingRegisterRequest"); - codec.Register(new NamingRegisterResponseCodec(requestCodec), "org.apache.reef.io.network.naming.serialization.NamingRegisterResponse"); - codec.Register(new NamingUnregisterRequestCodec(), "org.apache.reef.io.network.naming.serialization.NamingUnregisterRequest"); - return codec; - } - - [NamedParameter("Port for the NameServer to listen on")] - public class Port : Name<int> - { - } - - /// <summary> - /// Class used to handle incoming NamingEvent messages. - /// Delegates the event to the prescribed handler depending on its type - /// </summary> - private class ServerHandler : AbstractObserver<TransportEvent<NamingEvent>> - { - private IObserver<NamingEvent> _handler; - - public ServerHandler(IObserver<NamingEvent> handler) - { - _handler = handler; - } - - public override void OnNext(TransportEvent<NamingEvent> value) - { - NamingEvent message = value.Data; - message.Link = value.Link; - _handler.OnNext(message); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/Naming/NamingConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/NamingConfiguration.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/NamingConfiguration.cs deleted file mode 100644 index 3daac70..0000000 --- a/lang/cs/Source/REEF/reef-io/Network/Naming/NamingConfiguration.cs +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Diagnostics.CodeAnalysis; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Org.Apache.REEF.Tang.Formats; -using Org.Apache.REEF.Tang.Util; - -namespace Org.Apache.REEF.Naming -{ - public class NamingConfiguration : ConfigurationModuleBuilder - { - [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] - public static readonly RequiredParameter<string> NameServerAddress = new RequiredParameter<string>(); - - [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] - public static readonly RequiredParameter<int> NameServerPort = new RequiredParameter<int>(); - - public static ConfigurationModule ConfigurationModule - { - get - { - return new NamingConfiguration() - .BindNamedParameter(GenericType<NamingConfigurationOptions.NameServerAddress>.Class, NameServerAddress) - .BindNamedParameter(GenericType<NamingConfigurationOptions.NameServerPort>.Class, NameServerPort) - .Build(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/Naming/NamingConfigurationOptions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/NamingConfigurationOptions.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/NamingConfigurationOptions.cs deleted file mode 100644 index aa9b6e6..0000000 --- a/lang/cs/Source/REEF/reef-io/Network/Naming/NamingConfigurationOptions.cs +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Org.Apache.REEF.Tang.Annotations; - -namespace Org.Apache.REEF.Naming -{ - public class NamingConfigurationOptions - { - [NamedParameter("IP address of NameServer")] - public class NameServerAddress : Name<string> - { - } - - [NamedParameter("Port of NameServer")] - public class NameServerPort : Name<int> - { - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingGetAllRequestObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingGetAllRequestObserver.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingGetAllRequestObserver.cs deleted file mode 100644 index 92cc158..0000000 --- a/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingGetAllRequestObserver.cs +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System.Collections.Generic; -using Org.Apache.REEF.Common.io; -using Org.Apache.REEF.IO.Network.Naming.Events; -using Org.Apache.REEF.Wake.RX; - -namespace Org.Apache.REEF.IO.Network.Naming.Observers -{ - /// <summary> - /// Handler for NameService for events of type NamingGetAllRequest. - /// Gets all of the identifiers and their mapped IPEndpoints registered - /// with the NameServer. - /// </summary> - internal class NamingGetAllRequestObserver : AbstractObserver<NamingGetAllRequest> - { - private NameServer _server; - - public NamingGetAllRequestObserver(NameServer server) - { - _server = server; - } - - public override void OnNext(NamingGetAllRequest value) - { - List<NameAssignment> assignments = _server.GetAll(); - value.Link.Write(new NamingGetAllResponse(assignments)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingLookupRequestObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingLookupRequestObserver.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingLookupRequestObserver.cs deleted file mode 100644 index 220aaa5..0000000 --- a/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingLookupRequestObserver.cs +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Common.io; -using Org.Apache.REEF.IO.Network.Naming.Events; -using Org.Apache.REEF.Wake.RX; -using System.Collections.Generic; - -namespace Org.Apache.REEF.IO.Network.Naming.Observers -{ - /// <summary> - /// Handler for looking up IPEndpoints registered with the NameServer - /// </summary> - internal class NamingLookupRequestObserver : AbstractObserver<NamingLookupRequest> - { - private NameServer _server; - - public NamingLookupRequestObserver(NameServer server) - { - _server = server; - } - - /// <summary> - /// Look up the IPEndpoints for the given identifiers and write them - /// back to the NameClient - /// </summary> - /// <param name="value">The lookup request event</param> - public override void OnNext(NamingLookupRequest value) - { - List<NameAssignment> assignments = _server.Lookup(value.Identifiers); - value.Link.Write(new NamingLookupResponse(assignments)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingRegisterRequestObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingRegisterRequestObserver.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingRegisterRequestObserver.cs deleted file mode 100644 index 2da45a1..0000000 --- a/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingRegisterRequestObserver.cs +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Common.io; -using Org.Apache.REEF.IO.Network.Naming.Events; -using Org.Apache.REEF.Wake.RX; - -namespace Org.Apache.REEF.IO.Network.Naming.Observers -{ - /// <summary> - /// Handler for registering an identifier and endpoint with the Name Service - /// </summary> - internal class NamingRegisterRequestObserver : AbstractObserver<NamingRegisterRequest> - { - private NameServer _server; - - public NamingRegisterRequestObserver(NameServer server) - { - _server = server; - } - - /// <summary> - /// Register the identifier and IPEndpoint with the NameServer and send - /// the response back to the NameClient - /// </summary> - /// <param name="value">The register request event</param> - public override void OnNext(NamingRegisterRequest value) - { - NameAssignment assignment = value.NameAssignment; - _server.Register(assignment.Identifier, assignment.Endpoint); - - value.Link.Write(new NamingRegisterResponse(value)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingUnregisterRequestObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingUnregisterRequestObserver.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingUnregisterRequestObserver.cs deleted file mode 100644 index 17d2d50..0000000 --- a/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingUnregisterRequestObserver.cs +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.IO.Network.Naming.Events; -using Org.Apache.REEF.Wake.RX; - -namespace Org.Apache.REEF.IO.Network.Naming.Observers -{ - /// <summary> - /// Handler for unregistering an identifier with the NameServer - /// </summary> - internal class NamingUnregisterRequestObserver : AbstractObserver<NamingUnregisterRequest> - { - private NameServer _server; - - public NamingUnregisterRequestObserver(NameServer server) - { - _server = server; - } - - /// <summary> - /// Unregister the identifer with the NameServer. - /// </summary> - /// <param name="value">The unregister request event</param> - public override void OnNext(NamingUnregisterRequest value) - { - // Don't send a response - _server.Unregister(value.Identifier); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/Network.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-io/Network/Network.csproj b/lang/cs/Source/REEF/reef-io/Network/Network.csproj deleted file mode 100644 index fd46373..0000000 --- a/lang/cs/Source/REEF/reef-io/Network/Network.csproj +++ /dev/null @@ -1,180 +0,0 @@ -<?xml version="1.0" encoding="utf-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at -http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> - <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" /> - <PropertyGroup> - <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration> - <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform> - <ProjectGuid>{883CE800-6A6A-4E0A-B7FE-C054F4F2C1DC}</ProjectGuid> - <OutputType>Library</OutputType> - <AppDesignerFolder>Properties</AppDesignerFolder> - <RootNamespace>Org.Apache.Reef.IO.Network</RootNamespace> - <AssemblyName>Org.Apache.Reef.IO.Network</AssemblyName> - <TargetFrameworkVersion>v4.5</TargetFrameworkVersion> - <FileAlignment>512</FileAlignment> - <RestorePackages>true</RestorePackages> - <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\..\..\..</SolutionDir> - </PropertyGroup> - <Import Project="$(SolutionDir)\Source\build.props" /> - <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' "> - <PlatformTarget>AnyCPU</PlatformTarget> - <DebugSymbols>true</DebugSymbols> - <DebugType>full</DebugType> - <Optimize>false</Optimize> - <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath> - <DefineConstants>DEBUG;TRACE</DefineConstants> - <ErrorReport>prompt</ErrorReport> - <WarningLevel>4</WarningLevel> - </PropertyGroup> - <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x64' "> - <PlatformTarget>AnyCPU</PlatformTarget> - <DebugSymbols>true</DebugSymbols> - <DebugType>full</DebugType> - <Optimize>false</Optimize> - <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath> - <DefineConstants>DEBUG;TRACE</DefineConstants> - <ErrorReport>prompt</ErrorReport> - <WarningLevel>4</WarningLevel> - </PropertyGroup> - <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' "> - <PlatformTarget>AnyCPU</PlatformTarget> - <DebugType>pdbonly</DebugType> - <Optimize>true</Optimize> - <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath> - <DefineConstants>TRACE</DefineConstants> - <ErrorReport>prompt</ErrorReport> - <WarningLevel>4</WarningLevel> - </PropertyGroup> - <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|x64' "> - <PlatformTarget>AnyCPU</PlatformTarget> - <DebugType>pdbonly</DebugType> - <Optimize>true</Optimize> - <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath> - <DefineConstants>TRACE</DefineConstants> - <ErrorReport>prompt</ErrorReport> - <WarningLevel>4</WarningLevel> - </PropertyGroup> - <ItemGroup> - <Reference Include="Microsoft.Hadoop.Avro"> - <HintPath>$(PackagesDir)\Microsoft.Hadoop.Avro.$(AvroVersion)\lib\net40\Microsoft.Hadoop.Avro.dll</HintPath> - </Reference> - <Reference Include="Newtonsoft.Json"> - <HintPath>$(PackagesDir)\Newtonsoft.Json.$(NewtonsoftJsonVersion)\lib\net45\Newtonsoft.Json.dll</HintPath> - </Reference> - <Reference Include="protobuf-net"> - <HintPath>$(PackagesDir)\protobuf-net.$(ProtobufVersion)\lib\net40\protobuf-net.dll</HintPath> - </Reference> - <Reference Include="System.Reactive.Core"> - <HintPath>$(PackagesDir)\Rx-Core.$(RxVersion)\lib\net45\System.Reactive.Core.dll</HintPath> - </Reference> - <Reference Include="System.Reactive.Interfaces"> - <HintPath>$(PackagesDir)\Rx-Interfaces.$(RxVersion)\lib\net45\System.Reactive.Interfaces.dll</HintPath> - </Reference> - <Reference Include="System" /> - <Reference Include="System.Core" /> - <Reference Include="System.Runtime.Serialization" /> - <Reference Include="System.Xml.Linq" /> - <Reference Include="System.Data.DataSetExtensions" /> - <Reference Include="Microsoft.CSharp" /> - <Reference Include="System.Data" /> - <Reference Include="System.Xml" /> - </ItemGroup> - <ItemGroup> - <Compile Include="Naming\Codec\NamingLookupRequestCodec.cs" /> - <Compile Include="Naming\Codec\NamingLookupResponseCodec.cs" /> - <Compile Include="Naming\Codec\NamingRegisterRequestCodec.cs" /> - <Compile Include="Naming\Codec\NamingRegisterResponseCodec.cs" /> - <Compile Include="Naming\Codec\NamingUnregisterRequestCodec.cs" /> - <Compile Include="Naming\Contracts\AvroNamingAssignment.cs" /> - <Compile Include="Naming\Contracts\AvroNamingLookupRequest.cs" /> - <Compile Include="Naming\Contracts\AvroNamingLookupResponse.cs" /> - <Compile Include="Naming\Contracts\AvroNamingRegisterRequest.cs" /> - <Compile Include="Naming\Contracts\AvroNamingUnRegisterRequest.cs" /> - <Compile Include="Naming\Events\NamingEvent.cs" /> - <Compile Include="Naming\Events\NamingGetAllRequest.cs" /> - <Compile Include="Naming\Events\NamingGetAllResponse.cs" /> - <Compile Include="Naming\Events\NamingLookupRequest.cs" /> - <Compile Include="Naming\Events\NamingLookupResponse.cs" /> - <Compile Include="Naming\Events\NamingRegisterRequest.cs" /> - <Compile Include="Naming\Events\NamingRegisterResponse.cs" /> - <Compile Include="Naming\Events\NamingUnregisterRequest.cs" /> - <Compile Include="Naming\Events\NamingUnregisterResponse.cs" /> - <Compile Include="Naming\INameServer.cs" /> - <Compile Include="Naming\NameClient.cs" /> - <Compile Include="Naming\NameLookupClient.cs" /> - <Compile Include="Naming\NameRegisterClient.cs" /> - <Compile Include="Naming\NameServer.cs" /> - <Compile Include="Naming\NamingConfiguration.cs" /> - <Compile Include="Naming\NamingConfigurationOptions.cs" /> - <Compile Include="Naming\Observers\NamingGetAllRequestObserver.cs" /> - <Compile Include="Naming\Observers\NamingLookupRequestObserver.cs" /> - <Compile Include="Naming\Observers\NamingRegisterRequestObserver.cs" /> - <Compile Include="Naming\Observers\NamingUnregisterRequestObserver.cs" /> - <Compile Include="NetworkService\Codec\ControlMessageCodec.cs" /> - <Compile Include="NetworkService\Codec\NsMessageCodec.cs" /> - <Compile Include="NetworkService\Codec\NsMessageProto.cs" /> - <Compile Include="NetworkService\ControlMessage.cs" /> - <Compile Include="NetworkService\IConnection.cs" /> - <Compile Include="NetworkService\INetworkService.cs" /> - <Compile Include="NetworkService\NetworkService.cs" /> - <Compile Include="NetworkService\NetworkServiceConfiguration.cs" /> - <Compile Include="NetworkService\NetworkServiceOptions.cs" /> - <Compile Include="NetworkService\NsConnection.cs" /> - <Compile Include="NetworkService\NsMessage.cs" /> - <Compile Include="Properties\AssemblyInfo.cs" /> - <Compile Include="Utilities\BlockingCollectionExtensions.cs" /> - <Compile Include="Utilities\Utils.cs" /> - </ItemGroup> - <ItemGroup> - <None Include="packages.config" /> - </ItemGroup> - <ItemGroup> - <ProjectReference Include="$(SolutionDir)\Org.Apache.Reef.Tang\Org.Apache.Reef.Tang.csproj"> - <Project>{97dbb573-3994-417a-9f69-ffa25f00d2a6}</Project> - <Name>Org.Apache.Reef.Tang</Name> - </ProjectReference> - <ProjectReference Include="$(SolutionDir)\Org.Apache.Reef.Utilities\Org.Apache.Reef.Utilities.csproj"> - <Project>{79e7f89a-1dfb-45e1-8d43-d71a954aeb98}</Project> - <Name>Org.Apache.Reef.Utilities</Name> - </ProjectReference> - <ProjectReference Include="..\..\..\..\Org.Apache.Reef.Common\Org.Apache.Reef.Common.csproj"> - <Project>{545a0582-4105-44ce-b99c-b1379514a630}</Project> - <Name>Org.Apache.Reef.Common</Name> - </ProjectReference> - <ProjectReference Include="..\..\..\..\Org.Apache.Reef.Driver\Org.Apache.Reef.Driver.csproj"> - <Project>{a6baa2a7-f52f-4329-884e-1bcf711d6805}</Project> - <Name>Org.Apache.Reef.Driver</Name> - </ProjectReference> - <ProjectReference Include="..\..\..\..\Org.Apache.Reef.Wake\Org.Apache.Reef.Wake.csproj"> - <Project>{cdfb3464-4041-42b1-9271-83af24cd5008}</Project> - <Name>Org.Apache.Reef.Wake</Name> - </ProjectReference> - </ItemGroup> - <ItemGroup> - <WCFMetadata Include="Service References\" /> - </ItemGroup> - <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> - <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" /> - <!-- To modify your build process, add your task inside one of the targets below and uncomment it. - Other similar extension points exist, see Microsoft.Common.targets. - <Target Name="BeforeBuild"> - </Target> - <Target Name="AfterBuild"> - </Target> - --> -</Project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/ControlMessageCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/ControlMessageCodec.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/ControlMessageCodec.cs deleted file mode 100644 index 09ecfbe..0000000 --- a/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/ControlMessageCodec.cs +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Wake.Remote; - -namespace Org.Apache.REEF.IO.Network.NetworkService.Codec -{ - public class ControlMessageCodec : ICodec<ControlMessage> - { - [Inject] - public ControlMessageCodec() - { - } - - public byte[] Encode(ControlMessage message) - { - return BitConverter.GetBytes((int) message); - } - - public ControlMessage Decode(byte[] data) - { - return (ControlMessage) BitConverter.ToInt32(data, 0); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/NsMessageCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/NsMessageCodec.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/NsMessageCodec.cs deleted file mode 100644 index 454182e..0000000 --- a/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/NsMessageCodec.cs +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Wake; -using Org.Apache.REEF.Wake.Remote; -using ProtoBuf; -using System; -using System.IO; -using System.Linq; - -namespace Org.Apache.REEF.IO.Network.NetworkService.Codec -{ - /// <summary> - /// Codec to serialize NsMessages for NetworkService. - /// </summary> - /// <typeparam name="T">The message type</typeparam> - public class NsMessageCodec<T> : ICodec<NsMessage<T>> - { - private ICodec<T> _codec; - private IIdentifierFactory _idFactory; - - /// <summary> - /// Create new NsMessageCodec. - /// </summary> - /// <param name="codec">The codec used to serialize message data</param> - /// <param name="idFactory">Used to create identifier from string.</param> - public NsMessageCodec(ICodec<T> codec, IIdentifierFactory idFactory) - { - _codec = codec; - _idFactory = idFactory; - } - - /// <summary> - /// Serialize the NsMessage. - /// </summary> - /// <param name="obj">The object to serialize</param> - /// <returns>The serialized object in byte array form</returns> - public byte[] Encode(NsMessage<T> obj) - { - NsMessageProto proto = NsMessageProto.Create(obj, _codec); - using (var stream = new MemoryStream()) - { - Serializer.Serialize(stream, proto); - return stream.ToArray(); - } - } - - /// <summary> - /// Deserialize the byte array into NsMessage. - /// </summary> - /// <param name="data">The serialized byte array</param> - /// <returns>The deserialized NsMessage</returns> - public NsMessage<T> Decode(byte[] data) - { - using (var stream = new MemoryStream(data)) - { - NsMessageProto proto = Serializer.Deserialize<NsMessageProto>(stream); - - IIdentifier sourceId = _idFactory.Create(proto.SourceId); - IIdentifier destId = _idFactory.Create(proto.DestId); - NsMessage<T> message = new NsMessage<T>(sourceId, destId); - - var messages = proto.Data.Select(byteArr => _codec.Decode(byteArr)); - message.Data.AddRange(messages); - return message; - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/NsMessageProto.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/NsMessageProto.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/NsMessageProto.cs deleted file mode 100644 index 8345e3a..0000000 --- a/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/NsMessageProto.cs +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Runtime.Serialization; -using System.Text; -using System.Threading.Tasks; -using Org.Apache.REEF.Wake.Remote; -using ProtoBuf; - -namespace Org.Apache.REEF.IO.Network.NetworkService.Codec -{ - [ProtoContract] - public class NsMessageProto - { - public NsMessageProto() - { - Data = new List<byte[]>(); - } - - [ProtoMember(1)] - public string SourceId { get; set; } - - [ProtoMember(2)] - public string DestId { get; set; } - - [ProtoMember(3)] - public List<byte[]> Data { get; set; } - - public static NsMessageProto Create<T>(NsMessage<T> message, ICodec<T> codec) - { - NsMessageProto proto = new NsMessageProto(); - - proto.SourceId = message.SourceId.ToString(); - proto.DestId = message.DestId.ToString(); - - foreach (T item in message.Data) - { - proto.Data.Add(codec.Encode(item)); - } - - return proto; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/NetworkService/ControlMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/ControlMessage.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/ControlMessage.cs deleted file mode 100644 index 1309c32..0000000 --- a/lang/cs/Source/REEF/reef-io/Network/NetworkService/ControlMessage.cs +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -namespace Org.Apache.REEF.IO.Network.NetworkService -{ - public enum ControlMessage - { - /// <summary> - /// default state - /// </summary> - UNDEFINED = 0, - - /// <summary> - /// expecting data to be sent/received - /// </summary> - RECEIVE = 1, - - /// <summary> - /// stop group communications - /// </summary> - STOP = 2, - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/NetworkService/IConnection.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/IConnection.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/IConnection.cs deleted file mode 100644 index 0a13d60..0000000 --- a/lang/cs/Source/REEF/reef-io/Network/NetworkService/IConnection.cs +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; - -namespace Org.Apache.REEF.IO.Network.NetworkService -{ - /// <summary> - /// Represents a connection between two endpoints named by identifiers - /// </summary> - public interface IConnection<T> : IDisposable - { - /// <summary> - /// Opens the connection - /// </summary> - void Open(); - - /// <summary> - /// Writes the object to the connection - /// </summary> - /// <param name="obj">The message to send</param> - void Write(T obj); - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/NetworkService/INetworkService.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/INetworkService.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/INetworkService.cs deleted file mode 100644 index f9e1a0b..0000000 --- a/lang/cs/Source/REEF/reef-io/Network/NetworkService/INetworkService.cs +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using Org.Apache.REEF.Common.io; -using Org.Apache.REEF.Services; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Wake; - -namespace Org.Apache.REEF.IO.Network.NetworkService -{ - /// <summary> - /// Network service used for Reef Task communication. - /// </summary> - /// <typeparam name="T">The message type</typeparam> - public interface INetworkService<T> : IService, IDisposable - { - /// <summary> - /// Name client for registering ids - /// </summary> - INameClient NamingClient { get; } - - /// <summary> - /// Open a new connection to the remote host registered to - /// the name service with the given identifier - /// </summary> - /// <param name="destinationId">The identifier of the remote host</param> - /// <returns>The IConnection used for communication</returns> - IConnection<T> NewConnection(IIdentifier destinationId); - - /// <summary> - /// Register the identifier for the NetworkService with the NameService. - /// </summary> - /// <param name="id">The identifier to register</param> - void Register(IIdentifier id); - - /// <summary> - /// Unregister the identifier for the NetworkService with the NameService. - /// </summary> - void Unregister(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkService.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkService.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkService.cs deleted file mode 100644 index 041cd8d..0000000 --- a/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkService.cs +++ /dev/null @@ -1,156 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Net; -using System.Reactive; -using Org.Apache.REEF.Common.io; -using Org.Apache.REEF.IO.Network.Naming; -using Org.Apache.REEF.IO.Network.NetworkService.Codec; -using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Tang.Exceptions; -using Org.Apache.REEF.Wake; -using Org.Apache.REEF.Wake.Remote; -using Org.Apache.REEF.Wake.Remote.Impl; -using Org.Apache.REEF.Wake.Util; - -namespace Org.Apache.REEF.IO.Network.NetworkService -{ - /// <summary> - /// Network service used for Reef Task communication. - /// </summary> - /// <typeparam name="T">The message type</typeparam> - public class NetworkService<T> : INetworkService<T> - { - private Logger LOGGER = Logger.GetLogger(typeof(NetworkService<>)); - - private IRemoteManager<NsMessage<T>> _remoteManager; - private IObserver<NsMessage<T>> _messageHandler; - private ICodec<NsMessage<T>> _codec; - private IIdentifier _localIdentifier; - private IDisposable _messageHandlerDisposable; - private Dictionary<IIdentifier, IConnection<T>> _connectionMap; - - /// <summary> - /// Create a new NetworkFactory. - /// </summary> - /// <param name="nsPort">The port that the NetworkService will listen on</param> - /// <param name="nameServerAddr">The address of the NameServer</param> - /// <param name="nameServerPort">The port of the NameServer</param> - /// <param name="messageHandler">The observer to handle incoming messages</param> - /// <param name="idFactory">The factory used to create IIdentifiers</param> - /// <param name="codec">The codec used for serialization</param> - [Inject] - public NetworkService( - [Parameter(typeof(NetworkServiceOptions.NetworkServicePort))] int nsPort, - [Parameter(typeof(NamingConfigurationOptions.NameServerAddress))] string nameServerAddr, - [Parameter(typeof(NamingConfigurationOptions.NameServerPort))] int nameServerPort, - IObserver<NsMessage<T>> messageHandler, - IIdentifierFactory idFactory, - ICodec<T> codec) - { - _codec = new NsMessageCodec<T>(codec, idFactory); - - IPAddress localAddress = NetworkUtils.LocalIPAddress; - _remoteManager = new DefaultRemoteManager<NsMessage<T>>(localAddress, nsPort, _codec); - _messageHandler = messageHandler; - - NamingClient = new NameClient(nameServerAddr, nameServerPort); - _connectionMap = new Dictionary<IIdentifier, IConnection<T>>(); - - LOGGER.Log(Level.Info, "Started network service"); - } - - /// <summary> - /// Name client for registering ids - /// </summary> - public INameClient NamingClient { get; private set; } - - /// <summary> - /// Open a new connection to the remote host registered to - /// the name service with the given identifier - /// </summary> - /// <param name="destinationId">The identifier of the remote host</param> - /// <returns>The IConnection used for communication</returns> - public IConnection<T> NewConnection(IIdentifier destinationId) - { - if (_localIdentifier == null) - { - throw new IllegalStateException("Cannot open connection without first registering an ID"); - } - - IConnection<T> connection; - if (_connectionMap.TryGetValue(destinationId, out connection)) - { - return connection; - } - - connection = new NsConnection<T>(_localIdentifier, destinationId, - NamingClient, _remoteManager, _connectionMap); - - _connectionMap[destinationId] = connection; - return connection; - } - - /// <summary> - /// Register the identifier for the NetworkService with the NameService. - /// </summary> - /// <param name="id">The identifier to register</param> - public void Register(IIdentifier id) - { - LOGGER.Log(Level.Info, "Registering id {0} with network service.", id); - - _localIdentifier = id; - NamingClient.Register(id.ToString(), _remoteManager.LocalEndpoint); - - // Create and register incoming message handler - var anyEndpoint = new IPEndPoint(IPAddress.Any, 0); - _messageHandlerDisposable = _remoteManager.RegisterObserver(anyEndpoint, _messageHandler); - } - - /// <summary> - /// Unregister the identifier for the NetworkService with the NameService. - /// </summary> - public void Unregister() - { - if (_localIdentifier == null) - { - throw new IllegalStateException("Cannot unregister a non existant identifier"); - } - - NamingClient.Unregister(_localIdentifier.ToString()); - _localIdentifier = null; - _messageHandlerDisposable.Dispose(); - } - - /// <summary> - /// Dispose of the NetworkService's resources - /// </summary> - public void Dispose() - { - NamingClient.Dispose(); - _remoteManager.Dispose(); - - LOGGER.Log(Level.Info, "Disposed of network service"); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkServiceConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkServiceConfiguration.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkServiceConfiguration.cs deleted file mode 100644 index 29c4097..0000000 --- a/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkServiceConfiguration.cs +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Diagnostics.CodeAnalysis; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Org.Apache.REEF.Common.io; -using Org.Apache.REEF.IO.Network.Naming; -using Org.Apache.REEF.Tang.Formats; -using Org.Apache.REEF.Tang.Util; -using Org.Apache.REEF.Wake.Remote; - -namespace Org.Apache.REEF.IO.Network.NetworkService -{ - public class NetworkServiceConfiguration : ConfigurationModuleBuilder - { - [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] - public static readonly RequiredParameter<int> NetworkServicePort = new RequiredParameter<int>(); - - [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] - public static readonly RequiredImpl<ICodecFactory> NetworkServiceCodecFactory = new RequiredImpl<ICodecFactory>(); - - public static ConfigurationModule ConfigurationModule - { - get - { - return new NetworkServiceConfiguration() - .BindNamedParameter(GenericType<NetworkServiceOptions.NetworkServicePort>.Class, NetworkServicePort) - .BindNamedParameter(GenericType<NamingConfigurationOptions.NameServerPort>.Class, - NamingConfiguration.NameServerPort) - .BindNamedParameter(GenericType<NamingConfigurationOptions.NameServerAddress>.Class, - NamingConfiguration.NameServerAddress) - .BindImplementation(GenericType<ICodecFactory>.Class, NetworkServiceCodecFactory) - .Build(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkServiceOptions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkServiceOptions.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkServiceOptions.cs deleted file mode 100644 index dcf5bcc..0000000 --- a/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkServiceOptions.cs +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Wake; -using Org.Apache.REEF.Wake.Remote; - -namespace Org.Apache.REEF.IO.Network.NetworkService -{ - public class NetworkServiceOptions - { - [NamedParameter("Port of NetworkService", "NsPort", "0")] - public class NetworkServicePort : Name<int> - { - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/NetworkService/NsConnection.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/NsConnection.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/NsConnection.cs deleted file mode 100644 index 7e586d8..0000000 --- a/lang/cs/Source/REEF/reef-io/Network/NetworkService/NsConnection.cs +++ /dev/null @@ -1,139 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Globalization; -using System.IO; -using System.Net; -using System.Net.Sockets; -using System.Runtime.Remoting; -using Org.Apache.REEF.Common.io; -using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Tang.Exceptions; -using Org.Apache.REEF.Wake; -using Org.Apache.REEF.Wake.Remote; - -namespace Org.Apache.REEF.IO.Network.NetworkService -{ - /// <summary> - /// Represents a connection between two hosts using the NetworkService. - /// </summary> - public class NsConnection<T> : IConnection<T> - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(NsConnection<T>)); - - private IIdentifier _sourceId; - private IIdentifier _destId; - private INameClient _nameClient; - private IRemoteManager<NsMessage<T>> _remoteManager; - private Dictionary<IIdentifier, IConnection<T>> _connectionMap; - private IObserver<NsMessage<T>> _remoteSender; - - /// <summary> - /// Creates a new NsConnection between two hosts. - /// </summary> - /// <param name="sourceId">The identifier of the sender</param> - /// <param name="destId">The identifier of the receiver</param> - /// <param name="nameClient">The NameClient used for naming lookup</param> - /// <param name="remoteManager">The remote manager used for network communication</param> - /// <param name="connectionMap">A cache of opened connections. Will remove itself from - /// the cache when the NsConnection is disposed.</param> - public NsConnection( - IIdentifier sourceId, - IIdentifier destId, - INameClient nameClient, - IRemoteManager<NsMessage<T>> remoteManager, - Dictionary<IIdentifier, IConnection<T>> connectionMap) - { - _sourceId = sourceId; - _destId = destId; - _nameClient = nameClient; - _remoteManager = remoteManager; - _connectionMap = connectionMap; - } - - /// <summary> - /// Opens the connection to the remote host. - /// </summary> - public void Open() - { - string destStr = _destId.ToString(); - LOGGER.Log(Level.Verbose, "Network service opening connection to {0}...", destStr); - - IPEndPoint destAddr = _nameClient.Lookup(_destId.ToString()); - if (destAddr == null) - { - throw new RemotingException("Cannot register Identifier with NameService"); - } - - try - { - _remoteSender = _remoteManager.GetRemoteObserver(destAddr); - LOGGER.Log(Level.Verbose, "Network service completed connection to {0}.", destStr); - } - catch (SocketException) - { - LOGGER.Log(Level.Error, "Network Service cannot open connection to " + destAddr); - throw; - } - catch (ObjectDisposedException) - { - LOGGER.Log(Level.Error, "Network Service cannot open connection to " + destAddr); - throw; - } - } - - /// <summary> - /// Writes the object to the remote host. - /// </summary> - /// <param name="message">The message to send</param> - public void Write(T message) - { - if (_remoteSender == null) - { - throw new IllegalStateException("NsConnection has not been opened yet."); - } - - try - { - _remoteSender.OnNext(new NsMessage<T>(_sourceId, _destId, message)); - } - catch (IOException) - { - LOGGER.Log(Level.Error, "Network Service cannot write message to {0}", _destId); - throw; - } - catch (ObjectDisposedException) - { - LOGGER.Log(Level.Error, "Network Service cannot write message to {0}", _destId); - throw; - } - } - - /// <summary> - /// Closes the connection - /// </summary> - public void Dispose() - { - _connectionMap.Remove(_destId); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/NetworkService/NsMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/NsMessage.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/NsMessage.cs deleted file mode 100644 index 3c99ffd..0000000 --- a/lang/cs/Source/REEF/reef-io/Network/NetworkService/NsMessage.cs +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System.Collections.Generic; -using Org.Apache.REEF.Wake; - -namespace Org.Apache.REEF.IO.Network.NetworkService -{ - /// <summary> - /// Message sent between NetworkServices - /// </summary> - /// <typeparam name="T">The type of data being sent</typeparam> - public class NsMessage<T> - { - /// <summary> - /// Create a new NsMessage with no data. - /// </summary> - /// <param name="sourceId">The identifier of the sender</param> - /// <param name="destId">The identifier of the receiver</param> - public NsMessage(IIdentifier sourceId, IIdentifier destId) - { - SourceId = sourceId; - DestId = destId; - Data = new List<T>(); - } - - /// <summary> - /// Create a new NsMessage with data. - /// </summary> - /// <param name="sourceId">The identifier of the sender</param> - /// <param name="destId">The identifier of the receiver</param> - /// <param name="message">The message to send</param> - public NsMessage(IIdentifier sourceId, IIdentifier destId, T message) - { - SourceId = sourceId; - DestId = destId; - Data = new List<T> { message }; - } - - /// <summary> - /// The identifier of the sender of the message. - /// </summary> - public IIdentifier SourceId { get; private set; } - - /// <summary> - /// The identifier of the receiver of the message. - /// </summary> - public IIdentifier DestId { get; private set; } - - /// <summary> - /// A list of data being sent in the message. - /// </summary> - public List<T> Data { get; private set; } - } -}
