[REEF-422] Convert Wake Layer from Writable to Streaming

This addressed the issue by
  * using `StreamingCodec` in WritableLink instead of `Writable` by
    reflection.
  * Removing Writable constraint on generic in `WritableLink`,
    `WritableTransportServer`, `WritableTransportClient`,
    `WritableRemoteManager`
  * Introduce `TemporaryWritableToStreamingCodec` so that Network
    Service layer can still use `Writable` interface.

`StreamingCodec` is assumed to have at most static argument parameters.
That is, it cannot have parameters that change from one message to
another.

JIRA:
  [REEF-422](https://issues.apache.org/jira/browse/REEF-422)

Pull Request:
  This closes #261


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/ba2653d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/ba2653d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/ba2653d6

Branch: refs/heads/master
Commit: ba2653d6bfac3aabaf640474efbdfcb18c87baf8
Parents: ec9b497
Author: Dhruv <[email protected]>
Authored: Tue Jun 30 11:01:06 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Tue Jul 7 13:56:04 2015 -0700

----------------------------------------------------------------------
 .../NetworkService/WritableNetworkService.cs    |   2 +-
 .../Org.Apache.REEF.Wake.Tests.csproj           |   5 +-
 .../PrefixedStringWritable.cs                   | 108 -----
 .../StreamingRemoteManagerTest.cs               | 340 +++++++++++++++
 .../StreamingTransportTest.cs                   | 177 ++++++++
 .../WritableRemoteManagerTest.cs                | 437 -------------------
 .../WritableTransportTest.cs                    | 223 ----------
 .../Org.Apache.REEF.Wake.csproj                 |  15 +-
 .../Remote/IWritableRemoteEvent.cs              |  45 --
 .../Remote/Impl/RemoteEvent.cs                  |   4 +
 .../Remote/Impl/RemoteEventStreamingCodec.cs    |  82 ++++
 .../Remote/Impl/StreamingLink.cs                | 258 +++++++++++
 .../Remote/Impl/StreamingRemoteManager.cs       | 256 +++++++++++
 .../Impl/StreamingRemoteManagerFactory.cs       |  51 +++
 .../Remote/Impl/StreamingTransportClient.cs     | 125 ++++++
 .../Remote/Impl/StreamingTransportServer.cs     | 212 +++++++++
 .../Impl/TemporaryWritableToStreamingCodec.cs   |  70 +++
 .../Remote/Impl/WritableLink.cs                 | 295 -------------
 .../Remote/Impl/WritableObserverContainer.cs    | 132 ------
 .../Remote/Impl/WritableRemoteEvent.cs          | 115 -----
 .../Remote/Impl/WritableRemoteManager.cs        | 286 ------------
 .../Remote/Impl/WritableRemoteManagerFactory.cs |  59 ---
 .../Remote/Impl/WritableTransportClient.cs      | 132 ------
 .../Remote/Impl/WritableTransportServer.cs      | 244 -----------
 24 files changed, 1585 insertions(+), 2088 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs 
b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
index f383697..7d9d015 100644
--- a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
@@ -62,7 +62,7 @@ namespace Org.Apache.REEF.Network.NetworkService
             IObserver<WritableNsMessage<T>> messageHandler,
             IIdentifierFactory idFactory,
             INameClient nameClient,
-            WritableRemoteManagerFactory remoteManagerFactory)
+            StreamingRemoteManagerFactory remoteManagerFactory)
         {
  
             IPAddress localAddress = NetworkUtils.LocalIPAddress;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj 
b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
index 581508b..babc26d 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
@@ -46,12 +46,11 @@ under the License.
   <ItemGroup>
     <Compile Include="ClockTest.cs" />
     <Compile Include="MultiCodecTest.cs" />
-    <Compile Include="PrefixedStringWritable.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
     <Compile Include="PubSubSubjectTest.cs" />
     <Compile Include="RemoteManagerTest.cs" />
-    <Compile Include="WritableRemoteManagerTest.cs" />
-    <Compile Include="WritableTransportTest.cs" />
+    <Compile Include="StreamingRemoteManagerTest.cs" />
+    <Compile Include="StreamingTransportTest.cs" />
     <Compile Include="TransportTest.cs" />
     <Compile Include="WritableString.cs" />
   </ItemGroup>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake.Tests/PrefixedStringWritable.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/PrefixedStringWritable.cs 
b/lang/cs/Org.Apache.REEF.Wake.Tests/PrefixedStringWritable.cs
deleted file mode 100644
index dbb8af3..0000000
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/PrefixedStringWritable.cs
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake.Remote;
-
-namespace Org.Apache.REEF.Wake.Tests
-{
-    [NamedParameter("identifier in PrefixedWritable")]
-    public class StringId : Name<int>
-    {
-    }
-       
-    /// <summary>
-    /// Writable wrapper around the string class which takes integer prefix
-    /// This class is used to test non empty injector in TransportServer and 
Client
-    /// </summary>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see 
Jira REEF-295 ", false)]
-    public class PrefixedStringWritable : IWritable
-    {
-        private readonly int _id;
-        private string _data;
-
-        /// <summary>
-        /// Returns the actual string data
-        /// </summary>
-        public string Data
-        {
-            get { return _data + "_" + _id; }
-            set { _data = value; }
-        }
-
-        /// <summary>
-        /// Empty constructor for instantiation with reflection
-        /// </summary>
-        [Inject]
-        public PrefixedStringWritable([Parameter(typeof(StringId))] int id)
-        {
-            _id = id;
-        }
-
-        /// <summary>
-        /// Constructor
-        /// </summary>
-        /// <param name="data">The string data</param>
-        public PrefixedStringWritable(string data)
-        {
-            _data = data;
-        }
-
-        /// <summary>
-        /// Reads the string
-        /// </summary>
-        /// <param name="reader">reader to read from</param>
-        public void Read(IDataReader reader)
-        {
-            _data = reader.ReadString();
-        }
-
-        /// <summary>
-        /// Writes the string
-        /// </summary>
-        /// <param name="writer">Writer to write</param>
-        public void Write(IDataWriter writer)
-        {
-            writer.WriteString(_data);
-        }
-
-        /// <summary>
-        /// Reads the string
-        /// </summary>
-        /// <param name="reader">reader to read from</param>
-        /// <param name="token">the cancellation token</param>
-        public async Task ReadAsync(IDataReader reader, CancellationToken 
token)
-        {
-            _data = await reader.ReadStringAsync(token);
-        }
-
-        /// <summary>
-        /// Writes the string
-        /// </summary>
-        /// <param name="writer">Writer to write</param>
-        /// <param name="token">the cancellation token</param>
-        public async Task WriteAsync(IDataWriter writer, CancellationToken 
token)
-        {
-            await writer.WriteStringAsync(_data, token);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs 
b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs
new file mode 100644
index 0000000..20f75be
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Net;
+using System.Reactive;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Wake.Tests
+{
+    [TestClass]
+    public class StreamingRemoteManagerTest
+    {
+        private readonly StreamingRemoteManagerFactory _remoteManagerFactory1 =
+            
TangFactory.GetTang().NewInjector().GetInstance<StreamingRemoteManagerFactory>();
+
+        private readonly StreamingRemoteManagerFactory _remoteManagerFactory2 =
+        
TangFactory.GetTang().NewInjector().GetInstance<StreamingRemoteManagerFactory>();
+        
+        /// <summary>
+        /// Tests one way communication between Remote Managers 
+        /// Remote Manager listens on any available port
+        /// </summary>
+        [TestMethod]
+        public void TestStreamingOneWayCommunication()
+        {
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<WritableString> queue = new 
BlockingCollection<WritableString>();
+            List<string> events = new List<string>();
+
+            using (var remoteManager1 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager2 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            {
+                var observer = Observer.Create<WritableString>(queue.Add);
+                IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
+                remoteManager2.RegisterObserver(endpoint1, observer);
+
+                var remoteObserver = 
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+                remoteObserver.OnNext(new WritableString("abc"));
+                remoteObserver.OnNext(new WritableString("def"));
+                remoteObserver.OnNext(new WritableString("ghi"));
+
+                events.Add(queue.Take().Data);
+                events.Add(queue.Take().Data);
+                events.Add(queue.Take().Data);
+            }
+
+            Assert.AreEqual(3, events.Count);
+        }
+
+        /// <summary>
+        /// Tests two way communications. Checks whether both sides are able 
to receive messages
+        /// </summary>
+        [TestMethod]
+        public void TestStreamingTwoWayCommunication()
+        {
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<WritableString> queue1 = new 
BlockingCollection<WritableString>();
+            BlockingCollection<WritableString> queue2 = new 
BlockingCollection<WritableString>();
+            List<string> events1 = new List<string>();
+            List<string> events2 = new List<string>();
+
+            using (var remoteManager1 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager2 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            {
+                // Register observers for remote manager 1 and remote manager 2
+                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+                var observer1 = Observer.Create<WritableString>(queue1.Add);
+                var observer2 = Observer.Create<WritableString>(queue2.Add);
+                remoteManager1.RegisterObserver(remoteEndpoint, observer1);
+                remoteManager2.RegisterObserver(remoteEndpoint, observer2);
+
+                // Remote manager 1 sends 3 events to remote manager 2
+                var remoteObserver1 = 
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+                remoteObserver1.OnNext(new WritableString("abc"));
+                remoteObserver1.OnNext(new WritableString("def"));
+                remoteObserver1.OnNext(new WritableString("ghi"));
+
+                // Remote manager 2 sends 4 events to remote manager 1
+                var remoteObserver2 = 
remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
+                remoteObserver2.OnNext(new WritableString("jkl"));
+                remoteObserver2.OnNext(new WritableString("mno"));
+                remoteObserver2.OnNext(new WritableString("pqr"));
+                remoteObserver2.OnNext(new WritableString("stu"));
+
+                events1.Add(queue1.Take().Data);
+                events1.Add(queue1.Take().Data);
+                events1.Add(queue1.Take().Data);
+                events1.Add(queue1.Take().Data);
+
+                events2.Add(queue2.Take().Data);
+                events2.Add(queue2.Take().Data);
+                events2.Add(queue2.Take().Data);
+            }
+
+            Assert.AreEqual(4, events1.Count);
+            Assert.AreEqual(3, events2.Count);
+        }
+
+        /// <summary>
+        /// Tests one way communication between 3 nodes.
+        /// nodes 1 and 2 send messages to node 3
+        /// </summary>
+        [TestMethod]
+        public void TestStreamingCommunicationThreeNodesOneWay()
+        {
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<WritableString> queue = new 
BlockingCollection<WritableString>();
+            List<string> events = new List<string>();
+
+            using (var remoteManager1 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager2 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager3 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            {
+                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+                var observer = Observer.Create<WritableString>(queue.Add);
+                remoteManager3.RegisterObserver(remoteEndpoint, observer);
+
+                var remoteObserver1 = 
remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
+                var remoteObserver2 = 
remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
+
+                remoteObserver2.OnNext(new WritableString("abc"));
+                remoteObserver1.OnNext(new WritableString("def"));
+                remoteObserver2.OnNext(new WritableString("ghi"));
+                remoteObserver1.OnNext(new WritableString("jkl"));
+                remoteObserver2.OnNext(new WritableString("mno"));
+
+                for (int i = 0; i < 5; i++)
+                {
+                    events.Add(queue.Take().Data);
+                }
+            }
+
+            Assert.AreEqual(5, events.Count);
+        }
+
+        /// <summary>
+        /// Tests one way communication between 3 nodes.
+        /// nodes 1 and 2 send messages to node 3 and node 3 sends message back
+        /// </summary>
+        [TestMethod]
+        public void TestStreamingCommunicationThreeNodesBothWays()
+        {
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<WritableString> queue1 = new 
BlockingCollection<WritableString>();
+            BlockingCollection<WritableString> queue2 = new 
BlockingCollection<WritableString>();
+            BlockingCollection<WritableString> queue3 = new 
BlockingCollection<WritableString>();
+            List<string> events1 = new List<string>();
+            List<string> events2 = new List<string>();
+            List<string> events3 = new List<string>();
+
+            using (var remoteManager1 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager2 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager3 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            {
+                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+
+                var observer = Observer.Create<WritableString>(queue1.Add);
+                remoteManager1.RegisterObserver(remoteEndpoint, observer);
+                var observer2 = Observer.Create<WritableString>(queue2.Add);
+                remoteManager2.RegisterObserver(remoteEndpoint, observer2);
+                var observer3 = Observer.Create<WritableString>(queue3.Add);
+                remoteManager3.RegisterObserver(remoteEndpoint, observer3);
+
+                var remoteObserver1 = 
remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
+                var remoteObserver2 = 
remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
+
+                // Observer 1 and 2 send messages to observer 3
+                remoteObserver1.OnNext(new WritableString("abc"));
+                remoteObserver1.OnNext(new WritableString("abc"));
+                remoteObserver1.OnNext(new WritableString("abc"));
+                remoteObserver2.OnNext(new WritableString("def"));
+                remoteObserver2.OnNext(new WritableString("def"));
+
+                // Observer 3 sends messages back to observers 1 and 2
+                var remoteObserver3A = 
remoteManager3.GetRemoteObserver(remoteManager1.LocalEndpoint);
+                var remoteObserver3B = 
remoteManager3.GetRemoteObserver(remoteManager2.LocalEndpoint);
+
+                remoteObserver3A.OnNext(new WritableString("ghi"));
+                remoteObserver3A.OnNext(new WritableString("ghi"));
+                remoteObserver3B.OnNext(new WritableString("jkl"));
+                remoteObserver3B.OnNext(new WritableString("jkl"));
+                remoteObserver3B.OnNext(new WritableString("jkl"));
+
+                events1.Add(queue1.Take().Data);
+                events1.Add(queue1.Take().Data);
+
+                events2.Add(queue2.Take().Data);
+                events2.Add(queue2.Take().Data);
+                events2.Add(queue2.Take().Data);
+
+                events3.Add(queue3.Take().Data);
+                events3.Add(queue3.Take().Data);
+                events3.Add(queue3.Take().Data);
+                events3.Add(queue3.Take().Data);
+                events3.Add(queue3.Take().Data);
+            }
+
+            Assert.AreEqual(2, events1.Count);
+            Assert.AreEqual(3, events2.Count);
+            Assert.AreEqual(5, events3.Count);
+        }
+
+        /// <summary>
+        /// Tests whether remote manager is able to send acknowledgement back
+        /// </summary>
+        [TestMethod]
+        public void TestStreamingRemoteSenderCallback()
+        {
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<WritableString> queue = new 
BlockingCollection<WritableString>();
+            List<string> events = new List<string>();
+
+            using (var remoteManager1 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager2 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            {
+                // Register handler for when remote manager 2 receives events; 
respond
+                // with an ack
+                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+                var remoteObserver2 = 
remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
+
+                var receiverObserver = Observer.Create<WritableString>(
+                    message => remoteObserver2.OnNext(new 
WritableString("received message: " + message.Data)));
+                remoteManager2.RegisterObserver(remoteEndpoint, 
receiverObserver);
+
+                // Register handler for remote manager 1 to record the ack
+                var senderObserver = 
Observer.Create<WritableString>(queue.Add);
+                remoteManager1.RegisterObserver(remoteEndpoint, 
senderObserver);
+
+                // Begin to send messages
+                var remoteObserver1 = 
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+                remoteObserver1.OnNext(new WritableString("hello"));
+                remoteObserver1.OnNext(new WritableString("there"));
+                remoteObserver1.OnNext(new WritableString("buddy"));
+
+                events.Add(queue.Take().Data);
+                events.Add(queue.Take().Data);
+                events.Add(queue.Take().Data);
+            }
+
+            Assert.AreEqual(3, events.Count);
+            Assert.AreEqual("received message: hello", events[0]);
+            Assert.AreEqual("received message: there", events[1]);
+            Assert.AreEqual("received message: buddy", events[2]);
+        }
+        
+        /// <summary>
+        /// Test whether observer can be created with IRemoteMessage interface
+        /// </summary>
+        [TestMethod]
+        public void TestStreamingRegisterObserverByType()
+        {
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<WritableString> queue = new 
BlockingCollection<WritableString>();
+            List<string> events = new List<string>();
+
+            using (var remoteManager1 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager2 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            {
+                // RemoteManager2 listens and records events of type 
IRemoteEvent<WritableString>
+                var observer = 
Observer.Create<IRemoteMessage<WritableString>>(message => 
queue.Add(message.Message));
+                remoteManager2.RegisterObserver(observer);
+
+                // Remote manager 1 sends 3 events to remote manager 2
+                var remoteObserver = 
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+                remoteObserver.OnNext(new WritableString("abc"));
+                remoteObserver.OnNext(new WritableString("def"));
+                remoteObserver.OnNext(new WritableString("ghi"));
+
+                events.Add(queue.Take().Data);
+                events.Add(queue.Take().Data);
+                events.Add(queue.Take().Data);
+            }
+
+            Assert.AreEqual(3, events.Count);
+        }
+
+        /// <summary>
+        /// Tests whether we get the cached observer back for sending message 
without reinstantiating it
+        /// </summary>
+        [TestMethod]
+        public void TestStreamingCachedConnection()
+        {
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<WritableString> queue = new 
BlockingCollection<WritableString>();
+            List<string> events = new List<string>();
+
+            using (var remoteManager1 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager2 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            {
+                var observer = Observer.Create<WritableString>(queue.Add);
+                IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
+                remoteManager2.RegisterObserver(endpoint1, observer);
+
+                var remoteObserver = 
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+                remoteObserver.OnNext(new WritableString("abc"));
+                remoteObserver.OnNext(new WritableString("def"));
+
+                var cachedObserver = 
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+                cachedObserver.OnNext(new WritableString("ghi"));
+                cachedObserver.OnNext(new WritableString("jkl"));
+
+                events.Add(queue.Take().Data);
+                events.Add(queue.Take().Data);
+                events.Add(queue.Take().Data);
+                events.Add(queue.Take().Data);
+            }
+
+            Assert.AreEqual(4, events.Count);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingTransportTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingTransportTest.cs 
b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingTransportTest.cs
new file mode 100644
index 0000000..268da70
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingTransportTest.cs
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Net;
+using System.Reactive;
+using System.Threading.Tasks;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.Remote.Parameters;
+using Org.Apache.REEF.Wake.StreamingCodec;
+using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
+
+namespace Org.Apache.REEF.Wake.Tests
+{
+    /// <summary>
+    /// Tests the StreamingTransportServer, StreamingTransportClient and 
StreamingLink.
+    /// Basically the Wake transport layer.
+    /// </summary>
+    [TestClass]
+    public class StreamingTransportTest
+    {
+        private readonly ITcpPortProvider _tcpPortProvider = 
GetTcpProvider(8900, 8940);
+        private readonly IInjector _injector = 
TangFactory.GetTang().NewInjector();
+
+        /// <summary>
+        /// Tests whether StreamingTransportServer receives 
+        /// string messages from StreamingTransportClient
+        /// </summary>
+        [TestMethod]
+        public void TestStreamingTransportServer()
+        {
+            BlockingCollection<string> queue = new 
BlockingCollection<string>();
+            List<string> events = new List<string>();
+            IStreamingCodec<string> stringCodec = 
_injector.GetInstance<StringStreamingCodec>();
+
+            IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
+            var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent 
=> queue.Add(tEvent.Data));
+
+            using (var server = new 
StreamingTransportServer<string>(endpoint.Address, remoteHandler, 
_tcpPortProvider, stringCodec))
+            {
+                server.Run();
+
+                IPEndPoint remoteEndpoint = new 
IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
+                using (var client = new 
StreamingTransportClient<string>(remoteEndpoint, stringCodec))
+                {
+                    client.Send("Hello");
+                    client.Send(", ");
+                    client.Send("World!");
+
+                    events.Add(queue.Take());
+                    events.Add(queue.Take());
+                    events.Add(queue.Take());
+                } 
+            }
+
+            Assert.AreEqual(3, events.Count);
+            Assert.AreEqual(events[0], "Hello");
+            Assert.AreEqual(events[1], ", ");
+            Assert.AreEqual(events[2], "World!");
+        }
+
+        /// <summary>
+        /// Checks whether StreamingTransportClient is able to receive 
messages from remote host
+        /// </summary>
+        [TestMethod]
+        public void TestStreamingTransportSenderStage()
+        {
+            
+            IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
+
+            List<string> events = new List<string>();
+            BlockingCollection<string> queue = new 
BlockingCollection<string>();
+            IStreamingCodec<string> stringCodec = 
_injector.GetInstance<StringStreamingCodec>();
+
+            // Server echoes the message back to the client
+            var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent 
=> tEvent.Link.Write(tEvent.Data));
+
+            using (var server = new 
StreamingTransportServer<string>(endpoint.Address, remoteHandler, 
_tcpPortProvider, stringCodec))
+            {
+                server.Run();
+
+                var clientHandler = 
Observer.Create<TransportEvent<string>>(tEvent => queue.Add(tEvent.Data));
+                IPEndPoint remoteEndpoint = new 
IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
+                using (var client = new 
StreamingTransportClient<string>(remoteEndpoint, clientHandler, stringCodec))
+                {
+                    client.Send("Hello");
+                    client.Send(", ");
+                    client.Send(" World");
+
+                    events.Add(queue.Take());
+                    events.Add(queue.Take());
+                    events.Add(queue.Take());
+                } 
+            }
+
+            Assert.AreEqual(3, events.Count);
+            Assert.AreEqual(events[0], "Hello");
+            Assert.AreEqual(events[1], ", ");
+            Assert.AreEqual(events[2], " World");
+        }
+
+        /// <summary>
+        /// Checks whether StreamingTransportClient and 
StreamingTransportServer works 
+        /// in asynchronous condition while sending messages asynchronously 
from different 
+        /// threads
+        /// </summary>
+        [TestMethod]
+        public void TestStreamingRaceCondition()
+        {
+            BlockingCollection<string> queue = new 
BlockingCollection<string>();
+            List<string> events = new List<string>();
+            IStreamingCodec<string> stringCodec = 
_injector.GetInstance<StringStreamingCodec>();
+            int numEventsExpected = 150;
+
+            IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
+            var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent 
=> queue.Add(tEvent.Data));
+
+            using (var server = new 
StreamingTransportServer<string>(endpoint.Address, remoteHandler, 
_tcpPortProvider, stringCodec))
+            {
+                server.Run();
+
+                for (int i = 0; i < numEventsExpected / 3; i++)
+                {
+                    Task.Run(() =>
+                    {
+                        IPEndPoint remoteEndpoint = new 
IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
+                        using (var client = new 
StreamingTransportClient<string>(remoteEndpoint, stringCodec))
+                        {
+                            client.Send("Hello");
+                            client.Send(", ");
+                            client.Send("World!");
+                        }
+                    });
+                }
+
+                for (int i = 0; i < numEventsExpected; i++)
+                {
+                    events.Add(queue.Take());
+                }
+            }
+
+            Assert.AreEqual(numEventsExpected, events.Count);
+
+        }
+
+        private static ITcpPortProvider GetTcpProvider(int portRangeStart, int 
portRangeEnd)
+        {
+            var configuration = TangFactory.GetTang().NewConfigurationBuilder()
+                .BindImplementation<ITcpPortProvider, TcpPortProvider>()
+                
.BindIntNamedParam<TcpPortRangeStart>(portRangeStart.ToString())
+                .BindIntNamedParam<TcpPortRangeCount>((portRangeEnd - 
portRangeStart + 1).ToString())
+                .Build();
+            return 
TangFactory.GetTang().NewInjector(configuration).GetInstance<ITcpPortProvider>();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs 
b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs
deleted file mode 100644
index 49c0f5b..0000000
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs
+++ /dev/null
@@ -1,437 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Globalization;
-using System.IO;
-using System.Net;
-using System.Reactive;
-using System.Threading.Tasks;
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Wake.Impl;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Remote.Impl;
-using Org.Apache.REEF.Wake.Util;
-
-namespace Org.Apache.REEF.Wake.Tests
-{
-    [TestClass]
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see 
Jira REEF-295 ", false)]
-    public class WritableRemoteManagerTest
-    {
-        private const int Id = 5;
-
-        private static IConfiguration _config = 
TangFactory.GetTang().NewConfigurationBuilder().BindNamedParameter<StringId, 
int>(
-               GenericType<StringId>.Class, 
Id.ToString(CultureInfo.InvariantCulture)).Build();
-
-        private readonly WritableRemoteManagerFactory _remoteManagerFactory1 =
-            
TangFactory.GetTang().NewInjector().GetInstance<WritableRemoteManagerFactory>();
-
-        private readonly WritableRemoteManagerFactory _remoteManagerFactory2 =
-        
TangFactory.GetTang().NewInjector(_config).GetInstance<WritableRemoteManagerFactory>();
-        
-        /// <summary>
-        /// Tests one way communication between Remote Managers 
-        /// Remote Manager listens on any available port
-        /// </summary>
-        [TestMethod]
-        public void TestWritableOneWayCommunication()
-        {
-            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
-            BlockingCollection<WritableString> queue = new 
BlockingCollection<WritableString>();
-            List<string> events = new List<string>();
-
-            using (var remoteManager1 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            {
-                var observer = Observer.Create<WritableString>(queue.Add);
-                IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
-                remoteManager2.RegisterObserver(endpoint1, observer);
-
-                var remoteObserver = 
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
-                remoteObserver.OnNext(new WritableString("abc"));
-                remoteObserver.OnNext(new WritableString("def"));
-                remoteObserver.OnNext(new WritableString("ghi"));
-
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-            }
-
-            Assert.AreEqual(3, events.Count);
-        }
-
-        /// <summary>
-        /// Tests one way communication between Remote Managers 
-        /// Remote manager listens on a particular port
-        /// </summary>
-        [TestMethod]
-        public void TestWritableOneWayCommunicationClientOnly()
-        {
-            int listeningPort = NetworkUtils.GenerateRandomPort(8900, 8940);
-            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
-            BlockingCollection<WritableString> queue = new 
BlockingCollection<WritableString>();
-            List<string> events = new List<string>();
-
-            using (var remoteManager1 = 
_remoteManagerFactory1.GetInstance<WritableString>())
-            using (var remoteManager2 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 
listeningPort))
-            {
-                IPEndPoint remoteEndpoint = new IPEndPoint(listeningAddress, 
0);
-                var observer = Observer.Create<WritableString>(queue.Add);
-                remoteManager2.RegisterObserver(remoteEndpoint, observer);
-
-                var remoteObserver = 
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
-                remoteObserver.OnNext(new WritableString("abc"));
-                remoteObserver.OnNext(new WritableString("def"));
-                remoteObserver.OnNext(new WritableString("ghi"));
-
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-            }
-
-            Assert.AreEqual(3, events.Count);
-        }
-
-        /// <summary>
-        /// Tests two way communications. Checks whether both sides are able 
to receive messages
-        /// </summary>
-        [TestMethod]
-        public void TestWritableTwoWayCommunication()
-        {
-            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
-            BlockingCollection<WritableString> queue1 = new 
BlockingCollection<WritableString>();
-            BlockingCollection<WritableString> queue2 = new 
BlockingCollection<WritableString>();
-            List<string> events1 = new List<string>();
-            List<string> events2 = new List<string>();
-
-            using (var remoteManager1 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            {
-                // Register observers for remote manager 1 and remote manager 2
-                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
-                var observer1 = Observer.Create<WritableString>(queue1.Add);
-                var observer2 = Observer.Create<WritableString>(queue2.Add);
-                remoteManager1.RegisterObserver(remoteEndpoint, observer1);
-                remoteManager2.RegisterObserver(remoteEndpoint, observer2);
-
-                // Remote manager 1 sends 3 events to remote manager 2
-                var remoteObserver1 = 
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
-                remoteObserver1.OnNext(new WritableString("abc"));
-                remoteObserver1.OnNext(new WritableString("def"));
-                remoteObserver1.OnNext(new WritableString("ghi"));
-
-                // Remote manager 2 sends 4 events to remote manager 1
-                var remoteObserver2 = 
remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
-                remoteObserver2.OnNext(new WritableString("jkl"));
-                remoteObserver2.OnNext(new WritableString("mno"));
-                remoteObserver2.OnNext(new WritableString("pqr"));
-                remoteObserver2.OnNext(new WritableString("stu"));
-
-                events1.Add(queue1.Take().Data);
-                events1.Add(queue1.Take().Data);
-                events1.Add(queue1.Take().Data);
-                events1.Add(queue1.Take().Data);
-
-                events2.Add(queue2.Take().Data);
-                events2.Add(queue2.Take().Data);
-                events2.Add(queue2.Take().Data);
-            }
-
-            Assert.AreEqual(4, events1.Count);
-            Assert.AreEqual(3, events2.Count);
-        }
-
-        /// <summary>
-        /// Tests two way communications where message needs an injectable 
argument 
-        /// to be passed. Checks whether both sides are able to receive 
messages
-        /// </summary>
-        [TestMethod]
-        public void TestNonEmptyArgumentInjectionWritableTwoWayCommunication()
-        {
-            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");         
   
-
-            BlockingCollection<PrefixedStringWritable> queue1 = new 
BlockingCollection<PrefixedStringWritable>();
-            BlockingCollection<PrefixedStringWritable> queue2 = new 
BlockingCollection<PrefixedStringWritable>();
-            List<string> events1 = new List<string>();
-            List<string> events2 = new List<string>();
-
-            using (var remoteManager1 = 
_remoteManagerFactory2.GetInstance<PrefixedStringWritable>(listeningAddress, 0))
-            using (var remoteManager2 = 
_remoteManagerFactory2.GetInstance<PrefixedStringWritable>(listeningAddress, 0))
-            {
-                // Register observers for remote manager 1 and remote manager 2
-                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
-                var observer1 = 
Observer.Create<PrefixedStringWritable>(queue1.Add);
-                var observer2 = 
Observer.Create<PrefixedStringWritable>(queue2.Add);
-                remoteManager1.RegisterObserver(remoteEndpoint, observer1);
-                remoteManager2.RegisterObserver(remoteEndpoint, observer2);
-
-                // Remote manager 1 sends 3 events to remote manager 2
-                var remoteObserver1 = 
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
-                remoteObserver1.OnNext(new PrefixedStringWritable("abc"));
-                remoteObserver1.OnNext(new PrefixedStringWritable("def"));
-                remoteObserver1.OnNext(new PrefixedStringWritable("ghi"));
-
-                // Remote manager 2 sends 4 events to remote manager 1
-                var remoteObserver2 = 
remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
-                remoteObserver2.OnNext(new PrefixedStringWritable("jkl"));
-                remoteObserver2.OnNext(new PrefixedStringWritable("mno"));
-                remoteObserver2.OnNext(new PrefixedStringWritable("pqr"));
-                remoteObserver2.OnNext(new PrefixedStringWritable("stu"));
-
-                events1.Add(queue1.Take().Data);
-                events1.Add(queue1.Take().Data);
-                events1.Add(queue1.Take().Data);
-                events1.Add(queue1.Take().Data);
-
-                events2.Add(queue2.Take().Data);
-                events2.Add(queue2.Take().Data);
-                events2.Add(queue2.Take().Data);
-            }
-
-            Assert.AreEqual(4, events1.Count);
-            Assert.AreEqual(3, events2.Count);
-        }
-
-        /// <summary>
-        /// Tests one way communication between 3 nodes.
-        /// nodes 1 and 2 send messages to node 3
-        /// </summary>
-        [TestMethod]
-        public void TestWritableCommunicationThreeNodesOneWay()
-        {
-            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
-            BlockingCollection<WritableString> queue = new 
BlockingCollection<WritableString>();
-            List<string> events = new List<string>();
-
-            using (var remoteManager1 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager3 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            {
-                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
-                var observer = Observer.Create<WritableString>(queue.Add);
-                remoteManager3.RegisterObserver(remoteEndpoint, observer);
-
-                var remoteObserver1 = 
remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
-                var remoteObserver2 = 
remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
-
-                remoteObserver2.OnNext(new WritableString("abc"));
-                remoteObserver1.OnNext(new WritableString("def"));
-                remoteObserver2.OnNext(new WritableString("ghi"));
-                remoteObserver1.OnNext(new WritableString("jkl"));
-                remoteObserver2.OnNext(new WritableString("mno"));
-
-                for (int i = 0; i < 5; i++)
-                {
-                    events.Add(queue.Take().Data);
-                }
-            }
-
-            Assert.AreEqual(5, events.Count);
-        }
-
-        /// <summary>
-        /// Tests one way communication between 3 nodes.
-        /// nodes 1 and 2 send messages to node 3 and node 3 sends message back
-        /// </summary>
-        [TestMethod]
-        public void TestWritableCommunicationThreeNodesBothWays()
-        {
-            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
-            BlockingCollection<WritableString> queue1 = new 
BlockingCollection<WritableString>();
-            BlockingCollection<WritableString> queue2 = new 
BlockingCollection<WritableString>();
-            BlockingCollection<WritableString> queue3 = new 
BlockingCollection<WritableString>();
-            List<string> events1 = new List<string>();
-            List<string> events2 = new List<string>();
-            List<string> events3 = new List<string>();
-
-            using (var remoteManager1 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager3 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            {
-                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
-
-                var observer = Observer.Create<WritableString>(queue1.Add);
-                remoteManager1.RegisterObserver(remoteEndpoint, observer);
-                var observer2 = Observer.Create<WritableString>(queue2.Add);
-                remoteManager2.RegisterObserver(remoteEndpoint, observer2);
-                var observer3 = Observer.Create<WritableString>(queue3.Add);
-                remoteManager3.RegisterObserver(remoteEndpoint, observer3);
-
-                var remoteObserver1 = 
remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
-                var remoteObserver2 = 
remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
-
-                // Observer 1 and 2 send messages to observer 3
-                remoteObserver1.OnNext(new WritableString("abc"));
-                remoteObserver1.OnNext(new WritableString("abc"));
-                remoteObserver1.OnNext(new WritableString("abc"));
-                remoteObserver2.OnNext(new WritableString("def"));
-                remoteObserver2.OnNext(new WritableString("def"));
-
-                // Observer 3 sends messages back to observers 1 and 2
-                var remoteObserver3A = 
remoteManager3.GetRemoteObserver(remoteManager1.LocalEndpoint);
-                var remoteObserver3B = 
remoteManager3.GetRemoteObserver(remoteManager2.LocalEndpoint);
-
-                remoteObserver3A.OnNext(new WritableString("ghi"));
-                remoteObserver3A.OnNext(new WritableString("ghi"));
-                remoteObserver3B.OnNext(new WritableString("jkl"));
-                remoteObserver3B.OnNext(new WritableString("jkl"));
-                remoteObserver3B.OnNext(new WritableString("jkl"));
-
-                events1.Add(queue1.Take().Data);
-                events1.Add(queue1.Take().Data);
-
-                events2.Add(queue2.Take().Data);
-                events2.Add(queue2.Take().Data);
-                events2.Add(queue2.Take().Data);
-
-                events3.Add(queue3.Take().Data);
-                events3.Add(queue3.Take().Data);
-                events3.Add(queue3.Take().Data);
-                events3.Add(queue3.Take().Data);
-                events3.Add(queue3.Take().Data);
-            }
-
-            Assert.AreEqual(2, events1.Count);
-            Assert.AreEqual(3, events2.Count);
-            Assert.AreEqual(5, events3.Count);
-        }
-
-        /// <summary>
-        /// Tests whether remote manager is able to send acknowledgement back
-        /// </summary>
-        [TestMethod]
-        public void TestWritableRemoteSenderCallback()
-        {
-            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
-            BlockingCollection<WritableString> queue = new 
BlockingCollection<WritableString>();
-            List<string> events = new List<string>();
-
-            using (var remoteManager1 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            {
-                // Register handler for when remote manager 2 receives events; 
respond
-                // with an ack
-                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
-                var remoteObserver2 = 
remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
-
-                var receiverObserver = Observer.Create<WritableString>(
-                    message => remoteObserver2.OnNext(new 
WritableString("received message: " + message.Data)));
-                remoteManager2.RegisterObserver(remoteEndpoint, 
receiverObserver);
-
-                // Register handler for remote manager 1 to record the ack
-                var senderObserver = 
Observer.Create<WritableString>(queue.Add);
-                remoteManager1.RegisterObserver(remoteEndpoint, 
senderObserver);
-
-                // Begin to send messages
-                var remoteObserver1 = 
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
-                remoteObserver1.OnNext(new WritableString("hello"));
-                remoteObserver1.OnNext(new WritableString("there"));
-                remoteObserver1.OnNext(new WritableString("buddy"));
-
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-            }
-
-            Assert.AreEqual(3, events.Count);
-            Assert.AreEqual("received message: hello", events[0]);
-            Assert.AreEqual("received message: there", events[1]);
-            Assert.AreEqual("received message: buddy", events[2]);
-        }
-        
-        /// <summary>
-        /// Test whether observer can be created with IRemoteMessage interface
-        /// </summary>
-        [TestMethod]
-        public void TestWritableRegisterObserverByType()
-        {
-            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
-            BlockingCollection<WritableString> queue = new 
BlockingCollection<WritableString>();
-            List<string> events = new List<string>();
-
-            using (var remoteManager1 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            {
-                // RemoteManager2 listens and records events of type 
IRemoteEvent<WritableString>
-                var observer = 
Observer.Create<IRemoteMessage<WritableString>>(message => 
queue.Add(message.Message));
-                remoteManager2.RegisterObserver(observer);
-
-                // Remote manager 1 sends 3 events to remote manager 2
-                var remoteObserver = 
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
-                remoteObserver.OnNext(new WritableString("abc"));
-                remoteObserver.OnNext(new WritableString("def"));
-                remoteObserver.OnNext(new WritableString("ghi"));
-
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-            }
-
-            Assert.AreEqual(3, events.Count);
-        }
-
-        /// <summary>
-        /// Tests whether we get the cached observer back for sending message 
without reinstantiating it
-        /// </summary>
-        [TestMethod]
-        public void TestWritableCachedConnection()
-        {
-            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
-            BlockingCollection<WritableString> queue = new 
BlockingCollection<WritableString>();
-            List<string> events = new List<string>();
-
-            using (var remoteManager1 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            {
-                var observer = Observer.Create<WritableString>(queue.Add);
-                IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
-                remoteManager2.RegisterObserver(endpoint1, observer);
-
-                var remoteObserver = 
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
-                remoteObserver.OnNext(new WritableString("abc"));
-                remoteObserver.OnNext(new WritableString("def"));
-
-                var cachedObserver = 
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
-                cachedObserver.OnNext(new WritableString("ghi"));
-                cachedObserver.OnNext(new WritableString("jkl"));
-
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-            }
-
-            Assert.AreEqual(4, events.Count);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs 
b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs
deleted file mode 100644
index 2255cfa..0000000
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs
+++ /dev/null
@@ -1,223 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Globalization;
-using System.IO;
-using System.Net;
-using System.Reactive;
-using System.Threading;
-using System.Threading.Tasks;
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Remote.Impl;
-using Org.Apache.REEF.Wake.Remote.Parameters;
-using Org.Apache.REEF.Wake.Util;
-
-namespace Org.Apache.REEF.Wake.Tests
-{
-    /// <summary>
-    /// Tests the WritableTransportServer, WritableTransportClient and 
WritableLink.
-    /// Basically the Wake transport layer.
-    /// </summary>
-    [TestClass]
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see 
Jira REEF-295 ", false)]
-    public class WritableTransportTest
-    {
-        private readonly ITcpPortProvider _tcpPortProvider = 
GetTcpProvider(8900, 8940);
-        private readonly IInjector _injector = 
TangFactory.GetTang().NewInjector();
-
-        /// <summary>
-        /// Tests whether WritableTransportServer receives 
-        /// string messages from WritableTransportClient
-        /// </summary>
-        [TestMethod]
-        public void TestWritableTransportServer()
-        {
-            BlockingCollection<WritableString> queue = new 
BlockingCollection<WritableString>();
-            List<string> events = new List<string>();
-
-            IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
-            var remoteHandler = 
Observer.Create<TransportEvent<WritableString>>(tEvent => 
queue.Add(tEvent.Data));
-
-            using (var server = new 
WritableTransportServer<WritableString>(endpoint, remoteHandler, 
_tcpPortProvider, _injector))
-            {
-                server.Run();
-
-                IPEndPoint remoteEndpoint = new 
IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
-                using (var client = new 
WritableTransportClient<WritableString>(remoteEndpoint, _injector))
-                {
-                    client.Send(new WritableString("Hello"));
-                    client.Send(new WritableString(", "));
-                    client.Send(new WritableString("World!"));
-
-                    events.Add(queue.Take().Data);
-                    events.Add(queue.Take().Data);
-                    events.Add(queue.Take().Data);
-                } 
-            }
-
-            Assert.AreEqual(3, events.Count);
-            Assert.AreEqual(events[0], "Hello");
-            Assert.AreEqual(events[1], ", ");
-            Assert.AreEqual(events[2], "World!");
-        }
-
-
-        /// <summary>
-        /// Tests whether WritableTransportServer receives 
-        /// string messages from WritableTransportClient with non empty 
injector
-        /// </summary>
-        [TestMethod]
-        public void TestNonEmptyInjectionTransportServer()
-        {
-            int id = 5;
-            IConfiguration config = 
TangFactory.GetTang().NewConfigurationBuilder().BindNamedParameter<StringId, 
int>(
-                GenericType<StringId>.Class, 
id.ToString(CultureInfo.InvariantCulture)).Build();
-
-            IInjector injector = TangFactory.GetTang().NewInjector(config);
-
-            BlockingCollection<PrefixedStringWritable> queue = new 
BlockingCollection<PrefixedStringWritable>();
-            List<string> events = new List<string>();
-
-            IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
-            var remoteHandler = 
Observer.Create<TransportEvent<PrefixedStringWritable>>(tEvent => 
queue.Add(tEvent.Data));
-
-            using (var server = new 
WritableTransportServer<PrefixedStringWritable>(endpoint, remoteHandler, 
_tcpPortProvider, injector.ForkInjector()))
-            {
-                server.Run();
-
-                IPEndPoint remoteEndpoint = new 
IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
-                using (var client = new 
WritableTransportClient<PrefixedStringWritable>(remoteEndpoint, 
injector.ForkInjector()))
-                {
-                    client.Send(new PrefixedStringWritable("Hello"));
-                    client.Send(new PrefixedStringWritable(", "));
-                    client.Send(new PrefixedStringWritable("World!"));
-
-                    events.Add(queue.Take().Data);
-                    events.Add(queue.Take().Data);
-                    events.Add(queue.Take().Data);
-                }
-            }
-
-            Assert.AreEqual(3, events.Count);
-            Assert.AreEqual(events[0], "Hello_" + id);
-            Assert.AreEqual(events[1], ", _" + id);
-            Assert.AreEqual(events[2], "World!_" + id);
-        }
-
-
-        /// <summary>
-        /// Checks whether WritableTransportClient is able to receive messages 
from remote host
-        /// </summary>
-        [TestMethod]
-        public void TestWritableTransportSenderStage()
-        {
-            
-            IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
-
-            List<string> events = new List<string>();
-            BlockingCollection<WritableString> queue = new 
BlockingCollection<WritableString>();
-
-            // Server echoes the message back to the client
-            var remoteHandler = 
Observer.Create<TransportEvent<WritableString>>(tEvent => 
tEvent.Link.Write(tEvent.Data));
-
-            using (var server = new 
WritableTransportServer<WritableString>(endpoint, remoteHandler, 
_tcpPortProvider, _injector))
-            {
-                server.Run();
-
-                var clientHandler = 
Observer.Create<TransportEvent<WritableString>>(tEvent => 
queue.Add(tEvent.Data));
-                IPEndPoint remoteEndpoint = new 
IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
-                using (var client = new 
WritableTransportClient<WritableString>(remoteEndpoint, clientHandler, 
_injector))
-                {
-                    client.Send(new WritableString("Hello"));
-                    client.Send(new WritableString(", "));
-                    client.Send(new WritableString(" World"));
-
-                    events.Add(queue.Take().Data);
-                    events.Add(queue.Take().Data);
-                    events.Add(queue.Take().Data);
-                } 
-            }
-
-            Assert.AreEqual(3, events.Count);
-            Assert.AreEqual(events[0], "Hello");
-            Assert.AreEqual(events[1], ", ");
-            Assert.AreEqual(events[2], " World");
-        }
-
-        /// <summary>
-        /// Checks whether WritableTransportClient and WritableTransportServer 
works 
-        /// in asynchronous condition while sending messages asynchronously 
from different 
-        /// threads
-        /// </summary>
-        [TestMethod]
-        public void TestWritableRaceCondition()
-        {
-            BlockingCollection<WritableString> queue = new 
BlockingCollection<WritableString>();
-            List<string> events = new List<string>();
-            int numEventsExpected = 150;
-
-            IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
-            var remoteHandler = 
Observer.Create<TransportEvent<WritableString>>(tEvent => 
queue.Add(tEvent.Data));
-
-            using (var server = new 
WritableTransportServer<WritableString>(endpoint, remoteHandler, 
_tcpPortProvider, _injector))
-            {
-                server.Run();
-
-                for (int i = 0; i < numEventsExpected / 3; i++)
-                {
-                    Task.Run(() =>
-                    {
-                        IPEndPoint remoteEndpoint = new 
IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
-                        using (var client = new 
WritableTransportClient<WritableString>(remoteEndpoint, _injector))
-                        {
-                            client.Send(new WritableString("Hello"));
-                            client.Send(new WritableString(", "));
-                            client.Send(new WritableString("World!"));
-                        }
-                    });
-                }
-
-                for (int i = 0; i < numEventsExpected; i++)
-                {
-                    events.Add(queue.Take().Data);
-                }
-            }
-
-            Assert.AreEqual(numEventsExpected, events.Count);
-
-        }
-
-        private static ITcpPortProvider GetTcpProvider(int portRangeStart, int 
portRangeEnd)
-        {
-            var configuration = TangFactory.GetTang().NewConfigurationBuilder()
-                .BindImplementation<ITcpPortProvider, TcpPortProvider>()
-                
.BindIntNamedParam<TcpPortRangeStart>(portRangeStart.ToString())
-                .BindIntNamedParam<TcpPortRangeCount>((portRangeEnd - 
portRangeStart + 1).ToString())
-                .Build();
-            return 
TangFactory.GetTang().NewInjector(configuration).GetInstance<ITcpPortProvider>();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj 
b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
index d2b2970..4069d15 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
@@ -48,7 +48,9 @@ under the License.
     <Compile Include="IEventHandler.cs" />
     <Compile Include="IIdentifier.cs" />
     <Compile Include="IIdentifierFactory.cs" />
-    <Compile Include="Remote\Impl\WritableRemoteManagerFactory.cs" />
+    <Compile Include="Remote\Impl\RemoteEventStreamingCodec.cs" />
+    <Compile Include="Remote\Impl\TemporaryWritableToStreamingCodec.cs" />
+    <Compile Include="Remote\Impl\StreamingRemoteManagerFactory.cs" />
     <Compile Include="Remote\Impl\DefaultRemoteManagerFactory.cs" />
     <Compile Include="Impl\LoggingEventHandler.cs" />
     <Compile Include="Impl\MissingStartHandlerHandler.cs" />
@@ -71,15 +73,12 @@ under the License.
     <Compile Include="Remote\IDataWriter.cs" />
     <Compile Include="Remote\Impl\StreamDataReader.cs" />
     <Compile Include="Remote\Impl\StreamDataWriter.cs" />
-    <Compile Include="Remote\Impl\WritableRemoteManager.cs" />
-    <Compile Include="Remote\Impl\WritableLink.cs" />
-    <Compile Include="Remote\Impl\WritableObserverContainer.cs" />
-    <Compile Include="Remote\Impl\WritableRemoteEvent.cs" />
-    <Compile Include="Remote\Impl\WritableTransportClient.cs" />
-    <Compile Include="Remote\Impl\WritableTransportServer.cs" />
+    <Compile Include="Remote\Impl\StreamingRemoteManager.cs" />
+    <Compile Include="Remote\Impl\StreamingLink.cs" />
+    <Compile Include="Remote\Impl\StreamingTransportClient.cs" />
+    <Compile Include="Remote\Impl\StreamingTransportServer.cs" />
     <Compile Include="Remote\IRemoteManagerFactory.cs" />
     <Compile Include="Remote\IWritable.cs" />
-    <Compile Include="Remote\IWritableRemoteEvent.cs" />
     <Compile Include="Remote\Proto\WakeRemoteProtosGen.cs" />
     <Compile Include="Remote\ICodec.cs" />
     <Compile Include="Remote\ICodecFactory.cs" />

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/IWritableRemoteEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IWritableRemoteEvent.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/IWritableRemoteEvent.cs
deleted file mode 100644
index 40222aa..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/IWritableRemoteEvent.cs
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Linq.Expressions;
-using System.Net;
-
-namespace Org.Apache.REEF.Wake.Remote
-{
-    /// <summary>
-    /// Interface for remote event
-    /// </summary>
-    /// <typeparam name="T">Type of remote event message. It is assumed that T 
implements IWritable</typeparam>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see 
Jira REEF-295 ", false)]
-    internal interface IWritableRemoteEvent<T> : IWritable where T : IWritable
-    {
-        /// <summary>
-        /// Local Endpoint
-        /// </summary>
-        IPEndPoint LocalEndPoint { get; set; }
-
-        /// <summary>
-        /// Remote Endpoint
-        /// </summary>
-        IPEndPoint RemoteEndPoint { get; set; }
-
-        T Value { get; }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs
index b39b20f..bfed7f9 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+using System;
 using System.Net;
 using Org.Apache.REEF.Tang.Annotations;
 
@@ -50,12 +51,15 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
 
         public IPEndPoint RemoteEndPoint { get; set; }
 
+        [Obsolete("This field is never used and will be removed as part of 
0.13. See [REEF-445]", false)]
         public string Source { get; set; }
 
+        [Obsolete("This field is never used and will be removed as part of 
0.13. See [REEF-445]", false)]
         public string Sink { get; set; }
 
         public T Value { get; set; }
 
+        [Obsolete("This field is never used and will be removed as part of 
0.13. See [REEF-445]", false)]
         public long Sequence { get; set; }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventStreamingCodec.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventStreamingCodec.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventStreamingCodec.cs
new file mode 100644
index 0000000..01acd73
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventStreamingCodec.cs
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Writable remote event class
+    /// </summary>
+    /// <typeparam name="T">Type of remote event message. It is assumed that T 
implements IWritable</typeparam>
+    internal sealed class RemoteEventStreamingCodec<T> : 
IStreamingCodec<IRemoteEvent<T>>
+    {
+        private readonly IStreamingCodec<T> _codec;
+
+        internal RemoteEventStreamingCodec(IStreamingCodec<T> codec)
+        {
+            _codec = codec;
+        } 
+        
+        /// <summary>
+        /// Read the class fields.
+        /// </summary>
+        /// <param name="reader">The reader from which to read </param>
+        /// <returns>The remote event</returns>
+        public IRemoteEvent<T> Read(IDataReader reader)
+        {
+            return new RemoteEvent<T>(null, null, _codec.Read(reader));
+        }
+
+        /// <summary>
+        /// Writes the class fields.
+        /// </summary>
+        /// <param name="value">The remote event</param>
+        /// <param name="writer">The writer to which to write</param>
+        public void Write(IRemoteEvent<T> value, IDataWriter writer)
+        {
+            _codec.Write(value.Value, writer);
+        }
+
+        /// <summary>
+        /// Read the class fields.
+        /// </summary>
+        /// <param name="reader">The reader from which to read </param>
+        /// <param name="token">The cancellation token</param>
+        /// <returns>The remote event</returns>
+        public async Task<IRemoteEvent<T>>  ReadAsync(IDataReader reader, 
CancellationToken token)
+        {
+            T message =  await _codec.ReadAsync(reader, token);
+            return new RemoteEvent<T>(null, null, message);     
+        }
+
+        /// <summary>
+        /// Writes the class fields.
+        /// </summary>
+        /// <param name="value">The remote event</param>
+        /// <param name="writer">The writer to which to write</param>
+        /// <param name="token">The cancellation token</param>
+        public async Task WriteAsync(IRemoteEvent<T> value, IDataWriter 
writer, CancellationToken token)
+        {
+            await _codec.WriteAsync(value.Value, writer, token);        
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs
new file mode 100644
index 0000000..4396b56
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Net;
+using System.Net.Sockets;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.StreamingCodec;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Represents an open connection between remote hosts. This class is not 
thread safe
+    /// </summary>
+    /// <typeparam name="T">Generic Type of message.</typeparam>
+    internal sealed class StreamingLink<T> : ILink<T>
+    {
+        private static readonly Logger Logger = Logger.GetLogger(typeof 
(StreamingLink<T>));
+
+        private readonly IPEndPoint _localEndpoint;
+        private bool _disposed;
+        private readonly IStreamingCodec<T> _streamingCodec;
+        private readonly TcpClient _client;
+
+        /// <summary>
+        /// Stream reader to be passed to codec
+        /// </summary>
+        private readonly StreamDataReader _reader;
+
+        /// <summary>
+        /// Stream writer from which to read from in codec
+        /// </summary>
+        private readonly StreamDataWriter _writer;
+
+        /// <summary>
+        /// Constructs a Link object.
+        /// Connects to the specified remote endpoint.
+        /// </summary>
+        /// <param name="remoteEndpoint">The remote endpoint to connect 
to</param>
+        /// <param name="streamingCodec">Streaming codec</param>
+        internal StreamingLink(IPEndPoint remoteEndpoint, IStreamingCodec<T> 
streamingCodec)
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+
+            _client = new TcpClient();
+            _client.Connect(remoteEndpoint);
+
+            var stream = _client.GetStream();
+            _localEndpoint = GetLocalEndpoint();
+            _disposed = false;
+            _reader = new StreamDataReader(stream);
+            _writer = new StreamDataWriter(stream);
+            _streamingCodec = streamingCodec;
+        }
+
+        /// <summary>
+        /// Constructs a Link object.
+        /// Uses the already connected TcpClient.
+        /// </summary>
+        /// <param name="client">The already connected client</param>
+        /// <param name="streamingCodec">Streaming codec</param>
+        internal StreamingLink(TcpClient client, IStreamingCodec<T> 
streamingCodec)
+        {
+            if (client == null)
+            {
+                throw new ArgumentNullException("client");
+            }
+
+            _client = client;
+            var stream = _client.GetStream();
+            _localEndpoint = GetLocalEndpoint();
+            _disposed = false;
+            _reader = new StreamDataReader(stream);
+            _writer = new StreamDataWriter(stream);
+            _streamingCodec = streamingCodec;
+        }
+
+        /// <summary>
+        /// Returns the local socket address
+        /// </summary>
+        public IPEndPoint LocalEndpoint
+        {
+            get { return _localEndpoint; }
+        }
+
+        /// <summary>
+        /// Returns the remote socket address
+        /// </summary>
+        public IPEndPoint RemoteEndpoint
+        {
+            get { return (IPEndPoint) _client.Client.RemoteEndPoint; }
+        }
+
+        /// <summary>
+        /// Writes the message to the remote host
+        /// </summary>
+        /// <param name="value">The data to write</param>
+        public void Write(T value)
+        {
+            if (value == null)
+            {
+                throw new ArgumentNullException("value");
+            }
+            if (_disposed)
+            {
+                Exceptions.Throw(new IllegalStateException("Link has been 
closed."), Logger);
+            }
+
+            _streamingCodec.Write(value, _writer);
+        }
+
+        /// <summary>
+        /// Writes the value to this link asynchronously
+        /// </summary>
+        /// <param name="value">The data to write</param>
+        /// <param name="token">The cancellation token</param>
+        public async Task WriteAsync(T value, CancellationToken token)
+        {
+            if (_disposed)
+            {
+                Exceptions.Throw(new IllegalStateException("Link has been 
closed."), Logger);
+            }
+
+            await _streamingCodec.WriteAsync(value, _writer, token);
+        }
+
+        /// <summary>
+        /// Reads the value from the link synchronously
+        /// </summary>
+        public T Read()
+        {
+            if (_disposed)
+            {
+                Exceptions.Throw(new IllegalStateException("Link has been 
disposed."), Logger);
+            }
+
+            try
+            {
+                T value = _streamingCodec.Read(_reader);
+                return value;
+            }
+            catch (Exception e)
+            {
+                Logger.Log(Level.Warning, "In Read function unable to read the 
message.");
+                Exceptions.CaughtAndThrow(e, Level.Error, Logger);
+                throw;
+            }
+        }
+
+        /// <summary>
+        /// Reads the value from the link asynchronously
+        /// </summary>
+        /// <param name="token">The cancellation token</param>
+        public async Task<T> ReadAsync(CancellationToken token)
+        {
+            if (_disposed)
+            {
+                Exceptions.Throw(new IllegalStateException("Link has been 
disposed."), Logger);
+            }
+
+            try
+            {
+                T value = await _streamingCodec.ReadAsync(_reader, token);
+                return value;
+            }
+            catch (Exception e)
+            {
+                Logger.Log(Level.Warning, "In ReadAsync function unable to 
read the message.");
+                Exceptions.CaughtAndThrow(e, Level.Error, Logger);
+                throw;
+            }
+        }
+
+        /// <summary>
+        /// Close the client connection
+        /// </summary>
+        public void Dispose()
+        {
+            if (_disposed)
+            {
+                return;
+            }
+
+            try
+            {
+                _client.GetStream().Close();
+            }
+            catch (InvalidOperationException)
+            {
+                Logger.Log(Level.Warning, "failed to close stream on a 
non-connected socket.");
+            }
+
+            _client.Close();
+            _disposed = true;
+        }
+
+        /// <summary>
+        /// Overrides Equals. Two Link objects are equal if they are connected
+        /// to the same remote endpoint.
+        /// </summary>
+        /// <param name="obj">The object to compare</param>
+        /// <returns>True if the object is equal to this Link, otherwise 
false</returns>
+        public override bool Equals(object obj)
+        {
+            Link<T> other = obj as Link<T>;
+            if (other == null)
+            {
+                return false;
+            }
+
+            return other.RemoteEndpoint.Equals(RemoteEndpoint);
+        }
+
+        /// <summary>
+        /// Gets the hash code for the Link object.
+        /// </summary>
+        /// <returns>The object's hash code</returns>
+        public override int GetHashCode()
+        {
+            return RemoteEndpoint.GetHashCode();
+        }
+
+        /// <summary>
+        /// Discovers the IPEndpoint for the current machine.
+        /// </summary>
+        /// <returns>The local IPEndpoint</returns>
+        private IPEndPoint GetLocalEndpoint()
+        {
+            IPAddress address = NetworkUtils.LocalIPAddress;
+            int port = ((IPEndPoint) _client.Client.LocalEndPoint).Port;
+            return new IPEndPoint(address, port);
+        }
+    }
+}
\ No newline at end of file

Reply via email to