http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/Channel.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/Channel.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/Channel.cs deleted file mode 100644 index 184da8a..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/Impl/Channel.cs +++ /dev/null @@ -1,258 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.IO; -using System.Linq; -using System.Net.Sockets; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace Org.Apache.Reef.Wake.Remote.Impl -{ - /// <summary> - /// Performs low level network IO operations between hosts - /// </summary> - public class Channel - { - private NetworkStream _stream; - - /// <summary> - /// Constructs a new Channel with the the connected NetworkStream. - /// </summary> - /// <param name="stream">The connected stream</param> - public Channel(NetworkStream stream) - { - if (stream == null) - { - throw new ArgumentNullException("stream"); - } - - _stream = stream; - } - - /// <summary> - /// Sends a message to the connected client synchronously - /// </summary> - /// <param name="message">The message to send</param> - public void Write(byte[] message) - { - if (message == null) - { - throw new ArgumentNullException("message"); - } - - byte[] messageBuffer = GenerateMessageBuffer(message); - _stream.Write(messageBuffer, 0, messageBuffer.Length); - } - - /// <summary> - /// Sends a message to the connected client asynchronously - /// </summary> - /// <param name="message">The message to send</param> - /// <param name="token">The cancellation token</param> - /// <returns>The awaitable write task</returns> - public async Task WriteAsync(byte[] message, CancellationToken token) - { - byte[] messageBuffer = GenerateMessageBuffer(message); - await _stream.WriteAsync(messageBuffer, 0, messageBuffer.Length, token); - } - - /// <summary> - /// Reads an incoming message as a byte array synchronously. - /// The message length is read as the first four bytes. - /// </summary> - /// <returns>The byte array message</returns> - public byte[] Read() - { - int payloadLength = ReadMessageLength(); - if (payloadLength == 0) - { - return null; - } - - return ReadBytes(payloadLength); - } - - /// <summary> - /// Reads an incoming message as a byte array asynchronously. - /// The message length is read as the first four bytes. - /// </summary> - /// <param name="token">The cancellation token</param> - /// <returns>The byte array message</returns> - public async Task<byte[]> ReadAsync(CancellationToken token) - { - int payloadLength = await GetMessageLengthAsync(token); - if (payloadLength == 0) - { - return null; - } - - return await ReadBytesAsync(payloadLength, token); - } - - /// <summary> - /// Helper method to read the specified number of bytes from the network stream. - /// </summary> - /// <param name="bytesToRead">The number of bytes to read</param> - /// <returns>The byte[] read from the network stream with the requested - /// number of bytes, otherwise null if the operation failed. - /// </returns> - private byte[] ReadBytes(int bytesToRead) - { - int totalBytesRead = 0; - byte[] buffer = new byte[bytesToRead]; - - while (totalBytesRead < bytesToRead) - { - int bytesRead = _stream.Read(buffer, totalBytesRead, bytesToRead - totalBytesRead); - if (bytesRead == 0) - { - // Read timed out or connection was closed - return null; - } - - totalBytesRead += bytesRead; - } - - return buffer; - } - - /// <summary> - /// Helper method to read the specified number of bytes from the network stream. - /// </summary> - /// <param name="bytesToRead">The number of bytes to read</param> - /// <param name="token">The cancellation token</param> - /// <returns>The byte[] read from the network stream with the requested - /// number of bytes, otherwise null if the operation failed. - /// </returns> - private async Task<byte[]> ReadBytesAsync(int bytesToRead, CancellationToken token) - { - int bytesRead = 0; - byte[] buffer = new byte[bytesToRead]; - - while (bytesRead < bytesToRead) - { - int amountRead = await _stream.ReadAsync(buffer, bytesRead, bytesToRead - bytesRead, token); - if (amountRead == 0) - { - // Read timed out or connection was closed - return null; - } - - bytesRead += amountRead; - } - - return buffer; - } - - /// <summary> - /// Generates the payload buffer containing the message along - /// with a header indicating the message length. - /// </summary> - /// <param name="message">The message to send</param> - /// <returns>The payload buffer</returns> - private byte[] GenerateMessageBuffer(byte[] message) - { - byte[] lengthBuffer1 = BitConverter.GetBytes(message.Length + 4); - byte[] lengthBuffer2 = BitConverter.GetBytes(message.Length); - if (BitConverter.IsLittleEndian) - { - Array.Reverse(lengthBuffer1); - } - - int len = lengthBuffer1.Length + lengthBuffer2.Length + message.Length; - byte[] messageBuffer = new byte[len]; - - int bytesCopied = 0; - bytesCopied += CopyBytes(lengthBuffer1, messageBuffer, 0); - bytesCopied += CopyBytes(lengthBuffer2, messageBuffer, bytesCopied); - CopyBytes(message, messageBuffer, bytesCopied); - - return messageBuffer; - } - - /// <summary> - /// Reads the first four bytes from the stream and decode - /// it to get the message length in bytes - /// </summary> - /// <returns>The incoming message's length in bytes</returns> - private int ReadMessageLength() - { - byte[] lenBytes = ReadBytes(sizeof(int)); - if (lenBytes == null) - { - return 0; - } - if (BitConverter.IsLittleEndian) - { - Array.Reverse(lenBytes); - } - if (BitConverter.ToInt32(lenBytes, 0) == 0) - { - return 0; - } - - byte[] msgLength = ReadBytes(sizeof(int)); - return (msgLength == null) ? 0 : BitConverter.ToInt32(msgLength, 0); - } - - /// <summary> - /// Reads the first four bytes from the stream and decode - /// it to get the message length in bytes - /// </summary> - /// <param name="token">The cancellation token</param> - /// <returns>The incoming message's length in bytes</returns> - private async Task<int> GetMessageLengthAsync(CancellationToken token) - { - byte[] lenBytes = await ReadBytesAsync(sizeof(int), token); - if (lenBytes == null) - { - return 0; - } - if (BitConverter.IsLittleEndian) - { - Array.Reverse(lenBytes); - } - if (BitConverter.ToInt32(lenBytes, 0) == 0) - { - return 0; - } - - byte[] msgLength = ReadBytes(sizeof(int)); - return (msgLength == null) ? 0 : BitConverter.ToInt32(msgLength, 0); - } - - /// <summary> - /// Copies the entire source buffer into the destination buffer the specified - /// destination offset. - /// </summary> - /// <param name="source">The source buffer to be copied</param> - /// <param name="dest">The destination buffer to copy to</param> - /// <param name="destOffset">The offset at the destination buffer to begin - /// copying.</param> - /// <returns>The number of bytes copied</returns> - private int CopyBytes(byte[] source, byte[] dest, int destOffset) - { - Buffer.BlockCopy(source, 0, dest, destOffset, source.Length); - return source.Length; - } - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteManager.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteManager.cs deleted file mode 100644 index 2bba3c8..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteManager.cs +++ /dev/null @@ -1,338 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.Reef.Utilities.Diagnostics; -using Org.Apache.Reef.Utilities.Logging; -using Org.Apache.Reef.Wake.Util; -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Diagnostics; -using System.Linq; -using System.Net; -using System.Net.Sockets; -using System.Reactive; -using System.Threading.Tasks; - -namespace Org.Apache.Reef.Wake.Remote.Impl -{ - /// <summary> - /// Manages incoming and outgoing messages between remote hosts. - /// </summary> - public class DefaultRemoteManager<T> : IRemoteManager<T> - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(DefaultRemoteManager<T>)); - - private ObserverContainer<T> _observerContainer; - private TransportServer<IRemoteEvent<T>> _server; - private Dictionary<IPEndPoint, ProxyObserver> _cachedClients; - private ICodec<IRemoteEvent<T>> _codec; - - /// <summary> - /// Constructs a DefaultRemoteManager listening on the specified address and any - /// available port. - /// </summary> - /// <param name="localAddress">The address to listen on</param> - /// <param name="codec">The codec used for serializing messages</param> - public DefaultRemoteManager(IPAddress localAddress, ICodec<T> codec) : this(localAddress, 0, codec) - { - } - - /// <summary> - /// Constructs a DefaultRemoteManager listening on the specified IPEndPoint. - /// </summary> - /// <param name="localEndpoint">The endpoint to listen on</param> - /// <param name="codec">The codec used for serializing messages</param> - public DefaultRemoteManager(IPEndPoint localEndpoint, ICodec<T> codec) - { - if (localEndpoint == null) - { - throw new ArgumentNullException("localEndpoint"); - } - if (localEndpoint.Port < 0) - { - throw new ArgumentException("Listening port must be greater than or equal to zero"); - } - if (codec == null) - { - throw new ArgumentNullException("codec"); - } - - _codec = new RemoteEventCodec<T>(codec); - _observerContainer = new ObserverContainer<T>(); - _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>(); - - // Begin to listen for incoming messages - _server = new TransportServer<IRemoteEvent<T>>(localEndpoint, _observerContainer, _codec); - _server.Run(); - - LocalEndpoint = _server.LocalEndpoint; - Identifier = new SocketRemoteIdentifier(LocalEndpoint); - } - - /// <summary> - /// Constructs a DefaultRemoteManager listening on the specified address and any - /// available port. - /// </summary> - /// <param name="localAddress">The address to listen on</param> - /// <param name="port">The port to listen on</param> - /// <param name="codec">The codec used for serializing messages</param> - public DefaultRemoteManager(IPAddress localAddress, int port, ICodec<T> codec) - { - if (localAddress == null) - { - throw new ArgumentNullException("localAddress"); - } - if (port < 0) - { - throw new ArgumentException("Listening port must be greater than or equal to zero"); - } - if (codec == null) - { - throw new ArgumentNullException("codec"); - } - - _observerContainer = new ObserverContainer<T>(); - _codec = new RemoteEventCodec<T>(codec); - _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>(); - - IPEndPoint localEndpoint = new IPEndPoint(localAddress, port); - - // Begin to listen for incoming messages - _server = new TransportServer<IRemoteEvent<T>>(localEndpoint, _observerContainer, _codec); - _server.Run(); - - LocalEndpoint = _server.LocalEndpoint; - Identifier = new SocketRemoteIdentifier(LocalEndpoint); - } - - /// <summary> - /// Constructs a DefaultRemoteManager. Does not listen for incoming messages. - /// </summary> - /// <param name="codec">The codec used for serializing messages</param> - public DefaultRemoteManager(ICodec<T> codec) - { - using (LOGGER.LogFunction("DefaultRemoteManager::DefaultRemoteManager")) - { - if (codec == null) - { - throw new ArgumentNullException("codec"); - } - - _observerContainer = new ObserverContainer<T>(); - _codec = new RemoteEventCodec<T>(codec); - _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>(); - - LocalEndpoint = new IPEndPoint(NetworkUtils.LocalIPAddress, 0); - Identifier = new SocketRemoteIdentifier(LocalEndpoint); - } - } - - /// <summary> - /// Gets the RemoteIdentifier for the DefaultRemoteManager - /// </summary> - public IRemoteIdentifier Identifier { get; private set; } - - /// <summary> - /// Gets the local IPEndPoint for the DefaultRemoteManager - /// </summary> - public IPEndPoint LocalEndpoint { get; private set; } - - /// <summary> - /// Returns an IObserver used to send messages to the remote host at - /// the specified IPEndpoint. - /// </summary> - /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param> - /// <returns>An IObserver used to send messages to the remote host</returns> - public IObserver<T> GetRemoteObserver(RemoteEventEndPoint<T> remoteEndpoint) - { - if (remoteEndpoint == null) - { - throw new ArgumentNullException("remoteEndpoint"); - } - - SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier; - if (id == null) - { - throw new ArgumentException("ID not supported"); - } - - return GetRemoteObserver(id.Addr); - } - - /// <summary> - /// Returns an IObserver used to send messages to the remote host at - /// the specified IPEndpoint. - /// </summary> - /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param> - /// <returns>An IObserver used to send messages to the remote host</returns> - public IObserver<T> GetRemoteObserver(IPEndPoint remoteEndpoint) - { - if (remoteEndpoint == null) - { - throw new ArgumentNullException("remoteEndpoint"); - } - - ProxyObserver remoteObserver; - if (!_cachedClients.TryGetValue(remoteEndpoint, out remoteObserver)) - { - TransportClient<IRemoteEvent<T>> client = - new TransportClient<IRemoteEvent<T>>(remoteEndpoint, _codec, _observerContainer); - - remoteObserver = new ProxyObserver(client); - _cachedClients[remoteEndpoint] = remoteObserver; - } - - return remoteObserver; - } - - /// <summary> - /// Registers an IObserver used to handle incoming messages from the remote host - /// at the specified IPEndPoint. - /// The IDisposable that is returned can be used to unregister the IObserver. - /// </summary> - /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param> - /// <param name="observer">The IObserver to handle incoming messages</param> - /// <returns>An IDisposable used to unregister the observer with</returns> - public IDisposable RegisterObserver(RemoteEventEndPoint<T> remoteEndpoint, IObserver<T> observer) - { - if (remoteEndpoint == null) - { - throw new ArgumentNullException("remoteEndpoint"); - } - - SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier; - if (id == null) - { - throw new ArgumentException("ID not supported"); - } - - return RegisterObserver(id.Addr, observer); - } - - /// <summary> - /// Registers an IObserver used to handle incoming messages from the remote host - /// at the specified IPEndPoint. - /// The IDisposable that is returned can be used to unregister the IObserver. - /// </summary> - /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param> - /// <param name="observer">The IObserver to handle incoming messages</param> - /// <returns>An IDisposable used to unregister the observer with</returns> - public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> observer) - { - if (remoteEndpoint == null) - { - throw new ArgumentNullException("remoteEndpoint"); - } - if (observer == null) - { - throw new ArgumentNullException("observer"); - } - - return _observerContainer.RegisterObserver(remoteEndpoint, observer); - } - - /// <summary> - /// Registers an IObserver used to handle incoming messages from the remote host - /// at the specified IPEndPoint. - /// The IDisposable that is returned can be used to unregister the IObserver. - /// </summary> - /// <param name="observer">The IObserver to handle incoming messages</param> - /// <returns>An IDisposable used to unregister the observer with</returns> - public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer) - { - if (observer == null) - { - throw new ArgumentNullException("observer"); - } - - return _observerContainer.RegisterObserver(observer); - } - - /// <summary> - /// Release all resources for the DefaultRemoteManager. - /// </summary> - public void Dispose() - { - foreach (ProxyObserver cachedClient in _cachedClients.Values) - { - cachedClient.Dispose(); - } - - if (_server != null) - { - _server.Dispose(); - } - } - - /// <summary> - /// Observer to send messages to connected remote host - /// </summary> - private class ProxyObserver : IObserver<T>, IDisposable - { - private TransportClient<IRemoteEvent<T>> _client; - private int _messageCount; - - /// <summary> - /// Create new ProxyObserver - /// </summary> - /// <param name="client">The connected transport client used to send - /// messages to remote host</param> - public ProxyObserver(TransportClient<IRemoteEvent<T>> client) - { - _client = client; - _messageCount = 0; - } - - /// <summary> - /// Send the message to the remote host - /// </summary> - /// <param name="message">The message to send</param> - public void OnNext(T message) - { - IRemoteEvent<T> remoteEvent = new RemoteEvent<T>(_client.Link.LocalEndpoint, _client.Link.RemoteEndpoint, message) - { - Sink = "default", - Sequence = _messageCount - }; - - _messageCount++; - _client.Send(remoteEvent); - } - - /// <summary> - /// Close underlying transport client - /// </summary> - public void Dispose() - { - _client.Dispose(); - } - - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - - public void OnCompleted() - { - throw new NotImplementedException(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteMessage.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteMessage.cs deleted file mode 100644 index 5b24276..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteMessage.cs +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -namespace Org.Apache.Reef.Wake.Remote.Impl -{ - class DefaultRemoteMessage<T> : IRemoteMessage<T> - { - public DefaultRemoteMessage(IRemoteIdentifier id, T message) - { - Identifier = id; - Message = message; - } - - public IRemoteIdentifier Identifier { get; private set; } - - public T Message { get; private set; } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/IPEndpointComparer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/IPEndpointComparer.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/IPEndpointComparer.cs deleted file mode 100644 index 8d4b47d..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/Impl/IPEndpointComparer.cs +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Net; -using System.Text; -using System.Threading.Tasks; - -namespace Org.Apache.Reef.Wake.Remote.Impl -{ - /// <summary> - /// Class to compare two IPEndPoint objects. - /// </summary> - internal class IPEndPointComparer : IEqualityComparer<IPEndPoint> - { - public bool Equals(IPEndPoint x, IPEndPoint y) - { - if (ReferenceEquals(x, y)) - { - return true; - } - if (x == null || y == null) - { - return false; - } - - // If either port is 0, don't check port - if (x.Port == 0 || y.Port == 0) - { - return x.Address.Equals(y.Address); - } - - return x.Equals(y); - } - - public int GetHashCode(IPEndPoint obj) - { - return obj.Address.GetHashCode(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/IntCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/IntCodec.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/IntCodec.cs deleted file mode 100644 index e413023..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/Impl/IntCodec.cs +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.Reef.Tang.Annotations; -using System; - -namespace Org.Apache.Reef.Wake.Remote.Impl -{ - public class IntCodec : ICodec<int> - { - [Inject] - public IntCodec() - { - } - - public byte[] Encode(int obj) - { - return BitConverter.GetBytes(obj); - } - - public int Decode(byte[] data) - { - return BitConverter.ToInt32(data, 0); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/Link.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/Link.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/Link.cs deleted file mode 100644 index d5b987a..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/Impl/Link.cs +++ /dev/null @@ -1,256 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Net; -using System.Net.Sockets; -using System.Text; -using System.Threading; -using System.Threading.Tasks; -using Org.Apache.Reef.Utilities.Diagnostics; -using Org.Apache.Reef.Utilities.Logging; -using Org.Apache.Reef.Tang.Exceptions; -using Org.Apache.Reef.Wake.Util; - -namespace Org.Apache.Reef.Wake.Remote.Impl -{ - /// <summary> - /// Represents an open connection between remote hosts - /// </summary> - public class Link<T> : ILink<T> - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(Link<T>)); - - private IPEndPoint _localEndpoint; - private ICodec<T> _codec; - private Channel _channel; - private bool _disposed; - - /// <summary> - /// Constructs a Link object. - /// Connects to the specified remote endpoint. - /// </summary> - /// <param name="remoteEndpoint">The remote endpoint to connect to</param> - /// <param name="codec">The codec for serializing messages</param> - public Link(IPEndPoint remoteEndpoint, ICodec<T> codec) - { - if (remoteEndpoint == null) - { - throw new ArgumentNullException("remoteEndpoint"); - } - if (codec == null) - { - throw new ArgumentNullException("codec"); - } - - Client = new TcpClient(); - Client.Connect(remoteEndpoint); - - _codec = codec; - _channel = new Channel(Client.GetStream()); - _localEndpoint = GetLocalEndpoint(); - _disposed = false; - } - - /// <summary> - /// Constructs a Link object. - /// Uses the already connected TcpClient. - /// </summary> - /// <param name="client">The already connected client</param> - /// <param name="codec">The encoder and decoder</param> - public Link(TcpClient client, ICodec<T> codec) - { - if (client == null) - { - throw new ArgumentNullException("client"); - } - if (codec == null) - { - throw new ArgumentNullException("codec"); - } - - Client = client; - _codec = codec; - _channel = new Channel(Client.GetStream()); - _localEndpoint = GetLocalEndpoint(); - _disposed = false; - } - - /// <summary> - /// Returns the local socket address - /// </summary> - public IPEndPoint LocalEndpoint - { - get { return _localEndpoint; } - } - - /// <summary> - /// Returns the remote socket address - /// </summary> - public IPEndPoint RemoteEndpoint - { - get { return (IPEndPoint) Client.Client.RemoteEndPoint; } - } - - /// <summary> - /// Gets the underlying TcpClient - /// </summary> - public TcpClient Client { get; private set; } - - /// <summary> - /// Writes the message to the remote host - /// </summary> - /// <param name="value">The data to write</param> - public void Write(T value) - { - if (value == null) - { - throw new ArgumentNullException("value"); - } - if (_disposed) - { - Exceptions.Throw(new IllegalStateException("Link has been closed."), LOGGER); - } - - byte[] message = _codec.Encode(value); - _channel.Write(message); - } - - /// <summary> - /// Writes the value to this link asynchronously - /// </summary> - /// <param name="value">The data to write</param> - /// <param name="token">The cancellation token</param> - public async Task WriteAsync(T value, CancellationToken token) - { - if (_disposed) - { - Exceptions.Throw(new IllegalStateException("Link has been closed."), LOGGER); - } - - byte[] message = _codec.Encode(value); - await _channel.WriteAsync(message, token); - } - - /// <summary> - /// Reads the value from the link synchronously - /// </summary> - public T Read() - { - if (_disposed) - { - Exceptions.Throw(new IllegalStateException("Link has been disposed."), LOGGER); - } - - byte[] message = _channel.Read(); - return (message == null) ? default(T) : _codec.Decode(message); - } - - /// <summary> - /// Reads the value from the link asynchronously - /// </summary> - /// <param name="token">The cancellation token</param> - public async Task<T> ReadAsync(CancellationToken token) - { - if (_disposed) - { - Exceptions.Throw(new IllegalStateException("Link has been disposed."), LOGGER); - } - - byte[] message = await _channel.ReadAsync(token); - return (message == null) ? default(T) : _codec.Decode(message); - } - - /// <summary> - /// Close the client connection - /// </summary> - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - /// <summary> - /// Subclasses of Links should overwrite this to handle disposing - /// of the link - /// </summary> - /// <param name="disposing">To dispose or not</param> - public virtual void Dispose(bool disposing) - { - if (_disposed) - { - return; - } - - if (disposing) - { - try - { - Client.GetStream().Close(); - } - catch (InvalidOperationException) - { - LOGGER.Log(Level.Warning, "failed to close stream on a non-connected socket."); - } - - Client.Close(); - } - _disposed = true; - } - - /// <summary> - /// Overrides Equals. Two Link objects are equal if they are connected - /// to the same remote endpoint. - /// </summary> - /// <param name="obj">The object to compare</param> - /// <returns>True if the object is equal to this Link, otherwise false</returns> - public override bool Equals(object obj) - { - Link<T> other = obj as Link<T>; - if (other == null) - { - return false; - } - - return other.RemoteEndpoint.Equals(RemoteEndpoint); - } - - /// <summary> - /// Gets the hash code for the Link object. - /// </summary> - /// <returns>The object's hash code</returns> - public override int GetHashCode() - { - return RemoteEndpoint.GetHashCode(); - } - - /// <summary> - /// Discovers the IPEndpoint for the current machine. - /// </summary> - /// <returns>The local IPEndpoint</returns> - private IPEndPoint GetLocalEndpoint() - { - IPAddress address = NetworkUtils.LocalIPAddress; - int port = ((IPEndPoint) Client.Client.LocalEndPoint).Port; - return new IPEndPoint(address, port); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiCodec.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiCodec.cs deleted file mode 100644 index 2791eb3..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiCodec.cs +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using Org.Apache.Reef.Wake.Remote; -using Org.Apache.Reef.Wake.Remote.Impl; - -namespace Org.Apache.Reef.Wake.Remote.Impl -{ - /// <summary> - /// Codec that can encode and decode a class depending on the class type. - /// </summary> - public class MultiCodec<T> : ICodec<T> - { - private readonly MultiEncoder<T> _encoder; - - private readonly MultiDecoder<T> _decoder; - - /// <summary> - /// Constructs a new MultiCodec object. - /// </summary> - public MultiCodec() - { - _encoder = new MultiEncoder<T>(); - _decoder = new MultiDecoder<T>(); - } - - /// <summary> - /// Register a codec to be used when encoding/decoding objects of this type. - /// </summary> - /// <typeparam name="U">The type of codec</typeparam> - /// <param name="codec">The codec to use when encoding/decoding - /// objects of this type</param> - public void Register<U>(ICodec<U> codec) where U : T - { - _encoder.Register(codec); - _decoder.Register(codec); - } - - /// <summary> - /// Register a codec to be used when encoding/decoding objects of this type. - /// </summary> - /// <typeparam name="U">The type of codec</typeparam> - /// <param name="codec">The codec to use when encoding/decoding - /// objects of this type</param> - /// <param name="name">The name of the class to encode/decode</param> - public void Register<U>(ICodec<U> codec, string name) where U : T - { - _encoder.Register(codec, name); - _decoder.Register(codec, name); - } - - /// <summary> - /// Encodes an object with the appropriate encoding or null if it cannot - /// be encoded. - /// </summary> - /// <param name="obj">Data to encode</param> - public byte[] Encode(T obj) - { - return _encoder.Encode(obj); - } - - /// <summary> - /// Decodes byte array into the appripriate object type. - /// </summary> - /// <param name="data">Data to be decoded</param> - public T Decode(byte[] data) - { - return _decoder.Decode(data); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiDecoder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiDecoder.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiDecoder.cs deleted file mode 100644 index 789e226..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiDecoder.cs +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Reflection; -using Org.Apache.Reef.Utilities.Diagnostics; -using Org.Apache.Reef.Utilities.Logging; -using Org.Apache.Reef.Wake.Remote; -using Org.Apache.Reef.Wake.Remote.Proto.WakeRemoteProtos; - -namespace Org.Apache.Reef.Wake.Remote.Impl -{ - /// <summary> - /// Decoder using the WakeTuple protocol buffer - /// (class name and bytes) - /// </summary> - public class MultiDecoder<T> : IDecoder<T> - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(MultiDecoder<T>)); - private Dictionary<Type, object> _decoderMap; - private Dictionary<string, Type> _nameMap; - - /// <summary> - /// Constructs a decoder that decodes bytes based on the class type - /// </summary> - public MultiDecoder() - { - _decoderMap = new Dictionary<Type, object>(); - _nameMap = new Dictionary<string, Type>(); - } - - /// <summary> - /// Register the decoder for objects of type U - /// </summary> - /// <typeparam name="U">The type of decoder to use when decoding - /// objects of this type</typeparam> - /// <param name="decoder">The decoder to use when decoding - /// objects of this type</param> - public void Register<U>(IDecoder<U> decoder) where U : T - { - Type type = typeof(U); - _decoderMap[type] = decoder; - _nameMap[type.ToString()] = type; - } - - /// <summary> - /// Register the decoder for objects of type U - /// </summary> - /// <typeparam name="U">The type of decoder to use when decoding - /// objects of this type</typeparam> - /// <param name="decoder">The decoder to use when decoding - /// objects of this type</param> - /// <param name="name">The name of the class to decode</param> - public void Register<U>(IDecoder<U> decoder, string name) where U : T - { - Type type = typeof(U); - _decoderMap[type] = decoder; - _nameMap[name] = type; - } - - /// <summary> - /// Decodes byte array according to the underlying object type. - /// </summary> - /// <param name="data">The data to decode</param> - public T Decode(byte[] data) - { - WakeTuplePBuf pbuf = WakeTuplePBuf.Deserialize(data); - if (pbuf == null) - { - return default(T); - } - - // Get object's class Type - Type type; - if (!_nameMap.TryGetValue(pbuf.className, out type)) - { - return default(T); - } - - // Get decoder for that type - object decoder; - if (!_decoderMap.TryGetValue(type, out decoder)) - { - Exceptions.Throw(new RemoteRuntimeException("Decoder for " + type + " not known."), LOGGER); - } - - // Invoke the decoder to decode the byte array - Type handlerType = typeof(IDecoder<>).MakeGenericType(new[] { type }); - MethodInfo info = handlerType.GetMethod("Decode"); - return (T) info.Invoke(decoder, new[] { (object) pbuf.data }); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiEncoder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiEncoder.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiEncoder.cs deleted file mode 100644 index cccf52f..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiEncoder.cs +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Reflection; -using Org.Apache.Reef.Utilities.Logging; -using Org.Apache.Reef.Wake.Remote.Proto.WakeRemoteProtos; - -namespace Org.Apache.Reef.Wake.Remote.Impl -{ - /// <summary> - /// Encoder using the WakeTuple protocol buffer - /// (class name and bytes) - /// </summary> - public class MultiEncoder<T> : IEncoder<T> - { - private static Logger _logger = Logger.GetLogger(typeof(MultiEncoder<>)); - private Dictionary<Type, object> _encoderMap; - private Dictionary<Type, string> _nameMap; - - /// <summary> - /// Constructs an encoder that encodes an object to bytes based on the class name - /// </summary> - public MultiEncoder() - { - _encoderMap = new Dictionary<Type, object>(); - _nameMap = new Dictionary<Type, string>(); - } - - public void Register<U>(IEncoder<U> encoder) where U : T - { - _encoderMap[typeof(U)] = encoder; - _nameMap[typeof(U)] = typeof(U).ToString(); - } - - public void Register<U>(IEncoder<U> encoder, string name) where U : T - { - _encoderMap[typeof(U)] = encoder; - _nameMap[typeof(U)] = name; - _logger.Log(Level.Verbose, "Registering name for " + name); - } - - /// <summary>Encodes an object to a byte array</summary> - /// <param name="obj"></param> - public byte[] Encode(T obj) - { - // Find encoder for object type - object encoder; - if (!_encoderMap.TryGetValue(obj.GetType(), out encoder)) - { - return null; - } - - // Invoke encoder for this type - Type handlerType = typeof(IEncoder<>).MakeGenericType(new[] { obj.GetType() }); - MethodInfo info = handlerType.GetMethod("Encode"); - byte[] data = (byte[]) info.Invoke(encoder, new[] { (object) obj }); - - // Serialize object type and object data into well known tuple - // To decode, deserialize the tuple, get object type, and look up the - // decoder for that type - string name = _nameMap[obj.GetType()]; - _logger.Log(Level.Verbose, "Encoding name for " + name); - WakeTuplePBuf pbuf = new WakeTuplePBuf { className = name, data = data }; - pbuf.className = name; - pbuf.data = data; - return pbuf.Serialize(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/ObserverContainer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/ObserverContainer.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/ObserverContainer.cs deleted file mode 100644 index 577cd95..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/Impl/ObserverContainer.cs +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; -using System.Net; -using System.Text; -using System.Threading.Tasks; -using Org.Apache.Reef.Utilities.Diagnostics; -using Org.Apache.Reef.Utilities.Logging; -using Org.Apache.Reef.Wake.RX.Impl; -using Org.Apache.Reef.Wake.Util; - -namespace Org.Apache.Reef.Wake.Remote.Impl -{ - /// <summary> - /// Stores registered IObservers for DefaultRemoteManager. - /// Can register and look up IObservers by remote IPEndPoint. - /// </summary> - internal class ObserverContainer<T> : IObserver<TransportEvent<IRemoteEvent<T>>> - { - private ConcurrentDictionary<IPEndPoint, IObserver<T>> _endpointMap; - private ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>> _typeMap; - private IObserver<T> _universalObserver; - - /// <summary> - /// Constructs a new ObserverContainer used to manage remote IObservers. - /// </summary> - public ObserverContainer() - { - _endpointMap = new ConcurrentDictionary<IPEndPoint, IObserver<T>>(new IPEndPointComparer()); - _typeMap = new ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>>(); - } - - /// <summary> - /// Registers an IObserver used to handle incoming messages from the remote host - /// at the specified IPEndPoint. - /// </summary> - /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param> - /// <param name="observer">The IObserver to handle incoming messages</param> - /// <returns>An IDisposable used to unregister the observer with</returns> - public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> observer) - { - if (remoteEndpoint.Address.Equals(IPAddress.Any)) - { - _universalObserver = observer; - return Disposable.Create(() => { _universalObserver = null; }); - } - - _endpointMap[remoteEndpoint] = observer; - return Disposable.Create(() => _endpointMap.TryRemove(remoteEndpoint, out observer)); - } - - /// <summary> - /// Registers an IObserver to handle incoming messages from a remote host - /// </summary> - /// <param name="observer">The IObserver to handle incoming messages</param> - /// <returns>An IDisposable used to unregister the observer with</returns> - public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer) - { - _typeMap[typeof(T)] = observer; - return Disposable.Create(() => _typeMap.TryRemove(typeof(T), out observer)); - } - - /// <summary> - /// Look up the IObserver for the registered IPEndPoint or event type - /// and execute the IObserver. - /// </summary> - /// <param name="transportEvent">The incoming remote event</param> - public void OnNext(TransportEvent<IRemoteEvent<T>> transportEvent) - { - IRemoteEvent<T> remoteEvent = transportEvent.Data; - remoteEvent.LocalEndPoint = transportEvent.Link.LocalEndpoint; - remoteEvent.RemoteEndPoint = transportEvent.Link.RemoteEndpoint; - T value = remoteEvent.Value; - bool handled = false; - - IObserver<T> observer1; - IObserver<IRemoteMessage<T>> observer2; - if (_universalObserver != null) - { - _universalObserver.OnNext(value); - handled = true; - } - if (_endpointMap.TryGetValue(remoteEvent.RemoteEndPoint, out observer1)) - { - // IObserver was registered by IPEndpoint - observer1.OnNext(value); - handled = true; - } - else if (_typeMap.TryGetValue(value.GetType(), out observer2)) - { - // IObserver was registered by event type - IRemoteIdentifier id = new SocketRemoteIdentifier(remoteEvent.RemoteEndPoint); - IRemoteMessage<T> remoteMessage = new DefaultRemoteMessage<T>(id, value); - observer2.OnNext(remoteMessage); - handled = true; - } - - if (!handled) - { - throw new WakeRuntimeException("Unrecognized Wake RemoteEvent message"); - } - } - - public void OnError(Exception error) - { - } - - public void OnCompleted() - { - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEvent.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEvent.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEvent.cs deleted file mode 100644 index 9e2fe2a..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEvent.cs +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System.Net; - -namespace Org.Apache.Reef.Wake.Remote.Impl -{ - public class RemoteEvent<T> : IRemoteEvent<T> - { - public RemoteEvent(IPEndPoint localEndPoint, IPEndPoint remoteEndPoint, string source, string sink, long seq, T value) - { - LocalEndPoint = localEndPoint; - RemoteEndPoint = remoteEndPoint; - Source = source; - Sink = sink; - Value = value; - Sequence = seq; - } - - public RemoteEvent(IPEndPoint localEndpoint, IPEndPoint remoteEndpoint, T value) - { - LocalEndPoint = localEndpoint; - RemoteEndPoint = remoteEndpoint; - Value = value; - } - - public RemoteEvent() - { - } - - public IPEndPoint LocalEndPoint { get; set; } - - public IPEndPoint RemoteEndPoint { get; set; } - - public string Source { get; set; } - - public string Sink { get; set; } - - public T Value { get; set; } - - public long Sequence { get; set; } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventCodec.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventCodec.cs deleted file mode 100644 index 2c5b16d..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventCodec.cs +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -namespace Org.Apache.Reef.Wake.Remote.Impl -{ - internal class RemoteEventCodec<T> : ICodec<IRemoteEvent<T>> - { - private readonly RemoteEventEncoder<T> _encoder; - private readonly RemoteEventDecoder<T> _decoder; - - public RemoteEventCodec(ICodec<T> codec) - { - _encoder = new RemoteEventEncoder<T>(codec); - _decoder = new RemoteEventDecoder<T>(codec); - } - - public byte[] Encode(IRemoteEvent<T> obj) - { - return _encoder.Encode(obj); - } - - public IRemoteEvent<T> Decode(byte[] data) - { - return _decoder.Decode(data); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventDecoder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventDecoder.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventDecoder.cs deleted file mode 100644 index f9cfbc1..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventDecoder.cs +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.Reef.Wake.Remote.Proto.WakeRemoteProtos; - -namespace Org.Apache.Reef.Wake.Remote.Impl -{ - public class RemoteEventDecoder<T> : IDecoder<IRemoteEvent<T>> - { - private IDecoder<T> _decoder; - - public RemoteEventDecoder(IDecoder<T> decoder) - { - _decoder = decoder; - } - - public IRemoteEvent<T> Decode(byte[] data) - { - WakeMessagePBuf pbuf = WakeMessagePBuf.Deserialize(data); - return new RemoteEvent<T>(null, null, pbuf.source, pbuf.sink, pbuf.seq, _decoder.Decode(pbuf.data)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEncoder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEncoder.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEncoder.cs deleted file mode 100644 index 432e688..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEncoder.cs +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.Reef.Wake.Remote.Proto.WakeRemoteProtos; - -namespace Org.Apache.Reef.Wake.Remote.Impl -{ - public class RemoteEventEncoder<T> : IEncoder<IRemoteEvent<T>> - { - private readonly IEncoder<T> _encoder; - - public RemoteEventEncoder(IEncoder<T> encoder) - { - _encoder = encoder; - } - - public byte[] Encode(IRemoteEvent<T> obj) - { - WakeMessagePBuf pbuf = new WakeMessagePBuf(); - pbuf.sink = obj.Sink; - pbuf.source = obj.Source; - pbuf.data = _encoder.Encode(obj.Value); - pbuf.seq = obj.Sequence; - return pbuf.Serialize(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEndpoint.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEndpoint.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEndpoint.cs deleted file mode 100644 index 10a048e..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEndpoint.cs +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace Org.Apache.Reef.Wake.Remote.Impl -{ - public class RemoteEventEndPoint<T> - { - private IRemoteIdentifier _id; - - public RemoteEventEndPoint(IRemoteIdentifier id) - { - _id = id; - } - - public IRemoteIdentifier Id - { - get { return _id; } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/SocketRemoteIdentifier.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/SocketRemoteIdentifier.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/SocketRemoteIdentifier.cs deleted file mode 100644 index 2f402a8..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/Impl/SocketRemoteIdentifier.cs +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.Reef.Utilities.Diagnostics; -using Org.Apache.Reef.Utilities.Logging; -using System.Globalization; -using System.Net; -using System.Text; - -namespace Org.Apache.Reef.Wake.Remote.Impl -{ - /// <summary> - /// Remote identifier based on a socket address - /// </summary> - public class SocketRemoteIdentifier : IRemoteIdentifier - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(SocketRemoteIdentifier)); - private IPEndPoint _addr; - - public SocketRemoteIdentifier(IPEndPoint addr) - { - _addr = addr; - } - - public SocketRemoteIdentifier(string str) - { - int index = str.IndexOf(":", System.StringComparison.Ordinal); - if (index <= 0) - { - Exceptions.Throw(new RemoteRuntimeException("Invalid name " + str), LOGGER); - } - string host = str.Substring(0, index); - int port = int.Parse(str.Substring(index + 1), CultureInfo.InvariantCulture); - _addr = new IPEndPoint(IPAddress.Parse(host), port); - } - - public IPEndPoint Addr - { - get { return _addr; } - } - - public override int GetHashCode() - { - return _addr.GetHashCode(); - } - - public override bool Equals(object obj) - { - return _addr.Equals(((SocketRemoteIdentifier)obj).Addr); - } - - public override string ToString() - { - StringBuilder builder = new StringBuilder(); - builder.Append("socket://"); - builder.Append(_addr); - return builder.ToString(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/StringCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/StringCodec.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/StringCodec.cs deleted file mode 100644 index 4e25b18..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/Impl/StringCodec.cs +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Org.Apache.Reef.Tang.Annotations; - -namespace Org.Apache.Reef.Wake.Remote.Impl -{ - public class StringCodec : ICodec<string> - { - [Inject] - public StringCodec() - { - } - - public byte[] Encode(string obj) - { - return Encoding.ASCII.GetBytes(obj); - } - - public string Decode(byte[] data) - { - return Encoding.ASCII.GetString(data); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifier.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifier.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifier.cs deleted file mode 100644 index 31829cd..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifier.cs +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace Org.Apache.Reef.Wake.Remote.Impl -{ - public class StringIdentifier : IIdentifier - { - private readonly string _str; - - public StringIdentifier(string s) - { - _str = s; - } - - public override int GetHashCode() - { - return _str.GetHashCode(); - } - - public override bool Equals(object o) - { - StringIdentifier other = o as StringIdentifier; - return other != null && _str.Equals(other._str); - } - - public override string ToString() - { - return _str; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifierFactory.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifierFactory.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifierFactory.cs deleted file mode 100644 index b9bcb50..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifierFactory.cs +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.Reef.Tang.Annotations; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace Org.Apache.Reef.Wake.Remote.Impl -{ - public class StringIdentifierFactory : IIdentifierFactory - { - [Inject] - public StringIdentifierFactory() - { - } - - public IIdentifier Create(string s) - { - return new StringIdentifier(s); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportClient.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportClient.cs deleted file mode 100644 index 848a915..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportClient.cs +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Net; -using System.Threading; -using System.Threading.Tasks; - -namespace Org.Apache.Reef.Wake.Remote.Impl -{ - /// <summary> - /// Establish connections to TransportServer for remote message passing - /// </summary> - public class TransportClient<T> : IDisposable - { - private ILink<T> _link; - private IObserver<TransportEvent<T>> _observer; - private CancellationTokenSource _cancellationSource; - private bool _disposed; - - /// <summary> - /// Construct a TransportClient. - /// Used to send messages to the specified remote endpoint. - /// </summary> - /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param> - /// <param name="codec">Codec to decode/encodec</param> - public TransportClient(IPEndPoint remoteEndpoint, ICodec<T> codec) - { - if (remoteEndpoint == null) - { - throw new ArgumentNullException("remoteEndpoint"); - } - if (codec == null) - { - throw new ArgumentNullException("codec"); - } - - _link = new Link<T>(remoteEndpoint, codec); - _cancellationSource = new CancellationTokenSource(); - _disposed = false; - } - - /// <summary> - /// Construct a TransportClient. - /// Used to send messages to the specified remote endpoint. - /// </summary> - /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param> - /// <param name="codec">Codec to decode/encodec</param> - /// <param name="observer">Callback used when receiving responses from remote host</param> - public TransportClient(IPEndPoint remoteEndpoint, - ICodec<T> codec, - IObserver<TransportEvent<T>> observer) - : this(remoteEndpoint, codec) - { - _observer = observer; - Task.Run(() => ResponseLoop()); - } - - /// <summary> - /// Gets the underlying transport link. - /// </summary> - public ILink<T> Link - { - get { return _link; } - } - - /// <summary> - /// Send the remote message. - /// </summary> - /// <param name="message">The message to send</param> - public void Send(T message) - { - if (message == null) - { - throw new ArgumentNullException("message"); - } - - _link.Write(message); - } - - /// <summary> - /// Close all opened connections - /// </summary> - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - protected void Dispose(bool disposing) - { - if (!_disposed && disposing) - { - _link.Dispose(); - _disposed = true; - } - } - - /// <summary> - /// Continually read responses from remote host - /// </summary> - private async Task ResponseLoop() - { - while (!_cancellationSource.IsCancellationRequested) - { - T message = await _link.ReadAsync(_cancellationSource.Token); - if (message == null) - { - break; - } - - TransportEvent<T> transportEvent = new TransportEvent<T>(message, _link); - _observer.OnNext(transportEvent); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportEvent.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportEvent.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportEvent.cs deleted file mode 100644 index 6c4644c..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportEvent.cs +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Net; -using System.Net.Sockets; -using System.Text; -using System.Threading.Tasks; - -namespace Org.Apache.Reef.Wake.Remote.Impl -{ - public class TransportEvent<T> - { - public TransportEvent(T data, ILink<T> link) - { - Data = data; - Link = link; - } - - public T Data { get; private set; } - - public ILink<T> Link { get; private set; } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportServer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportServer.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportServer.cs deleted file mode 100644 index 20cdc8a..0000000 --- a/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportServer.cs +++ /dev/null @@ -1,195 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.Reef.Utilities.Diagnostics; -using Org.Apache.Reef.Utilities.Logging; -using System; -using System.Globalization; -using System.Net; -using System.Net.Sockets; -using System.Threading; -using System.Threading.Tasks; -using Org.Apache.Reef.Wake.Util; - -namespace Org.Apache.Reef.Wake.Remote.Impl -{ - /// <summary> - /// Server to handle incoming remote messages. - /// </summary> - public class TransportServer<T> : IDisposable - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(TransportServer<>)); - - private TcpListener _listener; - private CancellationTokenSource _cancellationSource; - private IObserver<TransportEvent<T>> _remoteObserver; - private ICodec<T> _codec; - private bool _disposed; - private Task _serverTask; - - /// <summary> - /// Constructs a TransportServer to listen for remote events. - /// Listens on the specified remote endpoint. When it recieves a remote - /// event, it will envoke the specified remote handler. - /// </summary> - /// <param name="port">Port to listen on</param> - /// <param name="remoteHandler">The handler to invoke when receiving incoming - /// remote messages</param> - /// <param name="codec">The codec to encode/decode"</param> - public TransportServer(int port, IObserver<TransportEvent<T>> remoteHandler, ICodec<T> codec) - : this(new IPEndPoint(NetworkUtils.LocalIPAddress, port), remoteHandler, codec) - { - } - - /// <summary> - /// Constructs a TransportServer to listen for remote events. - /// Listens on the specified remote endpoint. When it recieves a remote - /// event, it will envoke the specified remote handler. - /// </summary> - /// <param name="localEndpoint">Endpoint to listen on</param> - /// <param name="remoteHandler">The handler to invoke when receiving incoming - /// remote messages</param> - /// <param name="codec">The codec to encode/decode"</param> - public TransportServer(IPEndPoint localEndpoint, - IObserver<TransportEvent<T>> remoteHandler, - ICodec<T> codec) - { - _listener = new TcpListener(localEndpoint.Address, localEndpoint.Port); - _remoteObserver = remoteHandler; - _cancellationSource = new CancellationTokenSource(); - _cancellationSource.Token.ThrowIfCancellationRequested(); - _codec = codec; - _disposed = false; - } - - /// <summary> - /// Returns the listening endpoint for the TransportServer - /// </summary> - public IPEndPoint LocalEndpoint - { - get { return _listener.LocalEndpoint as IPEndPoint; } - } - - /// <summary> - /// Starts listening for incoming remote messages. - /// </summary> - public void Run() - { - _listener.Start(); - _serverTask = Task.Run(() => StartServer()); - } - - /// <summary> - /// Close the TransportServer and all open connections - /// </summary> - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - public void Dispose(bool disposing) - { - if (!_disposed && disposing) - { - _cancellationSource.Cancel(); - try - { - _listener.Stop(); - } - catch (SocketException) - { - LOGGER.Log(Level.Info, "Disposing of transport server before listener is created."); - } - - if (_serverTask != null) - { - _serverTask.Wait(); - - // Give the TransportServer Task 500ms to shut down, ignore any timeout errors - try - { - CancellationTokenSource serverDisposeTimeout = new CancellationTokenSource(500); - _serverTask.Wait(serverDisposeTimeout.Token); - } - catch (Exception e) - { - Console.Error.WriteLine(e); - } - finally - { - _serverTask.Dispose(); - } - } - } - - _disposed = true; - } - - /// <summary> - /// Helper method to start TransportServer. This will - /// be run in an asynchronous Task. - /// </summary> - /// <returns>An asynchronous Task for the running server.</returns> - private async Task StartServer() - { - try - { - while (!_cancellationSource.Token.IsCancellationRequested) - { - TcpClient client = await _listener.AcceptTcpClientAsync().ConfigureAwait(false); - ProcessClient(client).Forget(); - } - } - catch (InvalidOperationException) - { - LOGGER.Log(Level.Info, "TransportServer has been closed."); - } - catch (OperationCanceledException) - { - LOGGER.Log(Level.Info, "TransportServer has been closed."); - } - } - - /// <summary> - /// Recieves event from connected TcpClient and invokes handler on the event. - /// </summary> - /// <param name="client">The connected client</param> - private async Task ProcessClient(TcpClient client) - { - // Keep reading messages from client until they disconnect or timeout - CancellationToken token = _cancellationSource.Token; - using (ILink<T> link = new Link<T>(client, _codec)) - { - while (!token.IsCancellationRequested) - { - T message = await link.ReadAsync(token); - TransportEvent<T> transportEvent = new TransportEvent<T>(message, link); - - _remoteObserver.OnNext(transportEvent); - - if (message == null) - { - break; - } - } - } - } - } -}
