http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteEvent.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteEvent.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteEvent.cs new file mode 100644 index 0000000..d01e43e --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteEvent.cs @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Text; + +namespace Org.Apache.REEF.Wake.Remote +{ + public interface IRemoteEvent<T> + { + IPEndPoint LocalEndPoint { get; set; } + + IPEndPoint RemoteEndPoint { get; set; } + + string Source { get; } + + string Sink { get; } + + T Value { get; } + + long Sequence { get; } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteIdentifier.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteIdentifier.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteIdentifier.cs new file mode 100644 index 0000000..8d8005b --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteIdentifier.cs @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Wake.Remote +{ + /// <summary> + /// An identifier that represents a remote source + /// </summary> + public abstract class IRemoteIdentifier : IIdentifier + { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteIdentifierFactory.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteIdentifierFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteIdentifierFactory.cs new file mode 100644 index 0000000..c8bbd7a --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteIdentifierFactory.cs @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Wake.Remote +{ + /// <summary>Factory that creates a RemoteIdentifier</summary> + public interface IRemoteIdentifierFactory : IIdentifierFactory + { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteManager.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteManager.cs new file mode 100644 index 0000000..4ebe131 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteManager.cs @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Net; +using Org.Apache.REEF.Wake.Remote.Impl; + +namespace Org.Apache.REEF.Wake.Remote +{ + public interface IRemoteManager<T> : IStage + { + IRemoteIdentifier Identifier { get; } + + IPEndPoint LocalEndpoint { get; } + + IObserver<T> GetRemoteObserver(RemoteEventEndPoint<T> dest); + + IObserver<T> GetRemoteObserver(IPEndPoint remoteEndpoint); + + IDisposable RegisterObserver(RemoteEventEndPoint<T> source, IObserver<T> theObserver); + + IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> theObserver); + + IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> theObserver); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteMessage.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteMessage.cs new file mode 100644 index 0000000..aacbc22 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteMessage.cs @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Wake.Remote +{ + /// <summary> + /// Message received from a remote handler + /// </summary> + public interface IRemoteMessage<T> + { + /// <summary> + /// Returns a remote identifier of the sender + /// </summary> + /// <returns>The remote identifier</returns> + IRemoteIdentifier Identifier { get; } + + /// <summary> + /// Returns an actual message + /// </summary> + /// <returns>The remote message</returns> + T Message { get; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/ISubscriptionManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/ISubscriptionManager.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/ISubscriptionManager.cs new file mode 100644 index 0000000..17fff91 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/ISubscriptionManager.cs @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Wake.Remote +{ + public interface ISubscriptionManager + { + void Unsubscribe(object token); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ByteCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ByteCodec.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ByteCodec.cs new file mode 100644 index 0000000..8584c67 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ByteCodec.cs @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + public class ByteCodec : ICodec<byte[]> + { + [Inject] + public ByteCodec() + { + } + + public byte[] Encode(byte[] obj) + { + return obj; + } + + public byte[] Decode(byte[] data) + { + return data; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ByteCodecFactory.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ByteCodecFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ByteCodecFactory.cs new file mode 100644 index 0000000..f6dbca3 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ByteCodecFactory.cs @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tang.Annotations; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + public class ByteCodecFactory : ICodecFactory + { + [Inject] + public ByteCodecFactory() + { + } + + public object Create() + { + return new ByteCodec(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/Channel.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/Channel.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/Channel.cs new file mode 100644 index 0000000..a1d78a3 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/Channel.cs @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.IO; +using System.Linq; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + /// <summary> + /// Performs low level network IO operations between hosts + /// </summary> + public class Channel + { + private NetworkStream _stream; + + /// <summary> + /// Constructs a new Channel with the the connected NetworkStream. + /// </summary> + /// <param name="stream">The connected stream</param> + public Channel(NetworkStream stream) + { + if (stream == null) + { + throw new ArgumentNullException("stream"); + } + + _stream = stream; + } + + /// <summary> + /// Sends a message to the connected client synchronously + /// </summary> + /// <param name="message">The message to send</param> + public void Write(byte[] message) + { + if (message == null) + { + throw new ArgumentNullException("message"); + } + + byte[] messageBuffer = GenerateMessageBuffer(message); + _stream.Write(messageBuffer, 0, messageBuffer.Length); + } + + /// <summary> + /// Sends a message to the connected client asynchronously + /// </summary> + /// <param name="message">The message to send</param> + /// <param name="token">The cancellation token</param> + /// <returns>The awaitable write task</returns> + public async Task WriteAsync(byte[] message, CancellationToken token) + { + byte[] messageBuffer = GenerateMessageBuffer(message); + await _stream.WriteAsync(messageBuffer, 0, messageBuffer.Length, token); + } + + /// <summary> + /// Reads an incoming message as a byte array synchronously. + /// The message length is read as the first four bytes. + /// </summary> + /// <returns>The byte array message</returns> + public byte[] Read() + { + int payloadLength = ReadMessageLength(); + if (payloadLength == 0) + { + return null; + } + + return ReadBytes(payloadLength); + } + + /// <summary> + /// Reads an incoming message as a byte array asynchronously. + /// The message length is read as the first four bytes. + /// </summary> + /// <param name="token">The cancellation token</param> + /// <returns>The byte array message</returns> + public async Task<byte[]> ReadAsync(CancellationToken token) + { + int payloadLength = await GetMessageLengthAsync(token); + if (payloadLength == 0) + { + return null; + } + + return await ReadBytesAsync(payloadLength, token); + } + + /// <summary> + /// Helper method to read the specified number of bytes from the network stream. + /// </summary> + /// <param name="bytesToRead">The number of bytes to read</param> + /// <returns>The byte[] read from the network stream with the requested + /// number of bytes, otherwise null if the operation failed. + /// </returns> + private byte[] ReadBytes(int bytesToRead) + { + int totalBytesRead = 0; + byte[] buffer = new byte[bytesToRead]; + + while (totalBytesRead < bytesToRead) + { + int bytesRead = _stream.Read(buffer, totalBytesRead, bytesToRead - totalBytesRead); + if (bytesRead == 0) + { + // Read timed out or connection was closed + return null; + } + + totalBytesRead += bytesRead; + } + + return buffer; + } + + /// <summary> + /// Helper method to read the specified number of bytes from the network stream. + /// </summary> + /// <param name="bytesToRead">The number of bytes to read</param> + /// <param name="token">The cancellation token</param> + /// <returns>The byte[] read from the network stream with the requested + /// number of bytes, otherwise null if the operation failed. + /// </returns> + private async Task<byte[]> ReadBytesAsync(int bytesToRead, CancellationToken token) + { + int bytesRead = 0; + byte[] buffer = new byte[bytesToRead]; + + while (bytesRead < bytesToRead) + { + int amountRead = await _stream.ReadAsync(buffer, bytesRead, bytesToRead - bytesRead, token); + if (amountRead == 0) + { + // Read timed out or connection was closed + return null; + } + + bytesRead += amountRead; + } + + return buffer; + } + + /// <summary> + /// Generates the payload buffer containing the message along + /// with a header indicating the message length. + /// </summary> + /// <param name="message">The message to send</param> + /// <returns>The payload buffer</returns> + private byte[] GenerateMessageBuffer(byte[] message) + { + byte[] lengthBuffer1 = BitConverter.GetBytes(message.Length + 4); + byte[] lengthBuffer2 = BitConverter.GetBytes(message.Length); + if (BitConverter.IsLittleEndian) + { + Array.Reverse(lengthBuffer1); + } + + int len = lengthBuffer1.Length + lengthBuffer2.Length + message.Length; + byte[] messageBuffer = new byte[len]; + + int bytesCopied = 0; + bytesCopied += CopyBytes(lengthBuffer1, messageBuffer, 0); + bytesCopied += CopyBytes(lengthBuffer2, messageBuffer, bytesCopied); + CopyBytes(message, messageBuffer, bytesCopied); + + return messageBuffer; + } + + /// <summary> + /// Reads the first four bytes from the stream and decode + /// it to get the message length in bytes + /// </summary> + /// <returns>The incoming message's length in bytes</returns> + private int ReadMessageLength() + { + byte[] lenBytes = ReadBytes(sizeof(int)); + if (lenBytes == null) + { + return 0; + } + if (BitConverter.IsLittleEndian) + { + Array.Reverse(lenBytes); + } + if (BitConverter.ToInt32(lenBytes, 0) == 0) + { + return 0; + } + + byte[] msgLength = ReadBytes(sizeof(int)); + return (msgLength == null) ? 0 : BitConverter.ToInt32(msgLength, 0); + } + + /// <summary> + /// Reads the first four bytes from the stream and decode + /// it to get the message length in bytes + /// </summary> + /// <param name="token">The cancellation token</param> + /// <returns>The incoming message's length in bytes</returns> + private async Task<int> GetMessageLengthAsync(CancellationToken token) + { + byte[] lenBytes = await ReadBytesAsync(sizeof(int), token); + if (lenBytes == null) + { + return 0; + } + if (BitConverter.IsLittleEndian) + { + Array.Reverse(lenBytes); + } + if (BitConverter.ToInt32(lenBytes, 0) == 0) + { + return 0; + } + + byte[] msgLength = ReadBytes(sizeof(int)); + return (msgLength == null) ? 0 : BitConverter.ToInt32(msgLength, 0); + } + + /// <summary> + /// Copies the entire source buffer into the destination buffer the specified + /// destination offset. + /// </summary> + /// <param name="source">The source buffer to be copied</param> + /// <param name="dest">The destination buffer to copy to</param> + /// <param name="destOffset">The offset at the destination buffer to begin + /// copying.</param> + /// <returns>The number of bytes copied</returns> + private int CopyBytes(byte[] source, byte[] dest, int destOffset) + { + Buffer.BlockCopy(source, 0, dest, destOffset, source.Length); + return source.Length; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs new file mode 100644 index 0000000..cb9cf65 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs @@ -0,0 +1,338 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.Util; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Reactive; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + /// <summary> + /// Manages incoming and outgoing messages between remote hosts. + /// </summary> + public class DefaultRemoteManager<T> : IRemoteManager<T> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(DefaultRemoteManager<T>)); + + private ObserverContainer<T> _observerContainer; + private TransportServer<IRemoteEvent<T>> _server; + private Dictionary<IPEndPoint, ProxyObserver> _cachedClients; + private ICodec<IRemoteEvent<T>> _codec; + + /// <summary> + /// Constructs a DefaultRemoteManager listening on the specified address and any + /// available port. + /// </summary> + /// <param name="localAddress">The address to listen on</param> + /// <param name="codec">The codec used for serializing messages</param> + public DefaultRemoteManager(IPAddress localAddress, ICodec<T> codec) : this(localAddress, 0, codec) + { + } + + /// <summary> + /// Constructs a DefaultRemoteManager listening on the specified IPEndPoint. + /// </summary> + /// <param name="localEndpoint">The endpoint to listen on</param> + /// <param name="codec">The codec used for serializing messages</param> + public DefaultRemoteManager(IPEndPoint localEndpoint, ICodec<T> codec) + { + if (localEndpoint == null) + { + throw new ArgumentNullException("localEndpoint"); + } + if (localEndpoint.Port < 0) + { + throw new ArgumentException("Listening port must be greater than or equal to zero"); + } + if (codec == null) + { + throw new ArgumentNullException("codec"); + } + + _codec = new RemoteEventCodec<T>(codec); + _observerContainer = new ObserverContainer<T>(); + _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>(); + + // Begin to listen for incoming messages + _server = new TransportServer<IRemoteEvent<T>>(localEndpoint, _observerContainer, _codec); + _server.Run(); + + LocalEndpoint = _server.LocalEndpoint; + Identifier = new SocketRemoteIdentifier(LocalEndpoint); + } + + /// <summary> + /// Constructs a DefaultRemoteManager listening on the specified address and any + /// available port. + /// </summary> + /// <param name="localAddress">The address to listen on</param> + /// <param name="port">The port to listen on</param> + /// <param name="codec">The codec used for serializing messages</param> + public DefaultRemoteManager(IPAddress localAddress, int port, ICodec<T> codec) + { + if (localAddress == null) + { + throw new ArgumentNullException("localAddress"); + } + if (port < 0) + { + throw new ArgumentException("Listening port must be greater than or equal to zero"); + } + if (codec == null) + { + throw new ArgumentNullException("codec"); + } + + _observerContainer = new ObserverContainer<T>(); + _codec = new RemoteEventCodec<T>(codec); + _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>(); + + IPEndPoint localEndpoint = new IPEndPoint(localAddress, port); + + // Begin to listen for incoming messages + _server = new TransportServer<IRemoteEvent<T>>(localEndpoint, _observerContainer, _codec); + _server.Run(); + + LocalEndpoint = _server.LocalEndpoint; + Identifier = new SocketRemoteIdentifier(LocalEndpoint); + } + + /// <summary> + /// Constructs a DefaultRemoteManager. Does not listen for incoming messages. + /// </summary> + /// <param name="codec">The codec used for serializing messages</param> + public DefaultRemoteManager(ICodec<T> codec) + { + using (LOGGER.LogFunction("DefaultRemoteManager::DefaultRemoteManager")) + { + if (codec == null) + { + throw new ArgumentNullException("codec"); + } + + _observerContainer = new ObserverContainer<T>(); + _codec = new RemoteEventCodec<T>(codec); + _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>(); + + LocalEndpoint = new IPEndPoint(NetworkUtils.LocalIPAddress, 0); + Identifier = new SocketRemoteIdentifier(LocalEndpoint); + } + } + + /// <summary> + /// Gets the RemoteIdentifier for the DefaultRemoteManager + /// </summary> + public IRemoteIdentifier Identifier { get; private set; } + + /// <summary> + /// Gets the local IPEndPoint for the DefaultRemoteManager + /// </summary> + public IPEndPoint LocalEndpoint { get; private set; } + + /// <summary> + /// Returns an IObserver used to send messages to the remote host at + /// the specified IPEndpoint. + /// </summary> + /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param> + /// <returns>An IObserver used to send messages to the remote host</returns> + public IObserver<T> GetRemoteObserver(RemoteEventEndPoint<T> remoteEndpoint) + { + if (remoteEndpoint == null) + { + throw new ArgumentNullException("remoteEndpoint"); + } + + SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier; + if (id == null) + { + throw new ArgumentException("ID not supported"); + } + + return GetRemoteObserver(id.Addr); + } + + /// <summary> + /// Returns an IObserver used to send messages to the remote host at + /// the specified IPEndpoint. + /// </summary> + /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param> + /// <returns>An IObserver used to send messages to the remote host</returns> + public IObserver<T> GetRemoteObserver(IPEndPoint remoteEndpoint) + { + if (remoteEndpoint == null) + { + throw new ArgumentNullException("remoteEndpoint"); + } + + ProxyObserver remoteObserver; + if (!_cachedClients.TryGetValue(remoteEndpoint, out remoteObserver)) + { + TransportClient<IRemoteEvent<T>> client = + new TransportClient<IRemoteEvent<T>>(remoteEndpoint, _codec, _observerContainer); + + remoteObserver = new ProxyObserver(client); + _cachedClients[remoteEndpoint] = remoteObserver; + } + + return remoteObserver; + } + + /// <summary> + /// Registers an IObserver used to handle incoming messages from the remote host + /// at the specified IPEndPoint. + /// The IDisposable that is returned can be used to unregister the IObserver. + /// </summary> + /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param> + /// <param name="observer">The IObserver to handle incoming messages</param> + /// <returns>An IDisposable used to unregister the observer with</returns> + public IDisposable RegisterObserver(RemoteEventEndPoint<T> remoteEndpoint, IObserver<T> observer) + { + if (remoteEndpoint == null) + { + throw new ArgumentNullException("remoteEndpoint"); + } + + SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier; + if (id == null) + { + throw new ArgumentException("ID not supported"); + } + + return RegisterObserver(id.Addr, observer); + } + + /// <summary> + /// Registers an IObserver used to handle incoming messages from the remote host + /// at the specified IPEndPoint. + /// The IDisposable that is returned can be used to unregister the IObserver. + /// </summary> + /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param> + /// <param name="observer">The IObserver to handle incoming messages</param> + /// <returns>An IDisposable used to unregister the observer with</returns> + public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> observer) + { + if (remoteEndpoint == null) + { + throw new ArgumentNullException("remoteEndpoint"); + } + if (observer == null) + { + throw new ArgumentNullException("observer"); + } + + return _observerContainer.RegisterObserver(remoteEndpoint, observer); + } + + /// <summary> + /// Registers an IObserver used to handle incoming messages from the remote host + /// at the specified IPEndPoint. + /// The IDisposable that is returned can be used to unregister the IObserver. + /// </summary> + /// <param name="observer">The IObserver to handle incoming messages</param> + /// <returns>An IDisposable used to unregister the observer with</returns> + public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer) + { + if (observer == null) + { + throw new ArgumentNullException("observer"); + } + + return _observerContainer.RegisterObserver(observer); + } + + /// <summary> + /// Release all resources for the DefaultRemoteManager. + /// </summary> + public void Dispose() + { + foreach (ProxyObserver cachedClient in _cachedClients.Values) + { + cachedClient.Dispose(); + } + + if (_server != null) + { + _server.Dispose(); + } + } + + /// <summary> + /// Observer to send messages to connected remote host + /// </summary> + private class ProxyObserver : IObserver<T>, IDisposable + { + private TransportClient<IRemoteEvent<T>> _client; + private int _messageCount; + + /// <summary> + /// Create new ProxyObserver + /// </summary> + /// <param name="client">The connected transport client used to send + /// messages to remote host</param> + public ProxyObserver(TransportClient<IRemoteEvent<T>> client) + { + _client = client; + _messageCount = 0; + } + + /// <summary> + /// Send the message to the remote host + /// </summary> + /// <param name="message">The message to send</param> + public void OnNext(T message) + { + IRemoteEvent<T> remoteEvent = new RemoteEvent<T>(_client.Link.LocalEndpoint, _client.Link.RemoteEndpoint, message) + { + Sink = "default", + Sequence = _messageCount + }; + + _messageCount++; + _client.Send(remoteEvent); + } + + /// <summary> + /// Close underlying transport client + /// </summary> + public void Dispose() + { + _client.Dispose(); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteMessage.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteMessage.cs new file mode 100644 index 0000000..225db30 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteMessage.cs @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + class DefaultRemoteMessage<T> : IRemoteMessage<T> + { + public DefaultRemoteMessage(IRemoteIdentifier id, T message) + { + Identifier = id; + Message = message; + } + + public IRemoteIdentifier Identifier { get; private set; } + + public T Message { get; private set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/IPEndpointComparer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/IPEndpointComparer.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/IPEndpointComparer.cs new file mode 100644 index 0000000..80415c0 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/IPEndpointComparer.cs @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + /// <summary> + /// Class to compare two IPEndPoint objects. + /// </summary> + internal class IPEndPointComparer : IEqualityComparer<IPEndPoint> + { + public bool Equals(IPEndPoint x, IPEndPoint y) + { + if (ReferenceEquals(x, y)) + { + return true; + } + if (x == null || y == null) + { + return false; + } + + // If either port is 0, don't check port + if (x.Port == 0 || y.Port == 0) + { + return x.Address.Equals(y.Address); + } + + return x.Equals(y); + } + + public int GetHashCode(IPEndPoint obj) + { + return obj.Address.GetHashCode(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/IntCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/IntCodec.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/IntCodec.cs new file mode 100644 index 0000000..ddbce2a --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/IntCodec.cs @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tang.Annotations; +using System; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + public class IntCodec : ICodec<int> + { + [Inject] + public IntCodec() + { + } + + public byte[] Encode(int obj) + { + return BitConverter.GetBytes(obj); + } + + public int Decode(byte[] data) + { + return BitConverter.ToInt32(data, 0); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/Link.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/Link.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/Link.cs new file mode 100644 index 0000000..b4369e0 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/Link.cs @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Exceptions; +using Org.Apache.REEF.Wake.Util; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + /// <summary> + /// Represents an open connection between remote hosts + /// </summary> + public class Link<T> : ILink<T> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(Link<T>)); + + private IPEndPoint _localEndpoint; + private ICodec<T> _codec; + private Channel _channel; + private bool _disposed; + + /// <summary> + /// Constructs a Link object. + /// Connects to the specified remote endpoint. + /// </summary> + /// <param name="remoteEndpoint">The remote endpoint to connect to</param> + /// <param name="codec">The codec for serializing messages</param> + public Link(IPEndPoint remoteEndpoint, ICodec<T> codec) + { + if (remoteEndpoint == null) + { + throw new ArgumentNullException("remoteEndpoint"); + } + if (codec == null) + { + throw new ArgumentNullException("codec"); + } + + Client = new TcpClient(); + Client.Connect(remoteEndpoint); + + _codec = codec; + _channel = new Channel(Client.GetStream()); + _localEndpoint = GetLocalEndpoint(); + _disposed = false; + } + + /// <summary> + /// Constructs a Link object. + /// Uses the already connected TcpClient. + /// </summary> + /// <param name="client">The already connected client</param> + /// <param name="codec">The encoder and decoder</param> + public Link(TcpClient client, ICodec<T> codec) + { + if (client == null) + { + throw new ArgumentNullException("client"); + } + if (codec == null) + { + throw new ArgumentNullException("codec"); + } + + Client = client; + _codec = codec; + _channel = new Channel(Client.GetStream()); + _localEndpoint = GetLocalEndpoint(); + _disposed = false; + } + + /// <summary> + /// Returns the local socket address + /// </summary> + public IPEndPoint LocalEndpoint + { + get { return _localEndpoint; } + } + + /// <summary> + /// Returns the remote socket address + /// </summary> + public IPEndPoint RemoteEndpoint + { + get { return (IPEndPoint) Client.Client.RemoteEndPoint; } + } + + /// <summary> + /// Gets the underlying TcpClient + /// </summary> + public TcpClient Client { get; private set; } + + /// <summary> + /// Writes the message to the remote host + /// </summary> + /// <param name="value">The data to write</param> + public void Write(T value) + { + if (value == null) + { + throw new ArgumentNullException("value"); + } + if (_disposed) + { + Exceptions.Throw(new IllegalStateException("Link has been closed."), LOGGER); + } + + byte[] message = _codec.Encode(value); + _channel.Write(message); + } + + /// <summary> + /// Writes the value to this link asynchronously + /// </summary> + /// <param name="value">The data to write</param> + /// <param name="token">The cancellation token</param> + public async Task WriteAsync(T value, CancellationToken token) + { + if (_disposed) + { + Exceptions.Throw(new IllegalStateException("Link has been closed."), LOGGER); + } + + byte[] message = _codec.Encode(value); + await _channel.WriteAsync(message, token); + } + + /// <summary> + /// Reads the value from the link synchronously + /// </summary> + public T Read() + { + if (_disposed) + { + Exceptions.Throw(new IllegalStateException("Link has been disposed."), LOGGER); + } + + byte[] message = _channel.Read(); + return (message == null) ? default(T) : _codec.Decode(message); + } + + /// <summary> + /// Reads the value from the link asynchronously + /// </summary> + /// <param name="token">The cancellation token</param> + public async Task<T> ReadAsync(CancellationToken token) + { + if (_disposed) + { + Exceptions.Throw(new IllegalStateException("Link has been disposed."), LOGGER); + } + + byte[] message = await _channel.ReadAsync(token); + return (message == null) ? default(T) : _codec.Decode(message); + } + + /// <summary> + /// Close the client connection + /// </summary> + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + /// <summary> + /// Subclasses of Links should overwrite this to handle disposing + /// of the link + /// </summary> + /// <param name="disposing">To dispose or not</param> + public virtual void Dispose(bool disposing) + { + if (_disposed) + { + return; + } + + if (disposing) + { + try + { + Client.GetStream().Close(); + } + catch (InvalidOperationException) + { + LOGGER.Log(Level.Warning, "failed to close stream on a non-connected socket."); + } + + Client.Close(); + } + _disposed = true; + } + + /// <summary> + /// Overrides Equals. Two Link objects are equal if they are connected + /// to the same remote endpoint. + /// </summary> + /// <param name="obj">The object to compare</param> + /// <returns>True if the object is equal to this Link, otherwise false</returns> + public override bool Equals(object obj) + { + Link<T> other = obj as Link<T>; + if (other == null) + { + return false; + } + + return other.RemoteEndpoint.Equals(RemoteEndpoint); + } + + /// <summary> + /// Gets the hash code for the Link object. + /// </summary> + /// <returns>The object's hash code</returns> + public override int GetHashCode() + { + return RemoteEndpoint.GetHashCode(); + } + + /// <summary> + /// Discovers the IPEndpoint for the current machine. + /// </summary> + /// <returns>The local IPEndpoint</returns> + private IPEndPoint GetLocalEndpoint() + { + IPAddress address = NetworkUtils.LocalIPAddress; + int port = ((IPEndPoint) Client.Client.LocalEndPoint).Port; + return new IPEndPoint(address, port); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/MultiCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/MultiCodec.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/MultiCodec.cs new file mode 100644 index 0000000..1d64f04 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/MultiCodec.cs @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.Remote.Impl; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + /// <summary> + /// Codec that can encode and decode a class depending on the class type. + /// </summary> + public class MultiCodec<T> : ICodec<T> + { + private readonly MultiEncoder<T> _encoder; + + private readonly MultiDecoder<T> _decoder; + + /// <summary> + /// Constructs a new MultiCodec object. + /// </summary> + public MultiCodec() + { + _encoder = new MultiEncoder<T>(); + _decoder = new MultiDecoder<T>(); + } + + /// <summary> + /// Register a codec to be used when encoding/decoding objects of this type. + /// </summary> + /// <typeparam name="U">The type of codec</typeparam> + /// <param name="codec">The codec to use when encoding/decoding + /// objects of this type</param> + public void Register<U>(ICodec<U> codec) where U : T + { + _encoder.Register(codec); + _decoder.Register(codec); + } + + /// <summary> + /// Register a codec to be used when encoding/decoding objects of this type. + /// </summary> + /// <typeparam name="U">The type of codec</typeparam> + /// <param name="codec">The codec to use when encoding/decoding + /// objects of this type</param> + /// <param name="name">The name of the class to encode/decode</param> + public void Register<U>(ICodec<U> codec, string name) where U : T + { + _encoder.Register(codec, name); + _decoder.Register(codec, name); + } + + /// <summary> + /// Encodes an object with the appropriate encoding or null if it cannot + /// be encoded. + /// </summary> + /// <param name="obj">Data to encode</param> + public byte[] Encode(T obj) + { + return _encoder.Encode(obj); + } + + /// <summary> + /// Decodes byte array into the appripriate object type. + /// </summary> + /// <param name="data">Data to be decoded</param> + public T Decode(byte[] data) + { + return _decoder.Decode(data); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/MultiDecoder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/MultiDecoder.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/MultiDecoder.cs new file mode 100644 index 0000000..0aeb3d4 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/MultiDecoder.cs @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Reflection; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.Remote.Proto.WakeRemoteProtos; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + /// <summary> + /// Decoder using the WakeTuple protocol buffer + /// (class name and bytes) + /// </summary> + public class MultiDecoder<T> : IDecoder<T> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(MultiDecoder<T>)); + private Dictionary<Type, object> _decoderMap; + private Dictionary<string, Type> _nameMap; + + /// <summary> + /// Constructs a decoder that decodes bytes based on the class type + /// </summary> + public MultiDecoder() + { + _decoderMap = new Dictionary<Type, object>(); + _nameMap = new Dictionary<string, Type>(); + } + + /// <summary> + /// Register the decoder for objects of type U + /// </summary> + /// <typeparam name="U">The type of decoder to use when decoding + /// objects of this type</typeparam> + /// <param name="decoder">The decoder to use when decoding + /// objects of this type</param> + public void Register<U>(IDecoder<U> decoder) where U : T + { + Type type = typeof(U); + _decoderMap[type] = decoder; + _nameMap[type.ToString()] = type; + } + + /// <summary> + /// Register the decoder for objects of type U + /// </summary> + /// <typeparam name="U">The type of decoder to use when decoding + /// objects of this type</typeparam> + /// <param name="decoder">The decoder to use when decoding + /// objects of this type</param> + /// <param name="name">The name of the class to decode</param> + public void Register<U>(IDecoder<U> decoder, string name) where U : T + { + Type type = typeof(U); + _decoderMap[type] = decoder; + _nameMap[name] = type; + } + + /// <summary> + /// Decodes byte array according to the underlying object type. + /// </summary> + /// <param name="data">The data to decode</param> + public T Decode(byte[] data) + { + WakeTuplePBuf pbuf = WakeTuplePBuf.Deserialize(data); + if (pbuf == null) + { + return default(T); + } + + // Get object's class Type + Type type; + if (!_nameMap.TryGetValue(pbuf.className, out type)) + { + return default(T); + } + + // Get decoder for that type + object decoder; + if (!_decoderMap.TryGetValue(type, out decoder)) + { + Exceptions.Throw(new RemoteRuntimeException("Decoder for " + type + " not known."), LOGGER); + } + + // Invoke the decoder to decode the byte array + Type handlerType = typeof(IDecoder<>).MakeGenericType(new[] { type }); + MethodInfo info = handlerType.GetMethod("Decode"); + return (T) info.Invoke(decoder, new[] { (object) pbuf.data }); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/MultiEncoder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/MultiEncoder.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/MultiEncoder.cs new file mode 100644 index 0000000..3ce695b --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/MultiEncoder.cs @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Reflection; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.Remote.Proto.WakeRemoteProtos; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + /// <summary> + /// Encoder using the WakeTuple protocol buffer + /// (class name and bytes) + /// </summary> + public class MultiEncoder<T> : IEncoder<T> + { + private static Logger _logger = Logger.GetLogger(typeof(MultiEncoder<>)); + private Dictionary<Type, object> _encoderMap; + private Dictionary<Type, string> _nameMap; + + /// <summary> + /// Constructs an encoder that encodes an object to bytes based on the class name + /// </summary> + public MultiEncoder() + { + _encoderMap = new Dictionary<Type, object>(); + _nameMap = new Dictionary<Type, string>(); + } + + public void Register<U>(IEncoder<U> encoder) where U : T + { + _encoderMap[typeof(U)] = encoder; + _nameMap[typeof(U)] = typeof(U).ToString(); + } + + public void Register<U>(IEncoder<U> encoder, string name) where U : T + { + _encoderMap[typeof(U)] = encoder; + _nameMap[typeof(U)] = name; + _logger.Log(Level.Verbose, "Registering name for " + name); + } + + /// <summary>Encodes an object to a byte array</summary> + /// <param name="obj"></param> + public byte[] Encode(T obj) + { + // Find encoder for object type + object encoder; + if (!_encoderMap.TryGetValue(obj.GetType(), out encoder)) + { + return null; + } + + // Invoke encoder for this type + Type handlerType = typeof(IEncoder<>).MakeGenericType(new[] { obj.GetType() }); + MethodInfo info = handlerType.GetMethod("Encode"); + byte[] data = (byte[]) info.Invoke(encoder, new[] { (object) obj }); + + // Serialize object type and object data into well known tuple + // To decode, deserialize the tuple, get object type, and look up the + // decoder for that type + string name = _nameMap[obj.GetType()]; + _logger.Log(Level.Verbose, "Encoding name for " + name); + WakeTuplePBuf pbuf = new WakeTuplePBuf { className = name, data = data }; + pbuf.className = name; + pbuf.data = data; + return pbuf.Serialize(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ObserverContainer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ObserverContainer.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ObserverContainer.cs new file mode 100644 index 0000000..725e9ce --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ObserverContainer.cs @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Text; +using System.Threading.Tasks; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.RX.Impl; +using Org.Apache.REEF.Wake.Util; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + /// <summary> + /// Stores registered IObservers for DefaultRemoteManager. + /// Can register and look up IObservers by remote IPEndPoint. + /// </summary> + internal class ObserverContainer<T> : IObserver<TransportEvent<IRemoteEvent<T>>> + { + private ConcurrentDictionary<IPEndPoint, IObserver<T>> _endpointMap; + private ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>> _typeMap; + private IObserver<T> _universalObserver; + + /// <summary> + /// Constructs a new ObserverContainer used to manage remote IObservers. + /// </summary> + public ObserverContainer() + { + _endpointMap = new ConcurrentDictionary<IPEndPoint, IObserver<T>>(new IPEndPointComparer()); + _typeMap = new ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>>(); + } + + /// <summary> + /// Registers an IObserver used to handle incoming messages from the remote host + /// at the specified IPEndPoint. + /// </summary> + /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param> + /// <param name="observer">The IObserver to handle incoming messages</param> + /// <returns>An IDisposable used to unregister the observer with</returns> + public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> observer) + { + if (remoteEndpoint.Address.Equals(IPAddress.Any)) + { + _universalObserver = observer; + return Disposable.Create(() => { _universalObserver = null; }); + } + + _endpointMap[remoteEndpoint] = observer; + return Disposable.Create(() => _endpointMap.TryRemove(remoteEndpoint, out observer)); + } + + /// <summary> + /// Registers an IObserver to handle incoming messages from a remote host + /// </summary> + /// <param name="observer">The IObserver to handle incoming messages</param> + /// <returns>An IDisposable used to unregister the observer with</returns> + public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer) + { + _typeMap[typeof(T)] = observer; + return Disposable.Create(() => _typeMap.TryRemove(typeof(T), out observer)); + } + + /// <summary> + /// Look up the IObserver for the registered IPEndPoint or event type + /// and execute the IObserver. + /// </summary> + /// <param name="transportEvent">The incoming remote event</param> + public void OnNext(TransportEvent<IRemoteEvent<T>> transportEvent) + { + IRemoteEvent<T> remoteEvent = transportEvent.Data; + remoteEvent.LocalEndPoint = transportEvent.Link.LocalEndpoint; + remoteEvent.RemoteEndPoint = transportEvent.Link.RemoteEndpoint; + T value = remoteEvent.Value; + bool handled = false; + + IObserver<T> observer1; + IObserver<IRemoteMessage<T>> observer2; + if (_universalObserver != null) + { + _universalObserver.OnNext(value); + handled = true; + } + if (_endpointMap.TryGetValue(remoteEvent.RemoteEndPoint, out observer1)) + { + // IObserver was registered by IPEndpoint + observer1.OnNext(value); + handled = true; + } + else if (_typeMap.TryGetValue(value.GetType(), out observer2)) + { + // IObserver was registered by event type + IRemoteIdentifier id = new SocketRemoteIdentifier(remoteEvent.RemoteEndPoint); + IRemoteMessage<T> remoteMessage = new DefaultRemoteMessage<T>(id, value); + observer2.OnNext(remoteMessage); + handled = true; + } + + if (!handled) + { + throw new WakeRuntimeException("Unrecognized Wake RemoteEvent message"); + } + } + + public void OnError(Exception error) + { + } + + public void OnCompleted() + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs new file mode 100644 index 0000000..bf50325 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Net; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + public class RemoteEvent<T> : IRemoteEvent<T> + { + public RemoteEvent(IPEndPoint localEndPoint, IPEndPoint remoteEndPoint, string source, string sink, long seq, T value) + { + LocalEndPoint = localEndPoint; + RemoteEndPoint = remoteEndPoint; + Source = source; + Sink = sink; + Value = value; + Sequence = seq; + } + + public RemoteEvent(IPEndPoint localEndpoint, IPEndPoint remoteEndpoint, T value) + { + LocalEndPoint = localEndpoint; + RemoteEndPoint = remoteEndpoint; + Value = value; + } + + public RemoteEvent() + { + } + + public IPEndPoint LocalEndPoint { get; set; } + + public IPEndPoint RemoteEndPoint { get; set; } + + public string Source { get; set; } + + public string Sink { get; set; } + + public T Value { get; set; } + + public long Sequence { get; set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventCodec.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventCodec.cs new file mode 100644 index 0000000..bcc5730 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventCodec.cs @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + internal class RemoteEventCodec<T> : ICodec<IRemoteEvent<T>> + { + private readonly RemoteEventEncoder<T> _encoder; + private readonly RemoteEventDecoder<T> _decoder; + + public RemoteEventCodec(ICodec<T> codec) + { + _encoder = new RemoteEventEncoder<T>(codec); + _decoder = new RemoteEventDecoder<T>(codec); + } + + public byte[] Encode(IRemoteEvent<T> obj) + { + return _encoder.Encode(obj); + } + + public IRemoteEvent<T> Decode(byte[] data) + { + return _decoder.Decode(data); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventDecoder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventDecoder.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventDecoder.cs new file mode 100644 index 0000000..19378d7 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventDecoder.cs @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Wake.Remote.Proto.WakeRemoteProtos; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + public class RemoteEventDecoder<T> : IDecoder<IRemoteEvent<T>> + { + private IDecoder<T> _decoder; + + public RemoteEventDecoder(IDecoder<T> decoder) + { + _decoder = decoder; + } + + public IRemoteEvent<T> Decode(byte[] data) + { + WakeMessagePBuf pbuf = WakeMessagePBuf.Deserialize(data); + return new RemoteEvent<T>(null, null, pbuf.source, pbuf.sink, pbuf.seq, _decoder.Decode(pbuf.data)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventEncoder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventEncoder.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventEncoder.cs new file mode 100644 index 0000000..59e1d6f --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventEncoder.cs @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Wake.Remote.Proto.WakeRemoteProtos; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + public class RemoteEventEncoder<T> : IEncoder<IRemoteEvent<T>> + { + private readonly IEncoder<T> _encoder; + + public RemoteEventEncoder(IEncoder<T> encoder) + { + _encoder = encoder; + } + + public byte[] Encode(IRemoteEvent<T> obj) + { + WakeMessagePBuf pbuf = new WakeMessagePBuf(); + pbuf.sink = obj.Sink; + pbuf.source = obj.Source; + pbuf.data = _encoder.Encode(obj.Value); + pbuf.seq = obj.Sequence; + return pbuf.Serialize(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventEndpoint.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventEndpoint.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventEndpoint.cs new file mode 100644 index 0000000..7d3e116 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventEndpoint.cs @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + public class RemoteEventEndPoint<T> + { + private IRemoteIdentifier _id; + + public RemoteEventEndPoint(IRemoteIdentifier id) + { + _id = id; + } + + public IRemoteIdentifier Id + { + get { return _id; } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/SocketRemoteIdentifier.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/SocketRemoteIdentifier.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/SocketRemoteIdentifier.cs new file mode 100644 index 0000000..ecb7711 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/SocketRemoteIdentifier.cs @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; +using System.Globalization; +using System.Net; +using System.Text; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + /// <summary> + /// Remote identifier based on a socket address + /// </summary> + public class SocketRemoteIdentifier : IRemoteIdentifier + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(SocketRemoteIdentifier)); + private IPEndPoint _addr; + + public SocketRemoteIdentifier(IPEndPoint addr) + { + _addr = addr; + } + + public SocketRemoteIdentifier(string str) + { + int index = str.IndexOf(":", System.StringComparison.Ordinal); + if (index <= 0) + { + Exceptions.Throw(new RemoteRuntimeException("Invalid name " + str), LOGGER); + } + string host = str.Substring(0, index); + int port = int.Parse(str.Substring(index + 1), CultureInfo.InvariantCulture); + _addr = new IPEndPoint(IPAddress.Parse(host), port); + } + + public IPEndPoint Addr + { + get { return _addr; } + } + + public override int GetHashCode() + { + return _addr.GetHashCode(); + } + + public override bool Equals(object obj) + { + return _addr.Equals(((SocketRemoteIdentifier)obj).Addr); + } + + public override string ToString() + { + StringBuilder builder = new StringBuilder(); + builder.Append("socket://"); + builder.Append(_addr); + return builder.ToString(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StringCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StringCodec.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StringCodec.cs new file mode 100644 index 0000000..f96caa7 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StringCodec.cs @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + public class StringCodec : ICodec<string> + { + [Inject] + public StringCodec() + { + } + + public byte[] Encode(string obj) + { + return Encoding.ASCII.GetBytes(obj); + } + + public string Decode(byte[] data) + { + return Encoding.ASCII.GetString(data); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StringIdentifier.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StringIdentifier.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StringIdentifier.cs new file mode 100644 index 0000000..1e7243b --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StringIdentifier.cs @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + public class StringIdentifier : IIdentifier + { + private readonly string _str; + + public StringIdentifier(string s) + { + _str = s; + } + + public override int GetHashCode() + { + return _str.GetHashCode(); + } + + public override bool Equals(object o) + { + StringIdentifier other = o as StringIdentifier; + return other != null && _str.Equals(other._str); + } + + public override string ToString() + { + return _str; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StringIdentifierFactory.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StringIdentifierFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StringIdentifierFactory.cs new file mode 100644 index 0000000..12657aa --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StringIdentifierFactory.cs @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tang.Annotations; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + public class StringIdentifierFactory : IIdentifierFactory + { + [Inject] + public StringIdentifierFactory() + { + } + + public IIdentifier Create(string s) + { + return new StringIdentifier(s); + } + } +}
