Repository: incubator-reef Updated Branches: refs/heads/master 31d41f886 -> a8ebe24e5
[REEF-258] Improve RemoteManager memory efficiency by using Writables This change adds the `WritableRemoteManager` implementation of `IRemoteManager`. It is more memory efficient as it only creates one serialized copy of each message instead of requiring it to be pre-serialized into a `byte[]`. JIRA: [REEF-258](https://issues.apache.org/jira/browse/REEF-258) Pull Request: This closes #170 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/a8ebe24e Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/a8ebe24e Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/a8ebe24e Branch: refs/heads/master Commit: a8ebe24e5e626f916dfb6276184284725228361d Parents: 31d41f8 Author: dkm2110 <[email protected]> Authored: Fri May 1 15:49:10 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Tue May 5 17:09:56 2015 -0700 ---------------------------------------------------------------------- .../Org.Apache.REEF.Wake.Tests.csproj | 1 + .../WritableRemoteManagerTest.cs | 375 +++++++++++++++++++ .../Impl/DefaultRemoteManagerFactory.cs | 53 --- .../Org.Apache.REEF.Wake.csproj | 7 +- .../Remote/IWritableRemoteEvent.cs | 45 +++ .../Remote/Impl/DefaultRemoteManagerFactory.cs | 53 +++ .../Remote/Impl/WritableObserverContainer.cs | 132 +++++++ .../Remote/Impl/WritableRemoteEvent.cs | 109 ++++++ .../Remote/Impl/WritableRemoteManager.cs | 279 ++++++++++++++ .../Remote/Impl/WritableRemoteManagerFactory.cs | 55 +++ 10 files changed, 1055 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj index d709933..f947422 100644 --- a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj @@ -49,6 +49,7 @@ under the License. <Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="PubSubSubjectTest.cs" /> <Compile Include="RemoteManagerTest.cs" /> + <Compile Include="WritableRemoteManagerTest.cs" /> <Compile Include="WritableTransportTest.cs" /> <Compile Include="TransportTest.cs" /> <Compile Include="WritableString.cs" /> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs new file mode 100644 index 0000000..6f7baf9 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs @@ -0,0 +1,375 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.IO; +using System.Net; +using System.Reactive; +using System.Threading.Tasks; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Wake.Impl; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.Remote.Impl; +using Org.Apache.REEF.Wake.Util; + +namespace Org.Apache.REEF.Wake.Tests +{ + [TestClass] + [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] + public class WritableRemoteManagerTest + { + private readonly WritableRemoteManagerFactory _remoteManagerFactory = + TangFactory.GetTang().NewInjector().GetInstance<WritableRemoteManagerFactory>(); + + /// <summary> + /// Tests one way communication between Remote Managers + /// Remote Manager listens on any available port + /// </summary> + [TestMethod] + public void TestWritableOneWayCommunication() + { + IPAddress listeningAddress = IPAddress.Parse("127.0.0.1"); + + BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>(); + List<string> events = new List<string>(); + + using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) + using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) + { + var observer = Observer.Create<WritableString>(queue.Add); + IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0); + remoteManager2.RegisterObserver(endpoint1, observer); + + var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint); + remoteObserver.OnNext(new WritableString("abc")); + remoteObserver.OnNext(new WritableString("def")); + remoteObserver.OnNext(new WritableString("ghi")); + + events.Add(queue.Take().Data); + events.Add(queue.Take().Data); + events.Add(queue.Take().Data); + } + + Assert.AreEqual(3, events.Count); + } + + /// <summary> + /// Tests one way communication between Remote Managers + /// Remote manager listens on a particular port + /// </summary> + [TestMethod] + public void TestWritableOneWayCommunicationClientOnly() + { + int listeningPort = NetworkUtils.GenerateRandomPort(6000, 7000); + IPAddress listeningAddress = IPAddress.Parse("127.0.0.1"); + + BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>(); + List<string> events = new List<string>(); + + using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>()) + using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, listeningPort)) + { + IPEndPoint remoteEndpoint = new IPEndPoint(listeningAddress, 0); + var observer = Observer.Create<WritableString>(queue.Add); + remoteManager2.RegisterObserver(remoteEndpoint, observer); + + var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint); + remoteObserver.OnNext(new WritableString("abc")); + remoteObserver.OnNext(new WritableString("def")); + remoteObserver.OnNext(new WritableString("ghi")); + + events.Add(queue.Take().Data); + events.Add(queue.Take().Data); + events.Add(queue.Take().Data); + } + + Assert.AreEqual(3, events.Count); + } + + /// <summary> + /// Tests two way communications. Checks whether both sides are able to receive messages + /// </summary> + [TestMethod] + public void TestWritableTwoWayCommunication() + { + IPAddress listeningAddress = IPAddress.Parse("127.0.0.1"); + + BlockingCollection<WritableString> queue1 = new BlockingCollection<WritableString>(); + BlockingCollection<WritableString> queue2 = new BlockingCollection<WritableString>(); + List<string> events1 = new List<string>(); + List<string> events2 = new List<string>(); + + using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) + using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) + { + // Register observers for remote manager 1 and remote manager 2 + var remoteEndpoint = new IPEndPoint(listeningAddress, 0); + var observer1 = Observer.Create<WritableString>(queue1.Add); + var observer2 = Observer.Create<WritableString>(queue2.Add); + remoteManager1.RegisterObserver(remoteEndpoint, observer1); + remoteManager2.RegisterObserver(remoteEndpoint, observer2); + + // Remote manager 1 sends 3 events to remote manager 2 + var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint); + remoteObserver1.OnNext(new WritableString("abc")); + remoteObserver1.OnNext(new WritableString("def")); + remoteObserver1.OnNext(new WritableString("ghi")); + + // Remote manager 2 sends 4 events to remote manager 1 + var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint); + remoteObserver2.OnNext(new WritableString("jkl")); + remoteObserver2.OnNext(new WritableString("mno")); + remoteObserver2.OnNext(new WritableString("pqr")); + remoteObserver2.OnNext(new WritableString("stu")); + + events1.Add(queue1.Take().Data); + events1.Add(queue1.Take().Data); + events1.Add(queue1.Take().Data); + events1.Add(queue1.Take().Data); + + events2.Add(queue2.Take().Data); + events2.Add(queue2.Take().Data); + events2.Add(queue2.Take().Data); + } + + Assert.AreEqual(4, events1.Count); + Assert.AreEqual(3, events2.Count); + } + + /// <summary> + /// Tests one way communication between 3 nodes. + /// nodes 1 and 2 send messages to node 3 + /// </summary> + [TestMethod] + public void TestWritableCommunicationThreeNodesOneWay() + { + IPAddress listeningAddress = IPAddress.Parse("127.0.0.1"); + + BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>(); + List<string> events = new List<string>(); + + using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) + using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) + using (var remoteManager3 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) + { + var remoteEndpoint = new IPEndPoint(listeningAddress, 0); + var observer = Observer.Create<WritableString>(queue.Add); + remoteManager3.RegisterObserver(remoteEndpoint, observer); + + var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint); + var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint); + + remoteObserver2.OnNext(new WritableString("abc")); + remoteObserver1.OnNext(new WritableString("def")); + remoteObserver2.OnNext(new WritableString("ghi")); + remoteObserver1.OnNext(new WritableString("jkl")); + remoteObserver2.OnNext(new WritableString("mno")); + + for (int i = 0; i < 5; i++) + { + events.Add(queue.Take().Data); + } + } + + Assert.AreEqual(5, events.Count); + } + + /// <summary> + /// Tests one way communication between 3 nodes. + /// nodes 1 and 2 send messages to node 3 and node 3 sends message back + /// </summary> + [TestMethod] + public void TestWritableCommunicationThreeNodesBothWays() + { + IPAddress listeningAddress = IPAddress.Parse("127.0.0.1"); + + BlockingCollection<WritableString> queue1 = new BlockingCollection<WritableString>(); + BlockingCollection<WritableString> queue2 = new BlockingCollection<WritableString>(); + BlockingCollection<WritableString> queue3 = new BlockingCollection<WritableString>(); + List<string> events1 = new List<string>(); + List<string> events2 = new List<string>(); + List<string> events3 = new List<string>(); + + using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) + using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) + using (var remoteManager3 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) + { + var remoteEndpoint = new IPEndPoint(listeningAddress, 0); + + var observer = Observer.Create<WritableString>(queue1.Add); + remoteManager1.RegisterObserver(remoteEndpoint, observer); + var observer2 = Observer.Create<WritableString>(queue2.Add); + remoteManager2.RegisterObserver(remoteEndpoint, observer2); + var observer3 = Observer.Create<WritableString>(queue3.Add); + remoteManager3.RegisterObserver(remoteEndpoint, observer3); + + var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint); + var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint); + + // Observer 1 and 2 send messages to observer 3 + remoteObserver1.OnNext(new WritableString("abc")); + remoteObserver1.OnNext(new WritableString("abc")); + remoteObserver1.OnNext(new WritableString("abc")); + remoteObserver2.OnNext(new WritableString("def")); + remoteObserver2.OnNext(new WritableString("def")); + + // Observer 3 sends messages back to observers 1 and 2 + var remoteObserver3A = remoteManager3.GetRemoteObserver(remoteManager1.LocalEndpoint); + var remoteObserver3B = remoteManager3.GetRemoteObserver(remoteManager2.LocalEndpoint); + + remoteObserver3A.OnNext(new WritableString("ghi")); + remoteObserver3A.OnNext(new WritableString("ghi")); + remoteObserver3B.OnNext(new WritableString("jkl")); + remoteObserver3B.OnNext(new WritableString("jkl")); + remoteObserver3B.OnNext(new WritableString("jkl")); + + events1.Add(queue1.Take().Data); + events1.Add(queue1.Take().Data); + + events2.Add(queue2.Take().Data); + events2.Add(queue2.Take().Data); + events2.Add(queue2.Take().Data); + + events3.Add(queue3.Take().Data); + events3.Add(queue3.Take().Data); + events3.Add(queue3.Take().Data); + events3.Add(queue3.Take().Data); + events3.Add(queue3.Take().Data); + } + + Assert.AreEqual(2, events1.Count); + Assert.AreEqual(3, events2.Count); + Assert.AreEqual(5, events3.Count); + } + + /// <summary> + /// Tests whether remote manager is able to send acknowledgement back + /// </summary> + [TestMethod] + public void TestWritableRemoteSenderCallback() + { + IPAddress listeningAddress = IPAddress.Parse("127.0.0.1"); + + BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>(); + List<string> events = new List<string>(); + + using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) + using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) + { + // Register handler for when remote manager 2 receives events; respond + // with an ack + var remoteEndpoint = new IPEndPoint(listeningAddress, 0); + var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint); + + var receiverObserver = Observer.Create<WritableString>( + message => remoteObserver2.OnNext(new WritableString("received message: " + message.Data))); + remoteManager2.RegisterObserver(remoteEndpoint, receiverObserver); + + // Register handler for remote manager 1 to record the ack + var senderObserver = Observer.Create<WritableString>(queue.Add); + remoteManager1.RegisterObserver(remoteEndpoint, senderObserver); + + // Begin to send messages + var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint); + remoteObserver1.OnNext(new WritableString("hello")); + remoteObserver1.OnNext(new WritableString("there")); + remoteObserver1.OnNext(new WritableString("buddy")); + + events.Add(queue.Take().Data); + events.Add(queue.Take().Data); + events.Add(queue.Take().Data); + } + + Assert.AreEqual(3, events.Count); + Assert.AreEqual("received message: hello", events[0]); + Assert.AreEqual("received message: there", events[1]); + Assert.AreEqual("received message: buddy", events[2]); + } + + /// <summary> + /// Test whether observer can be created with IRemoteMessage interface + /// </summary> + [TestMethod] + public void TestWritableRegisterObserverByType() + { + IPAddress listeningAddress = IPAddress.Parse("127.0.0.1"); + + BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>(); + List<string> events = new List<string>(); + + using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) + using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) + { + // RemoteManager2 listens and records events of type IRemoteEvent<WritableString> + var observer = Observer.Create<IRemoteMessage<WritableString>>(message => queue.Add(message.Message)); + remoteManager2.RegisterObserver(observer); + + // Remote manager 1 sends 3 events to remote manager 2 + var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint); + remoteObserver.OnNext(new WritableString("abc")); + remoteObserver.OnNext(new WritableString("def")); + remoteObserver.OnNext(new WritableString("ghi")); + + events.Add(queue.Take().Data); + events.Add(queue.Take().Data); + events.Add(queue.Take().Data); + } + + Assert.AreEqual(3, events.Count); + } + + /// <summary> + /// Tests whether we get the cached observer back for sending message without reinstantiating it + /// </summary> + [TestMethod] + public void TestWritableCachedConnection() + { + IPAddress listeningAddress = IPAddress.Parse("127.0.0.1"); + + BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>(); + List<string> events = new List<string>(); + + using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) + using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0)) + { + var observer = Observer.Create<WritableString>(queue.Add); + IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0); + remoteManager2.RegisterObserver(endpoint1, observer); + + var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint); + remoteObserver.OnNext(new WritableString("abc")); + remoteObserver.OnNext(new WritableString("def")); + + var cachedObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint); + cachedObserver.OnNext(new WritableString("ghi")); + cachedObserver.OnNext(new WritableString("jkl")); + + events.Add(queue.Take().Data); + events.Add(queue.Take().Data); + events.Add(queue.Take().Data); + events.Add(queue.Take().Data); + } + + Assert.AreEqual(4, events.Count); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake/Impl/DefaultRemoteManagerFactory.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/DefaultRemoteManagerFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/DefaultRemoteManagerFactory.cs deleted file mode 100644 index 38a020f..0000000 --- a/lang/cs/Org.Apache.REEF.Wake/Impl/DefaultRemoteManagerFactory.cs +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System.Net; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Wake.Remote; -using Org.Apache.REEF.Wake.Remote.Impl; - -namespace Org.Apache.REEF.Wake.Impl -{ - /// <summary> - /// An implementation of IRemoteManagerFactory for DefaultRemoteManager. - /// </summary> - internal sealed class DefaultRemoteManagerFactory : IRemoteManagerFactory - { - [Inject] - private DefaultRemoteManagerFactory() - { - } - - public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int port, ICodec<T> codec) - { -#pragma warning disable 618 - // This is the one place allowed to call this constructor. Hence, disabling the warning is OK. - return new DefaultRemoteManager<T>(localAddress, port, codec); -#pragma warning restore 618 - } - - public IRemoteManager<T> GetInstance<T>(ICodec<T> codec) - { -#pragma warning disable 618 - // This is the one place allowed to call this constructor. Hence, disabling the warning is OK. - return new DefaultRemoteManager<T>(codec); -#pragma warning restore 618 - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj index 26f2fbf..a62d524 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj +++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj @@ -48,7 +48,8 @@ under the License. <Compile Include="IEventHandler.cs" /> <Compile Include="IIdentifier.cs" /> <Compile Include="IIdentifierFactory.cs" /> - <Compile Include="Impl\DefaultRemoteManagerFactory.cs" /> + <Compile Include="Remote\Impl\WritableRemoteManagerFactory.cs" /> + <Compile Include="Remote\Impl\DefaultRemoteManagerFactory.cs" /> <Compile Include="Impl\LoggingEventHandler.cs" /> <Compile Include="Impl\MissingStartHandlerHandler.cs" /> <Compile Include="Impl\MultiEventHandler.cs" /> @@ -65,11 +66,15 @@ under the License. <Compile Include="Remote\IDataWriter.cs" /> <Compile Include="Remote\Impl\StreamDataReader.cs" /> <Compile Include="Remote\Impl\StreamDataWriter.cs" /> + <Compile Include="Remote\Impl\WritableRemoteManager.cs" /> <Compile Include="Remote\Impl\WritableLink.cs" /> + <Compile Include="Remote\Impl\WritableObserverContainer.cs" /> + <Compile Include="Remote\Impl\WritableRemoteEvent.cs" /> <Compile Include="Remote\Impl\WritableTransportClient.cs" /> <Compile Include="Remote\Impl\WritableTransportServer.cs" /> <Compile Include="Remote\IRemoteManagerFactory.cs" /> <Compile Include="Remote\IWritable.cs" /> + <Compile Include="Remote\IWritableRemoteEvent.cs" /> <Compile Include="Remote\Proto\WakeRemoteProtosGen.cs" /> <Compile Include="Remote\ICodec.cs" /> <Compile Include="Remote\ICodecFactory.cs" /> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake/Remote/IWritableRemoteEvent.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IWritableRemoteEvent.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IWritableRemoteEvent.cs new file mode 100644 index 0000000..40222aa --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IWritableRemoteEvent.cs @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Linq.Expressions; +using System.Net; + +namespace Org.Apache.REEF.Wake.Remote +{ + /// <summary> + /// Interface for remote event + /// </summary> + /// <typeparam name="T">Type of remote event message. It is assumed that T implements IWritable</typeparam> + [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] + internal interface IWritableRemoteEvent<T> : IWritable where T : IWritable + { + /// <summary> + /// Local Endpoint + /// </summary> + IPEndPoint LocalEndPoint { get; set; } + + /// <summary> + /// Remote Endpoint + /// </summary> + IPEndPoint RemoteEndPoint { get; set; } + + T Value { get; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs new file mode 100644 index 0000000..38a020f --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Net; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.Remote.Impl; + +namespace Org.Apache.REEF.Wake.Impl +{ + /// <summary> + /// An implementation of IRemoteManagerFactory for DefaultRemoteManager. + /// </summary> + internal sealed class DefaultRemoteManagerFactory : IRemoteManagerFactory + { + [Inject] + private DefaultRemoteManagerFactory() + { + } + + public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int port, ICodec<T> codec) + { +#pragma warning disable 618 + // This is the one place allowed to call this constructor. Hence, disabling the warning is OK. + return new DefaultRemoteManager<T>(localAddress, port, codec); +#pragma warning restore 618 + } + + public IRemoteManager<T> GetInstance<T>(ICodec<T> codec) + { +#pragma warning disable 618 + // This is the one place allowed to call this constructor. Hence, disabling the warning is OK. + return new DefaultRemoteManager<T>(codec); +#pragma warning restore 618 + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableObserverContainer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableObserverContainer.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableObserverContainer.cs new file mode 100644 index 0000000..9790e3e --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableObserverContainer.cs @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Concurrent; +using System.Net; +using Org.Apache.REEF.Wake.Util; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + /// <summary> + /// Stores registered IObservers for DefaultRemoteManager. + /// Can register and look up IObservers by remote IPEndPoint. + /// </summary> + /// <typeparam name="T">Message type T. It is assumed to be IWritable</typeparam> + [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] + internal class WritableObserverContainer<T> : IObserver<TransportEvent<IWritableRemoteEvent<T>>> where T : IWritable + { + private static readonly Logger Logger = Logger.GetLogger(typeof(WritableObserverContainer<>)); + private readonly ConcurrentDictionary<IPEndPoint, IObserver<T>> _endpointMap; + private readonly ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>> _typeMap; + private IObserver<T> _universalObserver; + + /// <summary> + /// Constructs a new ObserverContainer used to manage remote IObservers. + /// </summary> + public WritableObserverContainer() + { + _endpointMap = new ConcurrentDictionary<IPEndPoint, IObserver<T>>(new IPEndPointComparer()); + _typeMap = new ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>>(); + } + + /// <summary> + /// Registers an IObserver used to handle incoming messages from the remote host + /// at the specified IPEndPoint. + /// </summary> + /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param> + /// <param name="observer">The IObserver to handle incoming messages</param> + /// <returns>An IDisposable used to unregister the observer with</returns> + public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> observer) + { + if (remoteEndpoint.Address.Equals(IPAddress.Any)) + { + _universalObserver = observer; + return Disposable.Create(() => { _universalObserver = null; }); + } + + _endpointMap[remoteEndpoint] = observer; + return Disposable.Create(() => _endpointMap.TryRemove(remoteEndpoint, out observer)); + } + + /// <summary> + /// Registers an IObserver to handle incoming messages from a remote host + /// </summary> + /// <param name="observer">The IObserver to handle incoming messages</param> + /// <returns>An IDisposable used to unregister the observer with</returns> + public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer) + { + _typeMap[typeof(T)] = observer; + return Disposable.Create(() => _typeMap.TryRemove(typeof(T), out observer)); + } + + /// <summary> + /// Look up the IObserver for the registered IPEndPoint or event type + /// and execute the IObserver. + /// </summary> + /// <param name="transportEvent">The incoming remote event</param> + public void OnNext(TransportEvent<IWritableRemoteEvent<T>> transportEvent) + { + IWritableRemoteEvent<T> remoteEvent = transportEvent.Data; + remoteEvent.LocalEndPoint = transportEvent.Link.LocalEndpoint; + remoteEvent.RemoteEndPoint = transportEvent.Link.RemoteEndpoint; + T value = remoteEvent.Value; + bool handled = false; + + IObserver<T> observer1; + IObserver<IRemoteMessage<T>> observer2; + if (_universalObserver != null) + { + _universalObserver.OnNext(value); + handled = true; + } + if (_endpointMap.TryGetValue(remoteEvent.RemoteEndPoint, out observer1)) + { + // IObserver was registered by IPEndpoint + observer1.OnNext(value); + handled = true; + } + else if (_typeMap.TryGetValue(value.GetType(), out observer2)) + { + // IObserver was registered by event type + IRemoteIdentifier id = new SocketRemoteIdentifier(remoteEvent.RemoteEndPoint); + IRemoteMessage<T> remoteMessage = new DefaultRemoteMessage<T>(id, value); + observer2.OnNext(remoteMessage); + handled = true; + } + + if (!handled) + { + throw new WakeRuntimeException("Unrecognized Wake RemoteEvent message"); + } + } + + public void OnError(Exception error) + { + throw error; + } + + public void OnCompleted() + { + Logger.Log(Level.Info, "Exiting the Writable Observer Container"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs new file mode 100644 index 0000000..b9702d6 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Net; +using System.Threading; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + /// <summary> + /// Writable remote event class + /// </summary> + /// <typeparam name="T">Type of remote event message. It is assumed that T implements IWritable</typeparam> + [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] + internal sealed class WritableRemoteEvent<T> : IWritableRemoteEvent<T> where T : IWritable + { + /// <summary> + /// Creates the Remote Event + /// </summary> + /// <param name="localEndpoint">Local Address</param> + /// <param name="remoteEndpoint">Remote Address</param> + /// <param name="value">Actual message</param> + public WritableRemoteEvent(IPEndPoint localEndpoint, IPEndPoint remoteEndpoint, T value) + { + LocalEndPoint = localEndpoint; + RemoteEndPoint = remoteEndpoint; + Value = value; + } + + /// <summary> + /// Creates empty Remote Event + /// </summary> + public WritableRemoteEvent() + { + } + + /// <summary> + /// Local Address + /// </summary> + public IPEndPoint LocalEndPoint { get; set; } + + /// <summary> + /// Remote Address + /// </summary> + public IPEndPoint RemoteEndPoint { get; set; } + + /// <summary> + /// The actual message + /// </summary> + public T Value { get; set; } + + /// <summary> + /// Read the class fields. + /// </summary> + /// <param name="reader">The reader from which to read </param> + public void Read(IDataReader reader) + { + Value = Activator.CreateInstance<T>(); + Value.Read(reader); + } + + /// <summary> + /// Writes the class fields. + /// </summary> + /// <param name="writer">The writer to which to write</param> + public void Write(IDataWriter writer) + { + Value.Write(writer); + } + + /// <summary> + /// Read the class fields. + /// </summary> + /// <param name="reader">The reader from which to read </param> + /// <param name="token">The cancellation token</param> + public async Task ReadAsync(IDataReader reader, CancellationToken token) + { + Value = Activator.CreateInstance<T>(); + await Value.ReadAsync(reader, token); + } + + /// <summary> + /// Writes the class fields. + /// </summary> + /// <param name="writer">The writer to which to write</param> + /// <param name="token">The cancellation token</param> + public async Task WriteAsync(IDataWriter writer, CancellationToken token) + { + await Value.WriteAsync(writer, token); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs new file mode 100644 index 0000000..285db71 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs @@ -0,0 +1,279 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Net; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.Util; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + /// <summary> + /// Manages incoming and outgoing messages between remote hosts. + /// </summary> + /// <typeparam name="T">Message type T. It is assumed to be IWritable</typeparam> + [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] + public sealed class WritableRemoteManager<T> : IRemoteManager<T> where T : IWritable + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof (WritableRemoteManager<T>)); + + private readonly WritableObserverContainer<T> _observerContainer; + private readonly WritableTransportServer<IWritableRemoteEvent<T>> _server; + private readonly Dictionary<IPEndPoint, ProxyObserver> _cachedClients; + + /// <summary> + /// Constructs a DefaultRemoteManager listening on the specified address and + /// a specific port. + /// </summary> + /// <param name="localAddress">The address to listen on</param> + /// <param name="port">The port to listen on</param> + [Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)] + public WritableRemoteManager(IPAddress localAddress, int port) + { + if (localAddress == null) + { + throw new ArgumentNullException("localAddress"); + } + if (port < 0) + { + throw new ArgumentException("Listening port must be greater than or equal to zero"); + } + + _observerContainer = new WritableObserverContainer<T>(); + _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>(); + + IPEndPoint localEndpoint = new IPEndPoint(localAddress, port); + + // Begin to listen for incoming messages + _server = new WritableTransportServer<IWritableRemoteEvent<T>>(localEndpoint, _observerContainer); + _server.Run(); + + LocalEndpoint = _server.LocalEndpoint; + Identifier = new SocketRemoteIdentifier(LocalEndpoint); + } + + /// <summary> + /// Constructs a DefaultRemoteManager. Does not listen for incoming messages. + /// </summary> + [Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)] + public WritableRemoteManager() + { + using (LOGGER.LogFunction("WritableRemoteManager::WritableRemoteManager")) + { + _observerContainer = new WritableObserverContainer<T>(); + _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>(); + + LocalEndpoint = new IPEndPoint(NetworkUtils.LocalIPAddress, 0); + Identifier = new SocketRemoteIdentifier(LocalEndpoint); + } + } + + /// <summary> + /// Gets the RemoteIdentifier for the DefaultRemoteManager + /// </summary> + public IRemoteIdentifier Identifier { get; private set; } + + /// <summary> + /// Gets the local IPEndPoint for the DefaultRemoteManager + /// </summary> + public IPEndPoint LocalEndpoint { get; private set; } + + /// <summary> + /// Returns an IObserver used to send messages to the remote host at + /// the specified IPEndpoint. + /// </summary> + /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param> + /// <returns>An IObserver used to send messages to the remote host</returns> + public IObserver<T> GetRemoteObserver(RemoteEventEndPoint<T> remoteEndpoint) + { + if (remoteEndpoint == null) + { + throw new ArgumentNullException("remoteEndpoint"); + } + + SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier; + if (id == null) + { + throw new ArgumentException("ID not supported"); + } + + return GetRemoteObserver(id.Addr); + } + + /// <summary> + /// Returns an IObserver used to send messages to the remote host at + /// the specified IPEndpoint. + /// </summary> + /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param> + /// <returns>An IObserver used to send messages to the remote host</returns> + public IObserver<T> GetRemoteObserver(IPEndPoint remoteEndpoint) + { + if (remoteEndpoint == null) + { + throw new ArgumentNullException("remoteEndpoint"); + } + + ProxyObserver remoteObserver; + if (!_cachedClients.TryGetValue(remoteEndpoint, out remoteObserver)) + { + WritableTransportClient<IWritableRemoteEvent<T>> client = + new WritableTransportClient<IWritableRemoteEvent<T>>(remoteEndpoint, _observerContainer); + + remoteObserver = new ProxyObserver(client); + _cachedClients[remoteEndpoint] = remoteObserver; + } + + return remoteObserver; + } + + /// <summary> + /// Registers an IObserver used to handle incoming messages from the remote host + /// at the specified IPEndPoint. + /// The IDisposable that is returned can be used to unregister the IObserver. + /// </summary> + /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param> + /// <param name="observer">The IObserver to handle incoming messages</param> + /// <returns>An IDisposable used to unregister the observer with</returns> + public IDisposable RegisterObserver(RemoteEventEndPoint<T> remoteEndpoint, IObserver<T> observer) + { + if (remoteEndpoint == null) + { + throw new ArgumentNullException("remoteEndpoint"); + } + + if (observer == null) + { + throw new ArgumentNullException("observer"); + } + + SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier; + if (id == null) + { + throw new ArgumentException("ID not supported"); + } + + return RegisterObserver(id.Addr, observer); + } + + /// <summary> + /// Registers an IObserver used to handle incoming messages from the remote host + /// at the specified IPEndPoint. + /// The IDisposable that is returned can be used to unregister the IObserver. + /// </summary> + /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param> + /// <param name="observer">The IObserver to handle incoming messages</param> + /// <returns>An IDisposable used to unregister the observer with</returns> + public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> observer) + { + if (remoteEndpoint == null) + { + throw new ArgumentNullException("remoteEndpoint"); + } + if (observer == null) + { + throw new ArgumentNullException("observer"); + } + + return _observerContainer.RegisterObserver(remoteEndpoint, observer); + } + + /// <summary> + /// Registers an IObserver used to handle incoming messages from the remote host + /// at the specified IPEndPoint. + /// The IDisposable that is returned can be used to unregister the IObserver. + /// </summary> + /// <param name="observer">The IObserver to handle incoming messages</param> + /// <returns>An IDisposable used to unregister the observer with</returns> + public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer) + { + if (observer == null) + { + throw new ArgumentNullException("observer"); + } + + return _observerContainer.RegisterObserver(observer); + } + + /// <summary> + /// Release all resources for the DefaultRemoteManager. + /// </summary> + public void Dispose() + { + foreach (ProxyObserver cachedClient in _cachedClients.Values) + { + cachedClient.Dispose(); + } + + if (_server != null) + { + _server.Dispose(); + } + } + + /// <summary> + /// Observer to send messages to connected remote host + /// </summary> + private class ProxyObserver : IObserver<T>, IDisposable + { + private readonly WritableTransportClient<IWritableRemoteEvent<T>> _client; + + /// <summary> + /// Create new ProxyObserver + /// </summary> + /// <param name="client">The connected WritableTransport client used to send + /// messages to remote host</param> + public ProxyObserver(WritableTransportClient<IWritableRemoteEvent<T>> client) + { + _client = client; + } + + /// <summary> + /// Send the message to the remote host + /// </summary> + /// <param name="message">The message to send</param> + public void OnNext(T message) + { + IWritableRemoteEvent<T> remoteEvent = new WritableRemoteEvent<T>(_client.Link.LocalEndpoint, + _client.Link.RemoteEndpoint, + message); + + _client.Send(remoteEvent); + } + + /// <summary> + /// Close underlying WritableTransport client + /// </summary> + public void Dispose() + { + _client.Dispose(); + } + + public void OnError(Exception error) + { + throw error; + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs new file mode 100644 index 0000000..6d3c4ad --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Net; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.Remote.Impl; + +namespace Org.Apache.REEF.Wake.Impl +{ + /// <summary> + /// WritableRemoteManagerFactory for WritableRemoteManager. + /// </summary> + [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] + public sealed class WritableRemoteManagerFactory + { + [Inject] + private WritableRemoteManagerFactory() + { + } + + public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int port) where T : IWritable + { +#pragma warning disable 618 +// This is the one place allowed to call this constructor. Hence, disabling the warning is OK. + return new WritableRemoteManager<T>(localAddress, port); +#pragma warning disable 618 + } + + public IRemoteManager<T> GetInstance<T>() where T : IWritable + { +#pragma warning disable 618 +// This is the one place allowed to call this constructor. Hence, disabling the warning is OK. + return new WritableRemoteManager<T>(); +#pragma warning disable 618 + } + } +} \ No newline at end of file
