http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteEvent.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteEvent.cs
new file mode 100644
index 0000000..d01e43e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteEvent.cs
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Text;
+
+namespace Org.Apache.REEF.Wake.Remote
+{
+    public interface IRemoteEvent<T>
+    {
+        IPEndPoint LocalEndPoint { get; set; }
+
+        IPEndPoint RemoteEndPoint { get; set; }
+
+        string Source { get; }
+
+        string Sink { get; }
+
+        T Value { get; }
+
+        long Sequence { get; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteIdentifier.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteIdentifier.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteIdentifier.cs
new file mode 100644
index 0000000..8d8005b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteIdentifier.cs
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Wake.Remote
+{
+    /// <summary>
+    /// An identifier that represents a remote source
+    /// </summary>
+    public abstract class IRemoteIdentifier : IIdentifier
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteIdentifierFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteIdentifierFactory.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteIdentifierFactory.cs
new file mode 100644
index 0000000..c8bbd7a
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteIdentifierFactory.cs
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Wake.Remote
+{
+    /// <summary>Factory that creates a RemoteIdentifier</summary>
+    public interface IRemoteIdentifierFactory : IIdentifierFactory
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteManager.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteManager.cs
new file mode 100644
index 0000000..4ebe131
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteManager.cs
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Net;
+using Org.Apache.REEF.Wake.Remote.Impl;
+
+namespace Org.Apache.REEF.Wake.Remote
+{
+    public interface IRemoteManager<T> : IStage
+    {
+        IRemoteIdentifier Identifier { get; }
+
+        IPEndPoint LocalEndpoint { get; }
+
+        IObserver<T> GetRemoteObserver(RemoteEventEndPoint<T> dest);
+
+        IObserver<T> GetRemoteObserver(IPEndPoint remoteEndpoint);
+
+        IDisposable RegisterObserver(RemoteEventEndPoint<T> source, 
IObserver<T> theObserver);
+
+        IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> 
theObserver);
+
+        IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> theObserver);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteMessage.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteMessage.cs
new file mode 100644
index 0000000..aacbc22
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteMessage.cs
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Wake.Remote
+{
+    /// <summary>
+    /// Message received from a remote handler
+    /// </summary>
+    public interface IRemoteMessage<T>
+    {
+        /// <summary>
+        /// Returns a remote identifier of the sender
+        /// </summary>
+        /// <returns>The remote identifier</returns>
+        IRemoteIdentifier Identifier { get; }
+
+        /// <summary>
+        /// Returns an actual message
+        /// </summary>
+        /// <returns>The remote message</returns>
+        T Message { get; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/ISubscriptionManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/ISubscriptionManager.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/ISubscriptionManager.cs
new file mode 100644
index 0000000..17fff91
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/ISubscriptionManager.cs
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Wake.Remote
+{
+    public interface ISubscriptionManager
+    {
+        void Unsubscribe(object token);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ByteCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ByteCodec.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ByteCodec.cs
new file mode 100644
index 0000000..8584c67
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ByteCodec.cs
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    public class ByteCodec : ICodec<byte[]>
+    {
+        [Inject]
+        public ByteCodec()
+        {
+        }
+
+        public byte[] Encode(byte[] obj)
+        {
+            return obj;
+        }
+
+        public byte[] Decode(byte[] data)
+        {
+            return data;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ByteCodecFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ByteCodecFactory.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ByteCodecFactory.cs
new file mode 100644
index 0000000..f6dbca3
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ByteCodecFactory.cs
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Annotations;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    public class ByteCodecFactory : ICodecFactory
+    {
+        [Inject]
+        public ByteCodecFactory()
+        {
+        }
+
+        public object Create()
+        {
+            return new ByteCodec();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/Channel.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/Channel.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/Channel.cs
new file mode 100644
index 0000000..a1d78a3
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/Channel.cs
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.IO;
+using System.Linq;
+using System.Net.Sockets;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Performs low level network IO operations between hosts
+    /// </summary>
+    public class Channel
+    {
+        private NetworkStream _stream;
+
+        /// <summary>
+        /// Constructs a new Channel with the the connected NetworkStream.
+        /// </summary>
+        /// <param name="stream">The connected stream</param>
+        public Channel(NetworkStream stream)
+        {
+            if (stream == null)
+            {
+                throw new ArgumentNullException("stream");
+            }
+
+            _stream = stream;
+        }
+
+        /// <summary>
+        /// Sends a message to the connected client synchronously
+        /// </summary>
+        /// <param name="message">The message to send</param>
+        public void Write(byte[] message)
+        {
+            if (message == null)
+            {
+                throw new ArgumentNullException("message");
+            }
+
+            byte[] messageBuffer = GenerateMessageBuffer(message);
+            _stream.Write(messageBuffer, 0, messageBuffer.Length);
+        }
+
+        /// <summary>
+        /// Sends a message to the connected client asynchronously
+        /// </summary>
+        /// <param name="message">The message to send</param>
+        /// <param name="token">The cancellation token</param>
+        /// <returns>The awaitable write task</returns>
+        public async Task WriteAsync(byte[] message, CancellationToken token)
+        {
+            byte[] messageBuffer = GenerateMessageBuffer(message);
+            await _stream.WriteAsync(messageBuffer, 0, messageBuffer.Length, 
token);
+        }
+
+        /// <summary>
+        /// Reads an incoming message as a byte array synchronously.
+        /// The message length is read as the first four bytes.
+        /// </summary>
+        /// <returns>The byte array message</returns>
+        public byte[] Read()
+        {
+            int payloadLength = ReadMessageLength();
+            if (payloadLength == 0)
+            {
+                return null;
+            }
+
+            return ReadBytes(payloadLength);
+        }
+
+        /// <summary>
+        /// Reads an incoming message as a byte array asynchronously.
+        /// The message length is read as the first four bytes.
+        /// </summary>
+        /// <param name="token">The cancellation token</param>
+        /// <returns>The byte array message</returns>
+        public async Task<byte[]> ReadAsync(CancellationToken token)
+        {
+            int payloadLength = await GetMessageLengthAsync(token);
+            if (payloadLength == 0)
+            {
+                return null;
+            }
+
+            return await ReadBytesAsync(payloadLength, token);
+        }
+
+        /// <summary>
+        /// Helper method to read the specified number of bytes from the 
network stream.
+        /// </summary>
+        /// <param name="bytesToRead">The number of bytes to read</param>
+        /// <returns>The byte[] read from the network stream with the 
requested 
+        /// number of bytes, otherwise null if the operation failed.
+        /// </returns>
+        private byte[] ReadBytes(int bytesToRead)
+        {
+            int totalBytesRead = 0;
+            byte[] buffer = new byte[bytesToRead];
+
+            while (totalBytesRead < bytesToRead)
+            {
+                int bytesRead = _stream.Read(buffer, totalBytesRead, 
bytesToRead - totalBytesRead);
+                if (bytesRead == 0)
+                {
+                    // Read timed out or connection was closed
+                    return null;
+                }
+
+                totalBytesRead += bytesRead;
+            }
+
+            return buffer;
+        }
+
+        /// <summary>
+        /// Helper method to read the specified number of bytes from the 
network stream.
+        /// </summary>
+        /// <param name="bytesToRead">The number of bytes to read</param>
+        /// <param name="token">The cancellation token</param>
+        /// <returns>The byte[] read from the network stream with the 
requested 
+        /// number of bytes, otherwise null if the operation failed.
+        /// </returns>
+        private async Task<byte[]> ReadBytesAsync(int bytesToRead, 
CancellationToken token)
+        {
+            int bytesRead = 0;
+            byte[] buffer = new byte[bytesToRead];
+
+            while (bytesRead < bytesToRead)
+            {
+                int amountRead = await _stream.ReadAsync(buffer, bytesRead, 
bytesToRead - bytesRead, token);
+                if (amountRead == 0)
+                {
+                    // Read timed out or connection was closed
+                    return null;
+                }
+
+                bytesRead += amountRead;
+            }
+
+            return buffer;
+        }
+
+        /// <summary>
+        /// Generates the payload buffer containing the message along
+        /// with a header indicating the message length.
+        /// </summary>
+        /// <param name="message">The message to send</param>
+        /// <returns>The payload buffer</returns>
+        private byte[] GenerateMessageBuffer(byte[] message)
+        {
+            byte[] lengthBuffer1 = BitConverter.GetBytes(message.Length + 4);
+            byte[] lengthBuffer2 = BitConverter.GetBytes(message.Length);
+            if (BitConverter.IsLittleEndian)
+            {
+                Array.Reverse(lengthBuffer1);
+            }
+
+            int len = lengthBuffer1.Length + lengthBuffer2.Length + 
message.Length;
+            byte[] messageBuffer = new byte[len];
+
+            int bytesCopied = 0;
+            bytesCopied += CopyBytes(lengthBuffer1, messageBuffer, 0);
+            bytesCopied += CopyBytes(lengthBuffer2, messageBuffer, 
bytesCopied);
+            CopyBytes(message, messageBuffer, bytesCopied);
+
+            return messageBuffer;
+        }
+
+        /// <summary>
+        /// Reads the first four bytes from the stream and decode
+        /// it to get the message length in bytes
+        /// </summary>
+        /// <returns>The incoming message's length in bytes</returns>
+        private int ReadMessageLength()
+        {
+            byte[] lenBytes = ReadBytes(sizeof(int));
+            if (lenBytes == null)
+            {
+                return 0;
+            }
+            if (BitConverter.IsLittleEndian)
+            {
+                Array.Reverse(lenBytes);
+            }
+            if (BitConverter.ToInt32(lenBytes, 0) == 0)
+            {
+                return 0;
+            }
+                
+            byte[] msgLength = ReadBytes(sizeof(int));
+            return (msgLength == null) ? 0 : BitConverter.ToInt32(msgLength, 
0);
+        }
+
+        /// <summary>
+        /// Reads the first four bytes from the stream and decode
+        /// it to get the message length in bytes
+        /// </summary>
+        /// <param name="token">The cancellation token</param>
+        /// <returns>The incoming message's length in bytes</returns>
+        private async Task<int> GetMessageLengthAsync(CancellationToken token)
+        {
+            byte[] lenBytes = await ReadBytesAsync(sizeof(int), token);
+            if (lenBytes == null)
+            {
+                return 0;
+            }
+            if (BitConverter.IsLittleEndian)
+            {
+                Array.Reverse(lenBytes);
+            }
+            if (BitConverter.ToInt32(lenBytes, 0) == 0)
+            {
+                return 0;
+            }
+                
+            byte[] msgLength = ReadBytes(sizeof(int));
+            return (msgLength == null) ? 0 : BitConverter.ToInt32(msgLength, 
0);
+        }
+
+        /// <summary>
+        /// Copies the entire source buffer into the destination buffer the 
specified
+        /// destination offset.
+        /// </summary>
+        /// <param name="source">The source buffer to be copied</param>
+        /// <param name="dest">The destination buffer to copy to</param>
+        /// <param name="destOffset">The offset at the destination buffer to 
begin
+        /// copying.</param>
+        /// <returns>The number of bytes copied</returns>
+        private int CopyBytes(byte[] source, byte[] dest, int destOffset)
+        {
+            Buffer.BlockCopy(source, 0, dest, destOffset, source.Length);
+            return source.Length;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs
new file mode 100644
index 0000000..cb9cf65
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Util;
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Net;
+using System.Net.Sockets;
+using System.Reactive;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Manages incoming and outgoing messages between remote hosts.
+    /// </summary>
+    public class DefaultRemoteManager<T> : IRemoteManager<T>
+    {
+        private static readonly Logger LOGGER = 
Logger.GetLogger(typeof(DefaultRemoteManager<T>));
+
+        private ObserverContainer<T> _observerContainer;
+        private TransportServer<IRemoteEvent<T>> _server; 
+        private Dictionary<IPEndPoint, ProxyObserver> _cachedClients;
+        private ICodec<IRemoteEvent<T>> _codec;
+
+        /// <summary>
+        /// Constructs a DefaultRemoteManager listening on the specified 
address and any
+        /// available port.
+        /// </summary>
+        /// <param name="localAddress">The address to listen on</param>
+        /// <param name="codec">The codec used for serializing messages</param>
+        public DefaultRemoteManager(IPAddress localAddress, ICodec<T> codec) : 
this(localAddress, 0, codec)
+        {
+        }
+
+        /// <summary>
+        /// Constructs a DefaultRemoteManager listening on the specified 
IPEndPoint.
+        /// </summary>
+        /// <param name="localEndpoint">The endpoint to listen on</param>
+        /// <param name="codec">The codec used for serializing messages</param>
+        public DefaultRemoteManager(IPEndPoint localEndpoint, ICodec<T> codec)
+        {
+            if (localEndpoint == null)
+            {
+                throw new ArgumentNullException("localEndpoint");
+            }
+            if (localEndpoint.Port < 0)
+            {
+                throw new ArgumentException("Listening port must be greater 
than or equal to zero");
+            }
+            if (codec == null)
+            {
+                throw new ArgumentNullException("codec");
+            }
+
+            _codec = new RemoteEventCodec<T>(codec);
+            _observerContainer = new ObserverContainer<T>();
+            _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
+
+            // Begin to listen for incoming messages
+            _server = new TransportServer<IRemoteEvent<T>>(localEndpoint, 
_observerContainer, _codec);
+            _server.Run();
+
+            LocalEndpoint = _server.LocalEndpoint;
+            Identifier = new SocketRemoteIdentifier(LocalEndpoint);
+        }
+
+        /// <summary>
+        /// Constructs a DefaultRemoteManager listening on the specified 
address and any
+        /// available port.
+        /// </summary>
+        /// <param name="localAddress">The address to listen on</param>
+        /// <param name="port">The port to listen on</param>
+        /// <param name="codec">The codec used for serializing messages</param>
+        public DefaultRemoteManager(IPAddress localAddress, int port, 
ICodec<T> codec)
+        {
+            if (localAddress == null)
+            {
+                throw new ArgumentNullException("localAddress");
+            }
+            if (port < 0)
+            {
+                throw new ArgumentException("Listening port must be greater 
than or equal to zero");
+            }
+            if (codec == null)
+            {
+                throw new ArgumentNullException("codec");
+            }
+
+            _observerContainer = new ObserverContainer<T>();
+            _codec = new RemoteEventCodec<T>(codec);
+            _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
+
+            IPEndPoint localEndpoint = new IPEndPoint(localAddress, port);
+
+            // Begin to listen for incoming messages
+            _server = new TransportServer<IRemoteEvent<T>>(localEndpoint, 
_observerContainer, _codec);
+            _server.Run();
+
+            LocalEndpoint = _server.LocalEndpoint;
+            Identifier = new SocketRemoteIdentifier(LocalEndpoint);
+        }
+
+        /// <summary>
+        /// Constructs a DefaultRemoteManager. Does not listen for incoming 
messages.
+        /// </summary>
+        /// <param name="codec">The codec used for serializing messages</param>
+        public DefaultRemoteManager(ICodec<T> codec)
+        {
+            using 
(LOGGER.LogFunction("DefaultRemoteManager::DefaultRemoteManager"))
+            {
+                if (codec == null)
+                {
+                    throw new ArgumentNullException("codec");
+                }
+
+                _observerContainer = new ObserverContainer<T>();
+                _codec = new RemoteEventCodec<T>(codec);
+                _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
+
+                LocalEndpoint = new IPEndPoint(NetworkUtils.LocalIPAddress, 0);
+                Identifier = new SocketRemoteIdentifier(LocalEndpoint);
+            }
+        }
+
+        /// <summary>
+        /// Gets the RemoteIdentifier for the DefaultRemoteManager
+        /// </summary>
+        public IRemoteIdentifier Identifier { get; private set; }
+
+        /// <summary>
+        /// Gets the local IPEndPoint for the DefaultRemoteManager
+        /// </summary>
+        public IPEndPoint LocalEndpoint { get; private set; }
+
+        /// <summary>
+        /// Returns an IObserver used to send messages to the remote host at
+        /// the specified IPEndpoint.
+        /// </summary>
+        /// <param name="remoteEndpoint">The IPEndpoint of the remote 
host</param>
+        /// <returns>An IObserver used to send messages to the remote 
host</returns>
+        public IObserver<T> GetRemoteObserver(RemoteEventEndPoint<T> 
remoteEndpoint)
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+
+            SocketRemoteIdentifier id = remoteEndpoint.Id as 
SocketRemoteIdentifier;
+            if (id == null)
+            {
+                throw new ArgumentException("ID not supported");
+            }
+
+            return GetRemoteObserver(id.Addr);
+        }
+
+        /// <summary>
+        /// Returns an IObserver used to send messages to the remote host at
+        /// the specified IPEndpoint.
+        /// </summary>
+        /// <param name="remoteEndpoint">The IPEndpoint of the remote 
host</param>
+        /// <returns>An IObserver used to send messages to the remote 
host</returns>
+        public IObserver<T> GetRemoteObserver(IPEndPoint remoteEndpoint)
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+
+            ProxyObserver remoteObserver;
+            if (!_cachedClients.TryGetValue(remoteEndpoint, out 
remoteObserver))
+            {
+                TransportClient<IRemoteEvent<T>> client = 
+                    new TransportClient<IRemoteEvent<T>>(remoteEndpoint, 
_codec, _observerContainer);
+
+                remoteObserver = new ProxyObserver(client);
+                _cachedClients[remoteEndpoint] = remoteObserver;
+            }
+
+            return remoteObserver;
+        }
+
+        /// <summary>
+        /// Registers an IObserver used to handle incoming messages from the 
remote host
+        /// at the specified IPEndPoint.
+        /// The IDisposable that is returned can be used to unregister the 
IObserver.
+        /// </summary>
+        /// <param name="remoteEndpoint">The IPEndPoint of the remote 
host</param>
+        /// <param name="observer">The IObserver to handle incoming 
messages</param>
+        /// <returns>An IDisposable used to unregister the observer 
with</returns>
+        public IDisposable RegisterObserver(RemoteEventEndPoint<T> 
remoteEndpoint, IObserver<T> observer)
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+
+            SocketRemoteIdentifier id = remoteEndpoint.Id as 
SocketRemoteIdentifier;
+            if (id == null)
+            {
+                throw new ArgumentException("ID not supported");
+            }
+
+            return RegisterObserver(id.Addr, observer);
+        }
+
+        /// <summary>
+        /// Registers an IObserver used to handle incoming messages from the 
remote host
+        /// at the specified IPEndPoint.
+        /// The IDisposable that is returned can be used to unregister the 
IObserver.
+        /// </summary>
+        /// <param name="remoteEndpoint">The IPEndPoint of the remote 
host</param>
+        /// <param name="observer">The IObserver to handle incoming 
messages</param>
+        /// <returns>An IDisposable used to unregister the observer 
with</returns>
+        public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, 
IObserver<T> observer)
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+            if (observer == null)
+            {
+                throw new ArgumentNullException("observer");
+            }
+
+            return _observerContainer.RegisterObserver(remoteEndpoint, 
observer);
+        }
+
+        /// <summary>
+        /// Registers an IObserver used to handle incoming messages from the 
remote host
+        /// at the specified IPEndPoint.
+        /// The IDisposable that is returned can be used to unregister the 
IObserver.
+        /// </summary>
+        /// <param name="observer">The IObserver to handle incoming 
messages</param>
+        /// <returns>An IDisposable used to unregister the observer 
with</returns>
+        public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> 
observer)
+        {
+            if (observer == null)
+            {
+                throw new ArgumentNullException("observer");
+            }
+
+            return _observerContainer.RegisterObserver(observer);
+        }
+
+        /// <summary>
+        /// Release all resources for the DefaultRemoteManager.
+        /// </summary>
+        public void Dispose()
+        {
+            foreach (ProxyObserver cachedClient in _cachedClients.Values)
+            {
+                cachedClient.Dispose();
+            }
+
+            if (_server != null)
+            {
+                _server.Dispose();
+            }
+        }
+
+        /// <summary>
+        /// Observer to send messages to connected remote host
+        /// </summary>
+        private class ProxyObserver : IObserver<T>, IDisposable
+        {
+            private TransportClient<IRemoteEvent<T>> _client;
+            private int _messageCount;
+
+            /// <summary>
+            /// Create new ProxyObserver
+            /// </summary>
+            /// <param name="client">The connected transport client used to 
send
+            /// messages to remote host</param>
+            public ProxyObserver(TransportClient<IRemoteEvent<T>> client)
+            {
+                _client = client;
+                _messageCount = 0;
+            }
+
+            /// <summary>
+            /// Send the message to the remote host
+            /// </summary>
+            /// <param name="message">The message to send</param>
+            public void OnNext(T message)
+            {
+                IRemoteEvent<T> remoteEvent = new 
RemoteEvent<T>(_client.Link.LocalEndpoint, _client.Link.RemoteEndpoint, message)
+                {
+                    Sink = "default",
+                    Sequence = _messageCount
+                };
+
+                _messageCount++;
+                _client.Send(remoteEvent);
+            }
+
+            /// <summary>
+            /// Close underlying transport client
+            /// </summary>
+            public void Dispose()
+            {
+                _client.Dispose();
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteMessage.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteMessage.cs
new file mode 100644
index 0000000..225db30
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteMessage.cs
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    class DefaultRemoteMessage<T> : IRemoteMessage<T>
+    {
+        public DefaultRemoteMessage(IRemoteIdentifier id, T message)
+        {
+            Identifier = id;
+            Message = message;
+        }
+
+        public IRemoteIdentifier Identifier { get; private set; }
+
+        public T Message { get; private set; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/IPEndpointComparer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/IPEndpointComparer.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/IPEndpointComparer.cs
new file mode 100644
index 0000000..80415c0
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/IPEndpointComparer.cs
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Class to compare two IPEndPoint objects.
+    /// </summary>
+    internal class IPEndPointComparer : IEqualityComparer<IPEndPoint>
+    {
+        public bool Equals(IPEndPoint x, IPEndPoint y)
+        {
+            if (ReferenceEquals(x, y))
+            {
+                return true;
+            }
+            if (x == null || y == null)
+            {
+                return false;
+            }
+
+            // If either port is 0, don't check port
+            if (x.Port == 0 || y.Port == 0)
+            {
+                return x.Address.Equals(y.Address);
+            }
+
+            return x.Equals(y);
+        }
+
+        public int GetHashCode(IPEndPoint obj)
+        {
+            return obj.Address.GetHashCode();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/IntCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/IntCodec.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/IntCodec.cs
new file mode 100644
index 0000000..ddbce2a
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/IntCodec.cs
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Annotations;
+using System;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    public class IntCodec : ICodec<int>
+    {
+        [Inject]
+        public IntCodec()
+        {
+        }
+
+        public byte[] Encode(int obj)
+        {
+            return BitConverter.GetBytes(obj);
+        }
+
+        public int Decode(byte[] data)
+        {
+            return BitConverter.ToInt32(data, 0);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/Link.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/Link.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/Link.cs
new file mode 100644
index 0000000..b4369e0
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/Link.cs
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Net.Sockets;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Represents an open connection between remote hosts
+    /// </summary>
+    public class Link<T> : ILink<T>
+    {
+        private static readonly Logger LOGGER = 
Logger.GetLogger(typeof(Link<T>));
+
+        private IPEndPoint _localEndpoint;
+        private ICodec<T> _codec;
+        private Channel _channel;
+        private bool _disposed;
+
+        /// <summary>
+        /// Constructs a Link object.
+        /// Connects to the specified remote endpoint.
+        /// </summary>
+        /// <param name="remoteEndpoint">The remote endpoint to connect 
to</param>
+        /// <param name="codec">The codec for serializing messages</param>
+        public Link(IPEndPoint remoteEndpoint, ICodec<T> codec)
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+            if (codec == null)
+            {
+                throw new ArgumentNullException("codec");
+            }
+
+            Client = new TcpClient();
+            Client.Connect(remoteEndpoint);
+
+            _codec = codec;
+            _channel = new Channel(Client.GetStream());
+            _localEndpoint = GetLocalEndpoint();
+            _disposed = false;
+        }
+
+        /// <summary>
+        /// Constructs a Link object.
+        /// Uses the already connected TcpClient.
+        /// </summary>
+        /// <param name="client">The already connected client</param>
+        /// <param name="codec">The encoder and decoder</param>
+        public Link(TcpClient client, ICodec<T> codec)
+        {
+            if (client == null)
+            {
+                throw new ArgumentNullException("client");
+            }
+            if (codec == null)
+            {
+                throw new ArgumentNullException("codec");
+            }
+
+            Client = client;
+            _codec = codec;
+            _channel = new Channel(Client.GetStream());
+            _localEndpoint = GetLocalEndpoint();
+            _disposed = false;
+        }
+
+        /// <summary>
+        /// Returns the local socket address
+        /// </summary>
+        public IPEndPoint LocalEndpoint
+        {
+            get { return _localEndpoint; }
+        }
+
+        /// <summary>
+        /// Returns the remote socket address
+        /// </summary>
+        public IPEndPoint RemoteEndpoint
+        {
+            get { return (IPEndPoint) Client.Client.RemoteEndPoint; }
+        }
+
+        /// <summary>
+        /// Gets the underlying TcpClient
+        /// </summary>
+        public TcpClient Client { get; private set; }
+
+        /// <summary>
+        /// Writes the message to the remote host
+        /// </summary>
+        /// <param name="value">The data to write</param>
+        public void Write(T value)
+        {
+            if (value == null)
+            {
+                throw new ArgumentNullException("value");    
+            }
+            if (_disposed)
+            {
+                Exceptions.Throw(new IllegalStateException("Link has been 
closed."), LOGGER);
+            }
+
+            byte[] message = _codec.Encode(value);
+            _channel.Write(message);
+        }
+
+        /// <summary>
+        /// Writes the value to this link asynchronously
+        /// </summary>
+        /// <param name="value">The data to write</param>
+        /// <param name="token">The cancellation token</param>
+        public async Task WriteAsync(T value, CancellationToken token)
+        {
+            if (_disposed)
+            {
+                Exceptions.Throw(new IllegalStateException("Link has been 
closed."), LOGGER); 
+            }
+
+            byte[] message = _codec.Encode(value);
+            await _channel.WriteAsync(message, token);
+        }
+
+        /// <summary>
+        /// Reads the value from the link synchronously
+        /// </summary>
+        public T Read()
+        {
+            if (_disposed)
+            {
+                Exceptions.Throw(new IllegalStateException("Link has been 
disposed."), LOGGER);
+            }
+
+            byte[] message = _channel.Read();
+            return (message == null) ? default(T) : _codec.Decode(message);
+        }
+
+        /// <summary>
+        /// Reads the value from the link asynchronously
+        /// </summary>
+        /// <param name="token">The cancellation token</param>
+        public async Task<T> ReadAsync(CancellationToken token)
+        {
+            if (_disposed)
+            {
+                Exceptions.Throw(new IllegalStateException("Link has been 
disposed."), LOGGER);
+            }
+
+            byte[] message = await _channel.ReadAsync(token);
+            return (message == null) ? default(T) : _codec.Decode(message);
+        }
+
+        /// <summary>
+        /// Close the client connection
+        /// </summary>
+        public void Dispose()
+        {
+            Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        /// <summary>
+        /// Subclasses of Links should overwrite this to handle disposing
+        /// of the link
+        /// </summary>
+        /// <param name="disposing">To dispose or not</param>
+        public virtual void Dispose(bool disposing)
+        {
+            if (_disposed)
+            {
+                return;
+            }
+
+            if (disposing)
+            {
+                try
+                {
+                    Client.GetStream().Close();
+                }
+                catch (InvalidOperationException)
+                {
+                    LOGGER.Log(Level.Warning, "failed to close stream on a 
non-connected socket.");
+                }
+
+                Client.Close();
+            }
+            _disposed = true;
+        }
+
+        /// <summary>
+        /// Overrides Equals. Two Link objects are equal if they are connected
+        /// to the same remote endpoint.
+        /// </summary>
+        /// <param name="obj">The object to compare</param>
+        /// <returns>True if the object is equal to this Link, otherwise 
false</returns>
+        public override bool Equals(object obj)
+        {
+            Link<T> other = obj as Link<T>;
+            if (other == null)
+            {
+                return false;
+            }
+
+            return other.RemoteEndpoint.Equals(RemoteEndpoint);
+        }
+
+        /// <summary>
+        /// Gets the hash code for the Link object.
+        /// </summary>
+        /// <returns>The object's hash code</returns>
+        public override int GetHashCode()
+        {
+            return RemoteEndpoint.GetHashCode();
+        }
+
+        /// <summary>
+        /// Discovers the IPEndpoint for the current machine.
+        /// </summary>
+        /// <returns>The local IPEndpoint</returns>
+        private IPEndPoint GetLocalEndpoint()
+        {
+            IPAddress address = NetworkUtils.LocalIPAddress;
+            int port = ((IPEndPoint) Client.Client.LocalEndPoint).Port;
+            return new IPEndPoint(address, port);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/MultiCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/MultiCodec.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/MultiCodec.cs
new file mode 100644
index 0000000..1d64f04
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/MultiCodec.cs
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ 
+using System;
+using System.Collections.Generic;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Codec that can encode and decode a class depending on the class type.
+    /// </summary>
+    public class MultiCodec<T> : ICodec<T>
+    {
+        private readonly MultiEncoder<T> _encoder;
+
+        private readonly MultiDecoder<T> _decoder;
+
+        /// <summary>
+        /// Constructs a new MultiCodec object.
+        /// </summary>
+        public MultiCodec()
+        {
+            _encoder = new MultiEncoder<T>();
+            _decoder = new MultiDecoder<T>();
+        }
+
+        /// <summary>
+        /// Register a codec to be used when encoding/decoding objects of this 
type.
+        /// </summary>
+        /// <typeparam name="U">The type of codec</typeparam>
+        /// <param name="codec">The codec to use when encoding/decoding
+        /// objects of this type</param>
+        public void Register<U>(ICodec<U> codec) where U : T
+        {
+            _encoder.Register(codec);
+            _decoder.Register(codec);
+        }
+
+        /// <summary>
+        /// Register a codec to be used when encoding/decoding objects of this 
type.
+        /// </summary>
+        /// <typeparam name="U">The type of codec</typeparam>
+        /// <param name="codec">The codec to use when encoding/decoding
+        /// objects of this type</param>
+        /// <param name="name">The name of the class to encode/decode</param>
+        public void Register<U>(ICodec<U> codec, string name) where U : T
+        {
+            _encoder.Register(codec, name);
+            _decoder.Register(codec, name);
+        }
+
+        /// <summary>
+        /// Encodes an object with the appropriate encoding or null if it 
cannot
+        /// be encoded.
+        /// </summary>
+        /// <param name="obj">Data to encode</param>
+        public byte[] Encode(T obj)
+        {
+            return _encoder.Encode(obj);
+        }
+
+        /// <summary>
+        /// Decodes byte array into the appripriate object type.
+        /// </summary>
+        /// <param name="data">Data to be decoded</param>
+        public T Decode(byte[] data)
+        {
+            return _decoder.Decode(data);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/MultiDecoder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/MultiDecoder.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/MultiDecoder.cs
new file mode 100644
index 0000000..0aeb3d4
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/MultiDecoder.cs
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Reflection;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Proto.WakeRemoteProtos;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Decoder using the WakeTuple protocol buffer
+    /// (class name and bytes)
+    /// </summary>
+    public class MultiDecoder<T> : IDecoder<T>
+    {
+        private static readonly Logger LOGGER = 
Logger.GetLogger(typeof(MultiDecoder<T>));
+        private Dictionary<Type, object> _decoderMap;
+        private Dictionary<string, Type> _nameMap;
+
+        /// <summary>
+        /// Constructs a decoder that decodes bytes based on the class type
+        /// </summary>
+        public MultiDecoder()
+        {
+            _decoderMap = new Dictionary<Type, object>();
+            _nameMap = new Dictionary<string, Type>();
+        }
+
+        /// <summary>
+        /// Register the decoder for objects of type U
+        /// </summary>
+        /// <typeparam name="U">The type of decoder to use when decoding
+        /// objects of this type</typeparam>
+        /// <param name="decoder">The decoder to use when decoding
+        /// objects of this type</param>
+        public void Register<U>(IDecoder<U> decoder) where U : T
+        {
+            Type type = typeof(U);
+            _decoderMap[type] = decoder;
+            _nameMap[type.ToString()] = type;
+        }
+
+        /// <summary>
+        /// Register the decoder for objects of type U
+        /// </summary>
+        /// <typeparam name="U">The type of decoder to use when decoding
+        /// objects of this type</typeparam>
+        /// <param name="decoder">The decoder to use when decoding
+        /// objects of this type</param>
+        /// <param name="name">The name of the class to decode</param>
+        public void Register<U>(IDecoder<U> decoder, string name) where U : T
+        {
+            Type type = typeof(U);
+            _decoderMap[type] = decoder;
+            _nameMap[name] = type;
+        }
+
+        /// <summary>
+        /// Decodes byte array according to the underlying object type.
+        /// </summary>
+        /// <param name="data">The data to decode</param>
+        public T Decode(byte[] data)
+        {
+            WakeTuplePBuf pbuf = WakeTuplePBuf.Deserialize(data);
+            if (pbuf == null)
+            {
+                return default(T);
+            }
+
+            // Get object's class Type
+            Type type;
+            if (!_nameMap.TryGetValue(pbuf.className, out type))
+            {
+                return default(T);
+            }
+
+            // Get decoder for that type
+            object decoder;
+            if (!_decoderMap.TryGetValue(type, out decoder))
+            {
+                Exceptions.Throw(new RemoteRuntimeException("Decoder for " + 
type + " not known."), LOGGER);
+            }
+
+            // Invoke the decoder to decode the byte array
+            Type handlerType = typeof(IDecoder<>).MakeGenericType(new[] { type 
});
+            MethodInfo info = handlerType.GetMethod("Decode");
+            return (T) info.Invoke(decoder, new[] { (object) pbuf.data });
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/MultiEncoder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/MultiEncoder.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/MultiEncoder.cs
new file mode 100644
index 0000000..3ce695b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/MultiEncoder.cs
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Reflection;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote.Proto.WakeRemoteProtos;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Encoder using the WakeTuple protocol buffer
+    /// (class name and bytes)
+    /// </summary>
+    public class MultiEncoder<T> : IEncoder<T>
+    {
+        private static Logger _logger = 
Logger.GetLogger(typeof(MultiEncoder<>));
+        private Dictionary<Type, object> _encoderMap;
+        private Dictionary<Type, string> _nameMap;
+
+        /// <summary>
+        /// Constructs an encoder that encodes an object to bytes based on the 
class name
+        /// </summary>
+        public MultiEncoder()
+        {
+            _encoderMap = new Dictionary<Type, object>();
+            _nameMap = new Dictionary<Type, string>();
+        }
+
+        public void Register<U>(IEncoder<U> encoder) where U : T
+        {
+            _encoderMap[typeof(U)] = encoder;
+            _nameMap[typeof(U)] = typeof(U).ToString();
+        }
+
+        public void Register<U>(IEncoder<U> encoder, string name) where U : T
+        {
+            _encoderMap[typeof(U)] = encoder;
+            _nameMap[typeof(U)] = name;
+            _logger.Log(Level.Verbose, "Registering name for " + name);
+        }
+
+        /// <summary>Encodes an object to a byte array</summary>
+        /// <param name="obj"></param>
+        public byte[] Encode(T obj)
+        {
+            // Find encoder for object type
+            object encoder;
+            if (!_encoderMap.TryGetValue(obj.GetType(), out encoder))
+            {
+                return null;
+            }
+
+            // Invoke encoder for this type
+            Type handlerType = typeof(IEncoder<>).MakeGenericType(new[] { 
obj.GetType() });
+            MethodInfo info = handlerType.GetMethod("Encode");
+            byte[] data = (byte[]) info.Invoke(encoder, new[] { (object) obj 
});
+
+            // Serialize object type and object data into well known tuple
+            // To decode, deserialize the tuple, get object type, and look up 
the
+            // decoder for that type
+            string name = _nameMap[obj.GetType()];
+            _logger.Log(Level.Verbose, "Encoding name for " + name);
+            WakeTuplePBuf pbuf = new WakeTuplePBuf { className = name, data = 
data };
+            pbuf.className = name;
+            pbuf.data = data; 
+            return pbuf.Serialize();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ObserverContainer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ObserverContainer.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ObserverContainer.cs
new file mode 100644
index 0000000..725e9ce
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/ObserverContainer.cs
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.RX.Impl;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Stores registered IObservers for DefaultRemoteManager.
+    /// Can register and look up IObservers by remote IPEndPoint.
+    /// </summary>
+    internal class ObserverContainer<T> : 
IObserver<TransportEvent<IRemoteEvent<T>>>
+    {
+        private ConcurrentDictionary<IPEndPoint, IObserver<T>> _endpointMap;
+        private ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>> 
_typeMap;
+        private IObserver<T> _universalObserver;
+
+        /// <summary>
+        /// Constructs a new ObserverContainer used to manage remote 
IObservers.
+        /// </summary>
+        public ObserverContainer()
+        {
+            _endpointMap = new ConcurrentDictionary<IPEndPoint, 
IObserver<T>>(new IPEndPointComparer());
+            _typeMap = new ConcurrentDictionary<Type, 
IObserver<IRemoteMessage<T>>>();
+        }
+
+        /// <summary>
+        /// Registers an IObserver used to handle incoming messages from the 
remote host
+        /// at the specified IPEndPoint.
+        /// </summary>
+        /// <param name="remoteEndpoint">The IPEndPoint of the remote 
host</param>
+        /// <param name="observer">The IObserver to handle incoming 
messages</param>
+        /// <returns>An IDisposable used to unregister the observer 
with</returns>
+        public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, 
IObserver<T> observer) 
+        {
+            if (remoteEndpoint.Address.Equals(IPAddress.Any))
+            {
+                _universalObserver = observer;
+                return Disposable.Create(() => { _universalObserver = null; });
+            }
+
+            _endpointMap[remoteEndpoint] = observer;
+            return Disposable.Create(() => 
_endpointMap.TryRemove(remoteEndpoint, out observer));
+        }
+
+        /// <summary>
+        /// Registers an IObserver to handle incoming messages from a remote 
host
+        /// </summary>
+        /// <param name="observer">The IObserver to handle incoming 
messages</param>
+        /// <returns>An IDisposable used to unregister the observer 
with</returns>
+        public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> 
observer)
+        {
+            _typeMap[typeof(T)] = observer;
+            return Disposable.Create(() => _typeMap.TryRemove(typeof(T), out 
observer));
+        }
+
+        /// <summary>
+        /// Look up the IObserver for the registered IPEndPoint or event type 
+        /// and execute the IObserver.
+        /// </summary>
+        /// <param name="transportEvent">The incoming remote event</param>
+        public void OnNext(TransportEvent<IRemoteEvent<T>> transportEvent)
+        {
+            IRemoteEvent<T> remoteEvent = transportEvent.Data;
+            remoteEvent.LocalEndPoint = transportEvent.Link.LocalEndpoint;
+            remoteEvent.RemoteEndPoint = transportEvent.Link.RemoteEndpoint;
+            T value = remoteEvent.Value;
+            bool handled = false;
+
+            IObserver<T> observer1;
+            IObserver<IRemoteMessage<T>> observer2;
+            if (_universalObserver != null)
+            {
+                _universalObserver.OnNext(value);
+                handled = true;
+            }
+            if (_endpointMap.TryGetValue(remoteEvent.RemoteEndPoint, out 
observer1))
+            {
+                // IObserver was registered by IPEndpoint
+                observer1.OnNext(value);
+                handled = true;
+            } 
+            else if (_typeMap.TryGetValue(value.GetType(), out observer2))
+            {
+                // IObserver was registered by event type
+                IRemoteIdentifier id = new 
SocketRemoteIdentifier(remoteEvent.RemoteEndPoint);
+                IRemoteMessage<T> remoteMessage = new 
DefaultRemoteMessage<T>(id, value);
+                observer2.OnNext(remoteMessage);
+                handled = true;
+            }
+
+            if (!handled)
+            {
+                throw new WakeRuntimeException("Unrecognized Wake RemoteEvent 
message");
+            }
+        }
+
+        public void OnError(Exception error)
+        {
+        }
+
+        public void OnCompleted()
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs
new file mode 100644
index 0000000..bf50325
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Net;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    public class RemoteEvent<T> : IRemoteEvent<T>
+    {
+        public RemoteEvent(IPEndPoint localEndPoint, IPEndPoint 
remoteEndPoint, string source, string sink, long seq, T value)
+        {
+            LocalEndPoint = localEndPoint;
+            RemoteEndPoint = remoteEndPoint;
+            Source = source;
+            Sink = sink;
+            Value = value;
+            Sequence = seq;
+        }
+
+        public RemoteEvent(IPEndPoint localEndpoint, IPEndPoint 
remoteEndpoint, T value)
+        {
+            LocalEndPoint = localEndpoint;
+            RemoteEndPoint = remoteEndpoint;
+            Value = value;
+        }
+
+        public RemoteEvent()
+        {
+        }
+
+        public IPEndPoint LocalEndPoint { get; set; }
+
+        public IPEndPoint RemoteEndPoint { get; set; }
+
+        public string Source { get; set; }
+
+        public string Sink { get; set; }
+
+        public T Value { get; set; }
+
+        public long Sequence { get; set; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventCodec.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventCodec.cs
new file mode 100644
index 0000000..bcc5730
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventCodec.cs
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    internal class RemoteEventCodec<T> : ICodec<IRemoteEvent<T>>
+    {
+        private readonly RemoteEventEncoder<T> _encoder;
+        private readonly RemoteEventDecoder<T> _decoder;
+
+        public RemoteEventCodec(ICodec<T> codec) 
+        {
+            _encoder = new RemoteEventEncoder<T>(codec);
+            _decoder = new RemoteEventDecoder<T>(codec);
+        }
+
+        public byte[] Encode(IRemoteEvent<T> obj)
+        {
+            return _encoder.Encode(obj);
+        }
+
+        public IRemoteEvent<T> Decode(byte[] data)
+        {
+            return _decoder.Decode(data);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventDecoder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventDecoder.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventDecoder.cs
new file mode 100644
index 0000000..19378d7
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventDecoder.cs
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Wake.Remote.Proto.WakeRemoteProtos;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    public class RemoteEventDecoder<T> : IDecoder<IRemoteEvent<T>>
+    {
+        private IDecoder<T> _decoder;
+
+        public RemoteEventDecoder(IDecoder<T> decoder)
+        {
+            _decoder = decoder;
+        }
+
+        public IRemoteEvent<T> Decode(byte[] data)
+        {
+            WakeMessagePBuf pbuf = WakeMessagePBuf.Deserialize(data);
+            return new RemoteEvent<T>(null, null, pbuf.source, pbuf.sink, 
pbuf.seq, _decoder.Decode(pbuf.data));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventEncoder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventEncoder.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventEncoder.cs
new file mode 100644
index 0000000..59e1d6f
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventEncoder.cs
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Wake.Remote.Proto.WakeRemoteProtos;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    public class RemoteEventEncoder<T> : IEncoder<IRemoteEvent<T>>
+    {
+        private readonly IEncoder<T> _encoder;
+
+        public RemoteEventEncoder(IEncoder<T> encoder)
+        {
+            _encoder = encoder;
+        }
+
+        public byte[] Encode(IRemoteEvent<T> obj)
+        {
+            WakeMessagePBuf pbuf = new WakeMessagePBuf();
+            pbuf.sink = obj.Sink;
+            pbuf.source = obj.Source;
+            pbuf.data = _encoder.Encode(obj.Value);
+            pbuf.seq = obj.Sequence;
+            return pbuf.Serialize();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventEndpoint.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventEndpoint.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventEndpoint.cs
new file mode 100644
index 0000000..7d3e116
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventEndpoint.cs
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    public class RemoteEventEndPoint<T>
+    {
+        private IRemoteIdentifier _id;
+
+        public RemoteEventEndPoint(IRemoteIdentifier id)
+        {
+            _id = id;
+        }
+
+        public IRemoteIdentifier Id
+        {
+            get { return _id; }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/SocketRemoteIdentifier.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/SocketRemoteIdentifier.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/SocketRemoteIdentifier.cs
new file mode 100644
index 0000000..ecb7711
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/SocketRemoteIdentifier.cs
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using System.Globalization;
+using System.Net;
+using System.Text;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Remote identifier based on a socket address
+    /// </summary>
+    public class SocketRemoteIdentifier : IRemoteIdentifier
+    {
+        private static readonly Logger LOGGER = 
Logger.GetLogger(typeof(SocketRemoteIdentifier));
+        private IPEndPoint _addr;
+
+        public SocketRemoteIdentifier(IPEndPoint addr)
+        {
+            _addr = addr;
+        }
+
+        public SocketRemoteIdentifier(string str)
+        {
+            int index = str.IndexOf(":", System.StringComparison.Ordinal);
+            if (index <= 0)
+            {
+                Exceptions.Throw(new RemoteRuntimeException("Invalid name " + 
str), LOGGER); 
+            }
+            string host = str.Substring(0, index);
+            int port = int.Parse(str.Substring(index + 1), 
CultureInfo.InvariantCulture);
+            _addr = new IPEndPoint(IPAddress.Parse(host), port);
+        }
+
+        public IPEndPoint Addr
+        {
+            get { return _addr;  }
+        }
+
+        public override int GetHashCode()
+        {
+            return _addr.GetHashCode();
+        }
+
+        public override bool Equals(object obj)
+        {
+            return _addr.Equals(((SocketRemoteIdentifier)obj).Addr);
+        }
+
+        public override string ToString()
+        {
+            StringBuilder builder = new StringBuilder();
+            builder.Append("socket://");
+            builder.Append(_addr);
+            return builder.ToString();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StringCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StringCodec.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StringCodec.cs
new file mode 100644
index 0000000..f96caa7
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StringCodec.cs
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    public class StringCodec : ICodec<string>
+    {
+        [Inject]
+        public StringCodec()
+        {
+        }
+
+        public byte[] Encode(string obj)
+        {
+            return Encoding.ASCII.GetBytes(obj);
+        }
+
+        public string Decode(byte[] data)
+        {
+            return Encoding.ASCII.GetString(data);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StringIdentifier.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StringIdentifier.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StringIdentifier.cs
new file mode 100644
index 0000000..1e7243b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StringIdentifier.cs
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    public class StringIdentifier : IIdentifier
+    {
+        private readonly string _str;
+
+        public StringIdentifier(string s)
+        {
+            _str = s;
+        }
+
+        public override int GetHashCode()
+        {
+            return _str.GetHashCode();
+        }
+
+        public override bool Equals(object o)
+        {
+            StringIdentifier other = o as StringIdentifier;
+            return other != null && _str.Equals(other._str);
+        }
+
+        public override string ToString()
+        {
+            return _str;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StringIdentifierFactory.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StringIdentifierFactory.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StringIdentifierFactory.cs
new file mode 100644
index 0000000..12657aa
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StringIdentifierFactory.cs
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Annotations;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    public class StringIdentifierFactory : IIdentifierFactory
+    {
+        [Inject]
+        public StringIdentifierFactory()
+        {
+        }
+
+        public IIdentifier Create(string s)
+        {
+            return new StringIdentifier(s);
+        }
+    }
+}

Reply via email to