http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/Naming/NameLookupClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Naming/NameLookupClient.cs b/lang/cs/Org.Apache.REEF.Network/Naming/NameLookupClient.cs new file mode 100644 index 0000000..b8c4018 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Naming/NameLookupClient.cs @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Threading; +using Org.Apache.REEF.Common.io; +using Org.Apache.REEF.Network.Naming.Events; +using Org.Apache.REEF.Wake.Remote.Impl; + +namespace Org.Apache.REEF.Network.Naming +{ + /// <summary> + /// Helper class to send lookup events to the name server + /// </summary> + internal class NameLookupClient + { + private TransportClient<NamingEvent> _client; + private BlockingCollection<NamingLookupResponse> _lookupResponseQueue; + private BlockingCollection<NamingGetAllResponse> _getAllResponseQueue; + + /// <summary> + /// Constructs a new NameLookupClient. + /// </summary> + /// <param name="client">The transport client used to connect to the NameServer</param> + /// <param name="lookupQueue">The queue used to signal that a response + /// has been received from the NameServer</param> + /// <param name="getAllQueue">The queue used to signal that a GetAllResponse + /// has been received from the NameServer</param> + public NameLookupClient(TransportClient<NamingEvent> client, + BlockingCollection<NamingLookupResponse> lookupQueue, + BlockingCollection<NamingGetAllResponse> getAllQueue) + { + _client = client; + _lookupResponseQueue = lookupQueue; + _getAllResponseQueue = getAllQueue; + } + + /// <summary> + /// Look up the IPEndPoint that has been registered with the NameServer using + /// the given identifier as the key. + /// </summary> + /// <param name="id">The id for the IPEndPoint</param> + /// <param name="token">The cancellation token used for timeout</param> + /// <returns>The registered IPEndpoint, or null if the identifer has not + /// been registered with the NameServer or if the operation times out.</returns> + public IPEndPoint Lookup(string id, CancellationToken token) + { + List<string> ids = new List<string> { id }; + List<NameAssignment> assignment = Lookup(ids); + return (assignment == null || assignment.Count == 0) ? null : assignment.First().Endpoint; + } + + /// <summary> + /// Look up IPEndPoints that have been registered with the NameService + /// </summary> + /// <param name="ids">The list of ids to look up</param> + /// <returns>A list of NameAssignments representing the mapped identifier/IPEndpoint + /// pairs</returns> + public List<NameAssignment> Lookup(List<string> ids) + { + _client.Send(new NamingLookupRequest(ids)); + NamingLookupResponse response = _lookupResponseQueue.Take(); + return response.NameAssignments; + } + + /// <summary> + /// Synchronously gets all of the identifier/IPEndpoint pairs registered with the NameService. + /// </summary> + /// <returns>A list of NameAssignments representing the mapped identifier/IPEndpoint + /// pairs</returns> + public List<NameAssignment> GetAll() + { + _client.Send(new NamingGetAllRequest()); + NamingGetAllResponse response = _getAllResponseQueue.Take(); + return response.Assignments; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/Naming/NameRegisterClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Naming/NameRegisterClient.cs b/lang/cs/Org.Apache.REEF.Network/Naming/NameRegisterClient.cs new file mode 100644 index 0000000..a18cb31 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Naming/NameRegisterClient.cs @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Concurrent; +using System.Net; +using Org.Apache.REEF.Common.io; +using Org.Apache.REEF.Network.Naming.Events; +using Org.Apache.REEF.Wake.Remote.Impl; + +namespace Org.Apache.REEF.Network.Naming +{ + /// <summary> + /// Helper class to send register and unregister events to the NameServer. + /// </summary> + internal class NameRegisterClient + { + private TransportClient<NamingEvent> _client; + private BlockingCollection<NamingRegisterResponse> _registerResponseQueue; + private BlockingCollection<NamingUnregisterResponse> _unregisterResponseQueue; + + public NameRegisterClient(TransportClient<NamingEvent> client, + BlockingCollection<NamingRegisterResponse> registerQueue, + BlockingCollection<NamingUnregisterResponse> unregisterQueue) + { + _client = client; + _registerResponseQueue = registerQueue; + _unregisterResponseQueue = unregisterQueue; + } + + /// <summary> + /// Synchronously register the id and endpoint with the NameServer. + /// </summary> + /// <param name="id">The identifier</param> + /// <param name="endpoint">The endpoint</param> + public void Register(string id, IPEndPoint endpoint) + { + NameAssignment assignment = new NameAssignment(id, endpoint); + _client.Send(new NamingRegisterRequest(assignment)); + _registerResponseQueue.Take(); + } + + /// <summary> + /// Synchronously unregisters the identifier with the NameServer. + /// </summary> + /// <param name="id">The identifer to unregister</param> + public void Unregister(string id) + { + _client.Send(new NamingUnregisterRequest(id)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/Naming/NameServer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Naming/NameServer.cs b/lang/cs/Org.Apache.REEF.Network/Naming/NameServer.cs new file mode 100644 index 0000000..26207e0 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Naming/NameServer.cs @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.io; +using Org.Apache.REEF.Network.Naming.Codec; +using Org.Apache.REEF.Network.Naming.Events; +using Org.Apache.REEF.Network.Naming.Observers; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.Remote.Impl; +using Org.Apache.REEF.Wake.RX; +using Org.Apache.REEF.Wake.RX.Impl; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; + +namespace Org.Apache.REEF.Network.Naming +{ + /// <summary> + /// Service that manages names and IPEndpoints for well known hosts. + /// Can register, unregister, and look up IPAddresses using a string identifier. + /// </summary> + public class NameServer : INameServer + { + private static Logger _logger = Logger.GetLogger(typeof(NameServer)); + + private TransportServer<NamingEvent> _server; + private Dictionary<string, IPEndPoint> _idToAddrMap; + + /// <summary> + /// Create a new NameServer to run on the specified port. + /// </summary> + /// <param name="port">The port to listen for incoming connections on.</param> + [Inject] + public NameServer([Parameter(typeof(NamingConfigurationOptions.NameServerPort))] int port) + { + IObserver<TransportEvent<NamingEvent>> handler = CreateServerHandler(); + _idToAddrMap = new Dictionary<string, IPEndPoint>(); + ICodec<NamingEvent> codec = CreateServerCodec(); + + // Start transport server, get listening IP endpoint + _logger.Log(Level.Info, "Starting naming server"); + _server = new TransportServer<NamingEvent>(port, handler, codec); + _server.Run(); + LocalEndpoint = _server.LocalEndpoint; + } + + public IPEndPoint LocalEndpoint { get; private set; } + + /// <summary> + /// Looks up the IPEndpoints for each string identifier + /// </summary> + /// <param name="ids">The IDs to look up</param> + /// <returns>A list of Name assignments representing the identifier + /// that was searched for and the mapped IPEndpoint</returns> + public List<NameAssignment> Lookup(List<string> ids) + { + if (ids == null) + { + Exceptions.Throw(new ArgumentNullException("ids"), _logger); + } + + return ids.Where(id => _idToAddrMap.ContainsKey(id)) + .Select(id => new NameAssignment(id, _idToAddrMap[id])) + .ToList(); + } + + /// <summary> + /// Gets all of the registered identifier/endpoint pairs. + /// </summary> + /// <returns>A list of all of the registered identifiers and their + /// mapped IPEndpoints</returns> + public List<NameAssignment> GetAll() + { + return _idToAddrMap.Select(pair => new NameAssignment(pair.Key, pair.Value)).ToList(); + } + + /// <summary> + /// Registers the string identifier with the given IPEndpoint + /// </summary> + /// <param name="id">The string ident</param> + /// <param name="endpoint">The mapped endpoint</param> + public void Register(string id, IPEndPoint endpoint) + { + if (id == null) + { + Exceptions.Throw(new ArgumentNullException("id"), _logger); + } + if (endpoint == null) + { + Exceptions.Throw(new ArgumentNullException("endpoint"), _logger); + } + + _logger.Log(Level.Info, "Registering id: " + id + ", and endpoint: " + endpoint); + _idToAddrMap[id] = endpoint; + } + + /// <summary> + /// Unregister the given identifier with the NameServer + /// </summary> + /// <param name="id">The identifier to unregister</param> + public void Unregister(string id) + { + if (id == null) + { + Exceptions.Throw(new ArgumentNullException("id"), _logger); + } + + _logger.Log(Level.Info, "Unregistering id: " + id); + _idToAddrMap.Remove(id); + } + + /// <summary> + /// Stops the NameServer + /// </summary> + public void Dispose() + { + _server.Dispose(); + } + + /// <summary> + /// Create the handler to manage incoming NamingEvent types + /// </summary> + /// <returns>The server handler</returns> + private IObserver<TransportEvent<NamingEvent>> CreateServerHandler() + { + PubSubSubject<NamingEvent> subject = new PubSubSubject<NamingEvent>(); + subject.Subscribe(new NamingLookupRequestObserver(this)); + subject.Subscribe(new NamingGetAllRequestObserver(this)); + subject.Subscribe(new NamingRegisterRequestObserver(this)); + subject.Subscribe(new NamingUnregisterRequestObserver(this)); + return new ServerHandler(subject); + } + + /// <summary> + /// Create the codec used to serialize/deserialize NamingEvent messages + /// </summary> + /// <returns>The serialization codec</returns> + private ICodec<NamingEvent> CreateServerCodec() + { + MultiCodec<NamingEvent> codec = new MultiCodec<NamingEvent>(); + codec.Register(new NamingLookupRequestCodec(), "org.apache.reef.io.network.naming.serialization.NamingLookupRequest"); + codec.Register(new NamingLookupResponseCodec(), "org.apache.reef.io.network.naming.serialization.NamingLookupResponse"); + NamingRegisterRequestCodec requestCodec = new NamingRegisterRequestCodec(); + codec.Register(requestCodec, "org.apache.reef.io.network.naming.serialization.NamingRegisterRequest"); + codec.Register(new NamingRegisterResponseCodec(requestCodec), "org.apache.reef.io.network.naming.serialization.NamingRegisterResponse"); + codec.Register(new NamingUnregisterRequestCodec(), "org.apache.reef.io.network.naming.serialization.NamingUnregisterRequest"); + return codec; + } + + [NamedParameter("Port for the NameServer to listen on")] + public class Port : Name<int> + { + } + + /// <summary> + /// Class used to handle incoming NamingEvent messages. + /// Delegates the event to the prescribed handler depending on its type + /// </summary> + private class ServerHandler : AbstractObserver<TransportEvent<NamingEvent>> + { + private IObserver<NamingEvent> _handler; + + public ServerHandler(IObserver<NamingEvent> handler) + { + _handler = handler; + } + + public override void OnNext(TransportEvent<NamingEvent> value) + { + NamingEvent message = value.Data; + message.Link = value.Link; + _handler.OnNext(message); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/Naming/NamingConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Naming/NamingConfiguration.cs b/lang/cs/Org.Apache.REEF.Network/Naming/NamingConfiguration.cs new file mode 100644 index 0000000..3daac70 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Naming/NamingConfiguration.cs @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Util; + +namespace Org.Apache.REEF.Naming +{ + public class NamingConfiguration : ConfigurationModuleBuilder + { + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly RequiredParameter<string> NameServerAddress = new RequiredParameter<string>(); + + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly RequiredParameter<int> NameServerPort = new RequiredParameter<int>(); + + public static ConfigurationModule ConfigurationModule + { + get + { + return new NamingConfiguration() + .BindNamedParameter(GenericType<NamingConfigurationOptions.NameServerAddress>.Class, NameServerAddress) + .BindNamedParameter(GenericType<NamingConfigurationOptions.NameServerPort>.Class, NameServerPort) + .Build(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/Naming/NamingConfigurationOptions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Naming/NamingConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Network/Naming/NamingConfigurationOptions.cs new file mode 100644 index 0000000..aa9b6e6 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Naming/NamingConfigurationOptions.cs @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Naming +{ + public class NamingConfigurationOptions + { + [NamedParameter("IP address of NameServer")] + public class NameServerAddress : Name<string> + { + } + + [NamedParameter("Port of NameServer")] + public class NameServerPort : Name<int> + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingGetAllRequestObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingGetAllRequestObserver.cs b/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingGetAllRequestObserver.cs new file mode 100644 index 0000000..df3a4a9 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingGetAllRequestObserver.cs @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Generic; +using Org.Apache.REEF.Common.io; +using Org.Apache.REEF.Network.Naming.Events; +using Org.Apache.REEF.Wake.RX; + +namespace Org.Apache.REEF.Network.Naming.Observers +{ + /// <summary> + /// Handler for NameService for events of type NamingGetAllRequest. + /// Gets all of the identifiers and their mapped IPEndpoints registered + /// with the NameServer. + /// </summary> + internal class NamingGetAllRequestObserver : AbstractObserver<NamingGetAllRequest> + { + private NameServer _server; + + public NamingGetAllRequestObserver(NameServer server) + { + _server = server; + } + + public override void OnNext(NamingGetAllRequest value) + { + List<NameAssignment> assignments = _server.GetAll(); + value.Link.Write(new NamingGetAllResponse(assignments)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingLookupRequestObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingLookupRequestObserver.cs b/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingLookupRequestObserver.cs new file mode 100644 index 0000000..21c602d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingLookupRequestObserver.cs @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.io; +using Org.Apache.REEF.Network.Naming.Events; +using Org.Apache.REEF.Wake.RX; +using System.Collections.Generic; + +namespace Org.Apache.REEF.Network.Naming.Observers +{ + /// <summary> + /// Handler for looking up IPEndpoints registered with the NameServer + /// </summary> + internal class NamingLookupRequestObserver : AbstractObserver<NamingLookupRequest> + { + private NameServer _server; + + public NamingLookupRequestObserver(NameServer server) + { + _server = server; + } + + /// <summary> + /// Look up the IPEndpoints for the given identifiers and write them + /// back to the NameClient + /// </summary> + /// <param name="value">The lookup request event</param> + public override void OnNext(NamingLookupRequest value) + { + List<NameAssignment> assignments = _server.Lookup(value.Identifiers); + value.Link.Write(new NamingLookupResponse(assignments)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingRegisterRequestObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingRegisterRequestObserver.cs b/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingRegisterRequestObserver.cs new file mode 100644 index 0000000..8ab8f6c --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingRegisterRequestObserver.cs @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.io; +using Org.Apache.REEF.Network.Naming.Events; +using Org.Apache.REEF.Wake.RX; + +namespace Org.Apache.REEF.Network.Naming.Observers +{ + /// <summary> + /// Handler for registering an identifier and endpoint with the Name Service + /// </summary> + internal class NamingRegisterRequestObserver : AbstractObserver<NamingRegisterRequest> + { + private NameServer _server; + + public NamingRegisterRequestObserver(NameServer server) + { + _server = server; + } + + /// <summary> + /// Register the identifier and IPEndpoint with the NameServer and send + /// the response back to the NameClient + /// </summary> + /// <param name="value">The register request event</param> + public override void OnNext(NamingRegisterRequest value) + { + NameAssignment assignment = value.NameAssignment; + _server.Register(assignment.Identifier, assignment.Endpoint); + + value.Link.Write(new NamingRegisterResponse(value)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingUnregisterRequestObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingUnregisterRequestObserver.cs b/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingUnregisterRequestObserver.cs new file mode 100644 index 0000000..6127a4d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingUnregisterRequestObserver.cs @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Network.Naming.Events; +using Org.Apache.REEF.Wake.RX; + +namespace Org.Apache.REEF.Network.Naming.Observers +{ + /// <summary> + /// Handler for unregistering an identifier with the NameServer + /// </summary> + internal class NamingUnregisterRequestObserver : AbstractObserver<NamingUnregisterRequest> + { + private NameServer _server; + + public NamingUnregisterRequestObserver(NameServer server) + { + _server = server; + } + + /// <summary> + /// Unregister the identifer with the NameServer. + /// </summary> + /// <param name="value">The unregister request event</param> + public override void OnNext(NamingUnregisterRequest value) + { + // Don't send a response + _server.Unregister(value.Identifier); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/ControlMessageCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/ControlMessageCodec.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/ControlMessageCodec.cs new file mode 100644 index 0000000..471e651 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/ControlMessageCodec.cs @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Network.NetworkService.Codec +{ + public class ControlMessageCodec : ICodec<ControlMessage> + { + [Inject] + public ControlMessageCodec() + { + } + + public byte[] Encode(ControlMessage message) + { + return BitConverter.GetBytes((int) message); + } + + public ControlMessage Decode(byte[] data) + { + return (ControlMessage) BitConverter.ToInt32(data, 0); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageCodec.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageCodec.cs new file mode 100644 index 0000000..d01a7ae --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageCodec.cs @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Wake; +using Org.Apache.REEF.Wake.Remote; +using ProtoBuf; +using System; +using System.IO; +using System.Linq; + +namespace Org.Apache.REEF.Network.NetworkService.Codec +{ + /// <summary> + /// Codec to serialize NsMessages for NetworkService. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + public class NsMessageCodec<T> : ICodec<NsMessage<T>> + { + private ICodec<T> _codec; + private IIdentifierFactory _idFactory; + + /// <summary> + /// Create new NsMessageCodec. + /// </summary> + /// <param name="codec">The codec used to serialize message data</param> + /// <param name="idFactory">Used to create identifier from string.</param> + public NsMessageCodec(ICodec<T> codec, IIdentifierFactory idFactory) + { + _codec = codec; + _idFactory = idFactory; + } + + /// <summary> + /// Serialize the NsMessage. + /// </summary> + /// <param name="obj">The object to serialize</param> + /// <returns>The serialized object in byte array form</returns> + public byte[] Encode(NsMessage<T> obj) + { + NsMessageProto proto = NsMessageProto.Create(obj, _codec); + using (var stream = new MemoryStream()) + { + Serializer.Serialize(stream, proto); + return stream.ToArray(); + } + } + + /// <summary> + /// Deserialize the byte array into NsMessage. + /// </summary> + /// <param name="data">The serialized byte array</param> + /// <returns>The deserialized NsMessage</returns> + public NsMessage<T> Decode(byte[] data) + { + using (var stream = new MemoryStream(data)) + { + NsMessageProto proto = Serializer.Deserialize<NsMessageProto>(stream); + + IIdentifier sourceId = _idFactory.Create(proto.SourceId); + IIdentifier destId = _idFactory.Create(proto.DestId); + NsMessage<T> message = new NsMessage<T>(sourceId, destId); + + var messages = proto.Data.Select(byteArr => _codec.Decode(byteArr)); + message.Data.AddRange(messages); + return message; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageProto.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageProto.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageProto.cs new file mode 100644 index 0000000..0de8be1 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageProto.cs @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.Serialization; +using System.Text; +using System.Threading.Tasks; +using Org.Apache.REEF.Wake.Remote; +using ProtoBuf; + +namespace Org.Apache.REEF.Network.NetworkService.Codec +{ + [ProtoContract] + public class NsMessageProto + { + public NsMessageProto() + { + Data = new List<byte[]>(); + } + + [ProtoMember(1)] + public string SourceId { get; set; } + + [ProtoMember(2)] + public string DestId { get; set; } + + [ProtoMember(3)] + public List<byte[]> Data { get; set; } + + public static NsMessageProto Create<T>(NsMessage<T> message, ICodec<T> codec) + { + NsMessageProto proto = new NsMessageProto(); + + proto.SourceId = message.SourceId.ToString(); + proto.DestId = message.DestId.ToString(); + + foreach (T item in message.Data) + { + proto.Data.Add(codec.Encode(item)); + } + + return proto; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/NetworkService/ControlMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/ControlMessage.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/ControlMessage.cs new file mode 100644 index 0000000..bcc7a8c --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/ControlMessage.cs @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Network.NetworkService +{ + public enum ControlMessage + { + /// <summary> + /// default state + /// </summary> + UNDEFINED = 0, + + /// <summary> + /// expecting data to be sent/received + /// </summary> + RECEIVE = 1, + + /// <summary> + /// stop group communications + /// </summary> + STOP = 2, + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/NetworkService/IConnection.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/IConnection.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/IConnection.cs new file mode 100644 index 0000000..78b9c37 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/IConnection.cs @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; + +namespace Org.Apache.REEF.Network.NetworkService +{ + /// <summary> + /// Represents a connection between two endpoints named by identifiers + /// </summary> + public interface IConnection<T> : IDisposable + { + /// <summary> + /// Opens the connection + /// </summary> + void Open(); + + /// <summary> + /// Writes the object to the connection + /// </summary> + /// <param name="obj">The message to send</param> + void Write(T obj); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/NetworkService/INetworkService.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/INetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/INetworkService.cs new file mode 100644 index 0000000..a4b845a --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/INetworkService.cs @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using Org.Apache.REEF.Common.io; +using Org.Apache.REEF.Services; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake; + +namespace Org.Apache.REEF.Network.NetworkService +{ + /// <summary> + /// Network service used for Reef Task communication. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + public interface INetworkService<T> : IService, IDisposable + { + /// <summary> + /// Name client for registering ids + /// </summary> + INameClient NamingClient { get; } + + /// <summary> + /// Open a new connection to the remote host registered to + /// the name service with the given identifier + /// </summary> + /// <param name="destinationId">The identifier of the remote host</param> + /// <returns>The IConnection used for communication</returns> + IConnection<T> NewConnection(IIdentifier destinationId); + + /// <summary> + /// Register the identifier for the NetworkService with the NameService. + /// </summary> + /// <param name="id">The identifier to register</param> + void Register(IIdentifier id); + + /// <summary> + /// Unregister the identifier for the NetworkService with the NameService. + /// </summary> + void Unregister(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs new file mode 100644 index 0000000..57eae81 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Reactive; +using Org.Apache.REEF.Common.io; +using Org.Apache.REEF.Network.Naming; +using Org.Apache.REEF.Network.NetworkService.Codec; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Exceptions; +using Org.Apache.REEF.Wake; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.Remote.Impl; +using Org.Apache.REEF.Wake.Util; + +namespace Org.Apache.REEF.Network.NetworkService +{ + /// <summary> + /// Network service used for Reef Task communication. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + public class NetworkService<T> : INetworkService<T> + { + private Logger LOGGER = Logger.GetLogger(typeof(NetworkService<>)); + + private IRemoteManager<NsMessage<T>> _remoteManager; + private IObserver<NsMessage<T>> _messageHandler; + private ICodec<NsMessage<T>> _codec; + private IIdentifier _localIdentifier; + private IDisposable _messageHandlerDisposable; + private Dictionary<IIdentifier, IConnection<T>> _connectionMap; + + /// <summary> + /// Create a new NetworkFactory. + /// </summary> + /// <param name="nsPort">The port that the NetworkService will listen on</param> + /// <param name="nameServerAddr">The address of the NameServer</param> + /// <param name="nameServerPort">The port of the NameServer</param> + /// <param name="messageHandler">The observer to handle incoming messages</param> + /// <param name="idFactory">The factory used to create IIdentifiers</param> + /// <param name="codec">The codec used for serialization</param> + [Inject] + public NetworkService( + [Parameter(typeof(NetworkServiceOptions.NetworkServicePort))] int nsPort, + [Parameter(typeof(NamingConfigurationOptions.NameServerAddress))] string nameServerAddr, + [Parameter(typeof(NamingConfigurationOptions.NameServerPort))] int nameServerPort, + IObserver<NsMessage<T>> messageHandler, + IIdentifierFactory idFactory, + ICodec<T> codec) + { + _codec = new NsMessageCodec<T>(codec, idFactory); + + IPAddress localAddress = NetworkUtils.LocalIPAddress; + _remoteManager = new DefaultRemoteManager<NsMessage<T>>(localAddress, nsPort, _codec); + _messageHandler = messageHandler; + + NamingClient = new NameClient(nameServerAddr, nameServerPort); + _connectionMap = new Dictionary<IIdentifier, IConnection<T>>(); + + LOGGER.Log(Level.Info, "Started network service"); + } + + /// <summary> + /// Name client for registering ids + /// </summary> + public INameClient NamingClient { get; private set; } + + /// <summary> + /// Open a new connection to the remote host registered to + /// the name service with the given identifier + /// </summary> + /// <param name="destinationId">The identifier of the remote host</param> + /// <returns>The IConnection used for communication</returns> + public IConnection<T> NewConnection(IIdentifier destinationId) + { + if (_localIdentifier == null) + { + throw new IllegalStateException("Cannot open connection without first registering an ID"); + } + + IConnection<T> connection; + if (_connectionMap.TryGetValue(destinationId, out connection)) + { + return connection; + } + + connection = new NsConnection<T>(_localIdentifier, destinationId, + NamingClient, _remoteManager, _connectionMap); + + _connectionMap[destinationId] = connection; + return connection; + } + + /// <summary> + /// Register the identifier for the NetworkService with the NameService. + /// </summary> + /// <param name="id">The identifier to register</param> + public void Register(IIdentifier id) + { + LOGGER.Log(Level.Info, "Registering id {0} with network service.", id); + + _localIdentifier = id; + NamingClient.Register(id.ToString(), _remoteManager.LocalEndpoint); + + // Create and register incoming message handler + var anyEndpoint = new IPEndPoint(IPAddress.Any, 0); + _messageHandlerDisposable = _remoteManager.RegisterObserver(anyEndpoint, _messageHandler); + } + + /// <summary> + /// Unregister the identifier for the NetworkService with the NameService. + /// </summary> + public void Unregister() + { + if (_localIdentifier == null) + { + throw new IllegalStateException("Cannot unregister a non existant identifier"); + } + + NamingClient.Unregister(_localIdentifier.ToString()); + _localIdentifier = null; + _messageHandlerDisposable.Dispose(); + } + + /// <summary> + /// Dispose of the NetworkService's resources + /// </summary> + public void Dispose() + { + NamingClient.Dispose(); + _remoteManager.Dispose(); + + LOGGER.Log(Level.Info, "Disposed of network service"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkServiceConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkServiceConfiguration.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkServiceConfiguration.cs new file mode 100644 index 0000000..003260c --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkServiceConfiguration.cs @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Org.Apache.REEF.Common.io; +using Org.Apache.REEF.Network.Naming; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Network.NetworkService +{ + public class NetworkServiceConfiguration : ConfigurationModuleBuilder + { + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly RequiredParameter<int> NetworkServicePort = new RequiredParameter<int>(); + + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly RequiredImpl<ICodecFactory> NetworkServiceCodecFactory = new RequiredImpl<ICodecFactory>(); + + public static ConfigurationModule ConfigurationModule + { + get + { + return new NetworkServiceConfiguration() + .BindNamedParameter(GenericType<NetworkServiceOptions.NetworkServicePort>.Class, NetworkServicePort) + .BindNamedParameter(GenericType<NamingConfigurationOptions.NameServerPort>.Class, + NamingConfiguration.NameServerPort) + .BindNamedParameter(GenericType<NamingConfigurationOptions.NameServerAddress>.Class, + NamingConfiguration.NameServerAddress) + .BindImplementation(GenericType<ICodecFactory>.Class, NetworkServiceCodecFactory) + .Build(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkServiceOptions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkServiceOptions.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkServiceOptions.cs new file mode 100644 index 0000000..008751d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkServiceOptions.cs @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Network.NetworkService +{ + public class NetworkServiceOptions + { + [NamedParameter("Port of NetworkService", "NsPort", "0")] + public class NetworkServicePort : Name<int> + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/NetworkService/NsConnection.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/NsConnection.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/NsConnection.cs new file mode 100644 index 0000000..5465faa --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/NsConnection.cs @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Globalization; +using System.IO; +using System.Net; +using System.Net.Sockets; +using System.Runtime.Remoting; +using Org.Apache.REEF.Common.io; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Exceptions; +using Org.Apache.REEF.Wake; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Network.NetworkService +{ + /// <summary> + /// Represents a connection between two hosts using the NetworkService. + /// </summary> + public class NsConnection<T> : IConnection<T> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(NsConnection<T>)); + + private IIdentifier _sourceId; + private IIdentifier _destId; + private INameClient _nameClient; + private IRemoteManager<NsMessage<T>> _remoteManager; + private Dictionary<IIdentifier, IConnection<T>> _connectionMap; + private IObserver<NsMessage<T>> _remoteSender; + + /// <summary> + /// Creates a new NsConnection between two hosts. + /// </summary> + /// <param name="sourceId">The identifier of the sender</param> + /// <param name="destId">The identifier of the receiver</param> + /// <param name="nameClient">The NameClient used for naming lookup</param> + /// <param name="remoteManager">The remote manager used for network communication</param> + /// <param name="connectionMap">A cache of opened connections. Will remove itself from + /// the cache when the NsConnection is disposed.</param> + public NsConnection( + IIdentifier sourceId, + IIdentifier destId, + INameClient nameClient, + IRemoteManager<NsMessage<T>> remoteManager, + Dictionary<IIdentifier, IConnection<T>> connectionMap) + { + _sourceId = sourceId; + _destId = destId; + _nameClient = nameClient; + _remoteManager = remoteManager; + _connectionMap = connectionMap; + } + + /// <summary> + /// Opens the connection to the remote host. + /// </summary> + public void Open() + { + string destStr = _destId.ToString(); + LOGGER.Log(Level.Verbose, "Network service opening connection to {0}...", destStr); + + IPEndPoint destAddr = _nameClient.Lookup(_destId.ToString()); + if (destAddr == null) + { + throw new RemotingException("Cannot register Identifier with NameService"); + } + + try + { + _remoteSender = _remoteManager.GetRemoteObserver(destAddr); + LOGGER.Log(Level.Verbose, "Network service completed connection to {0}.", destStr); + } + catch (SocketException) + { + LOGGER.Log(Level.Error, "Network Service cannot open connection to " + destAddr); + throw; + } + catch (ObjectDisposedException) + { + LOGGER.Log(Level.Error, "Network Service cannot open connection to " + destAddr); + throw; + } + } + + /// <summary> + /// Writes the object to the remote host. + /// </summary> + /// <param name="message">The message to send</param> + public void Write(T message) + { + if (_remoteSender == null) + { + throw new IllegalStateException("NsConnection has not been opened yet."); + } + + try + { + _remoteSender.OnNext(new NsMessage<T>(_sourceId, _destId, message)); + } + catch (IOException) + { + LOGGER.Log(Level.Error, "Network Service cannot write message to {0}", _destId); + throw; + } + catch (ObjectDisposedException) + { + LOGGER.Log(Level.Error, "Network Service cannot write message to {0}", _destId); + throw; + } + } + + /// <summary> + /// Closes the connection + /// </summary> + public void Dispose() + { + _connectionMap.Remove(_destId); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/NetworkService/NsMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/NsMessage.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/NsMessage.cs new file mode 100644 index 0000000..0eba888 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/NsMessage.cs @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Generic; +using Org.Apache.REEF.Wake; + +namespace Org.Apache.REEF.Network.NetworkService +{ + /// <summary> + /// Message sent between NetworkServices + /// </summary> + /// <typeparam name="T">The type of data being sent</typeparam> + public class NsMessage<T> + { + /// <summary> + /// Create a new NsMessage with no data. + /// </summary> + /// <param name="sourceId">The identifier of the sender</param> + /// <param name="destId">The identifier of the receiver</param> + public NsMessage(IIdentifier sourceId, IIdentifier destId) + { + SourceId = sourceId; + DestId = destId; + Data = new List<T>(); + } + + /// <summary> + /// Create a new NsMessage with data. + /// </summary> + /// <param name="sourceId">The identifier of the sender</param> + /// <param name="destId">The identifier of the receiver</param> + /// <param name="message">The message to send</param> + public NsMessage(IIdentifier sourceId, IIdentifier destId, T message) + { + SourceId = sourceId; + DestId = destId; + Data = new List<T> { message }; + } + + /// <summary> + /// The identifier of the sender of the message. + /// </summary> + public IIdentifier SourceId { get; private set; } + + /// <summary> + /// The identifier of the receiver of the message. + /// </summary> + public IIdentifier DestId { get; private set; } + + /// <summary> + /// A list of data being sent in the message. + /// </summary> + public List<T> Data { get; private set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj new file mode 100644 index 0000000..6824277 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj @@ -0,0 +1,184 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" /> + <PropertyGroup> + <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration> + <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform> + <ProjectGuid>{883CE800-6A6A-4E0A-B7FE-C054F4F2C1DC}</ProjectGuid> + <OutputType>Library</OutputType> + <AppDesignerFolder>Properties</AppDesignerFolder> + <RootNamespace>Org.Apache.REEF.Network</RootNamespace> + <AssemblyName>Org.Apache.REEF.Network</AssemblyName> + <TargetFrameworkVersion>v4.5</TargetFrameworkVersion> + <FileAlignment>512</FileAlignment> + <RestorePackages>true</RestorePackages> + <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\</SolutionDir> + </PropertyGroup> + <Import Project="$(SolutionDir)\Source\build.props" /> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' "> + <PlatformTarget>AnyCPU</PlatformTarget> + <DebugSymbols>true</DebugSymbols> + <DebugType>full</DebugType> + <Optimize>false</Optimize> + <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath> + <DefineConstants>DEBUG;TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x64' "> + <PlatformTarget>AnyCPU</PlatformTarget> + <DebugSymbols>true</DebugSymbols> + <DebugType>full</DebugType> + <Optimize>false</Optimize> + <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath> + <DefineConstants>DEBUG;TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' "> + <PlatformTarget>AnyCPU</PlatformTarget> + <DebugType>pdbonly</DebugType> + <Optimize>true</Optimize> + <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath> + <DefineConstants>TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|x64' "> + <PlatformTarget>AnyCPU</PlatformTarget> + <DebugType>pdbonly</DebugType> + <Optimize>true</Optimize> + <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath> + <DefineConstants>TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + </PropertyGroup> + <ItemGroup> + <Reference Include="Microsoft.Hadoop.Avro"> + <HintPath>$(PackagesDir)\Microsoft.Hadoop.Avro.$(AvroVersion)\lib\net40\Microsoft.Hadoop.Avro.dll</HintPath> + </Reference> + <Reference Include="Newtonsoft.Json"> + <HintPath>$(PackagesDir)\Newtonsoft.Json.$(NewtonsoftJsonVersion)\lib\net45\Newtonsoft.Json.dll</HintPath> + </Reference> + <Reference Include="protobuf-net"> + <HintPath>$(PackagesDir)\protobuf-net.$(ProtobufVersion)\lib\net40\protobuf-net.dll</HintPath> + </Reference> + <Reference Include="System.Reactive.Core"> + <HintPath>$(PackagesDir)\Rx-Core.$(RxVersion)\lib\net45\System.Reactive.Core.dll</HintPath> + </Reference> + <Reference Include="System.Reactive.Interfaces"> + <HintPath>$(PackagesDir)\Rx-Interfaces.$(RxVersion)\lib\net45\System.Reactive.Interfaces.dll</HintPath> + </Reference> + <Reference Include="System" /> + <Reference Include="System.Core" /> + <Reference Include="System.Runtime.Serialization" /> + <Reference Include="System.Xml.Linq" /> + <Reference Include="System.Data.DataSetExtensions" /> + <Reference Include="Microsoft.CSharp" /> + <Reference Include="System.Data" /> + <Reference Include="System.Xml" /> + </ItemGroup> + <ItemGroup> + <Compile Include="Naming\Codec\NamingLookupRequestCodec.cs" /> + <Compile Include="Naming\Codec\NamingLookupResponseCodec.cs" /> + <Compile Include="Naming\Codec\NamingRegisterRequestCodec.cs" /> + <Compile Include="Naming\Codec\NamingRegisterResponseCodec.cs" /> + <Compile Include="Naming\Codec\NamingUnregisterRequestCodec.cs" /> + <Compile Include="Naming\Contracts\AvroNamingAssignment.cs" /> + <Compile Include="Naming\Contracts\AvroNamingLookupRequest.cs" /> + <Compile Include="Naming\Contracts\AvroNamingLookupResponse.cs" /> + <Compile Include="Naming\Contracts\AvroNamingRegisterRequest.cs" /> + <Compile Include="Naming\Contracts\AvroNamingUnRegisterRequest.cs" /> + <Compile Include="Naming\Events\NamingEvent.cs" /> + <Compile Include="Naming\Events\NamingGetAllRequest.cs" /> + <Compile Include="Naming\Events\NamingGetAllResponse.cs" /> + <Compile Include="Naming\Events\NamingLookupRequest.cs" /> + <Compile Include="Naming\Events\NamingLookupResponse.cs" /> + <Compile Include="Naming\Events\NamingRegisterRequest.cs" /> + <Compile Include="Naming\Events\NamingRegisterResponse.cs" /> + <Compile Include="Naming\Events\NamingUnregisterRequest.cs" /> + <Compile Include="Naming\Events\NamingUnregisterResponse.cs" /> + <Compile Include="Naming\INameServer.cs" /> + <Compile Include="Naming\NameClient.cs" /> + <Compile Include="Naming\NameLookupClient.cs" /> + <Compile Include="Naming\NameRegisterClient.cs" /> + <Compile Include="Naming\NameServer.cs" /> + <Compile Include="Naming\NamingConfiguration.cs"> + <SubType>Code</SubType> + </Compile> + <Compile Include="Naming\NamingConfigurationOptions.cs"> + <SubType>Code</SubType> + </Compile> + <Compile Include="Naming\Observers\NamingGetAllRequestObserver.cs" /> + <Compile Include="Naming\Observers\NamingLookupRequestObserver.cs" /> + <Compile Include="Naming\Observers\NamingRegisterRequestObserver.cs" /> + <Compile Include="Naming\Observers\NamingUnregisterRequestObserver.cs" /> + <Compile Include="NetworkService\Codec\ControlMessageCodec.cs" /> + <Compile Include="NetworkService\Codec\NsMessageCodec.cs" /> + <Compile Include="NetworkService\Codec\NsMessageProto.cs" /> + <Compile Include="NetworkService\ControlMessage.cs" /> + <Compile Include="NetworkService\IConnection.cs" /> + <Compile Include="NetworkService\INetworkService.cs" /> + <Compile Include="NetworkService\NetworkService.cs" /> + <Compile Include="NetworkService\NetworkServiceConfiguration.cs" /> + <Compile Include="NetworkService\NetworkServiceOptions.cs" /> + <Compile Include="NetworkService\NsConnection.cs" /> + <Compile Include="NetworkService\NsMessage.cs" /> + <Compile Include="Properties\AssemblyInfo.cs" /> + <Compile Include="Utilities\BlockingCollectionExtensions.cs" /> + <Compile Include="Utilities\Utils.cs" /> + </ItemGroup> + <ItemGroup> + <None Include="packages.config" /> + </ItemGroup> + <ItemGroup> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj"> + <Project>{97dbb573-3994-417a-9f69-ffa25f00d2a6}</Project> + <Name>Org.Apache.REEF.Tang</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Utilities\Org.Apache.REEF.Utilities.csproj"> + <Project>{79e7f89a-1dfb-45e1-8d43-d71a954aeb98}</Project> + <Name>Org.Apache.REEF.Utilities</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Common\Org.Apache.REEF.Common.csproj"> + <Project>{545a0582-4105-44ce-b99c-b1379514a630}</Project> + <Name>Org.Apache.REEF.Common</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Driver\Org.Apache.REEF.Driver.csproj"> + <Project>{a6baa2a7-f52f-4329-884e-1bcf711d6805}</Project> + <Name>Org.Apache.REEF.Driver</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Wake\Org.Apache.REEF.Wake.csproj"> + <Project>{cdfb3464-4041-42b1-9271-83af24cd5008}</Project> + <Name>Org.Apache.REEF.Wake</Name> + </ProjectReference> + </ItemGroup> + <ItemGroup> + <WCFMetadata Include="Service References\" /> + </ItemGroup> + <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> + <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" /> + <!-- To modify your build process, add your task inside one of the targets below and uncomment it. + Other similar extension points exist, see Microsoft.Common.targets. + <Target Name="BeforeBuild"> + </Target> + <Target Name="AfterBuild"> + </Target> + --> +</Project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/Properties/AssemblyInfo.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.Network/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..536e986 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Properties/AssemblyInfo.cs @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Org.Apache.REEF.Network")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("Org.Apache.REEF.Network")] +[assembly: AssemblyCopyright("Copyright © 2015")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("b3f5e608-8908-4f06-a87e-5e41c88133ac")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/Utilities/BlockingCollectionExtensions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Utilities/BlockingCollectionExtensions.cs b/lang/cs/Org.Apache.REEF.Network/Utilities/BlockingCollectionExtensions.cs new file mode 100644 index 0000000..9dc057c --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Utilities/BlockingCollectionExtensions.cs @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; + +namespace Org.Apache.REEF.Network.Utilities +{ + public static class BlockingCollectionExtensions + { + /// <summary> + /// Removes the given item from the BlockingCollection if it is present. + /// If it is not present, it blocks until any item is available in the + /// BlockingCollection. It then removes and returns that first available + /// item. + /// </summary> + /// <typeparam name="T">The type of BlockingCollection</typeparam> + /// <param name="collection">The BlockingCollection to remove the specified item</param> + /// <param name="item">The item to remove from the BlockingCollection, if it exists</param> + /// <returns>The specified item, or the first available item if the specified item is + /// not present in the BlockingCollection</returns> + public static T Take<T>(this BlockingCollection<T> collection, T item) + { + T ret = default(T); + bool foundItem = false; + List<T> removedItems = new List<T>(); + + // Empty the collection + for (int i = 0; i < collection.Count; i++) + { + T removed; + if (collection.TryTake(out removed)) + { + removedItems.Add(removed); + } + } + + // Add them back to the collection minus the specified item + foreach (T removedItem in removedItems) + { + if (removedItem.Equals(item)) + { + ret = removedItem; + foundItem = true; + } + else + { + collection.Add(removedItem); + } + } + + if (!foundItem) + { + // Error: the element wasn't in the collection + throw new InvalidOperationException(item + " not found in blocking collection"); + } + + return ret; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/Utilities/Utils.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Utilities/Utils.cs b/lang/cs/Org.Apache.REEF.Network/Utilities/Utils.cs new file mode 100644 index 0000000..bc02b89 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Utilities/Utils.cs @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.IO; +using Microsoft.Hadoop.Avro; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Tasks; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Exceptions; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Tang.Implementations.Tang; + +namespace Org.Apache.REEF.Network.Utilities +{ + internal class Utils + { + private static Logger LOGGER = Logger.GetLogger(typeof(Utils)); + + /// <summary> + /// Returns the TaskIdentifier from the Configuration. + /// </summary> + /// <param name="taskConfiguration">The Configuration object</param> + /// <returns>The TaskIdentifier for the given Configuration</returns> + public static string GetTaskId(IConfiguration taskConfiguration) + { + try + { + IInjector injector = TangFactory.GetTang().NewInjector(taskConfiguration); + return injector.GetNamedInstance<TaskConfigurationOptions.Identifier, string>( + GenericType<TaskConfigurationOptions.Identifier>.Class); + } + catch (InjectionException) + { + LOGGER.Log(Level.Error, "Unable to find task identifier"); + throw; + } + } + + /// <summary> + /// Returns the Context Identifier from the Configuration. + /// </summary> + /// <param name="contextConfiguration">The Configuration object</param> + /// <returns>The TaskIdentifier for the given Configuration</returns> + public static string GetContextId(IConfiguration contextConfiguration) + { + try + { + IInjector injector = TangFactory.GetTang().NewInjector(contextConfiguration); + return injector.GetNamedInstance<ContextConfigurationOptions.ContextIdentifier, string>( + GenericType<ContextConfigurationOptions.ContextIdentifier>.Class); + } + catch (InjectionException) + { + LOGGER.Log(Level.Error, "Unable to find task identifier"); + throw; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/packages.config ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/packages.config b/lang/cs/Org.Apache.REEF.Network/packages.config new file mode 100644 index 0000000..88cf17b --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/packages.config @@ -0,0 +1,26 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<packages> + <package id="Microsoft.Hadoop.Avro" version="1.4.0.0" targetFramework="net45" /> + <package id="Newtonsoft.Json" version="6.0.8" targetFramework="net45" /> + <package id="protobuf-net" version="2.0.0.668" targetFramework="net45" /> + <package id="Rx-Core" version="2.2.5" targetFramework="net45" /> + <package id="Rx-Interfaces" version="2.2.5" targetFramework="net45" /> +</packages> \ No newline at end of file
