Repository: incubator-reef Updated Branches: refs/heads/master c1b7b0528 -> af4a19807
[REEF-320] Change reflection to tang instantiation in Wake Layer for Writables This addressed the issue by * Taking Iinjector as argument in RemoteManager and Transport Layer * Instantiating messages by tang injection than by reflection JIRA: [REEF-320](https://issues.apache.org/jira/browse/REEF-320) Pull Request: This closes #181 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/af4a1980 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/af4a1980 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/af4a1980 Branch: refs/heads/master Commit: af4a19807f7b15368718c5b449f3dcfebf890207 Parents: c1b7b05 Author: dkm2110 <[email protected]> Authored: Tue May 12 14:21:17 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Wed May 13 16:53:24 2015 -0700 ---------------------------------------------------------------------- .../Org.Apache.REEF.Wake.Tests.csproj | 1 + .../PrefixedStringWritable.cs | 108 +++++++++++++++++++ .../WritableRemoteManagerTest.cs | 100 +++++++++++++---- .../WritableString.cs | 2 + .../WritableTransportTest.cs | 60 +++++++++-- .../Org.Apache.REEF.Wake.csproj | 1 - .../Remote/Impl/RemoteEvent.cs | 2 + .../Remote/Impl/WritableLink.cs | 44 ++++---- .../Remote/Impl/WritableRemoteEvent.cs | 12 ++- .../Remote/Impl/WritableRemoteManager.cs | 17 ++- .../Remote/Impl/WritableRemoteManagerFactory.cs | 16 +-- .../Remote/Impl/WritableTransportClient.cs | 12 ++- .../Remote/Impl/WritableTransportServer.cs | 23 ++-- .../cs/Org.Apache.REEF.Wake/Remote/TypeCache.cs | 104 ------------------ 14 files changed, 321 insertions(+), 181 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/af4a1980/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj index f947422..581508b 100644 --- a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj @@ -46,6 +46,7 @@ under the License. <ItemGroup> <Compile Include="ClockTest.cs" /> <Compile Include="MultiCodecTest.cs" /> + <Compile Include="PrefixedStringWritable.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="PubSubSubjectTest.cs" /> <Compile Include="RemoteManagerTest.cs" /> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/af4a1980/lang/cs/Org.Apache.REEF.Wake.Tests/PrefixedStringWritable.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/PrefixedStringWritable.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/PrefixedStringWritable.cs new file mode 100644 index 0000000..dbb8af3 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/PrefixedStringWritable.cs @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Wake.Tests +{ + [NamedParameter("identifier in PrefixedWritable")] + public class StringId : Name<int> + { + } + + /// <summary> + /// Writable wrapper around the string class which takes integer prefix + /// This class is used to test non empty injector in TransportServer and Client + /// </summary> + [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] + public class PrefixedStringWritable : IWritable + { + private readonly int _id; + private string _data; + + /// <summary> + /// Returns the actual string data + /// </summary> + public string Data + { + get { return _data + "_" + _id; } + set { _data = value; } + } + + /// <summary> + /// Empty constructor for instantiation with reflection + /// </summary> + [Inject] + public PrefixedStringWritable([Parameter(typeof(StringId))] int id) + { + _id = id; + } + + /// <summary> + /// Constructor + /// </summary> + /// <param name="data">The string data</param> + public PrefixedStringWritable(string data) + { + _data = data; + } + + /// <summary> + /// Reads the string + /// </summary> + /// <param name="reader">reader to read from</param> + public void Read(IDataReader reader) + { + _data = reader.ReadString(); + } + + /// <summary> + /// Writes the string + /// </summary> + /// <param name="writer">Writer to write</param> + public void Write(IDataWriter writer) + { + writer.WriteString(_data); + } + + /// <summary> + /// Reads the string + /// </summary> + /// <param name="reader">reader to read from</param> + /// <param name="token">the cancellation token</param> + public async Task ReadAsync(IDataReader reader, CancellationToken token) + { + _data = await reader.ReadStringAsync(token); + } + + /// <summary> + /// Writes the string + /// </summary> + /// <param name="writer">Writer to write</param> + /// <param name="token">the cancellation token</param> + public async Task WriteAsync(IDataWriter writer, CancellationToken token) + { + await writer.WriteStringAsync(_data, token); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/af4a1980/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs index 80bb78b..49c0f5b 100644 --- a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs @@ -20,12 +20,15 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Globalization; using System.IO; using System.Net; using System.Reactive; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; using Org.Apache.REEF.Wake.Impl; using Org.Apache.REEF.Wake.Remote; using Org.Apache.REEF.Wake.Remote.Impl; @@ -37,8 +40,16 @@ namespace Org.Apache.REEF.Wake.Tests [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] public class WritableRemoteManagerTest { - private readonly WritableRemoteManagerFactory _remoteManagerFactory = + private const int Id = 5; + + private static IConfiguration _config = TangFactory.GetTang().NewConfigurationBuilder().BindNamedParameter<StringId, int>( + GenericType<StringId>.Class, Id.ToString(CultureInfo.InvariantCulture)).Build(); + + private readonly WritableRemoteManagerFactory _remoteManagerFactory1 = TangFactory.GetTang().NewInjector().GetInstance<WritableRemoteManagerFactory>(); + + private readonly WritableRemoteManagerFactory _remoteManagerFactory2 = + TangFactory.GetTang().NewInjector(_config).GetInstance<WritableRemoteManagerFactory>(); /// <summary> /// Tests one way communication between Remote Managers @@ -52,8 +63,8 @@ namespace Org.Apache.REEF.Wake.Tests BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>(); List<string> events = new List<string>(); - using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) - using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) + using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) + using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) { var observer = Observer.Create<WritableString>(queue.Add); IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0); @@ -85,8 +96,8 @@ namespace Org.Apache.REEF.Wake.Tests BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>(); List<string> events = new List<string>(); - using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>()) - using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, listeningPort)) + using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>()) + using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, listeningPort)) { IPEndPoint remoteEndpoint = new IPEndPoint(listeningAddress, 0); var observer = Observer.Create<WritableString>(queue.Add); @@ -118,8 +129,8 @@ namespace Org.Apache.REEF.Wake.Tests List<string> events1 = new List<string>(); List<string> events2 = new List<string>(); - using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) - using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) + using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) + using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) { // Register observers for remote manager 1 and remote manager 2 var remoteEndpoint = new IPEndPoint(listeningAddress, 0); @@ -156,6 +167,57 @@ namespace Org.Apache.REEF.Wake.Tests } /// <summary> + /// Tests two way communications where message needs an injectable argument + /// to be passed. Checks whether both sides are able to receive messages + /// </summary> + [TestMethod] + public void TestNonEmptyArgumentInjectionWritableTwoWayCommunication() + { + IPAddress listeningAddress = IPAddress.Parse("127.0.0.1"); + + BlockingCollection<PrefixedStringWritable> queue1 = new BlockingCollection<PrefixedStringWritable>(); + BlockingCollection<PrefixedStringWritable> queue2 = new BlockingCollection<PrefixedStringWritable>(); + List<string> events1 = new List<string>(); + List<string> events2 = new List<string>(); + + using (var remoteManager1 = _remoteManagerFactory2.GetInstance<PrefixedStringWritable>(listeningAddress, 0)) + using (var remoteManager2 = _remoteManagerFactory2.GetInstance<PrefixedStringWritable>(listeningAddress, 0)) + { + // Register observers for remote manager 1 and remote manager 2 + var remoteEndpoint = new IPEndPoint(listeningAddress, 0); + var observer1 = Observer.Create<PrefixedStringWritable>(queue1.Add); + var observer2 = Observer.Create<PrefixedStringWritable>(queue2.Add); + remoteManager1.RegisterObserver(remoteEndpoint, observer1); + remoteManager2.RegisterObserver(remoteEndpoint, observer2); + + // Remote manager 1 sends 3 events to remote manager 2 + var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint); + remoteObserver1.OnNext(new PrefixedStringWritable("abc")); + remoteObserver1.OnNext(new PrefixedStringWritable("def")); + remoteObserver1.OnNext(new PrefixedStringWritable("ghi")); + + // Remote manager 2 sends 4 events to remote manager 1 + var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint); + remoteObserver2.OnNext(new PrefixedStringWritable("jkl")); + remoteObserver2.OnNext(new PrefixedStringWritable("mno")); + remoteObserver2.OnNext(new PrefixedStringWritable("pqr")); + remoteObserver2.OnNext(new PrefixedStringWritable("stu")); + + events1.Add(queue1.Take().Data); + events1.Add(queue1.Take().Data); + events1.Add(queue1.Take().Data); + events1.Add(queue1.Take().Data); + + events2.Add(queue2.Take().Data); + events2.Add(queue2.Take().Data); + events2.Add(queue2.Take().Data); + } + + Assert.AreEqual(4, events1.Count); + Assert.AreEqual(3, events2.Count); + } + + /// <summary> /// Tests one way communication between 3 nodes. /// nodes 1 and 2 send messages to node 3 /// </summary> @@ -167,9 +229,9 @@ namespace Org.Apache.REEF.Wake.Tests BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>(); List<string> events = new List<string>(); - using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) - using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) - using (var remoteManager3 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) + using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) + using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) + using (var remoteManager3 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) { var remoteEndpoint = new IPEndPoint(listeningAddress, 0); var observer = Observer.Create<WritableString>(queue.Add); @@ -209,9 +271,9 @@ namespace Org.Apache.REEF.Wake.Tests List<string> events2 = new List<string>(); List<string> events3 = new List<string>(); - using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) - using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) - using (var remoteManager3 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) + using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) + using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) + using (var remoteManager3 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) { var remoteEndpoint = new IPEndPoint(listeningAddress, 0); @@ -272,8 +334,8 @@ namespace Org.Apache.REEF.Wake.Tests BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>(); List<string> events = new List<string>(); - using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) - using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) + using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) + using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) { // Register handler for when remote manager 2 receives events; respond // with an ack @@ -316,8 +378,8 @@ namespace Org.Apache.REEF.Wake.Tests BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>(); List<string> events = new List<string>(); - using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) - using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) + using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) + using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) { // RemoteManager2 listens and records events of type IRemoteEvent<WritableString> var observer = Observer.Create<IRemoteMessage<WritableString>>(message => queue.Add(message.Message)); @@ -348,8 +410,8 @@ namespace Org.Apache.REEF.Wake.Tests BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>(); List<string> events = new List<string>(); - using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) - using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) + using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) + using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0)) { var observer = Observer.Create<WritableString>(queue.Add); IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/af4a1980/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs index 8a6d041..30ff487 100644 --- a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs @@ -21,6 +21,7 @@ using System; using System.IO; using System.Threading; using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Wake.Remote; namespace Org.Apache.REEF.Wake.Tests @@ -39,6 +40,7 @@ namespace Org.Apache.REEF.Wake.Tests /// <summary> /// Empty constructor for instantiation with reflection /// </summary> + [Inject] public WritableString() { } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/af4a1980/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs index 914a2aa..2255cfa 100644 --- a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs @@ -20,6 +20,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Globalization; using System.IO; using System.Net; using System.Reactive; @@ -28,6 +29,7 @@ using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; using Org.Apache.REEF.Wake.Remote; using Org.Apache.REEF.Wake.Remote.Impl; using Org.Apache.REEF.Wake.Remote.Parameters; @@ -44,6 +46,7 @@ namespace Org.Apache.REEF.Wake.Tests public class WritableTransportTest { private readonly ITcpPortProvider _tcpPortProvider = GetTcpProvider(8900, 8940); + private readonly IInjector _injector = TangFactory.GetTang().NewInjector(); /// <summary> /// Tests whether WritableTransportServer receives @@ -58,12 +61,12 @@ namespace Org.Apache.REEF.Wake.Tests IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0); var remoteHandler = Observer.Create<TransportEvent<WritableString>>(tEvent => queue.Add(tEvent.Data)); - using (var server = new WritableTransportServer<WritableString>(endpoint, remoteHandler, _tcpPortProvider)) + using (var server = new WritableTransportServer<WritableString>(endpoint, remoteHandler, _tcpPortProvider, _injector)) { server.Run(); IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port); - using (var client = new WritableTransportClient<WritableString>(remoteEndpoint)) + using (var client = new WritableTransportClient<WritableString>(remoteEndpoint, _injector)) { client.Send(new WritableString("Hello")); client.Send(new WritableString(", ")); @@ -81,7 +84,50 @@ namespace Org.Apache.REEF.Wake.Tests Assert.AreEqual(events[2], "World!"); } - + + /// <summary> + /// Tests whether WritableTransportServer receives + /// string messages from WritableTransportClient with non empty injector + /// </summary> + [TestMethod] + public void TestNonEmptyInjectionTransportServer() + { + int id = 5; + IConfiguration config = TangFactory.GetTang().NewConfigurationBuilder().BindNamedParameter<StringId, int>( + GenericType<StringId>.Class, id.ToString(CultureInfo.InvariantCulture)).Build(); + + IInjector injector = TangFactory.GetTang().NewInjector(config); + + BlockingCollection<PrefixedStringWritable> queue = new BlockingCollection<PrefixedStringWritable>(); + List<string> events = new List<string>(); + + IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0); + var remoteHandler = Observer.Create<TransportEvent<PrefixedStringWritable>>(tEvent => queue.Add(tEvent.Data)); + + using (var server = new WritableTransportServer<PrefixedStringWritable>(endpoint, remoteHandler, _tcpPortProvider, injector.ForkInjector())) + { + server.Run(); + + IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port); + using (var client = new WritableTransportClient<PrefixedStringWritable>(remoteEndpoint, injector.ForkInjector())) + { + client.Send(new PrefixedStringWritable("Hello")); + client.Send(new PrefixedStringWritable(", ")); + client.Send(new PrefixedStringWritable("World!")); + + events.Add(queue.Take().Data); + events.Add(queue.Take().Data); + events.Add(queue.Take().Data); + } + } + + Assert.AreEqual(3, events.Count); + Assert.AreEqual(events[0], "Hello_" + id); + Assert.AreEqual(events[1], ", _" + id); + Assert.AreEqual(events[2], "World!_" + id); + } + + /// <summary> /// Checks whether WritableTransportClient is able to receive messages from remote host /// </summary> @@ -97,13 +143,13 @@ namespace Org.Apache.REEF.Wake.Tests // Server echoes the message back to the client var remoteHandler = Observer.Create<TransportEvent<WritableString>>(tEvent => tEvent.Link.Write(tEvent.Data)); - using (var server = new WritableTransportServer<WritableString>(endpoint, remoteHandler, _tcpPortProvider)) + using (var server = new WritableTransportServer<WritableString>(endpoint, remoteHandler, _tcpPortProvider, _injector)) { server.Run(); var clientHandler = Observer.Create<TransportEvent<WritableString>>(tEvent => queue.Add(tEvent.Data)); IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port); - using (var client = new WritableTransportClient<WritableString>(remoteEndpoint, clientHandler)) + using (var client = new WritableTransportClient<WritableString>(remoteEndpoint, clientHandler, _injector)) { client.Send(new WritableString("Hello")); client.Send(new WritableString(", ")); @@ -136,7 +182,7 @@ namespace Org.Apache.REEF.Wake.Tests IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0); var remoteHandler = Observer.Create<TransportEvent<WritableString>>(tEvent => queue.Add(tEvent.Data)); - using (var server = new WritableTransportServer<WritableString>(endpoint, remoteHandler, _tcpPortProvider)) + using (var server = new WritableTransportServer<WritableString>(endpoint, remoteHandler, _tcpPortProvider, _injector)) { server.Run(); @@ -145,7 +191,7 @@ namespace Org.Apache.REEF.Wake.Tests Task.Run(() => { IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port); - using (var client = new WritableTransportClient<WritableString>(remoteEndpoint)) + using (var client = new WritableTransportClient<WritableString>(remoteEndpoint, _injector)) { client.Send(new WritableString("Hello")); client.Send(new WritableString(", ")); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/af4a1980/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj index 53ffd65..170c967 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj +++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj @@ -120,7 +120,6 @@ under the License. <Compile Include="Remote\RemoteConfiguration.cs" /> <Compile Include="Remote\RemoteRuntimeException.cs" /> <Compile Include="Remote\TcpPortProvider.cs" /> - <Compile Include="Remote\TypeCache.cs" /> <Compile Include="RX\AbstractObserver.cs" /> <Compile Include="RX\AbstractRxStage.cs" /> <Compile Include="RX\Impl\PubSubSubject.cs" /> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/af4a1980/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs index bf50325..b39b20f 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs @@ -18,6 +18,7 @@ */ using System.Net; +using Org.Apache.REEF.Tang.Annotations; namespace Org.Apache.REEF.Wake.Remote.Impl { @@ -40,6 +41,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl Value = value; } + [Inject] public RemoteEvent() { } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/af4a1980/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs index f867240..9859338 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs @@ -23,6 +23,7 @@ using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; using Org.Apache.REEF.Tang.Exceptions; +using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Utilities.Diagnostics; using Org.Apache.REEF.Utilities.Logging; using Org.Apache.REEF.Wake.Util; @@ -41,11 +42,8 @@ namespace Org.Apache.REEF.Wake.Remote.Impl private readonly IPEndPoint _localEndpoint; private bool _disposed; private readonly NetworkStream _stream; - - /// <summary> - /// Cache structure to store the constructor functions for various types. - /// </summary> - private readonly TypeCache<T> _cache; + private readonly IInjector _injector; + /// <summary> /// Stream reader to be passed to IWritable @@ -62,7 +60,8 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// Connects to the specified remote endpoint. /// </summary> /// <param name="remoteEndpoint">The remote endpoint to connect to</param> - public WritableLink(IPEndPoint remoteEndpoint) + /// <param name="injector">The injector to pass arguments to incoming messages</param> + public WritableLink(IPEndPoint remoteEndpoint, IInjector injector) { if (remoteEndpoint == null) { @@ -75,9 +74,9 @@ namespace Org.Apache.REEF.Wake.Remote.Impl _stream = Client.GetStream(); _localEndpoint = GetLocalEndpoint(); _disposed = false; - _cache = new TypeCache<T>(); _reader = new StreamDataReader(_stream); _writer = new StreamDataWriter(_stream); + _injector = injector; } /// <summary> @@ -85,7 +84,8 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// Uses the already connected TcpClient. /// </summary> /// <param name="client">The already connected client</param> - public WritableLink(TcpClient client) + /// <param name="injector">The injector to pass arguments to incoming messages</param> + public WritableLink(TcpClient client, IInjector injector) { if (client == null) { @@ -96,9 +96,9 @@ namespace Org.Apache.REEF.Wake.Remote.Impl _stream = Client.GetStream(); _localEndpoint = GetLocalEndpoint(); _disposed = false; - _cache = new TypeCache<T>(); _reader = new StreamDataReader(_stream); _writer = new StreamDataWriter(_stream); + _injector = injector; } /// <summary> @@ -174,15 +174,16 @@ namespace Org.Apache.REEF.Wake.Remote.Impl return default(T); } - T value = _cache.GetInstance(dataType); - - if (value == null) + try + { + T value = (T) _injector.ForkInjector().GetInstance(dataType); + value.Read(_reader); + return value; + } + catch (InjectionException) { return default(T); } - - value.Read(_reader); - return value; } /// <summary> @@ -205,15 +206,16 @@ namespace Org.Apache.REEF.Wake.Remote.Impl return default(T); } - T value = _cache.GetInstance(dataType); - - if(value==null) + try + { + T value = (T) _injector.ForkInjector().GetInstance(dataType); + await value.ReadAsync(_reader, token); + return value; + } + catch (InjectionException) { return default(T); } - - await value.ReadAsync(_reader, token); - return value; } /// <summary> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/af4a1980/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs index b9702d6..b3664d0 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs @@ -21,6 +21,8 @@ using System; using System.Net; using System.Threading; using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Interface; namespace Org.Apache.REEF.Wake.Remote.Impl { @@ -31,6 +33,8 @@ namespace Org.Apache.REEF.Wake.Remote.Impl [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] internal sealed class WritableRemoteEvent<T> : IWritableRemoteEvent<T> where T : IWritable { + private readonly IInjector _injector; + /// <summary> /// Creates the Remote Event /// </summary> @@ -47,8 +51,10 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// <summary> /// Creates empty Remote Event /// </summary> - public WritableRemoteEvent() + [Inject] + public WritableRemoteEvent(IInjector injector) { + _injector = injector; } /// <summary> @@ -72,7 +78,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// <param name="reader">The reader from which to read </param> public void Read(IDataReader reader) { - Value = Activator.CreateInstance<T>(); + Value = (T)_injector.ForkInjector().GetInstance(typeof(T)); Value.Read(reader); } @@ -92,7 +98,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// <param name="token">The cancellation token</param> public async Task ReadAsync(IDataReader reader, CancellationToken token) { - Value = Activator.CreateInstance<T>(); + Value = (T)_injector.ForkInjector().GetInstance(typeof(T)); await Value.ReadAsync(reader, token); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/af4a1980/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs index 0a9ead3..73d8bb6 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs @@ -20,6 +20,7 @@ using System; using System.Collections.Generic; using System.Net; +using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Utilities.Logging; using Org.Apache.REEF.Wake.Util; @@ -37,6 +38,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl private readonly WritableObserverContainer<T> _observerContainer; private readonly WritableTransportServer<IWritableRemoteEvent<T>> _server; private readonly Dictionary<IPEndPoint, ProxyObserver> _cachedClients; + private readonly IInjector _injector; /// <summary> /// Constructs a DefaultRemoteManager listening on the specified address and @@ -44,8 +46,10 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// </summary> /// <param name="localAddress">The address to listen on</param> /// <param name="port">The port to listen on</param> + /// <param name="tcpPortProvider">Tcp port provider</param> + /// <param name="injector">The injector to pass arguments to incoming messages</param> [Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)] - public WritableRemoteManager(IPAddress localAddress, int port, ITcpPortProvider tcpPortProvider) + public WritableRemoteManager(IPAddress localAddress, int port, ITcpPortProvider tcpPortProvider, IInjector injector) { if (localAddress == null) { @@ -58,11 +62,12 @@ namespace Org.Apache.REEF.Wake.Remote.Impl _observerContainer = new WritableObserverContainer<T>(); _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>(); + _injector = injector; IPEndPoint localEndpoint = new IPEndPoint(localAddress, port); // Begin to listen for incoming messages - _server = new WritableTransportServer<IWritableRemoteEvent<T>>(localEndpoint, _observerContainer, tcpPortProvider); + _server = new WritableTransportServer<IWritableRemoteEvent<T>>(localEndpoint, _observerContainer, tcpPortProvider, injector); _server.Run(); LocalEndpoint = _server.LocalEndpoint; @@ -72,13 +77,15 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// <summary> /// Constructs a DefaultRemoteManager. Does not listen for incoming messages. /// </summary> + /// <param name="injector">The injector to pass arguments to incoming messages</param> [Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)] - public WritableRemoteManager() + public WritableRemoteManager(IInjector injector) { using (LOGGER.LogFunction("WritableRemoteManager::WritableRemoteManager")) { _observerContainer = new WritableObserverContainer<T>(); _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>(); + _injector = injector; LocalEndpoint = new IPEndPoint(NetworkUtils.LocalIPAddress, 0); Identifier = new SocketRemoteIdentifier(LocalEndpoint); @@ -134,7 +141,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl if (!_cachedClients.TryGetValue(remoteEndpoint, out remoteObserver)) { WritableTransportClient<IWritableRemoteEvent<T>> client = - new WritableTransportClient<IWritableRemoteEvent<T>>(remoteEndpoint, _observerContainer); + new WritableTransportClient<IWritableRemoteEvent<T>>(remoteEndpoint, _observerContainer, _injector); remoteObserver = new ProxyObserver(client); _cachedClients[remoteEndpoint] = remoteObserver; @@ -210,7 +217,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl return _observerContainer.RegisterObserver(observer); } - + /// <summary> /// Release all resources for the DefaultRemoteManager. /// </summary> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/af4a1980/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs index 4beb844..52fef8d 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs @@ -20,10 +20,9 @@ using System; using System.Net; using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Wake.Remote; -using Org.Apache.REEF.Wake.Remote.Impl; +using Org.Apache.REEF.Tang.Interface; -namespace Org.Apache.REEF.Wake.Impl +namespace Org.Apache.REEF.Wake.Remote.Impl { /// <summary> /// WritableRemoteManagerFactory for WritableRemoteManager. @@ -32,17 +31,20 @@ namespace Org.Apache.REEF.Wake.Impl public sealed class WritableRemoteManagerFactory { private readonly ITcpPortProvider _tcpPortProvider; - [Inject] - private WritableRemoteManagerFactory(ITcpPortProvider tcpPortProvider) + private readonly IInjector _injector; + + [Inject] + private WritableRemoteManagerFactory(ITcpPortProvider tcpPortProvider, IInjector injector) { _tcpPortProvider = tcpPortProvider; + _injector = injector; } public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int port) where T : IWritable { #pragma warning disable 618 // This is the one place allowed to call this constructor. Hence, disabling the warning is OK. - return new WritableRemoteManager<T>(localAddress, port, _tcpPortProvider); + return new WritableRemoteManager<T>(localAddress, port, _tcpPortProvider, _injector); #pragma warning disable 618 } @@ -50,7 +52,7 @@ namespace Org.Apache.REEF.Wake.Impl { #pragma warning disable 618 // This is the one place allowed to call this constructor. Hence, disabling the warning is OK. - return new WritableRemoteManager<T>(); + return new WritableRemoteManager<T>(_injector); #pragma warning disable 618 } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/af4a1980/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs index 5e42fb7..b245f0f 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs @@ -21,6 +21,7 @@ using System; using System.Net; using System.Threading; using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Utilities.Diagnostics; using Org.Apache.REEF.Utilities.Logging; @@ -44,11 +45,12 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// Used to send messages to the specified remote endpoint. /// </summary> /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param> - public WritableTransportClient(IPEndPoint remoteEndpoint) + /// <param name="injector">The injector to pass arguments to incoming messages</param> + public WritableTransportClient(IPEndPoint remoteEndpoint, IInjector injector) { Exceptions.ThrowIfArgumentNull(remoteEndpoint, "remoteEndpoint", Logger); - _link = new WritableLink<T>(remoteEndpoint); + _link = new WritableLink<T>(remoteEndpoint, injector); _cancellationSource = new CancellationTokenSource(); _disposed = false; } @@ -59,9 +61,11 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// </summary> /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param> /// <param name="observer">Callback used when receiving responses from remote host</param> + /// <param name="injector">The injector to pass arguments to incoming messages</param> public WritableTransportClient(IPEndPoint remoteEndpoint, - IObserver<TransportEvent<T>> observer) - : this(remoteEndpoint) + IObserver<TransportEvent<T>> observer, + IInjector injector) + : this(remoteEndpoint, injector) { _observer = observer; Task.Run(() => ResponseLoop()); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/af4a1980/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs index 90cfdd7..6b5961f 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs @@ -18,10 +18,13 @@ */ using System; +using System.Collections.Generic; +using System.IO; using System.Net; using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Utilities.Diagnostics; using Org.Apache.REEF.Utilities.Logging; using Org.Apache.REEF.Wake.Util; @@ -41,9 +44,9 @@ namespace Org.Apache.REEF.Wake.Remote.Impl private readonly CancellationTokenSource _cancellationSource; private readonly IObserver<TransportEvent<T>> _remoteObserver; private readonly ITcpPortProvider _tcpPortProvider; + private readonly IInjector _injector; private bool _disposed; private Task _serverTask; - /// <summary> /// Constructs a TransportServer to listen for remote events. /// Listens on the specified remote endpoint. When it recieves a remote @@ -53,8 +56,9 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// <param name="remoteHandler">The handler to invoke when receiving incoming /// remote messages</param> /// <param name="tcpPortProvider">Find port numbers if listenport is 0</param> - public WritableTransportServer(int port, IObserver<TransportEvent<T>> remoteHandler, ITcpPortProvider tcpPortProvider) - : this(new IPEndPoint(NetworkUtils.LocalIPAddress, port), remoteHandler, tcpPortProvider) + /// <param name="injector">The injector to pass arguments to incoming messages</param> + public WritableTransportServer(int port, IObserver<TransportEvent<T>> remoteHandler, ITcpPortProvider tcpPortProvider, IInjector injector) + : this(new IPEndPoint(NetworkUtils.LocalIPAddress, port), remoteHandler, tcpPortProvider, injector) { } @@ -67,16 +71,19 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// <param name="remoteHandler">The handler to invoke when receiving incoming /// remote messages</param> /// <param name="tcpPortProvider">Find port numbers if listenport is 0</param> + /// <param name="injector">The injector to pass arguments to incoming messages</param> public WritableTransportServer( IPEndPoint localEndpoint, IObserver<TransportEvent<T>> remoteHandler, - ITcpPortProvider tcpPortProvider) + ITcpPortProvider tcpPortProvider, + IInjector injector) { _listener = new TcpListener(localEndpoint.Address, localEndpoint.Port); _remoteObserver = remoteHandler; _tcpPortProvider = tcpPortProvider; _cancellationSource = new CancellationTokenSource(); _cancellationSource.Token.ThrowIfCancellationRequested(); + _injector = injector; _disposed = false; } @@ -212,25 +219,21 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// <param name="client">The connected client</param> private async Task ProcessClient(TcpClient client) { + // Keep reading messages from client until they disconnect or timeout CancellationToken token = _cancellationSource.Token; - using (ILink<T> link = new WritableLink<T>(client)) + using (ILink<T> link = new WritableLink<T>(client, _injector)) { while (!token.IsCancellationRequested) { - //T message = link.Read(); T message = await link.ReadAsync(token); if (message == null) { - //LOGGER.Log(Level.Error, - // "ProcessClient, no message received, break." + link.RemoteEndpoint + " - " + - // link.LocalEndpoint); break; } TransportEvent<T> transportEvent = new TransportEvent<T>(message, link); - _remoteObserver.OnNext(transportEvent); } LOGGER.Log(Level.Error, http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/af4a1980/lang/cs/Org.Apache.REEF.Wake/Remote/TypeCache.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/TypeCache.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/TypeCache.cs deleted file mode 100644 index 9bbe549..0000000 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/TypeCache.cs +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Linq.Expressions; -using System.Reflection; -using System.Text; -using System.Threading.Tasks; - -namespace Org.Apache.REEF.Wake.Remote -{ - /// <summary> - /// Cache used to store the constructor functions to instantiate various Types. - /// It is assumed that all types are inherited from the base type T - /// </summary> - /// <typeparam name="T"></typeparam> - public class TypeCache<T> - { - private const BindingFlags ConstructorFlags = - BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance; - - /// <summary> - /// Cache that stores the constructors for already used types using the assmebly name - /// </summary> - private readonly Dictionary<string, Func<T>> _typeConstructorMapping = new Dictionary<string, Func<T>>(); - - public T GetInstance(string typeString) - { - if (!_typeConstructorMapping.ContainsKey(typeString)) - { - var type = Type.GetType(typeString); - - if (type != null) - { - _typeConstructorMapping[typeString] = GetActivator(type); - } - } - - return _typeConstructorMapping[typeString](); - } - - /// <summary> - /// Returns the constructor for type T given actual type. Type can be - /// that of inherited class. - /// <param name="actualType">The actual type for which we want to create the constructor.</param> - /// <returns>The constructor function</returns> - /// </summary> - private Func<T> GetActivator(Type actualType) - { - ConstructorInfo constructor; - if (actualType.IsValueType) - { - // For struct types, there is an implicit default constructor. - constructor = null; - } - else if (!TryGetDefaultConstructor(actualType, out constructor)) - { - throw new Exception("could not get default constructor"); - } - NewExpression nex = constructor == null ? Expression.New(actualType) : Expression.New(constructor); - var body = Expression.Convert(nex, typeof (T)); - Expression<Func<T>> lambda = Expression.Lambda<Func<T>>(body); - - return lambda.Compile(); - } - - /// <summary> - /// Fills the constructor information and meta-data - /// </summary> - /// <param name="type">The type for which constructor needs to be created</param> - /// <param name="constructor">The information and meta data for the constructor creation</param> - /// <returns></returns> - private bool TryGetDefaultConstructor(Type type, out ConstructorInfo constructor) - { - // first, determine if there is a suitable constructor - if (type.IsAbstract || type.IsInterface) - { - constructor = null; - return false; - } - - constructor = type.GetConstructor(ConstructorFlags, null, Type.EmptyTypes, null); - return null != constructor; - } - } -}
