http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs index b1b79d4..8fe19d6 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs @@ -32,7 +32,6 @@ using Org.Apache.REEF.Network.NetworkService; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Exceptions; using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Wake.StreamingCodec; namespace Org.Apache.REEF.Network.Group.Task.Impl { @@ -43,18 +42,13 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// Communication Group. /// </summary> /// <typeparam name="T">The message type</typeparam> - // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295. public sealed class OperatorTopology<T> : IOperatorTopology<T>, IObserver<GeneralGroupCommunicationMessage> { - private const int DefaultTimeout = 50000; - private const int RetryCount = 10; - private static readonly Logger Logger = Logger.GetLogger(typeof(OperatorTopology<>)); private readonly string _groupName; private readonly string _operatorName; private readonly string _selfId; - private string _driverId; private readonly int _timeout; private readonly int _retryCount; private readonly int _sleepTime; @@ -66,7 +60,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl private readonly Sender _sender; private readonly BlockingCollection<NodeStruct<T>> _nodesWithData; private readonly Object _thisLock = new Object(); - private readonly IStreamingCodec<T> _codec; /// <summary> /// Creates a new OperatorTopology object. @@ -74,7 +67,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// <param name="operatorName">The name of the Group Communication Operator</param> /// <param name="groupName">The name of the operator's Communication Group</param> /// <param name="taskId">The operator's Task identifier</param> - /// <param name="driverId">The identifer for the driver</param> /// <param name="timeout">Timeout value for cancellation token</param> /// <param name="retryCount">Number of times to retry wating for registration</param> /// <param name="sleepTime">Sleep time between retry wating for registration</param> @@ -82,26 +74,22 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// <param name="childIds">The set of child Task identifiers in the topology graph</param> /// <param name="networkService">The network service</param> /// <param name="sender">The Sender used to do point to point communication</param> - /// <param name="codec">Streaming codec to encode objects</param> [Inject] private OperatorTopology( [Parameter(typeof(GroupCommConfigurationOptions.OperatorName))] string operatorName, [Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName, [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId, - [Parameter(typeof(GroupCommConfigurationOptions.DriverId))] string driverId, [Parameter(typeof(GroupCommConfigurationOptions.Timeout))] int timeout, [Parameter(typeof(GroupCommConfigurationOptions.RetryCountWaitingForRegistration))] int retryCount, [Parameter(typeof(GroupCommConfigurationOptions.SleepTimeWaitingForRegistration))] int sleepTime, [Parameter(typeof(GroupCommConfigurationOptions.TopologyRootTaskId))] string rootId, [Parameter(typeof(GroupCommConfigurationOptions.TopologyChildTaskIds))] ISet<string> childIds, - WritableNetworkService<GeneralGroupCommunicationMessage> networkService, - Sender sender, - IStreamingCodec<T> codec) + StreamingNetworkService<GeneralGroupCommunicationMessage> networkService, + Sender sender) { _operatorName = operatorName; _groupName = groupName; _selfId = taskId; - _driverId = driverId; _timeout = timeout; _retryCount = retryCount; _sleepTime = sleepTime; @@ -110,7 +98,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl _nodesWithData = new BlockingCollection<NodeStruct<T>>(); _children = new List<NodeStruct<T>>(); _idToNodeMap = new Dictionary<string, NodeStruct<T>>(); - _codec = codec; if (_selfId.Equals(rootId)) { @@ -201,7 +188,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl throw new ArgumentException("No parent for node"); } - SendToNode(message, MessageType.Data, _parent); + SendToNode(message, _parent); } /// <summary> @@ -218,7 +205,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl foreach (var child in _children) { - SendToNode(message, MessageType.Data, child); + SendToNode(message, child); } } @@ -444,10 +431,10 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// <param name="message">The message to send</param> /// <param name="msgType">The message type</param> /// <param name="node">The NodeStruct representing the Task to send to</param> - private void SendToNode(T message, MessageType msgType, NodeStruct<T> node) + private void SendToNode(T message, NodeStruct<T> node) { GeneralGroupCommunicationMessage gcm = new GroupCommunicationMessage<T>(_groupName, _operatorName, - _selfId, node.Identifier, message, msgType, _codec); + _selfId, node.Identifier, message); _sender.Send(gcm); } @@ -458,12 +445,12 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// <param name="messages">The list of messages to send</param> /// <param name="msgType">The message type</param> /// <param name="node">The NodeStruct representing the Task to send to</param> - private void SendToNode(IList<T> messages, MessageType msgType, NodeStruct<T> node) + private void SendToNode(IList<T> messages, NodeStruct<T> node) { T[] encodedMessages = messages.ToArray(); GroupCommunicationMessage<T> gcm = new GroupCommunicationMessage<T>(_groupName, _operatorName, - _selfId, node.Identifier, encodedMessages, msgType, _codec); + _selfId, node.Identifier, encodedMessages); _sender.Send(gcm); } @@ -511,7 +498,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl } IList<T> sublist = messages.ToList().GetRange(i, size); - SendToNode(sublist, MessageType.Data, nodeStruct); + SendToNode(sublist, nodeStruct); i += size; }
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageStreamingCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageStreamingCodec.cs new file mode 100644 index 0000000..c30c1bd --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageStreamingCodec.cs @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Wake; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.StreamingCodec; + +namespace Org.Apache.REEF.Network.NetworkService.Codec +{ + /// <summary> + /// Codec to serialize NsMessages for NetworkService. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + internal class NsMessageStreamingCodec<T> : IStreamingCodec<NsMessage<T>> + { + private readonly IIdentifierFactory _idFactory; + private readonly StreamingCodecFunctionCache<T> _codecFunctionsCache; + + /// <summary> + /// Create new NsMessageCodec. + /// </summary> + /// <param name="idFactory">Used to create identifier from string.</param> + /// <param name="injector">Injector to instantiate codecs.</param> + [Inject] + private NsMessageStreamingCodec(IIdentifierFactory idFactory, IInjector injector) + { + _idFactory = idFactory; + _codecFunctionsCache = new StreamingCodecFunctionCache<T>(injector); + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + ///<returns>The instance of type NsMessage<T></T> read from the reader</returns> + public NsMessage<T> Read(IDataReader reader) + { + int metadataSize = reader.ReadInt32(); + byte[] metadata = new byte[metadataSize]; + reader.Read(ref metadata, 0, metadataSize); + var res = GenerateMetaDataDecoding(metadata); + + Type messageType = res.Item3; + NsMessage<T> message = res.Item1; + + var codecReadFunc = _codecFunctionsCache.ReadFunction(messageType); + int messageCount = res.Item2; + + for (int i = 0; i < messageCount; i++) + { + message.Data.Add(codecReadFunc(reader)); + } + + return message; + } + + /// <summary> + /// Writes the class fields to the writer. + /// </summary> + /// <param name="obj">The object of type NsMessage<T></T> to be encoded</param> + /// <param name="writer">The writer to which to write</param> + public void Write(NsMessage<T> obj, IDataWriter writer) + { + byte[] encodedMetadata = GenerateMetaDataEncoding(obj); + byte[] encodedInt = BitConverter.GetBytes(encodedMetadata.Length); + byte[] totalEncoding = encodedInt.Concat(encodedMetadata).ToArray(); + writer.Write(totalEncoding, 0, totalEncoding.Length); + + Type messageType = obj.Data[0].GetType(); + var codecWriteFunc = _codecFunctionsCache.WriteFunction(messageType); + + foreach (var data in obj.Data) + { + codecWriteFunc(data, writer); + } + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + /// <param name="token">Cancellation token</param> + /// <returns>The instance of type NsMessage<T> read from the reader</returns> + public async Task<NsMessage<T>> ReadAsync(IDataReader reader, CancellationToken token) + { + int metadataSize = await reader.ReadInt32Async(token); + byte[] metadata = new byte[metadataSize]; + await reader.ReadAsync(metadata, 0, metadataSize, token); + var res = GenerateMetaDataDecoding(metadata); + Type messageType = res.Item3; + NsMessage<T> message = res.Item1; + var codecReadFunc = _codecFunctionsCache.ReadAsyncFunction(messageType); + int messageCount = res.Item2; + + for (int i = 0; i < messageCount; i++) + { + message.Data.Add(codecReadFunc(reader, token)); + } + + return message; + } + + /// <summary> + /// Writes the class fields to the writer. + /// </summary> + /// <param name="obj">The object of type NsMessage<T> to be encoded</param> + /// <param name="writer">The writer to which to write</param> + /// <param name="token">Cancellation token</param> + public async Task WriteAsync(NsMessage<T> obj, IDataWriter writer, CancellationToken token) + { + byte[] encodedMetadata = GenerateMetaDataEncoding(obj); + byte[] encodedInt = BitConverter.GetBytes(encodedMetadata.Length); + byte[] totalEncoding = encodedInt.Concat(encodedMetadata).ToArray(); + await writer.WriteAsync(totalEncoding, 0, totalEncoding.Length, token); + + Type messageType = obj.Data[0].GetType(); + + var codecWriteFunc = _codecFunctionsCache.WriteAsyncFunction(messageType); + + foreach (var data in obj.Data) + { + var asyncResult = codecWriteFunc.BeginInvoke(data, writer, token, null, null); + codecWriteFunc.EndInvoke(asyncResult); + } + } + + private static byte[] GenerateMetaDataEncoding(NsMessage<T> obj ) + { + List<byte[]> metadataBytes = new List<byte[]>(); + byte[] sourceBytes = StringToBytes(obj.SourceId.ToString()); + byte[] dstBytes = StringToBytes(obj.DestId.ToString()); + byte[] messageTypeBytes = StringToBytes(obj.Data[0].GetType().AssemblyQualifiedName); + byte[] messageCount = BitConverter.GetBytes(obj.Data.Count); + + metadataBytes.Add(BitConverter.GetBytes(sourceBytes.Length)); + metadataBytes.Add(BitConverter.GetBytes(dstBytes.Length)); + metadataBytes.Add(BitConverter.GetBytes(messageTypeBytes.Length)); + metadataBytes.Add(sourceBytes); + metadataBytes.Add(dstBytes); + metadataBytes.Add(messageTypeBytes); + metadataBytes.Add(messageCount); + + return metadataBytes.SelectMany(i => i).ToArray(); + } + + private Tuple<NsMessage<T>, int, Type> GenerateMetaDataDecoding(byte[] obj) + { + int srcCount = BitConverter.ToInt32(obj, 0); + int dstCount = BitConverter.ToInt32(obj, sizeof (int)); + int msgTypeCount = BitConverter.ToInt32(obj, 2*sizeof (int)); + + int offset = 3*sizeof (int); + string srcString = BytesToString(obj.Skip(offset).Take(srcCount).ToArray()); + offset += srcCount; + string dstString = BytesToString(obj.Skip(offset).Take(dstCount).ToArray()); + offset += dstCount; + Type msgType = Type.GetType(BytesToString(obj.Skip(offset).Take(msgTypeCount).ToArray())); + offset += msgTypeCount; + int messageCount = BitConverter.ToInt32(obj, offset); + + NsMessage<T> msg = new NsMessage<T>(_idFactory.Create(srcString), _idFactory.Create(dstString)); + return new Tuple<NsMessage<T>, int, Type>(msg, messageCount, msgType); + } + + private static byte[] StringToBytes(string str) + { + byte[] bytes = new byte[str.Length * sizeof(char)]; + Buffer.BlockCopy(str.ToCharArray(), 0, bytes, 0, bytes.Length); + return bytes; + } + + private static string BytesToString(byte[] bytes) + { + char[] chars = new char[bytes.Length / sizeof(char)]; + Buffer.BlockCopy(bytes, 0, chars, 0, bytes.Length); + return new string(chars); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs new file mode 100644 index 0000000..6d91298 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Reflection; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.StreamingCodec; + +namespace Org.Apache.REEF.Network.NetworkService.Codec +{ + /// <summary> + /// Cache of StreamingCodec functions used to store codec functions for messages + /// to avoid reflection cost. Each message type is assumed to have a unique + /// associated codec + /// </summary> + /// <typeparam name="T">The message type</typeparam> + internal class StreamingCodecFunctionCache<T> + { + private static readonly Logger Logger = Logger.GetLogger(typeof (StreamingCodecFunctionCache<T>)); + private readonly Dictionary<Type, Func<IDataReader, T>> _readFuncCache; + private readonly Dictionary<Type, Func<IDataReader, CancellationToken, T>> _readAsyncFuncCache; + private readonly Dictionary<Type, Action<T, IDataWriter>> _writeFuncCache; + private readonly Dictionary<Type, Func<T, IDataWriter, CancellationToken, Task>> _writeAsyncFuncCache; + private readonly IInjector _injector; + private readonly Type _streamingCodecType; + + /// <summary> + /// Create new StreamingCodecFunctionCache. + /// </summary> + /// <param name="injector"> Injector</param> + internal StreamingCodecFunctionCache(IInjector injector) + { + _injector = injector; + _readFuncCache = new Dictionary<Type, Func<IDataReader, T>>(); + _readAsyncFuncCache = new Dictionary<Type, Func<IDataReader, CancellationToken, T>>(); + _writeFuncCache = new Dictionary<Type, Action<T, IDataWriter>>(); + _writeAsyncFuncCache = new Dictionary<Type, Func<T, IDataWriter, CancellationToken, Task>>(); + _streamingCodecType = typeof(IStreamingCodec<>); + } + + /// <summary> + /// Creates the read delegate function of StreamingCodec from the message type + /// </summary> + /// <param name="messageType">Type of message</param> + /// <returns>The read delegate function</returns> + internal Func<IDataReader, T> ReadFunction(Type messageType) + { + Func<IDataReader, T> readFunc; + + if (!_readFuncCache.TryGetValue(messageType, out readFunc)) + { + AddCodecFunctions(messageType); + readFunc = _readFuncCache[messageType]; + } + + return readFunc; + } + + /// <summary> + /// Creates the read async delegate function of StreamingCodec from the message type + /// </summary> + /// <param name="messageType">Type of message</param> + /// <returns>The read async delegate function</returns> + internal Func<IDataReader, CancellationToken, T> ReadAsyncFunction(Type messageType) + { + Func<IDataReader, CancellationToken, T> readFunc; + + if (!_readAsyncFuncCache.TryGetValue(messageType, out readFunc)) + { + AddCodecFunctions(messageType); + readFunc = _readAsyncFuncCache[messageType]; + } + + return readFunc; + } + + /// <summary> + /// Creates the write delegate function of StreamingCodec from the message type + /// </summary> + /// <param name="messageType">Type of message</param> + /// <returns>The write delegate function</returns> + internal Action<T, IDataWriter> WriteFunction(Type messageType) + { + Action<T, IDataWriter> writeFunc; + + if (!_writeFuncCache.TryGetValue(messageType, out writeFunc)) + { + AddCodecFunctions(messageType); + writeFunc = _writeFuncCache[messageType]; + } + + return writeFunc; + } + + /// <summary> + /// Creates the write async delegate function of StreamingCodec from the message type + /// </summary> + /// <param name="messageType">Type of message</param> + /// <returns>The write async delegate function</returns> + internal Func<T, IDataWriter, CancellationToken, Task> WriteAsyncFunction(Type messageType) + { + Func<T, IDataWriter, CancellationToken, Task> writeFunc; + + if (!_writeAsyncFuncCache.TryGetValue(messageType, out writeFunc)) + { + AddCodecFunctions(messageType); + writeFunc = _writeAsyncFuncCache[messageType]; + } + + return writeFunc; + } + + private void AddCodecFunctions(Type messageType) + { + if (!typeof(T).IsAssignableFrom(messageType)) + { + Exceptions.CaughtAndThrow(new Exception("Message type not assignable to base type"), Level.Error, + Logger); + } + + Type codecType = _streamingCodecType.MakeGenericType(messageType); + var codec = _injector.GetInstance(codecType); + + MethodInfo readMethod = codec.GetType().GetMethod("Read"); + _readFuncCache[messageType] = (Func<IDataReader, T>) Delegate.CreateDelegate + (typeof (Func<IDataReader, T>), codec, readMethod); + + MethodInfo readAsyncMethod = codec.GetType().GetMethod("ReadAsync"); + MethodInfo genericHelper = GetType() + .GetMethod("ReadAsyncHelperFunc", BindingFlags.NonPublic | BindingFlags.Instance); + MethodInfo constructedHelper = genericHelper.MakeGenericMethod(messageType); + _readAsyncFuncCache[messageType] = + (Func<IDataReader, CancellationToken, T>)constructedHelper.Invoke(this, new[] { readAsyncMethod, codec }); + + MethodInfo writeMethod = codec.GetType().GetMethod("Write"); + genericHelper = GetType().GetMethod("WriteHelperFunc", BindingFlags.NonPublic | BindingFlags.Instance); + constructedHelper = genericHelper.MakeGenericMethod(messageType); + _writeFuncCache[messageType] = + (Action<T, IDataWriter>) constructedHelper.Invoke(this, new[] {writeMethod, codec}); + + MethodInfo writeAsyncMethod = codec.GetType().GetMethod("WriteAsync"); + genericHelper = GetType().GetMethod("WriteAsyncHelperFunc", BindingFlags.NonPublic | BindingFlags.Instance); + constructedHelper = genericHelper.MakeGenericMethod(messageType); + _writeAsyncFuncCache[messageType] = + (Func<T, IDataWriter, CancellationToken, Task>) + constructedHelper.Invoke(this, new[] {writeAsyncMethod, codec}); + } + + private Action<T, IDataWriter> WriteHelperFunc<T1>(MethodInfo method, object codec) where T1 : class + { + Action<T1, IDataWriter> func = (Action<T1, IDataWriter>) Delegate.CreateDelegate + (typeof (Action<T1, IDataWriter>), codec, method); + + Action<T, IDataWriter> ret = (obj, writer) => func(obj as T1, writer); + return ret; + } + + private Func<T, IDataWriter, CancellationToken, Task> WriteAsyncHelperFunc<T1>(MethodInfo method, object codec) + where T1 : class + { + Func<T1, IDataWriter, CancellationToken, Task> func = + (Func<T1, IDataWriter, CancellationToken, Task>) Delegate.CreateDelegate + (typeof (Func<T1, IDataWriter, CancellationToken, Task>), codec, method); + + Func<T, IDataWriter, CancellationToken, Task> ret = (obj, writer, token) => func(obj as T1, writer, token); + return ret; + } + + private Func<IDataReader, CancellationToken, T> ReadAsyncHelperFunc<T1>(MethodInfo method, object codec) + where T1 : class + { + Func<IDataReader, CancellationToken, Task<T1>> func = + (Func<IDataReader, CancellationToken, Task<T1>>) Delegate.CreateDelegate + (typeof (Func<IDataReader, CancellationToken, Task<T1>>), codec, method); + + Func<IDataReader, CancellationToken, T1> func1 = (writer, token) => func(writer, token).Result; + Func<IDataReader, CancellationToken, T> func2 = (writer, token) => ((T)(object)func1(writer, token)); + return func2; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs new file mode 100644 index 0000000..1ff2517 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Net; +using Org.Apache.REEF.Common.Io; +using Org.Apache.REEF.Network.NetworkService.Codec; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Exceptions; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.Remote.Impl; +using Org.Apache.REEF.Wake.StreamingCodec; +using Org.Apache.REEF.Wake.Util; + +namespace Org.Apache.REEF.Network.NetworkService +{ + /// <summary> + /// Writable Network service used for Reef Task communication. + /// </summary> + /// <typeparam name="T">The message type</typeparam> + public class StreamingNetworkService<T> : INetworkService<T> + { + private static readonly Logger Logger = Logger.GetLogger(typeof(StreamingNetworkService<>)); + + private readonly IRemoteManager<NsMessage<T>> _remoteManager; + private IIdentifier _localIdentifier; + private readonly IDisposable _messageHandlerDisposable; + private readonly Dictionary<IIdentifier, IConnection<T>> _connectionMap; + private readonly INameClient _nameClient; + + /// <summary> + /// Create a new Writable NetworkService. + /// </summary> + /// <param name="messageHandler">The observer to handle incoming messages</param> + /// <param name="idFactory">The factory used to create IIdentifiers</param> + /// <param name="nameClient">The name client used to register Ids</param> + /// <param name="remoteManagerFactory">Writable RemoteManagerFactory to create a + /// Writable RemoteManager</param> + /// <param name="codec">Codec for Network Service message</param> + /// <param name="injector">Fork of the injector that created the Network service</param> + [Inject] + private StreamingNetworkService( + IObserver<NsMessage<T>> messageHandler, + IIdentifierFactory idFactory, + INameClient nameClient, + StreamingRemoteManagerFactory remoteManagerFactory, + NsMessageStreamingCodec<T> codec, + IInjector injector) + { + IPAddress localAddress = NetworkUtils.LocalIPAddress; + _remoteManager = remoteManagerFactory.GetInstance(localAddress, codec); + + // Create and register incoming message handler + // TODO[REEF-419] This should use the TcpPortProvider mechanism + var anyEndpoint = new IPEndPoint(IPAddress.Any, 0); + _messageHandlerDisposable = _remoteManager.RegisterObserver(anyEndpoint, messageHandler); + + _nameClient = nameClient; + _connectionMap = new Dictionary<IIdentifier, IConnection<T>>(); + + Logger.Log(Level.Info, "Started network service"); + } + + /// <summary> + /// Name client for registering ids + /// </summary> + public INameClient NamingClient + { + get { return _nameClient; } + } + + /// <summary> + /// Open a new connection to the remote host registered to + /// the name service with the given identifier + /// </summary> + /// <param name="destinationId">The identifier of the remote host</param> + /// <returns>The IConnection used for communication</returns> + public IConnection<T> NewConnection(IIdentifier destinationId) + { + if (_localIdentifier == null) + { + throw new IllegalStateException("Cannot open connection without first registering an ID"); + } + + IConnection<T> connection; + if (_connectionMap.TryGetValue(destinationId, out connection)) + { + return connection; + } + else + { + connection = new NsConnection<T>(_localIdentifier, destinationId, + NamingClient, _remoteManager, _connectionMap); + + _connectionMap[destinationId] = connection; + return connection; + } + } + + /// <summary> + /// Register the identifier for the NetworkService with the NameService. + /// </summary> + /// <param name="id">The identifier to register</param> + public void Register(IIdentifier id) + { + Logger.Log(Level.Info, "Registering id {0} with network service.", id); + + _localIdentifier = id; + NamingClient.Register(id.ToString(), _remoteManager.LocalEndpoint); + + Logger.Log(Level.Info, "End of Registering id {0} with network service.", id); + } + + /// <summary> + /// Unregister the identifier for the NetworkService with the NameService. + /// </summary> + public void Unregister() + { + if (_localIdentifier == null) + { + throw new IllegalStateException("Cannot unregister a non existant identifier"); + } + + NamingClient.Unregister(_localIdentifier.ToString()); + _localIdentifier = null; + _messageHandlerDisposable.Dispose(); + } + + /// <summary> + /// Dispose of the NetworkService's resources + /// </summary> + public void Dispose() + { + NamingClient.Dispose(); + _remoteManager.Dispose(); + + Logger.Log(Level.Info, "Disposed of network service"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs deleted file mode 100644 index 93da126..0000000 --- a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs +++ /dev/null @@ -1,159 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Net; -using Org.Apache.REEF.Common.Io; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Tang.Exceptions; -using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Wake; -using Org.Apache.REEF.Wake.Remote; -using Org.Apache.REEF.Wake.Remote.Impl; -using Org.Apache.REEF.Wake.Util; - -namespace Org.Apache.REEF.Network.NetworkService -{ - /// <summary> - /// Writable Network service used for Reef Task communication. - /// </summary> - /// <typeparam name="T">The message type</typeparam> - [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] - public class WritableNetworkService<T> : INetworkService<T> where T : IWritable - { - private static readonly Logger Logger = Logger.GetLogger(typeof(NetworkService<>)); - - private readonly IRemoteManager<WritableNsMessage<T>> _remoteManager; - private readonly IObserver<WritableNsMessage<T>> _messageHandler; - private IIdentifier _localIdentifier; - private IDisposable _messageHandlerDisposable; - private readonly Dictionary<IIdentifier, IConnection<T>> _connectionMap; - private readonly INameClient _nameClient; - - /// <summary> - /// Create a new Writable NetworkService. - /// </summary> - /// <param name="nsPort">The port that the NetworkService will listen on</param> - /// <param name="messageHandler">The observer to handle incoming messages</param> - /// <param name="idFactory">The factory used to create IIdentifiers</param> - /// <param name="nameClient">The name client used to register Ids</param> - /// <param name="remoteManagerFactory">Writable RemoteManagerFactory to create a - /// Writable RemoteManager</param> - [Inject] - private WritableNetworkService( - [Parameter(typeof (NetworkServiceOptions.NetworkServicePort))] int nsPort, - IObserver<WritableNsMessage<T>> messageHandler, - IIdentifierFactory idFactory, - INameClient nameClient, - StreamingRemoteManagerFactory remoteManagerFactory) - { - - IPAddress localAddress = NetworkUtils.LocalIPAddress; - _remoteManager = remoteManagerFactory.GetInstance<WritableNsMessage<T>>(localAddress, nsPort); - _messageHandler = messageHandler; - - // Create and register incoming message handler - // TODO[REEF-419] This should use the TcpPortProvider mechanism - var anyEndpoint = new IPEndPoint(IPAddress.Any, 0); - _messageHandlerDisposable = _remoteManager.RegisterObserver(anyEndpoint, _messageHandler); - - _nameClient = nameClient; - _connectionMap = new Dictionary<IIdentifier, IConnection<T>>(); - - Logger.Log(Level.Info, "Started network service"); - } - - /// <summary> - /// Name client for registering ids - /// </summary> - public INameClient NamingClient - { - get { return _nameClient; } - } - - /// <summary> - /// Open a new connection to the remote host registered to - /// the name service with the given identifier - /// </summary> - /// <param name="destinationId">The identifier of the remote host</param> - /// <returns>The IConnection used for communication</returns> - public IConnection<T> NewConnection(IIdentifier destinationId) - { - if (_localIdentifier == null) - { - throw new IllegalStateException("Cannot open connection without first registering an ID"); - } - - IConnection<T> connection; - if (_connectionMap.TryGetValue(destinationId, out connection)) - { - return connection; - } - else - { - connection = new WritableNsConnection<T>(_localIdentifier, destinationId, - NamingClient, _remoteManager, _connectionMap); - - _connectionMap[destinationId] = connection; - return connection; - } - } - - /// <summary> - /// Register the identifier for the NetworkService with the NameService. - /// </summary> - /// <param name="id">The identifier to register</param> - public void Register(IIdentifier id) - { - Logger.Log(Level.Info, "Registering id {0} with network service.", id); - - _localIdentifier = id; - NamingClient.Register(id.ToString(), _remoteManager.LocalEndpoint); - - Logger.Log(Level.Info, "End of Registering id {0} with network service.", id); - } - - /// <summary> - /// Unregister the identifier for the NetworkService with the NameService. - /// </summary> - public void Unregister() - { - if (_localIdentifier == null) - { - throw new IllegalStateException("Cannot unregister a non existant identifier"); - } - - NamingClient.Unregister(_localIdentifier.ToString()); - _localIdentifier = null; - _messageHandlerDisposable.Dispose(); - } - - /// <summary> - /// Dispose of the NetworkService's resources - /// </summary> - public void Dispose() - { - NamingClient.Dispose(); - _remoteManager.Dispose(); - - Logger.Log(Level.Info, "Disposed of network service"); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsConnection.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsConnection.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsConnection.cs deleted file mode 100644 index c20238c..0000000 --- a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsConnection.cs +++ /dev/null @@ -1,138 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.IO; -using System.Net; -using System.Net.Sockets; -using System.Runtime.Remoting; -using Org.Apache.REEF.Common.Io; -using Org.Apache.REEF.Tang.Exceptions; -using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Wake; -using Org.Apache.REEF.Wake.Remote; - -namespace Org.Apache.REEF.Network.NetworkService -{ - /// <summary> - /// Represents a connection between two hosts using the Writable NetworkService. - /// </summary> - [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] - public class WritableNsConnection<T> : IConnection<T> where T : IWritable - { - private static readonly Logger Logger = Logger.GetLogger(typeof (WritableNsConnection<T>)); - - private readonly IIdentifier _sourceId; - private readonly IIdentifier _destId; - private readonly INameClient _nameClient; - private readonly IRemoteManager<WritableNsMessage<T>> _remoteManager; - private readonly Dictionary<IIdentifier, IConnection<T>> _connectionMap; - private IObserver<WritableNsMessage<T>> _remoteSender; - - /// <summary> - /// Creates a new NsConnection between two hosts. - /// </summary> - /// <param name="sourceId">The identifier of the sender</param> - /// <param name="destId">The identifier of the receiver</param> - /// <param name="nameClient">The NameClient used for naming lookup</param> - /// <param name="remoteManager">The remote manager used for network communication</param> - /// <param name="connectionMap">A cache of opened connections. Will remove itself from - /// the cache when the NsConnection is disposed.</param> - public WritableNsConnection( - IIdentifier sourceId, - IIdentifier destId, - INameClient nameClient, - IRemoteManager<WritableNsMessage<T>> remoteManager, - Dictionary<IIdentifier, IConnection<T>> connectionMap) - { - _sourceId = sourceId; - _destId = destId; - _nameClient = nameClient; - _remoteManager = remoteManager; - _connectionMap = connectionMap; - } - - /// <summary> - /// Opens the connection to the remote host. - /// </summary> - public void Open() - { - string destStr = _destId.ToString(); - Logger.Log(Level.Verbose, "Network service opening connection to {0}...", destStr); - - IPEndPoint destAddr = _nameClient.Lookup(destStr); - if (null == destAddr) - { - throw new RemotingException("Destination Address identifier cannot be found"); - } - - try - { - _remoteSender = _remoteManager.GetRemoteObserver(destAddr); - Logger.Log(Level.Verbose, "Network service completed connection to {0}.", destStr); - } - catch (SocketException) - { - Logger.Log(Level.Error, "Network Service cannot open connection to " + destAddr); - throw; - } - catch (ObjectDisposedException) - { - Logger.Log(Level.Error, "Network Service cannot open connection to " + destAddr); - throw; - } - } - - /// <summary> - /// Writes the object to the remote host. - /// </summary> - /// <param name="message">The message to send</param> - public void Write(T message) - { - if (_remoteSender == null) - { - throw new IllegalStateException("NsConnection has not been opened yet."); - } - - try - { - _remoteSender.OnNext(new WritableNsMessage<T>(_sourceId, _destId, message)); - } - catch (IOException) - { - Logger.Log(Level.Error, "Network Service cannot write message to {0}", _destId); - throw; - } - catch (ObjectDisposedException) - { - Logger.Log(Level.Error, "Network Service cannot write message to {0}", _destId); - throw; - } - } - - /// <summary> - /// Closes the connection - /// </summary> - public void Dispose() - { - _connectionMap.Remove(_destId); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs deleted file mode 100644 index a9299bb..0000000 --- a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs +++ /dev/null @@ -1,185 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.IO; -using System.Linq; -using System.Runtime.Serialization; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.Hadoop.Avro; -using Org.Apache.REEF.Network.Group.Driver.Impl; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Tang.Interface; -using Org.Apache.REEF.Wake; -using Org.Apache.REEF.Wake.Remote; - - -namespace Org.Apache.REEF.Network.NetworkService -{ - /// <summary> - /// Writable Message sent between NetworkServices.</summary> - /// <typeparam name="T">The type of data being sent. It is assumed to be Writable</typeparam> - [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] - public class WritableNsMessage<T> : IWritable where T : IWritable - { - private IIdentifierFactory _factory; - private IInjector _injection; - /// <summary> - /// Constructor to allow instantiation by reflection - /// </summary> - [Inject] - public WritableNsMessage(IIdentifierFactory factory, IInjector injection) - { - _factory = factory; - _injection = injection; - } - - /// <summary> - /// Create a new Writable NsMessage with no data. - /// </summary> - /// <param name="sourceId">The identifier of the sender</param> - /// <param name="destId">The identifier of the receiver</param> - public WritableNsMessage(IIdentifier sourceId, IIdentifier destId) - { - SourceId = sourceId; - DestId = destId; - Data = new List<T>(); - } - - /// <summary> - /// Create a new Writable NsMessage with data. - /// </summary> - /// <param name="sourceId">The identifier of the sender</param> - /// <param name="destId">The identifier of the receiver</param> - /// <param name="message">The message to send</param> - public WritableNsMessage(IIdentifier sourceId, IIdentifier destId, T message) - { - SourceId = sourceId; - DestId = destId; - Data = new List<T> {message}; - } - - /// <summary> - /// The identifier of the sender of the message. - /// </summary> - internal IIdentifier SourceId { get; private set; } - - /// <summary> - /// The identifier of the receiver of the message. - /// </summary> - internal IIdentifier DestId { get; private set; } - - /// <summary> - /// A list of data being sent in the message. - /// </summary> - public IList<T> Data { get; set; } - - /// <summary> - /// Read the class fields. - /// </summary> - /// <param name="reader">The reader from which to read </param> - public void Read(IDataReader reader) - { - SourceId = _factory.Create(reader.ReadString()); - DestId = _factory.Create(reader.ReadString()); - int messageCount = reader.ReadInt32(); - string dataType = reader.ReadString(); - - Data = new List<T>(); - - for (int index = 0; index < messageCount; index++) - { - var dataPoint = (T)_injection.ForkInjector().GetInstance(Type.GetType(dataType)); - - if (null == dataPoint) - { - throw new Exception("T type instance cannot be created from the stream data in Network Service Message"); - } - - dataPoint.Read(reader); - Data.Add(dataPoint); - } - } - - /// <summary> - /// Writes the class fields. - /// </summary> - /// <param name="writer">The writer to which to write</param> - public void Write(IDataWriter writer) - { - writer.WriteString(SourceId.ToString()); - writer.WriteString(DestId.ToString()); - writer.WriteInt32(Data.Count); - writer.WriteString(Data[0].GetType().AssemblyQualifiedName); - - foreach (var data in Data) - { - data.Write(writer); - } - } - - /// <summary> - /// Read the class fields. - /// </summary> - /// <param name="reader">The reader from which to read </param> - /// <param name="token">The cancellation token</param> - public async Task ReadAsync(IDataReader reader, CancellationToken token) - { - SourceId = _factory.Create(await reader.ReadStringAsync(token)); - DestId = _factory.Create(await reader.ReadStringAsync(token)); - int messageCount = await reader.ReadInt32Async(token); - string dataType = await reader.ReadStringAsync(token); - - Data = new List<T>(); - - for (int index = 0; index < messageCount; index++) - { - var dataPoint = (T) _injection.ForkInjector().GetInstance(Type.GetType(dataType)); - - if (null == dataPoint) - { - throw new Exception("T type instance cannot be created from the stream data in Network Service Message"); - } - - await dataPoint.ReadAsync(reader, token); - Data.Add(dataPoint); - } - } - - /// <summary> - /// Writes the class fields. - /// </summary> - /// <param name="writer">The writer to which to write</param> - /// <param name="token">The cancellation token</param> - public async Task WriteAsync(IDataWriter writer, CancellationToken token) - { - await writer.WriteStringAsync(SourceId.ToString(), token); - await writer.WriteStringAsync(DestId.ToString(), token); - await writer.WriteInt32Async(Data.Count, token); - await writer.WriteStringAsync(Data[0].GetType().AssemblyQualifiedName, token); - - foreach (var data in Data) - { - data.Write(writer); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj index 73b6d9d..a0e6038 100644 --- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj +++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj @@ -61,6 +61,7 @@ under the License. <Compile Include="Group\Driver\IGroupCommDriver.cs" /> <Compile Include="Group\Driver\Impl\CommunicationGroupDriver.cs" /> <Compile Include="Group\Driver\Impl\GroupCommunicationMessage.cs" /> + <Compile Include="Group\Driver\Impl\GroupCommunicationMessageStreamingCodec.cs" /> <Compile Include="Group\Driver\Impl\MessageType.cs" /> <Compile Include="Group\Driver\Impl\GroupCommDriver.cs" /> <Compile Include="Group\Driver\Impl\TaskStarter.cs" /> @@ -139,6 +140,8 @@ under the License. <Compile Include="NetworkService\Codec\ControlMessageCodec.cs" /> <Compile Include="NetworkService\Codec\NsMessageCodec.cs" /> <Compile Include="NetworkService\Codec\NsMessageProto.cs" /> + <Compile Include="NetworkService\Codec\NsMessageStreamingCodec.cs" /> + <Compile Include="NetworkService\Codec\StreamingCodecFunctionCache.cs" /> <Compile Include="NetworkService\ControlMessage.cs" /> <Compile Include="NetworkService\IConnection.cs" /> <Compile Include="NetworkService\INetworkService.cs" /> @@ -147,9 +150,7 @@ under the License. <Compile Include="NetworkService\NetworkServiceOptions.cs" /> <Compile Include="NetworkService\NsConnection.cs" /> <Compile Include="NetworkService\NsMessage.cs" /> - <Compile Include="NetworkService\WritableNetworkService.cs" /> - <Compile Include="NetworkService\WritableNsConnection.cs" /> - <Compile Include="NetworkService\WritableNsMessage.cs" /> + <Compile Include="NetworkService\StreamingNetworkService.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="Utilities\BlockingCollectionExtensions.cs" /> <Compile Include="Utilities\Utils.cs" /> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj index babc26d..9f13c83 100644 --- a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj @@ -52,7 +52,6 @@ under the License. <Compile Include="StreamingRemoteManagerTest.cs" /> <Compile Include="StreamingTransportTest.cs" /> <Compile Include="TransportTest.cs" /> - <Compile Include="WritableString.cs" /> </ItemGroup> <ItemGroup> <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj"> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs index 20f75be..a0be7ee 100644 --- a/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs @@ -25,7 +25,8 @@ using Microsoft.VisualStudio.TestTools.UnitTesting; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Wake.Remote; using Org.Apache.REEF.Wake.Remote.Impl; -using Org.Apache.REEF.Wake.Util; +using Org.Apache.REEF.Wake.StreamingCodec; +using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs; namespace Org.Apache.REEF.Wake.Tests { @@ -34,9 +35,6 @@ namespace Org.Apache.REEF.Wake.Tests { private readonly StreamingRemoteManagerFactory _remoteManagerFactory1 = TangFactory.GetTang().NewInjector().GetInstance<StreamingRemoteManagerFactory>(); - - private readonly StreamingRemoteManagerFactory _remoteManagerFactory2 = - TangFactory.GetTang().NewInjector().GetInstance<StreamingRemoteManagerFactory>(); /// <summary> /// Tests one way communication between Remote Managers @@ -47,24 +45,25 @@ namespace Org.Apache.REEF.Wake.Tests { IPAddress listeningAddress = IPAddress.Parse("127.0.0.1"); - BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>(); + BlockingCollection<string> queue = new BlockingCollection<string>(); List<string> events = new List<string>(); + IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>(); - using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) - using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) + using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec)) + using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec)) { - var observer = Observer.Create<WritableString>(queue.Add); + var observer = Observer.Create<string>(queue.Add); IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0); remoteManager2.RegisterObserver(endpoint1, observer); var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint); - remoteObserver.OnNext(new WritableString("abc")); - remoteObserver.OnNext(new WritableString("def")); - remoteObserver.OnNext(new WritableString("ghi")); + remoteObserver.OnNext("abc"); + remoteObserver.OnNext("def"); + remoteObserver.OnNext("ghi"); - events.Add(queue.Take().Data); - events.Add(queue.Take().Data); - events.Add(queue.Take().Data); + events.Add(queue.Take()); + events.Add(queue.Take()); + events.Add(queue.Take()); } Assert.AreEqual(3, events.Count); @@ -78,42 +77,44 @@ namespace Org.Apache.REEF.Wake.Tests { IPAddress listeningAddress = IPAddress.Parse("127.0.0.1"); - BlockingCollection<WritableString> queue1 = new BlockingCollection<WritableString>(); - BlockingCollection<WritableString> queue2 = new BlockingCollection<WritableString>(); + BlockingCollection<string> queue1 = new BlockingCollection<string>(); + BlockingCollection<string> queue2 = new BlockingCollection<string>(); List<string> events1 = new List<string>(); List<string> events2 = new List<string>(); - using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) - using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) + IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>(); + + using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec)) + using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec)) { // Register observers for remote manager 1 and remote manager 2 var remoteEndpoint = new IPEndPoint(listeningAddress, 0); - var observer1 = Observer.Create<WritableString>(queue1.Add); - var observer2 = Observer.Create<WritableString>(queue2.Add); + var observer1 = Observer.Create<string>(queue1.Add); + var observer2 = Observer.Create<string>(queue2.Add); remoteManager1.RegisterObserver(remoteEndpoint, observer1); remoteManager2.RegisterObserver(remoteEndpoint, observer2); // Remote manager 1 sends 3 events to remote manager 2 var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint); - remoteObserver1.OnNext(new WritableString("abc")); - remoteObserver1.OnNext(new WritableString("def")); - remoteObserver1.OnNext(new WritableString("ghi")); + remoteObserver1.OnNext("abc"); + remoteObserver1.OnNext("def"); + remoteObserver1.OnNext("ghi"); // Remote manager 2 sends 4 events to remote manager 1 var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint); - remoteObserver2.OnNext(new WritableString("jkl")); - remoteObserver2.OnNext(new WritableString("mno")); - remoteObserver2.OnNext(new WritableString("pqr")); - remoteObserver2.OnNext(new WritableString("stu")); - - events1.Add(queue1.Take().Data); - events1.Add(queue1.Take().Data); - events1.Add(queue1.Take().Data); - events1.Add(queue1.Take().Data); - - events2.Add(queue2.Take().Data); - events2.Add(queue2.Take().Data); - events2.Add(queue2.Take().Data); + remoteObserver2.OnNext("jkl"); + remoteObserver2.OnNext("mno"); + remoteObserver2.OnNext("pqr"); + remoteObserver2.OnNext("stu"); + + events1.Add(queue1.Take()); + events1.Add(queue1.Take()); + events1.Add(queue1.Take()); + events1.Add(queue1.Take()); + + events2.Add(queue2.Take()); + events2.Add(queue2.Take()); + events2.Add(queue2.Take()); } Assert.AreEqual(4, events1.Count); @@ -129,29 +130,30 @@ namespace Org.Apache.REEF.Wake.Tests { IPAddress listeningAddress = IPAddress.Parse("127.0.0.1"); - BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>(); + BlockingCollection<string> queue = new BlockingCollection<string>(); List<string> events = new List<string>(); + IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>(); - using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) - using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) - using (var remoteManager3 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) + using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec)) + using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec)) + using (var remoteManager3 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec)) { var remoteEndpoint = new IPEndPoint(listeningAddress, 0); - var observer = Observer.Create<WritableString>(queue.Add); + var observer = Observer.Create<string>(queue.Add); remoteManager3.RegisterObserver(remoteEndpoint, observer); var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint); var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint); - remoteObserver2.OnNext(new WritableString("abc")); - remoteObserver1.OnNext(new WritableString("def")); - remoteObserver2.OnNext(new WritableString("ghi")); - remoteObserver1.OnNext(new WritableString("jkl")); - remoteObserver2.OnNext(new WritableString("mno")); + remoteObserver2.OnNext("abc"); + remoteObserver1.OnNext("def"); + remoteObserver2.OnNext("ghi"); + remoteObserver1.OnNext("jkl"); + remoteObserver2.OnNext("mno"); for (int i = 0; i < 5; i++) { - events.Add(queue.Take().Data); + events.Add(queue.Take()); } } @@ -167,58 +169,60 @@ namespace Org.Apache.REEF.Wake.Tests { IPAddress listeningAddress = IPAddress.Parse("127.0.0.1"); - BlockingCollection<WritableString> queue1 = new BlockingCollection<WritableString>(); - BlockingCollection<WritableString> queue2 = new BlockingCollection<WritableString>(); - BlockingCollection<WritableString> queue3 = new BlockingCollection<WritableString>(); + BlockingCollection<string> queue1 = new BlockingCollection<string>(); + BlockingCollection<string> queue2 = new BlockingCollection<string>(); + BlockingCollection<string> queue3 = new BlockingCollection<string>(); List<string> events1 = new List<string>(); List<string> events2 = new List<string>(); List<string> events3 = new List<string>(); - using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) - using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) - using (var remoteManager3 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) + IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>(); + + using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec)) + using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec)) + using (var remoteManager3 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec)) { var remoteEndpoint = new IPEndPoint(listeningAddress, 0); - var observer = Observer.Create<WritableString>(queue1.Add); + var observer = Observer.Create<string>(queue1.Add); remoteManager1.RegisterObserver(remoteEndpoint, observer); - var observer2 = Observer.Create<WritableString>(queue2.Add); + var observer2 = Observer.Create<string>(queue2.Add); remoteManager2.RegisterObserver(remoteEndpoint, observer2); - var observer3 = Observer.Create<WritableString>(queue3.Add); + var observer3 = Observer.Create<string>(queue3.Add); remoteManager3.RegisterObserver(remoteEndpoint, observer3); var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint); var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint); // Observer 1 and 2 send messages to observer 3 - remoteObserver1.OnNext(new WritableString("abc")); - remoteObserver1.OnNext(new WritableString("abc")); - remoteObserver1.OnNext(new WritableString("abc")); - remoteObserver2.OnNext(new WritableString("def")); - remoteObserver2.OnNext(new WritableString("def")); + remoteObserver1.OnNext("abc"); + remoteObserver1.OnNext("abc"); + remoteObserver1.OnNext("abc"); + remoteObserver2.OnNext("def"); + remoteObserver2.OnNext("def"); // Observer 3 sends messages back to observers 1 and 2 var remoteObserver3A = remoteManager3.GetRemoteObserver(remoteManager1.LocalEndpoint); var remoteObserver3B = remoteManager3.GetRemoteObserver(remoteManager2.LocalEndpoint); - remoteObserver3A.OnNext(new WritableString("ghi")); - remoteObserver3A.OnNext(new WritableString("ghi")); - remoteObserver3B.OnNext(new WritableString("jkl")); - remoteObserver3B.OnNext(new WritableString("jkl")); - remoteObserver3B.OnNext(new WritableString("jkl")); + remoteObserver3A.OnNext("ghi"); + remoteObserver3A.OnNext("ghi"); + remoteObserver3B.OnNext("jkl"); + remoteObserver3B.OnNext("jkl"); + remoteObserver3B.OnNext("jkl"); - events1.Add(queue1.Take().Data); - events1.Add(queue1.Take().Data); + events1.Add(queue1.Take()); + events1.Add(queue1.Take()); - events2.Add(queue2.Take().Data); - events2.Add(queue2.Take().Data); - events2.Add(queue2.Take().Data); + events2.Add(queue2.Take()); + events2.Add(queue2.Take()); + events2.Add(queue2.Take()); - events3.Add(queue3.Take().Data); - events3.Add(queue3.Take().Data); - events3.Add(queue3.Take().Data); - events3.Add(queue3.Take().Data); - events3.Add(queue3.Take().Data); + events3.Add(queue3.Take()); + events3.Add(queue3.Take()); + events3.Add(queue3.Take()); + events3.Add(queue3.Take()); + events3.Add(queue3.Take()); } Assert.AreEqual(2, events1.Count); @@ -234,34 +238,36 @@ namespace Org.Apache.REEF.Wake.Tests { IPAddress listeningAddress = IPAddress.Parse("127.0.0.1"); - BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>(); + BlockingCollection<string> queue = new BlockingCollection<string>(); List<string> events = new List<string>(); - using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) - using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) + IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>(); + + using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec)) + using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec)) { // Register handler for when remote manager 2 receives events; respond // with an ack var remoteEndpoint = new IPEndPoint(listeningAddress, 0); var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint); - var receiverObserver = Observer.Create<WritableString>( - message => remoteObserver2.OnNext(new WritableString("received message: " + message.Data))); + var receiverObserver = Observer.Create<string>( + message => remoteObserver2.OnNext("received message: " + message)); remoteManager2.RegisterObserver(remoteEndpoint, receiverObserver); // Register handler for remote manager 1 to record the ack - var senderObserver = Observer.Create<WritableString>(queue.Add); + var senderObserver = Observer.Create<string>(queue.Add); remoteManager1.RegisterObserver(remoteEndpoint, senderObserver); // Begin to send messages var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint); - remoteObserver1.OnNext(new WritableString("hello")); - remoteObserver1.OnNext(new WritableString("there")); - remoteObserver1.OnNext(new WritableString("buddy")); + remoteObserver1.OnNext("hello"); + remoteObserver1.OnNext("there"); + remoteObserver1.OnNext("buddy"); - events.Add(queue.Take().Data); - events.Add(queue.Take().Data); - events.Add(queue.Take().Data); + events.Add(queue.Take()); + events.Add(queue.Take()); + events.Add(queue.Take()); } Assert.AreEqual(3, events.Count); @@ -278,25 +284,27 @@ namespace Org.Apache.REEF.Wake.Tests { IPAddress listeningAddress = IPAddress.Parse("127.0.0.1"); - BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>(); + BlockingCollection<string> queue = new BlockingCollection<string>(); List<string> events = new List<string>(); - using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) - using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) + IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>(); + + using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec)) + using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec)) { - // RemoteManager2 listens and records events of type IRemoteEvent<WritableString> - var observer = Observer.Create<IRemoteMessage<WritableString>>(message => queue.Add(message.Message)); + // RemoteManager2 listens and records events of type IRemoteEvent<string> + var observer = Observer.Create<IRemoteMessage<string>>(message => queue.Add(message.Message)); remoteManager2.RegisterObserver(observer); // Remote manager 1 sends 3 events to remote manager 2 var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint); - remoteObserver.OnNext(new WritableString("abc")); - remoteObserver.OnNext(new WritableString("def")); - remoteObserver.OnNext(new WritableString("ghi")); + remoteObserver.OnNext("abc"); + remoteObserver.OnNext("def"); + remoteObserver.OnNext("ghi"); - events.Add(queue.Take().Data); - events.Add(queue.Take().Data); - events.Add(queue.Take().Data); + events.Add(queue.Take()); + events.Add(queue.Take()); + events.Add(queue.Take()); } Assert.AreEqual(3, events.Count); @@ -310,28 +318,30 @@ namespace Org.Apache.REEF.Wake.Tests { IPAddress listeningAddress = IPAddress.Parse("127.0.0.1"); - BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>(); + BlockingCollection<string> queue = new BlockingCollection<string>(); List<string> events = new List<string>(); - using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) - using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) + IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>(); + + using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec)) + using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec)) { - var observer = Observer.Create<WritableString>(queue.Add); + var observer = Observer.Create<string>(queue.Add); IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0); remoteManager2.RegisterObserver(endpoint1, observer); var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint); - remoteObserver.OnNext(new WritableString("abc")); - remoteObserver.OnNext(new WritableString("def")); + remoteObserver.OnNext("abc"); + remoteObserver.OnNext("def"); var cachedObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint); - cachedObserver.OnNext(new WritableString("ghi")); - cachedObserver.OnNext(new WritableString("jkl")); + cachedObserver.OnNext("ghi"); + cachedObserver.OnNext("jkl"); - events.Add(queue.Take().Data); - events.Add(queue.Take().Data); - events.Add(queue.Take().Data); - events.Add(queue.Take().Data); + events.Add(queue.Take()); + events.Add(queue.Take()); + events.Add(queue.Take()); + events.Add(queue.Take()); } Assert.AreEqual(4, events.Count); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs deleted file mode 100644 index 30ff487..0000000 --- a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.IO; -using System.Threading; -using System.Threading.Tasks; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Wake.Remote; - -namespace Org.Apache.REEF.Wake.Tests -{ - /// <summary> - /// Writable wrapper around the string class - /// </summary> - [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] - public class WritableString : IWritable - { - /// <summary> - /// Returns the actual string data - /// </summary> - public string Data { get; set; } - - /// <summary> - /// Empty constructor for instantiation with reflection - /// </summary> - [Inject] - public WritableString() - { - } - - /// <summary> - /// Constructor - /// </summary> - /// <param name="data">The string data</param> - public WritableString(string data) - { - Data = data; - } - - /// <summary> - /// Reads the string - /// </summary> - /// <param name="reader">reader to read from</param> - public void Read(IDataReader reader) - { - Data = reader.ReadString(); - } - - /// <summary> - /// Writes the string - /// </summary> - /// <param name="writer">Writer to write</param> - public void Write(IDataWriter writer) - { - writer.WriteString(Data); - } - - /// <summary> - /// Reads the string - /// </summary> - /// <param name="reader">reader to read from</param> - /// <param name="token">the cancellation token</param> - public async Task ReadAsync(IDataReader reader, CancellationToken token) - { - Data = await reader.ReadStringAsync(token); - } - - /// <summary> - /// Writes the string - /// </summary> - /// <param name="writer">Writer to write</param> - /// <param name="token">the cancellation token</param> - public async Task WriteAsync(IDataWriter writer, CancellationToken token) - { - await writer.WriteStringAsync(Data, token); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj index 4069d15..5767f8a 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj +++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj @@ -49,7 +49,6 @@ under the License. <Compile Include="IIdentifier.cs" /> <Compile Include="IIdentifierFactory.cs" /> <Compile Include="Remote\Impl\RemoteEventStreamingCodec.cs" /> - <Compile Include="Remote\Impl\TemporaryWritableToStreamingCodec.cs" /> <Compile Include="Remote\Impl\StreamingRemoteManagerFactory.cs" /> <Compile Include="Remote\Impl\DefaultRemoteManagerFactory.cs" /> <Compile Include="Impl\LoggingEventHandler.cs" /> @@ -78,7 +77,6 @@ under the License. <Compile Include="Remote\Impl\StreamingTransportClient.cs" /> <Compile Include="Remote\Impl\StreamingTransportServer.cs" /> <Compile Include="Remote\IRemoteManagerFactory.cs" /> - <Compile Include="Remote\IWritable.cs" /> <Compile Include="Remote\Proto\WakeRemoteProtosGen.cs" /> <Compile Include="Remote\ICodec.cs" /> <Compile Include="Remote\ICodecFactory.cs" /> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake/Remote/IWritable.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IWritable.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IWritable.cs deleted file mode 100644 index 644cf82..0000000 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/IWritable.cs +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.IO; -using System.Threading; -using System.Threading.Tasks; - -namespace Org.Apache.REEF.Wake.Remote -{ - /// <summary> - /// Interface that classes should implement if they need to be readable to and writable - /// from the stream. It is assumed that the classes inheriting this interface will have a - /// default empty constructor - /// </summary> - [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] - public interface IWritable - { - /// <summary> - /// Read the class fields. - /// </summary> - /// <param name="reader">The reader from which to read </param> - void Read(IDataReader reader); - - /// <summary> - /// Writes the class fields. - /// </summary> - /// <param name="writer">The writer to which to write</param> - void Write(IDataWriter writer); - - /// <summary> - /// Read the class fields. - /// </summary> - /// <param name="reader">The reader from which to read </param> - /// <param name="token">The cancellation token</param> - Task ReadAsync(IDataReader reader, CancellationToken token); - - /// <summary> - /// Writes the class fields. - /// </summary> - /// <param name="writer">The writer to which to write</param> - /// <param name="token">The cancellation token</param> - Task WriteAsync(IDataWriter writer, CancellationToken token); - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs index 90e3aca..da549ea 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs @@ -20,6 +20,7 @@ using System.Net; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Wake.StreamingCodec; namespace Org.Apache.REEF.Wake.Remote.Impl { @@ -38,12 +39,10 @@ namespace Org.Apache.REEF.Wake.Remote.Impl _injector = injector; } - //ToDo: The port argument will be removed once changes are made in WritableNetworkService [REEF-447] - public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int port) where T : IWritable + public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, IStreamingCodec<T> codec) { #pragma warning disable 618 // This is the one place allowed to call this constructor. Hence, disabling the warning is OK. - var codec = _injector.GetInstance<TemporaryWritableToStreamingCodec<T>>(); return new StreamingRemoteManager<T>(localAddress, _tcpPortProvider, codec); #pragma warning disable 618 }
