Repository: incubator-reef
Updated Branches:
  refs/heads/master c1b7b0528 -> af4a19807


[REEF-320] Change reflection to tang instantiation in Wake Layer for Writables

This addressed the issue by
  * Taking Iinjector as argument in RemoteManager and Transport Layer
  * Instantiating messages by tang injection than by reflection

JIRA:
  [REEF-320](https://issues.apache.org/jira/browse/REEF-320)

Pull Request:
  This closes #181


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/af4a1980
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/af4a1980
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/af4a1980

Branch: refs/heads/master
Commit: af4a19807f7b15368718c5b449f3dcfebf890207
Parents: c1b7b05
Author: dkm2110 <[email protected]>
Authored: Tue May 12 14:21:17 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Wed May 13 16:53:24 2015 -0700

----------------------------------------------------------------------
 .../Org.Apache.REEF.Wake.Tests.csproj           |   1 +
 .../PrefixedStringWritable.cs                   | 108 +++++++++++++++++++
 .../WritableRemoteManagerTest.cs                | 100 +++++++++++++----
 .../WritableString.cs                           |   2 +
 .../WritableTransportTest.cs                    |  60 +++++++++--
 .../Org.Apache.REEF.Wake.csproj                 |   1 -
 .../Remote/Impl/RemoteEvent.cs                  |   2 +
 .../Remote/Impl/WritableLink.cs                 |  44 ++++----
 .../Remote/Impl/WritableRemoteEvent.cs          |  12 ++-
 .../Remote/Impl/WritableRemoteManager.cs        |  17 ++-
 .../Remote/Impl/WritableRemoteManagerFactory.cs |  16 +--
 .../Remote/Impl/WritableTransportClient.cs      |  12 ++-
 .../Remote/Impl/WritableTransportServer.cs      |  23 ++--
 .../cs/Org.Apache.REEF.Wake/Remote/TypeCache.cs | 104 ------------------
 14 files changed, 321 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/af4a1980/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj 
b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
index f947422..581508b 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
@@ -46,6 +46,7 @@ under the License.
   <ItemGroup>
     <Compile Include="ClockTest.cs" />
     <Compile Include="MultiCodecTest.cs" />
+    <Compile Include="PrefixedStringWritable.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
     <Compile Include="PubSubSubjectTest.cs" />
     <Compile Include="RemoteManagerTest.cs" />

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/af4a1980/lang/cs/Org.Apache.REEF.Wake.Tests/PrefixedStringWritable.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/PrefixedStringWritable.cs 
b/lang/cs/Org.Apache.REEF.Wake.Tests/PrefixedStringWritable.cs
new file mode 100644
index 0000000..dbb8af3
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/PrefixedStringWritable.cs
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Wake.Tests
+{
+    [NamedParameter("identifier in PrefixedWritable")]
+    public class StringId : Name<int>
+    {
+    }
+       
+    /// <summary>
+    /// Writable wrapper around the string class which takes integer prefix
+    /// This class is used to test non empty injector in TransportServer and 
Client
+    /// </summary>
+    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see 
Jira REEF-295 ", false)]
+    public class PrefixedStringWritable : IWritable
+    {
+        private readonly int _id;
+        private string _data;
+
+        /// <summary>
+        /// Returns the actual string data
+        /// </summary>
+        public string Data
+        {
+            get { return _data + "_" + _id; }
+            set { _data = value; }
+        }
+
+        /// <summary>
+        /// Empty constructor for instantiation with reflection
+        /// </summary>
+        [Inject]
+        public PrefixedStringWritable([Parameter(typeof(StringId))] int id)
+        {
+            _id = id;
+        }
+
+        /// <summary>
+        /// Constructor
+        /// </summary>
+        /// <param name="data">The string data</param>
+        public PrefixedStringWritable(string data)
+        {
+            _data = data;
+        }
+
+        /// <summary>
+        /// Reads the string
+        /// </summary>
+        /// <param name="reader">reader to read from</param>
+        public void Read(IDataReader reader)
+        {
+            _data = reader.ReadString();
+        }
+
+        /// <summary>
+        /// Writes the string
+        /// </summary>
+        /// <param name="writer">Writer to write</param>
+        public void Write(IDataWriter writer)
+        {
+            writer.WriteString(_data);
+        }
+
+        /// <summary>
+        /// Reads the string
+        /// </summary>
+        /// <param name="reader">reader to read from</param>
+        /// <param name="token">the cancellation token</param>
+        public async Task ReadAsync(IDataReader reader, CancellationToken 
token)
+        {
+            _data = await reader.ReadStringAsync(token);
+        }
+
+        /// <summary>
+        /// Writes the string
+        /// </summary>
+        /// <param name="writer">Writer to write</param>
+        /// <param name="token">the cancellation token</param>
+        public async Task WriteAsync(IDataWriter writer, CancellationToken 
token)
+        {
+            await writer.WriteStringAsync(_data, token);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/af4a1980/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs 
b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs
index 80bb78b..49c0f5b 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs
@@ -20,12 +20,15 @@
 using System;
 using System.Collections.Concurrent;
 using System.Collections.Generic;
+using System.Globalization;
 using System.IO;
 using System.Net;
 using System.Reactive;
 using System.Threading.Tasks;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Wake.Impl;
 using Org.Apache.REEF.Wake.Remote;
 using Org.Apache.REEF.Wake.Remote.Impl;
@@ -37,8 +40,16 @@ namespace Org.Apache.REEF.Wake.Tests
     [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see 
Jira REEF-295 ", false)]
     public class WritableRemoteManagerTest
     {
-        private readonly WritableRemoteManagerFactory _remoteManagerFactory =
+        private const int Id = 5;
+
+        private static IConfiguration _config = 
TangFactory.GetTang().NewConfigurationBuilder().BindNamedParameter<StringId, 
int>(
+               GenericType<StringId>.Class, 
Id.ToString(CultureInfo.InvariantCulture)).Build();
+
+        private readonly WritableRemoteManagerFactory _remoteManagerFactory1 =
             
TangFactory.GetTang().NewInjector().GetInstance<WritableRemoteManagerFactory>();
+
+        private readonly WritableRemoteManagerFactory _remoteManagerFactory2 =
+        
TangFactory.GetTang().NewInjector(_config).GetInstance<WritableRemoteManagerFactory>();
         
         /// <summary>
         /// Tests one way communication between Remote Managers 
@@ -52,8 +63,8 @@ namespace Org.Apache.REEF.Wake.Tests
             BlockingCollection<WritableString> queue = new 
BlockingCollection<WritableString>();
             List<string> events = new List<string>();
 
-            using (var remoteManager1 = 
_remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = 
_remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager1 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager2 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
             {
                 var observer = Observer.Create<WritableString>(queue.Add);
                 IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
@@ -85,8 +96,8 @@ namespace Org.Apache.REEF.Wake.Tests
             BlockingCollection<WritableString> queue = new 
BlockingCollection<WritableString>();
             List<string> events = new List<string>();
 
-            using (var remoteManager1 = 
_remoteManagerFactory.GetInstance<WritableString>())
-            using (var remoteManager2 = 
_remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 
listeningPort))
+            using (var remoteManager1 = 
_remoteManagerFactory1.GetInstance<WritableString>())
+            using (var remoteManager2 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 
listeningPort))
             {
                 IPEndPoint remoteEndpoint = new IPEndPoint(listeningAddress, 
0);
                 var observer = Observer.Create<WritableString>(queue.Add);
@@ -118,8 +129,8 @@ namespace Org.Apache.REEF.Wake.Tests
             List<string> events1 = new List<string>();
             List<string> events2 = new List<string>();
 
-            using (var remoteManager1 = 
_remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = 
_remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager1 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager2 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
             {
                 // Register observers for remote manager 1 and remote manager 2
                 var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
@@ -156,6 +167,57 @@ namespace Org.Apache.REEF.Wake.Tests
         }
 
         /// <summary>
+        /// Tests two way communications where message needs an injectable 
argument 
+        /// to be passed. Checks whether both sides are able to receive 
messages
+        /// </summary>
+        [TestMethod]
+        public void TestNonEmptyArgumentInjectionWritableTwoWayCommunication()
+        {
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");         
   
+
+            BlockingCollection<PrefixedStringWritable> queue1 = new 
BlockingCollection<PrefixedStringWritable>();
+            BlockingCollection<PrefixedStringWritable> queue2 = new 
BlockingCollection<PrefixedStringWritable>();
+            List<string> events1 = new List<string>();
+            List<string> events2 = new List<string>();
+
+            using (var remoteManager1 = 
_remoteManagerFactory2.GetInstance<PrefixedStringWritable>(listeningAddress, 0))
+            using (var remoteManager2 = 
_remoteManagerFactory2.GetInstance<PrefixedStringWritable>(listeningAddress, 0))
+            {
+                // Register observers for remote manager 1 and remote manager 2
+                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+                var observer1 = 
Observer.Create<PrefixedStringWritable>(queue1.Add);
+                var observer2 = 
Observer.Create<PrefixedStringWritable>(queue2.Add);
+                remoteManager1.RegisterObserver(remoteEndpoint, observer1);
+                remoteManager2.RegisterObserver(remoteEndpoint, observer2);
+
+                // Remote manager 1 sends 3 events to remote manager 2
+                var remoteObserver1 = 
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+                remoteObserver1.OnNext(new PrefixedStringWritable("abc"));
+                remoteObserver1.OnNext(new PrefixedStringWritable("def"));
+                remoteObserver1.OnNext(new PrefixedStringWritable("ghi"));
+
+                // Remote manager 2 sends 4 events to remote manager 1
+                var remoteObserver2 = 
remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
+                remoteObserver2.OnNext(new PrefixedStringWritable("jkl"));
+                remoteObserver2.OnNext(new PrefixedStringWritable("mno"));
+                remoteObserver2.OnNext(new PrefixedStringWritable("pqr"));
+                remoteObserver2.OnNext(new PrefixedStringWritable("stu"));
+
+                events1.Add(queue1.Take().Data);
+                events1.Add(queue1.Take().Data);
+                events1.Add(queue1.Take().Data);
+                events1.Add(queue1.Take().Data);
+
+                events2.Add(queue2.Take().Data);
+                events2.Add(queue2.Take().Data);
+                events2.Add(queue2.Take().Data);
+            }
+
+            Assert.AreEqual(4, events1.Count);
+            Assert.AreEqual(3, events2.Count);
+        }
+
+        /// <summary>
         /// Tests one way communication between 3 nodes.
         /// nodes 1 and 2 send messages to node 3
         /// </summary>
@@ -167,9 +229,9 @@ namespace Org.Apache.REEF.Wake.Tests
             BlockingCollection<WritableString> queue = new 
BlockingCollection<WritableString>();
             List<string> events = new List<string>();
 
-            using (var remoteManager1 = 
_remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = 
_remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager3 = 
_remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager1 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager2 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager3 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
             {
                 var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
                 var observer = Observer.Create<WritableString>(queue.Add);
@@ -209,9 +271,9 @@ namespace Org.Apache.REEF.Wake.Tests
             List<string> events2 = new List<string>();
             List<string> events3 = new List<string>();
 
-            using (var remoteManager1 = 
_remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = 
_remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager3 = 
_remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager1 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager2 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager3 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
             {
                 var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
 
@@ -272,8 +334,8 @@ namespace Org.Apache.REEF.Wake.Tests
             BlockingCollection<WritableString> queue = new 
BlockingCollection<WritableString>();
             List<string> events = new List<string>();
 
-            using (var remoteManager1 = 
_remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = 
_remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager1 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager2 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
             {
                 // Register handler for when remote manager 2 receives events; 
respond
                 // with an ack
@@ -316,8 +378,8 @@ namespace Org.Apache.REEF.Wake.Tests
             BlockingCollection<WritableString> queue = new 
BlockingCollection<WritableString>();
             List<string> events = new List<string>();
 
-            using (var remoteManager1 = 
_remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = 
_remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager1 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager2 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
             {
                 // RemoteManager2 listens and records events of type 
IRemoteEvent<WritableString>
                 var observer = 
Observer.Create<IRemoteMessage<WritableString>>(message => 
queue.Add(message.Message));
@@ -348,8 +410,8 @@ namespace Org.Apache.REEF.Wake.Tests
             BlockingCollection<WritableString> queue = new 
BlockingCollection<WritableString>();
             List<string> events = new List<string>();
 
-            using (var remoteManager1 = 
_remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = 
_remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager1 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager2 = 
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
             {
                 var observer = Observer.Create<WritableString>(queue.Add);
                 IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/af4a1980/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs 
b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs
index 8a6d041..30ff487 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs
@@ -21,6 +21,7 @@ using System;
 using System.IO;
 using System.Threading;
 using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Wake.Remote;
 
 namespace Org.Apache.REEF.Wake.Tests
@@ -39,6 +40,7 @@ namespace Org.Apache.REEF.Wake.Tests
         /// <summary>
         /// Empty constructor for instantiation with reflection
         /// </summary>
+        [Inject]
         public WritableString()
         {
         }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/af4a1980/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs 
b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs
index 914a2aa..2255cfa 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs
@@ -20,6 +20,7 @@
 using System;
 using System.Collections.Concurrent;
 using System.Collections.Generic;
+using System.Globalization;
 using System.IO;
 using System.Net;
 using System.Reactive;
@@ -28,6 +29,7 @@ using System.Threading.Tasks;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Wake.Remote;
 using Org.Apache.REEF.Wake.Remote.Impl;
 using Org.Apache.REEF.Wake.Remote.Parameters;
@@ -44,6 +46,7 @@ namespace Org.Apache.REEF.Wake.Tests
     public class WritableTransportTest
     {
         private readonly ITcpPortProvider _tcpPortProvider = 
GetTcpProvider(8900, 8940);
+        private readonly IInjector _injector = 
TangFactory.GetTang().NewInjector();
 
         /// <summary>
         /// Tests whether WritableTransportServer receives 
@@ -58,12 +61,12 @@ namespace Org.Apache.REEF.Wake.Tests
             IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
             var remoteHandler = 
Observer.Create<TransportEvent<WritableString>>(tEvent => 
queue.Add(tEvent.Data));
 
-            using (var server = new 
WritableTransportServer<WritableString>(endpoint, remoteHandler, 
_tcpPortProvider))
+            using (var server = new 
WritableTransportServer<WritableString>(endpoint, remoteHandler, 
_tcpPortProvider, _injector))
             {
                 server.Run();
 
                 IPEndPoint remoteEndpoint = new 
IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
-                using (var client = new 
WritableTransportClient<WritableString>(remoteEndpoint))
+                using (var client = new 
WritableTransportClient<WritableString>(remoteEndpoint, _injector))
                 {
                     client.Send(new WritableString("Hello"));
                     client.Send(new WritableString(", "));
@@ -81,7 +84,50 @@ namespace Org.Apache.REEF.Wake.Tests
             Assert.AreEqual(events[2], "World!");
         }
 
-       
+
+        /// <summary>
+        /// Tests whether WritableTransportServer receives 
+        /// string messages from WritableTransportClient with non empty 
injector
+        /// </summary>
+        [TestMethod]
+        public void TestNonEmptyInjectionTransportServer()
+        {
+            int id = 5;
+            IConfiguration config = 
TangFactory.GetTang().NewConfigurationBuilder().BindNamedParameter<StringId, 
int>(
+                GenericType<StringId>.Class, 
id.ToString(CultureInfo.InvariantCulture)).Build();
+
+            IInjector injector = TangFactory.GetTang().NewInjector(config);
+
+            BlockingCollection<PrefixedStringWritable> queue = new 
BlockingCollection<PrefixedStringWritable>();
+            List<string> events = new List<string>();
+
+            IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
+            var remoteHandler = 
Observer.Create<TransportEvent<PrefixedStringWritable>>(tEvent => 
queue.Add(tEvent.Data));
+
+            using (var server = new 
WritableTransportServer<PrefixedStringWritable>(endpoint, remoteHandler, 
_tcpPortProvider, injector.ForkInjector()))
+            {
+                server.Run();
+
+                IPEndPoint remoteEndpoint = new 
IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
+                using (var client = new 
WritableTransportClient<PrefixedStringWritable>(remoteEndpoint, 
injector.ForkInjector()))
+                {
+                    client.Send(new PrefixedStringWritable("Hello"));
+                    client.Send(new PrefixedStringWritable(", "));
+                    client.Send(new PrefixedStringWritable("World!"));
+
+                    events.Add(queue.Take().Data);
+                    events.Add(queue.Take().Data);
+                    events.Add(queue.Take().Data);
+                }
+            }
+
+            Assert.AreEqual(3, events.Count);
+            Assert.AreEqual(events[0], "Hello_" + id);
+            Assert.AreEqual(events[1], ", _" + id);
+            Assert.AreEqual(events[2], "World!_" + id);
+        }
+
+
         /// <summary>
         /// Checks whether WritableTransportClient is able to receive messages 
from remote host
         /// </summary>
@@ -97,13 +143,13 @@ namespace Org.Apache.REEF.Wake.Tests
             // Server echoes the message back to the client
             var remoteHandler = 
Observer.Create<TransportEvent<WritableString>>(tEvent => 
tEvent.Link.Write(tEvent.Data));
 
-            using (var server = new 
WritableTransportServer<WritableString>(endpoint, remoteHandler, 
_tcpPortProvider))
+            using (var server = new 
WritableTransportServer<WritableString>(endpoint, remoteHandler, 
_tcpPortProvider, _injector))
             {
                 server.Run();
 
                 var clientHandler = 
Observer.Create<TransportEvent<WritableString>>(tEvent => 
queue.Add(tEvent.Data));
                 IPEndPoint remoteEndpoint = new 
IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
-                using (var client = new 
WritableTransportClient<WritableString>(remoteEndpoint, clientHandler))
+                using (var client = new 
WritableTransportClient<WritableString>(remoteEndpoint, clientHandler, 
_injector))
                 {
                     client.Send(new WritableString("Hello"));
                     client.Send(new WritableString(", "));
@@ -136,7 +182,7 @@ namespace Org.Apache.REEF.Wake.Tests
             IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
             var remoteHandler = 
Observer.Create<TransportEvent<WritableString>>(tEvent => 
queue.Add(tEvent.Data));
 
-            using (var server = new 
WritableTransportServer<WritableString>(endpoint, remoteHandler, 
_tcpPortProvider))
+            using (var server = new 
WritableTransportServer<WritableString>(endpoint, remoteHandler, 
_tcpPortProvider, _injector))
             {
                 server.Run();
 
@@ -145,7 +191,7 @@ namespace Org.Apache.REEF.Wake.Tests
                     Task.Run(() =>
                     {
                         IPEndPoint remoteEndpoint = new 
IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
-                        using (var client = new 
WritableTransportClient<WritableString>(remoteEndpoint))
+                        using (var client = new 
WritableTransportClient<WritableString>(remoteEndpoint, _injector))
                         {
                             client.Send(new WritableString("Hello"));
                             client.Send(new WritableString(", "));

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/af4a1980/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj 
b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
index 53ffd65..170c967 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
@@ -120,7 +120,6 @@ under the License.
     <Compile Include="Remote\RemoteConfiguration.cs" />
     <Compile Include="Remote\RemoteRuntimeException.cs" />
     <Compile Include="Remote\TcpPortProvider.cs" />
-    <Compile Include="Remote\TypeCache.cs" />
     <Compile Include="RX\AbstractObserver.cs" />
     <Compile Include="RX\AbstractRxStage.cs" />
     <Compile Include="RX\Impl\PubSubSubject.cs" />

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/af4a1980/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs
index bf50325..b39b20f 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs
@@ -18,6 +18,7 @@
  */
 
 using System.Net;
+using Org.Apache.REEF.Tang.Annotations;
 
 namespace Org.Apache.REEF.Wake.Remote.Impl
 {
@@ -40,6 +41,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
             Value = value;
         }
 
+        [Inject]
         public RemoteEvent()
         {
         }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/af4a1980/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs
index f867240..9859338 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs
@@ -23,6 +23,7 @@ using System.Net.Sockets;
 using System.Threading;
 using System.Threading.Tasks;
 using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Utilities.Diagnostics;
 using Org.Apache.REEF.Utilities.Logging;
 using Org.Apache.REEF.Wake.Util;
@@ -41,11 +42,8 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         private readonly IPEndPoint _localEndpoint;
         private bool _disposed;
         private readonly NetworkStream _stream;
-        
-        /// <summary>
-        /// Cache structure to store the constructor functions for various 
types.
-        /// </summary>
-        private readonly TypeCache<T> _cache;
+        private readonly IInjector _injector;
+       
 
         /// <summary>
         /// Stream reader to be passed to IWritable
@@ -62,7 +60,8 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// Connects to the specified remote endpoint.
         /// </summary>
         /// <param name="remoteEndpoint">The remote endpoint to connect 
to</param>
-        public WritableLink(IPEndPoint remoteEndpoint)
+        /// <param name="injector">The injector to pass arguments to incoming 
messages</param>
+        public WritableLink(IPEndPoint remoteEndpoint, IInjector injector)
         {
             if (remoteEndpoint == null)
             {
@@ -75,9 +74,9 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
             _stream = Client.GetStream();
             _localEndpoint = GetLocalEndpoint();
             _disposed = false;
-            _cache = new TypeCache<T>();
             _reader = new StreamDataReader(_stream);
             _writer = new StreamDataWriter(_stream);
+            _injector = injector;
         }
 
         /// <summary>
@@ -85,7 +84,8 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// Uses the already connected TcpClient.
         /// </summary>
         /// <param name="client">The already connected client</param>
-        public WritableLink(TcpClient client)
+        /// <param name="injector">The injector to pass arguments to incoming 
messages</param>
+        public WritableLink(TcpClient client, IInjector injector)
         {
             if (client == null)
             {
@@ -96,9 +96,9 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
             _stream = Client.GetStream();
             _localEndpoint = GetLocalEndpoint();
             _disposed = false;
-            _cache = new TypeCache<T>();
             _reader = new StreamDataReader(_stream);
             _writer = new StreamDataWriter(_stream);
+            _injector = injector;
         }
 
         /// <summary>
@@ -174,15 +174,16 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
                 return default(T);
             }
 
-            T value = _cache.GetInstance(dataType);
-
-            if (value == null)
+            try
+            {
+                T value = (T) _injector.ForkInjector().GetInstance(dataType);
+                value.Read(_reader);
+                return value;
+            }
+            catch (InjectionException)
             {
                 return default(T);
             }
-            
-            value.Read(_reader);
-            return value;
         }
 
         /// <summary>
@@ -205,15 +206,16 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
                 return default(T);
             }
 
-            T value = _cache.GetInstance(dataType);
-
-            if(value==null)
+            try
+            {
+                T value = (T) _injector.ForkInjector().GetInstance(dataType);
+                await value.ReadAsync(_reader, token);
+                return value;
+            }
+            catch (InjectionException)
             {
                 return default(T);
             }
-
-            await value.ReadAsync(_reader, token);
-            return value;
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/af4a1980/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs
index b9702d6..b3664d0 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs
@@ -21,6 +21,8 @@ using System;
 using System.Net;
 using System.Threading;
 using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Interface;
 
 namespace Org.Apache.REEF.Wake.Remote.Impl
 {
@@ -31,6 +33,8 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
     [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see 
Jira REEF-295 ", false)]
     internal sealed class WritableRemoteEvent<T> : IWritableRemoteEvent<T> 
where T : IWritable
     {
+        private readonly IInjector _injector;
+
         /// <summary>
         /// Creates the Remote Event
         /// </summary>
@@ -47,8 +51,10 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// <summary>
         /// Creates empty Remote Event
         /// </summary>
-        public WritableRemoteEvent()
+        [Inject]
+        public WritableRemoteEvent(IInjector injector)
         {
+            _injector = injector;
         }
 
         /// <summary>
@@ -72,7 +78,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// <param name="reader">The reader from which to read </param>
         public void Read(IDataReader reader)
         {
-            Value = Activator.CreateInstance<T>();
+            Value = (T)_injector.ForkInjector().GetInstance(typeof(T));
             Value.Read(reader);         
         }
 
@@ -92,7 +98,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// <param name="token">The cancellation token</param>
         public async Task ReadAsync(IDataReader reader, CancellationToken 
token)
         {
-            Value = Activator.CreateInstance<T>();
+            Value = (T)_injector.ForkInjector().GetInstance(typeof(T));
             await Value.ReadAsync(reader, token);      
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/af4a1980/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs
index 0a9ead3..73d8bb6 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs
@@ -20,6 +20,7 @@
 using System;
 using System.Collections.Generic;
 using System.Net;
+using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Utilities.Logging;
 using Org.Apache.REEF.Wake.Util;
 
@@ -37,6 +38,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         private readonly WritableObserverContainer<T> _observerContainer;
         private readonly WritableTransportServer<IWritableRemoteEvent<T>> 
_server;
         private readonly Dictionary<IPEndPoint, ProxyObserver> _cachedClients;
+        private readonly IInjector _injector;
 
         /// <summary>
         /// Constructs a DefaultRemoteManager listening on the specified 
address and
@@ -44,8 +46,10 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// </summary>
         /// <param name="localAddress">The address to listen on</param>
         /// <param name="port">The port to listen on</param>
+        /// <param name="tcpPortProvider">Tcp port provider</param>
+        /// <param name="injector">The injector to pass arguments to incoming 
messages</param>
         [Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)]
-        public WritableRemoteManager(IPAddress localAddress, int port, 
ITcpPortProvider tcpPortProvider)
+        public WritableRemoteManager(IPAddress localAddress, int port, 
ITcpPortProvider tcpPortProvider, IInjector injector)
         {
             if (localAddress == null)
             {
@@ -58,11 +62,12 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
 
             _observerContainer = new WritableObserverContainer<T>();
             _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
+            _injector = injector;
 
             IPEndPoint localEndpoint = new IPEndPoint(localAddress, port);
 
             // Begin to listen for incoming messages
-            _server = new 
WritableTransportServer<IWritableRemoteEvent<T>>(localEndpoint, 
_observerContainer, tcpPortProvider);
+            _server = new 
WritableTransportServer<IWritableRemoteEvent<T>>(localEndpoint, 
_observerContainer, tcpPortProvider, injector);
             _server.Run();
 
             LocalEndpoint = _server.LocalEndpoint;  
@@ -72,13 +77,15 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// <summary>
         /// Constructs a DefaultRemoteManager. Does not listen for incoming 
messages.
         /// </summary>
+        /// <param name="injector">The injector to pass arguments to incoming 
messages</param>
         [Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)]
-        public WritableRemoteManager()
+        public WritableRemoteManager(IInjector injector)
         {
             using 
(LOGGER.LogFunction("WritableRemoteManager::WritableRemoteManager"))
             {
                 _observerContainer = new WritableObserverContainer<T>();
                 _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
+                _injector = injector;
 
                 LocalEndpoint = new IPEndPoint(NetworkUtils.LocalIPAddress, 0);
                 Identifier = new SocketRemoteIdentifier(LocalEndpoint);
@@ -134,7 +141,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
             if (!_cachedClients.TryGetValue(remoteEndpoint, out 
remoteObserver))
             {
                 WritableTransportClient<IWritableRemoteEvent<T>> client =
-                    new 
WritableTransportClient<IWritableRemoteEvent<T>>(remoteEndpoint, 
_observerContainer);
+                    new 
WritableTransportClient<IWritableRemoteEvent<T>>(remoteEndpoint, 
_observerContainer, _injector);
 
                 remoteObserver = new ProxyObserver(client);
                 _cachedClients[remoteEndpoint] = remoteObserver;
@@ -210,7 +217,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
 
             return _observerContainer.RegisterObserver(observer);
         }
-
+        
         /// <summary>
         /// Release all resources for the DefaultRemoteManager.
         /// </summary>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/af4a1980/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs
index 4beb844..52fef8d 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs
@@ -20,10 +20,9 @@
 using System;
 using System.Net;
 using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Tang.Interface;
 
-namespace Org.Apache.REEF.Wake.Impl
+namespace Org.Apache.REEF.Wake.Remote.Impl
 {
     /// <summary>
     /// WritableRemoteManagerFactory for WritableRemoteManager.
@@ -32,17 +31,20 @@ namespace Org.Apache.REEF.Wake.Impl
     public sealed class WritableRemoteManagerFactory
     {
         private readonly ITcpPortProvider _tcpPortProvider;
-        [Inject]
-        private WritableRemoteManagerFactory(ITcpPortProvider tcpPortProvider)
+        private readonly IInjector _injector;
+
+        [Inject]  
+        private WritableRemoteManagerFactory(ITcpPortProvider tcpPortProvider, 
IInjector injector)
         {
             _tcpPortProvider = tcpPortProvider;
+            _injector = injector;
         }
 
         public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int 
port) where T : IWritable
         {
 #pragma warning disable 618
 // This is the one place allowed to call this constructor. Hence, disabling 
the warning is OK.
-            return new WritableRemoteManager<T>(localAddress, port, 
_tcpPortProvider);
+            return new WritableRemoteManager<T>(localAddress, port, 
_tcpPortProvider, _injector);
 #pragma warning disable 618
         }
 
@@ -50,7 +52,7 @@ namespace Org.Apache.REEF.Wake.Impl
         {
 #pragma warning disable 618
 // This is the one place allowed to call this constructor. Hence, disabling 
the warning is OK.
-            return new WritableRemoteManager<T>();
+            return new WritableRemoteManager<T>(_injector);
 #pragma warning disable 618
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/af4a1980/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs
index 5e42fb7..b245f0f 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs
@@ -21,6 +21,7 @@ using System;
 using System.Net;
 using System.Threading;
 using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Utilities.Diagnostics;
 using Org.Apache.REEF.Utilities.Logging;
 
@@ -44,11 +45,12 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// Used to send messages to the specified remote endpoint.
         /// </summary>
         /// <param name="remoteEndpoint">The endpoint of the remote server to 
connect to</param>
-        public WritableTransportClient(IPEndPoint remoteEndpoint)
+        /// <param name="injector">The injector to pass arguments to incoming 
messages</param>
+        public WritableTransportClient(IPEndPoint remoteEndpoint, IInjector 
injector)
         {
             Exceptions.ThrowIfArgumentNull(remoteEndpoint, "remoteEndpoint", 
Logger);
 
-            _link = new WritableLink<T>(remoteEndpoint);
+            _link = new WritableLink<T>(remoteEndpoint, injector);
             _cancellationSource = new CancellationTokenSource();
             _disposed = false;
         }
@@ -59,9 +61,11 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// </summary>
         /// <param name="remoteEndpoint">The endpoint of the remote server to 
connect to</param>
         /// <param name="observer">Callback used when receiving responses from 
remote host</param>
+        /// <param name="injector">The injector to pass arguments to incoming 
messages</param>
         public WritableTransportClient(IPEndPoint remoteEndpoint,
-            IObserver<TransportEvent<T>> observer)
-            : this(remoteEndpoint)
+            IObserver<TransportEvent<T>> observer,
+            IInjector injector)
+            : this(remoteEndpoint, injector)
         {
             _observer = observer;
             Task.Run(() => ResponseLoop());

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/af4a1980/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs
index 90cfdd7..6b5961f 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs
@@ -18,10 +18,13 @@
  */
 
 using System;
+using System.Collections.Generic;
+using System.IO;
 using System.Net;
 using System.Net.Sockets;
 using System.Threading;
 using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Utilities.Diagnostics;
 using Org.Apache.REEF.Utilities.Logging;
 using Org.Apache.REEF.Wake.Util;
@@ -41,9 +44,9 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         private readonly CancellationTokenSource _cancellationSource;
         private readonly IObserver<TransportEvent<T>> _remoteObserver;
         private readonly ITcpPortProvider _tcpPortProvider;
+        private readonly IInjector _injector;
         private bool _disposed;
         private Task _serverTask;
-
         /// <summary>
         /// Constructs a TransportServer to listen for remote events.  
         /// Listens on the specified remote endpoint.  When it recieves a 
remote
@@ -53,8 +56,9 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// <param name="remoteHandler">The handler to invoke when receiving 
incoming
         /// remote messages</param>
         /// <param name="tcpPortProvider">Find port numbers if listenport is 
0</param>
-        public WritableTransportServer(int port, IObserver<TransportEvent<T>> 
remoteHandler, ITcpPortProvider tcpPortProvider)
-            : this(new IPEndPoint(NetworkUtils.LocalIPAddress, port), 
remoteHandler, tcpPortProvider)
+        /// <param name="injector">The injector to pass arguments to incoming 
messages</param>
+        public WritableTransportServer(int port, IObserver<TransportEvent<T>> 
remoteHandler, ITcpPortProvider tcpPortProvider, IInjector injector)
+            : this(new IPEndPoint(NetworkUtils.LocalIPAddress, port), 
remoteHandler, tcpPortProvider, injector)
         {
         }
 
@@ -67,16 +71,19 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// <param name="remoteHandler">The handler to invoke when receiving 
incoming
         /// remote messages</param>
         /// <param name="tcpPortProvider">Find port numbers if listenport is 
0</param>
+        /// <param name="injector">The injector to pass arguments to incoming 
messages</param>
         public WritableTransportServer(
             IPEndPoint localEndpoint,
             IObserver<TransportEvent<T>> remoteHandler,
-            ITcpPortProvider tcpPortProvider)
+            ITcpPortProvider tcpPortProvider,
+            IInjector injector)
         {
             _listener = new TcpListener(localEndpoint.Address, 
localEndpoint.Port);
             _remoteObserver = remoteHandler;
             _tcpPortProvider = tcpPortProvider;
             _cancellationSource = new CancellationTokenSource();
             _cancellationSource.Token.ThrowIfCancellationRequested();
+            _injector = injector;
             _disposed = false;
         }
 
@@ -212,25 +219,21 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// <param name="client">The connected client</param>
         private async Task ProcessClient(TcpClient client)
         {
+            
             // Keep reading messages from client until they disconnect or 
timeout
             CancellationToken token = _cancellationSource.Token;
-            using (ILink<T> link = new WritableLink<T>(client))
+            using (ILink<T> link = new WritableLink<T>(client, _injector))
             {
                 while (!token.IsCancellationRequested)
                 {
-                    //T message = link.Read();
                     T message = await link.ReadAsync(token);
 
                     if (message == null)
                     {
-                        //LOGGER.Log(Level.Error,
-                   //         "ProcessClient, no message received, break." + 
link.RemoteEndpoint + " - " +
-                      //      link.LocalEndpoint);
                         break;
                     }
 
                     TransportEvent<T> transportEvent = new 
TransportEvent<T>(message, link);
-
                     _remoteObserver.OnNext(transportEvent);
                 }
                 LOGGER.Log(Level.Error,

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/af4a1980/lang/cs/Org.Apache.REEF.Wake/Remote/TypeCache.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/TypeCache.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/TypeCache.cs
deleted file mode 100644
index 9bbe549..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/TypeCache.cs
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Linq.Expressions;
-using System.Reflection;
-using System.Text;
-using System.Threading.Tasks;
-
-namespace Org.Apache.REEF.Wake.Remote
-{
-    /// <summary>
-    /// Cache used to store the constructor functions to instantiate various 
Types.
-    /// It is assumed that all types are inherited from the base type T
-    /// </summary>
-    /// <typeparam name="T"></typeparam>
-    public class TypeCache<T>
-    {
-        private const BindingFlags ConstructorFlags =
-            BindingFlags.Public | BindingFlags.NonPublic | 
BindingFlags.Instance;
-
-        /// <summary>
-        /// Cache that stores the constructors for already used types using 
the assmebly name
-        /// </summary>
-        private readonly Dictionary<string, Func<T>> _typeConstructorMapping = 
new Dictionary<string, Func<T>>();
-
-        public T GetInstance(string typeString)
-        {
-            if (!_typeConstructorMapping.ContainsKey(typeString))
-            {
-                var type = Type.GetType(typeString);
-
-                if (type != null)
-                {
-                    _typeConstructorMapping[typeString] = GetActivator(type);
-                }
-            }
-
-            return _typeConstructorMapping[typeString]();
-        }
-
-        /// <summary>
-        /// Returns the constructor for type T given actual type. Type can be
-        /// that of inherited class.
-        /// <param name="actualType">The actual type for which we want to 
create the constructor.</param>
-        /// <returns>The constructor function</returns>
-        /// </summary>
-        private Func<T> GetActivator(Type actualType)
-        {
-            ConstructorInfo constructor;
-            if (actualType.IsValueType)
-            {
-                // For struct types, there is an implicit default constructor.
-                constructor = null;
-            }
-            else if (!TryGetDefaultConstructor(actualType, out constructor))
-            {
-                throw new Exception("could not get default constructor");
-            }
-            NewExpression nex = constructor == null ? 
Expression.New(actualType) : Expression.New(constructor);
-            var body = Expression.Convert(nex, typeof (T));
-            Expression<Func<T>> lambda = Expression.Lambda<Func<T>>(body);
-
-            return lambda.Compile();
-        }
-
-        /// <summary>
-        /// Fills the constructor information and meta-data
-        /// </summary>
-        /// <param name="type">The type for which constructor needs to be 
created</param>
-        /// <param name="constructor">The information and meta data for the 
constructor creation</param>
-        /// <returns></returns>
-        private bool TryGetDefaultConstructor(Type type, out ConstructorInfo 
constructor)
-        {
-            // first, determine if there is a suitable constructor
-            if (type.IsAbstract || type.IsInterface)
-            {
-                constructor = null;
-                return false;
-            }
-
-            constructor = type.GetConstructor(ConstructorFlags, null, 
Type.EmptyTypes, null);
-            return null != constructor;
-        }
-    }
-}

Reply via email to