[REEF-422] Convert Wake Layer from Writable to Streaming
This addressed the issue by
* using `StreamingCodec` in WritableLink instead of `Writable` by
reflection.
* Removing Writable constraint on generic in `WritableLink`,
`WritableTransportServer`, `WritableTransportClient`,
`WritableRemoteManager`
* Introduce `TemporaryWritableToStreamingCodec` so that Network
Service layer can still use `Writable` interface.
`StreamingCodec` is assumed to have at most static argument parameters.
That is, it cannot have parameters that change from one message to
another.
JIRA:
[REEF-422](https://issues.apache.org/jira/browse/REEF-422)
Pull Request:
This closes #261
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/ba2653d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/ba2653d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/ba2653d6
Branch: refs/heads/master
Commit: ba2653d6bfac3aabaf640474efbdfcb18c87baf8
Parents: ec9b497
Author: Dhruv <[email protected]>
Authored: Tue Jun 30 11:01:06 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Tue Jul 7 13:56:04 2015 -0700
----------------------------------------------------------------------
.../NetworkService/WritableNetworkService.cs | 2 +-
.../Org.Apache.REEF.Wake.Tests.csproj | 5 +-
.../PrefixedStringWritable.cs | 108 -----
.../StreamingRemoteManagerTest.cs | 340 +++++++++++++++
.../StreamingTransportTest.cs | 177 ++++++++
.../WritableRemoteManagerTest.cs | 437 -------------------
.../WritableTransportTest.cs | 223 ----------
.../Org.Apache.REEF.Wake.csproj | 15 +-
.../Remote/IWritableRemoteEvent.cs | 45 --
.../Remote/Impl/RemoteEvent.cs | 4 +
.../Remote/Impl/RemoteEventStreamingCodec.cs | 82 ++++
.../Remote/Impl/StreamingLink.cs | 258 +++++++++++
.../Remote/Impl/StreamingRemoteManager.cs | 256 +++++++++++
.../Impl/StreamingRemoteManagerFactory.cs | 51 +++
.../Remote/Impl/StreamingTransportClient.cs | 125 ++++++
.../Remote/Impl/StreamingTransportServer.cs | 212 +++++++++
.../Impl/TemporaryWritableToStreamingCodec.cs | 70 +++
.../Remote/Impl/WritableLink.cs | 295 -------------
.../Remote/Impl/WritableObserverContainer.cs | 132 ------
.../Remote/Impl/WritableRemoteEvent.cs | 115 -----
.../Remote/Impl/WritableRemoteManager.cs | 286 ------------
.../Remote/Impl/WritableRemoteManagerFactory.cs | 59 ---
.../Remote/Impl/WritableTransportClient.cs | 132 ------
.../Remote/Impl/WritableTransportServer.cs | 244 -----------
24 files changed, 1585 insertions(+), 2088 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
index f383697..7d9d015 100644
--- a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
@@ -62,7 +62,7 @@ namespace Org.Apache.REEF.Network.NetworkService
IObserver<WritableNsMessage<T>> messageHandler,
IIdentifierFactory idFactory,
INameClient nameClient,
- WritableRemoteManagerFactory remoteManagerFactory)
+ StreamingRemoteManagerFactory remoteManagerFactory)
{
IPAddress localAddress = NetworkUtils.LocalIPAddress;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
index 581508b..babc26d 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
@@ -46,12 +46,11 @@ under the License.
<ItemGroup>
<Compile Include="ClockTest.cs" />
<Compile Include="MultiCodecTest.cs" />
- <Compile Include="PrefixedStringWritable.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="PubSubSubjectTest.cs" />
<Compile Include="RemoteManagerTest.cs" />
- <Compile Include="WritableRemoteManagerTest.cs" />
- <Compile Include="WritableTransportTest.cs" />
+ <Compile Include="StreamingRemoteManagerTest.cs" />
+ <Compile Include="StreamingTransportTest.cs" />
<Compile Include="TransportTest.cs" />
<Compile Include="WritableString.cs" />
</ItemGroup>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake.Tests/PrefixedStringWritable.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/PrefixedStringWritable.cs
b/lang/cs/Org.Apache.REEF.Wake.Tests/PrefixedStringWritable.cs
deleted file mode 100644
index dbb8af3..0000000
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/PrefixedStringWritable.cs
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake.Remote;
-
-namespace Org.Apache.REEF.Wake.Tests
-{
- [NamedParameter("identifier in PrefixedWritable")]
- public class StringId : Name<int>
- {
- }
-
- /// <summary>
- /// Writable wrapper around the string class which takes integer prefix
- /// This class is used to test non empty injector in TransportServer and
Client
- /// </summary>
- [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see
Jira REEF-295 ", false)]
- public class PrefixedStringWritable : IWritable
- {
- private readonly int _id;
- private string _data;
-
- /// <summary>
- /// Returns the actual string data
- /// </summary>
- public string Data
- {
- get { return _data + "_" + _id; }
- set { _data = value; }
- }
-
- /// <summary>
- /// Empty constructor for instantiation with reflection
- /// </summary>
- [Inject]
- public PrefixedStringWritable([Parameter(typeof(StringId))] int id)
- {
- _id = id;
- }
-
- /// <summary>
- /// Constructor
- /// </summary>
- /// <param name="data">The string data</param>
- public PrefixedStringWritable(string data)
- {
- _data = data;
- }
-
- /// <summary>
- /// Reads the string
- /// </summary>
- /// <param name="reader">reader to read from</param>
- public void Read(IDataReader reader)
- {
- _data = reader.ReadString();
- }
-
- /// <summary>
- /// Writes the string
- /// </summary>
- /// <param name="writer">Writer to write</param>
- public void Write(IDataWriter writer)
- {
- writer.WriteString(_data);
- }
-
- /// <summary>
- /// Reads the string
- /// </summary>
- /// <param name="reader">reader to read from</param>
- /// <param name="token">the cancellation token</param>
- public async Task ReadAsync(IDataReader reader, CancellationToken
token)
- {
- _data = await reader.ReadStringAsync(token);
- }
-
- /// <summary>
- /// Writes the string
- /// </summary>
- /// <param name="writer">Writer to write</param>
- /// <param name="token">the cancellation token</param>
- public async Task WriteAsync(IDataWriter writer, CancellationToken
token)
- {
- await writer.WriteStringAsync(_data, token);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs
b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs
new file mode 100644
index 0000000..20f75be
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Net;
+using System.Reactive;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Wake.Tests
+{
+ [TestClass]
+ public class StreamingRemoteManagerTest
+ {
+ private readonly StreamingRemoteManagerFactory _remoteManagerFactory1 =
+
TangFactory.GetTang().NewInjector().GetInstance<StreamingRemoteManagerFactory>();
+
+ private readonly StreamingRemoteManagerFactory _remoteManagerFactory2 =
+
TangFactory.GetTang().NewInjector().GetInstance<StreamingRemoteManagerFactory>();
+
+ /// <summary>
+ /// Tests one way communication between Remote Managers
+ /// Remote Manager listens on any available port
+ /// </summary>
+ [TestMethod]
+ public void TestStreamingOneWayCommunication()
+ {
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<WritableString> queue = new
BlockingCollection<WritableString>();
+ List<string> events = new List<string>();
+
+ using (var remoteManager1 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager2 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ {
+ var observer = Observer.Create<WritableString>(queue.Add);
+ IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
+ remoteManager2.RegisterObserver(endpoint1, observer);
+
+ var remoteObserver =
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+ remoteObserver.OnNext(new WritableString("abc"));
+ remoteObserver.OnNext(new WritableString("def"));
+ remoteObserver.OnNext(new WritableString("ghi"));
+
+ events.Add(queue.Take().Data);
+ events.Add(queue.Take().Data);
+ events.Add(queue.Take().Data);
+ }
+
+ Assert.AreEqual(3, events.Count);
+ }
+
+ /// <summary>
+ /// Tests two way communications. Checks whether both sides are able
to receive messages
+ /// </summary>
+ [TestMethod]
+ public void TestStreamingTwoWayCommunication()
+ {
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<WritableString> queue1 = new
BlockingCollection<WritableString>();
+ BlockingCollection<WritableString> queue2 = new
BlockingCollection<WritableString>();
+ List<string> events1 = new List<string>();
+ List<string> events2 = new List<string>();
+
+ using (var remoteManager1 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager2 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ {
+ // Register observers for remote manager 1 and remote manager 2
+ var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+ var observer1 = Observer.Create<WritableString>(queue1.Add);
+ var observer2 = Observer.Create<WritableString>(queue2.Add);
+ remoteManager1.RegisterObserver(remoteEndpoint, observer1);
+ remoteManager2.RegisterObserver(remoteEndpoint, observer2);
+
+ // Remote manager 1 sends 3 events to remote manager 2
+ var remoteObserver1 =
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+ remoteObserver1.OnNext(new WritableString("abc"));
+ remoteObserver1.OnNext(new WritableString("def"));
+ remoteObserver1.OnNext(new WritableString("ghi"));
+
+ // Remote manager 2 sends 4 events to remote manager 1
+ var remoteObserver2 =
remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
+ remoteObserver2.OnNext(new WritableString("jkl"));
+ remoteObserver2.OnNext(new WritableString("mno"));
+ remoteObserver2.OnNext(new WritableString("pqr"));
+ remoteObserver2.OnNext(new WritableString("stu"));
+
+ events1.Add(queue1.Take().Data);
+ events1.Add(queue1.Take().Data);
+ events1.Add(queue1.Take().Data);
+ events1.Add(queue1.Take().Data);
+
+ events2.Add(queue2.Take().Data);
+ events2.Add(queue2.Take().Data);
+ events2.Add(queue2.Take().Data);
+ }
+
+ Assert.AreEqual(4, events1.Count);
+ Assert.AreEqual(3, events2.Count);
+ }
+
+ /// <summary>
+ /// Tests one way communication between 3 nodes.
+ /// nodes 1 and 2 send messages to node 3
+ /// </summary>
+ [TestMethod]
+ public void TestStreamingCommunicationThreeNodesOneWay()
+ {
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<WritableString> queue = new
BlockingCollection<WritableString>();
+ List<string> events = new List<string>();
+
+ using (var remoteManager1 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager2 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager3 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ {
+ var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+ var observer = Observer.Create<WritableString>(queue.Add);
+ remoteManager3.RegisterObserver(remoteEndpoint, observer);
+
+ var remoteObserver1 =
remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
+ var remoteObserver2 =
remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
+
+ remoteObserver2.OnNext(new WritableString("abc"));
+ remoteObserver1.OnNext(new WritableString("def"));
+ remoteObserver2.OnNext(new WritableString("ghi"));
+ remoteObserver1.OnNext(new WritableString("jkl"));
+ remoteObserver2.OnNext(new WritableString("mno"));
+
+ for (int i = 0; i < 5; i++)
+ {
+ events.Add(queue.Take().Data);
+ }
+ }
+
+ Assert.AreEqual(5, events.Count);
+ }
+
+ /// <summary>
+ /// Tests one way communication between 3 nodes.
+ /// nodes 1 and 2 send messages to node 3 and node 3 sends message back
+ /// </summary>
+ [TestMethod]
+ public void TestStreamingCommunicationThreeNodesBothWays()
+ {
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<WritableString> queue1 = new
BlockingCollection<WritableString>();
+ BlockingCollection<WritableString> queue2 = new
BlockingCollection<WritableString>();
+ BlockingCollection<WritableString> queue3 = new
BlockingCollection<WritableString>();
+ List<string> events1 = new List<string>();
+ List<string> events2 = new List<string>();
+ List<string> events3 = new List<string>();
+
+ using (var remoteManager1 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager2 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager3 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ {
+ var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+
+ var observer = Observer.Create<WritableString>(queue1.Add);
+ remoteManager1.RegisterObserver(remoteEndpoint, observer);
+ var observer2 = Observer.Create<WritableString>(queue2.Add);
+ remoteManager2.RegisterObserver(remoteEndpoint, observer2);
+ var observer3 = Observer.Create<WritableString>(queue3.Add);
+ remoteManager3.RegisterObserver(remoteEndpoint, observer3);
+
+ var remoteObserver1 =
remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
+ var remoteObserver2 =
remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
+
+ // Observer 1 and 2 send messages to observer 3
+ remoteObserver1.OnNext(new WritableString("abc"));
+ remoteObserver1.OnNext(new WritableString("abc"));
+ remoteObserver1.OnNext(new WritableString("abc"));
+ remoteObserver2.OnNext(new WritableString("def"));
+ remoteObserver2.OnNext(new WritableString("def"));
+
+ // Observer 3 sends messages back to observers 1 and 2
+ var remoteObserver3A =
remoteManager3.GetRemoteObserver(remoteManager1.LocalEndpoint);
+ var remoteObserver3B =
remoteManager3.GetRemoteObserver(remoteManager2.LocalEndpoint);
+
+ remoteObserver3A.OnNext(new WritableString("ghi"));
+ remoteObserver3A.OnNext(new WritableString("ghi"));
+ remoteObserver3B.OnNext(new WritableString("jkl"));
+ remoteObserver3B.OnNext(new WritableString("jkl"));
+ remoteObserver3B.OnNext(new WritableString("jkl"));
+
+ events1.Add(queue1.Take().Data);
+ events1.Add(queue1.Take().Data);
+
+ events2.Add(queue2.Take().Data);
+ events2.Add(queue2.Take().Data);
+ events2.Add(queue2.Take().Data);
+
+ events3.Add(queue3.Take().Data);
+ events3.Add(queue3.Take().Data);
+ events3.Add(queue3.Take().Data);
+ events3.Add(queue3.Take().Data);
+ events3.Add(queue3.Take().Data);
+ }
+
+ Assert.AreEqual(2, events1.Count);
+ Assert.AreEqual(3, events2.Count);
+ Assert.AreEqual(5, events3.Count);
+ }
+
+ /// <summary>
+ /// Tests whether remote manager is able to send acknowledgement back
+ /// </summary>
+ [TestMethod]
+ public void TestStreamingRemoteSenderCallback()
+ {
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<WritableString> queue = new
BlockingCollection<WritableString>();
+ List<string> events = new List<string>();
+
+ using (var remoteManager1 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager2 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ {
+ // Register handler for when remote manager 2 receives events;
respond
+ // with an ack
+ var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+ var remoteObserver2 =
remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
+
+ var receiverObserver = Observer.Create<WritableString>(
+ message => remoteObserver2.OnNext(new
WritableString("received message: " + message.Data)));
+ remoteManager2.RegisterObserver(remoteEndpoint,
receiverObserver);
+
+ // Register handler for remote manager 1 to record the ack
+ var senderObserver =
Observer.Create<WritableString>(queue.Add);
+ remoteManager1.RegisterObserver(remoteEndpoint,
senderObserver);
+
+ // Begin to send messages
+ var remoteObserver1 =
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+ remoteObserver1.OnNext(new WritableString("hello"));
+ remoteObserver1.OnNext(new WritableString("there"));
+ remoteObserver1.OnNext(new WritableString("buddy"));
+
+ events.Add(queue.Take().Data);
+ events.Add(queue.Take().Data);
+ events.Add(queue.Take().Data);
+ }
+
+ Assert.AreEqual(3, events.Count);
+ Assert.AreEqual("received message: hello", events[0]);
+ Assert.AreEqual("received message: there", events[1]);
+ Assert.AreEqual("received message: buddy", events[2]);
+ }
+
+ /// <summary>
+ /// Test whether observer can be created with IRemoteMessage interface
+ /// </summary>
+ [TestMethod]
+ public void TestStreamingRegisterObserverByType()
+ {
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<WritableString> queue = new
BlockingCollection<WritableString>();
+ List<string> events = new List<string>();
+
+ using (var remoteManager1 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager2 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ {
+ // RemoteManager2 listens and records events of type
IRemoteEvent<WritableString>
+ var observer =
Observer.Create<IRemoteMessage<WritableString>>(message =>
queue.Add(message.Message));
+ remoteManager2.RegisterObserver(observer);
+
+ // Remote manager 1 sends 3 events to remote manager 2
+ var remoteObserver =
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+ remoteObserver.OnNext(new WritableString("abc"));
+ remoteObserver.OnNext(new WritableString("def"));
+ remoteObserver.OnNext(new WritableString("ghi"));
+
+ events.Add(queue.Take().Data);
+ events.Add(queue.Take().Data);
+ events.Add(queue.Take().Data);
+ }
+
+ Assert.AreEqual(3, events.Count);
+ }
+
+ /// <summary>
+ /// Tests whether we get the cached observer back for sending message
without reinstantiating it
+ /// </summary>
+ [TestMethod]
+ public void TestStreamingCachedConnection()
+ {
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<WritableString> queue = new
BlockingCollection<WritableString>();
+ List<string> events = new List<string>();
+
+ using (var remoteManager1 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager2 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ {
+ var observer = Observer.Create<WritableString>(queue.Add);
+ IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
+ remoteManager2.RegisterObserver(endpoint1, observer);
+
+ var remoteObserver =
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+ remoteObserver.OnNext(new WritableString("abc"));
+ remoteObserver.OnNext(new WritableString("def"));
+
+ var cachedObserver =
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+ cachedObserver.OnNext(new WritableString("ghi"));
+ cachedObserver.OnNext(new WritableString("jkl"));
+
+ events.Add(queue.Take().Data);
+ events.Add(queue.Take().Data);
+ events.Add(queue.Take().Data);
+ events.Add(queue.Take().Data);
+ }
+
+ Assert.AreEqual(4, events.Count);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingTransportTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingTransportTest.cs
b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingTransportTest.cs
new file mode 100644
index 0000000..268da70
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingTransportTest.cs
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Net;
+using System.Reactive;
+using System.Threading.Tasks;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.Remote.Parameters;
+using Org.Apache.REEF.Wake.StreamingCodec;
+using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
+
+namespace Org.Apache.REEF.Wake.Tests
+{
+ /// <summary>
+ /// Tests the StreamingTransportServer, StreamingTransportClient and
StreamingLink.
+ /// Basically the Wake transport layer.
+ /// </summary>
+ [TestClass]
+ public class StreamingTransportTest
+ {
+ private readonly ITcpPortProvider _tcpPortProvider =
GetTcpProvider(8900, 8940);
+ private readonly IInjector _injector =
TangFactory.GetTang().NewInjector();
+
+ /// <summary>
+ /// Tests whether StreamingTransportServer receives
+ /// string messages from StreamingTransportClient
+ /// </summary>
+ [TestMethod]
+ public void TestStreamingTransportServer()
+ {
+ BlockingCollection<string> queue = new
BlockingCollection<string>();
+ List<string> events = new List<string>();
+ IStreamingCodec<string> stringCodec =
_injector.GetInstance<StringStreamingCodec>();
+
+ IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
+ var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent
=> queue.Add(tEvent.Data));
+
+ using (var server = new
StreamingTransportServer<string>(endpoint.Address, remoteHandler,
_tcpPortProvider, stringCodec))
+ {
+ server.Run();
+
+ IPEndPoint remoteEndpoint = new
IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
+ using (var client = new
StreamingTransportClient<string>(remoteEndpoint, stringCodec))
+ {
+ client.Send("Hello");
+ client.Send(", ");
+ client.Send("World!");
+
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ }
+ }
+
+ Assert.AreEqual(3, events.Count);
+ Assert.AreEqual(events[0], "Hello");
+ Assert.AreEqual(events[1], ", ");
+ Assert.AreEqual(events[2], "World!");
+ }
+
+ /// <summary>
+ /// Checks whether StreamingTransportClient is able to receive
messages from remote host
+ /// </summary>
+ [TestMethod]
+ public void TestStreamingTransportSenderStage()
+ {
+
+ IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
+
+ List<string> events = new List<string>();
+ BlockingCollection<string> queue = new
BlockingCollection<string>();
+ IStreamingCodec<string> stringCodec =
_injector.GetInstance<StringStreamingCodec>();
+
+ // Server echoes the message back to the client
+ var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent
=> tEvent.Link.Write(tEvent.Data));
+
+ using (var server = new
StreamingTransportServer<string>(endpoint.Address, remoteHandler,
_tcpPortProvider, stringCodec))
+ {
+ server.Run();
+
+ var clientHandler =
Observer.Create<TransportEvent<string>>(tEvent => queue.Add(tEvent.Data));
+ IPEndPoint remoteEndpoint = new
IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
+ using (var client = new
StreamingTransportClient<string>(remoteEndpoint, clientHandler, stringCodec))
+ {
+ client.Send("Hello");
+ client.Send(", ");
+ client.Send(" World");
+
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ }
+ }
+
+ Assert.AreEqual(3, events.Count);
+ Assert.AreEqual(events[0], "Hello");
+ Assert.AreEqual(events[1], ", ");
+ Assert.AreEqual(events[2], " World");
+ }
+
+ /// <summary>
+ /// Checks whether StreamingTransportClient and
StreamingTransportServer works
+ /// in asynchronous condition while sending messages asynchronously
from different
+ /// threads
+ /// </summary>
+ [TestMethod]
+ public void TestStreamingRaceCondition()
+ {
+ BlockingCollection<string> queue = new
BlockingCollection<string>();
+ List<string> events = new List<string>();
+ IStreamingCodec<string> stringCodec =
_injector.GetInstance<StringStreamingCodec>();
+ int numEventsExpected = 150;
+
+ IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
+ var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent
=> queue.Add(tEvent.Data));
+
+ using (var server = new
StreamingTransportServer<string>(endpoint.Address, remoteHandler,
_tcpPortProvider, stringCodec))
+ {
+ server.Run();
+
+ for (int i = 0; i < numEventsExpected / 3; i++)
+ {
+ Task.Run(() =>
+ {
+ IPEndPoint remoteEndpoint = new
IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
+ using (var client = new
StreamingTransportClient<string>(remoteEndpoint, stringCodec))
+ {
+ client.Send("Hello");
+ client.Send(", ");
+ client.Send("World!");
+ }
+ });
+ }
+
+ for (int i = 0; i < numEventsExpected; i++)
+ {
+ events.Add(queue.Take());
+ }
+ }
+
+ Assert.AreEqual(numEventsExpected, events.Count);
+
+ }
+
+ private static ITcpPortProvider GetTcpProvider(int portRangeStart, int
portRangeEnd)
+ {
+ var configuration = TangFactory.GetTang().NewConfigurationBuilder()
+ .BindImplementation<ITcpPortProvider, TcpPortProvider>()
+
.BindIntNamedParam<TcpPortRangeStart>(portRangeStart.ToString())
+ .BindIntNamedParam<TcpPortRangeCount>((portRangeEnd -
portRangeStart + 1).ToString())
+ .Build();
+ return
TangFactory.GetTang().NewInjector(configuration).GetInstance<ITcpPortProvider>();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs
b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs
deleted file mode 100644
index 49c0f5b..0000000
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs
+++ /dev/null
@@ -1,437 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Globalization;
-using System.IO;
-using System.Net;
-using System.Reactive;
-using System.Threading.Tasks;
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Wake.Impl;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Remote.Impl;
-using Org.Apache.REEF.Wake.Util;
-
-namespace Org.Apache.REEF.Wake.Tests
-{
- [TestClass]
- [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see
Jira REEF-295 ", false)]
- public class WritableRemoteManagerTest
- {
- private const int Id = 5;
-
- private static IConfiguration _config =
TangFactory.GetTang().NewConfigurationBuilder().BindNamedParameter<StringId,
int>(
- GenericType<StringId>.Class,
Id.ToString(CultureInfo.InvariantCulture)).Build();
-
- private readonly WritableRemoteManagerFactory _remoteManagerFactory1 =
-
TangFactory.GetTang().NewInjector().GetInstance<WritableRemoteManagerFactory>();
-
- private readonly WritableRemoteManagerFactory _remoteManagerFactory2 =
-
TangFactory.GetTang().NewInjector(_config).GetInstance<WritableRemoteManagerFactory>();
-
- /// <summary>
- /// Tests one way communication between Remote Managers
- /// Remote Manager listens on any available port
- /// </summary>
- [TestMethod]
- public void TestWritableOneWayCommunication()
- {
- IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
- BlockingCollection<WritableString> queue = new
BlockingCollection<WritableString>();
- List<string> events = new List<string>();
-
- using (var remoteManager1 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager2 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- {
- var observer = Observer.Create<WritableString>(queue.Add);
- IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
- remoteManager2.RegisterObserver(endpoint1, observer);
-
- var remoteObserver =
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
- remoteObserver.OnNext(new WritableString("abc"));
- remoteObserver.OnNext(new WritableString("def"));
- remoteObserver.OnNext(new WritableString("ghi"));
-
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- }
-
- Assert.AreEqual(3, events.Count);
- }
-
- /// <summary>
- /// Tests one way communication between Remote Managers
- /// Remote manager listens on a particular port
- /// </summary>
- [TestMethod]
- public void TestWritableOneWayCommunicationClientOnly()
- {
- int listeningPort = NetworkUtils.GenerateRandomPort(8900, 8940);
- IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
- BlockingCollection<WritableString> queue = new
BlockingCollection<WritableString>();
- List<string> events = new List<string>();
-
- using (var remoteManager1 =
_remoteManagerFactory1.GetInstance<WritableString>())
- using (var remoteManager2 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress,
listeningPort))
- {
- IPEndPoint remoteEndpoint = new IPEndPoint(listeningAddress,
0);
- var observer = Observer.Create<WritableString>(queue.Add);
- remoteManager2.RegisterObserver(remoteEndpoint, observer);
-
- var remoteObserver =
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
- remoteObserver.OnNext(new WritableString("abc"));
- remoteObserver.OnNext(new WritableString("def"));
- remoteObserver.OnNext(new WritableString("ghi"));
-
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- }
-
- Assert.AreEqual(3, events.Count);
- }
-
- /// <summary>
- /// Tests two way communications. Checks whether both sides are able
to receive messages
- /// </summary>
- [TestMethod]
- public void TestWritableTwoWayCommunication()
- {
- IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
- BlockingCollection<WritableString> queue1 = new
BlockingCollection<WritableString>();
- BlockingCollection<WritableString> queue2 = new
BlockingCollection<WritableString>();
- List<string> events1 = new List<string>();
- List<string> events2 = new List<string>();
-
- using (var remoteManager1 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager2 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- {
- // Register observers for remote manager 1 and remote manager 2
- var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
- var observer1 = Observer.Create<WritableString>(queue1.Add);
- var observer2 = Observer.Create<WritableString>(queue2.Add);
- remoteManager1.RegisterObserver(remoteEndpoint, observer1);
- remoteManager2.RegisterObserver(remoteEndpoint, observer2);
-
- // Remote manager 1 sends 3 events to remote manager 2
- var remoteObserver1 =
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
- remoteObserver1.OnNext(new WritableString("abc"));
- remoteObserver1.OnNext(new WritableString("def"));
- remoteObserver1.OnNext(new WritableString("ghi"));
-
- // Remote manager 2 sends 4 events to remote manager 1
- var remoteObserver2 =
remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
- remoteObserver2.OnNext(new WritableString("jkl"));
- remoteObserver2.OnNext(new WritableString("mno"));
- remoteObserver2.OnNext(new WritableString("pqr"));
- remoteObserver2.OnNext(new WritableString("stu"));
-
- events1.Add(queue1.Take().Data);
- events1.Add(queue1.Take().Data);
- events1.Add(queue1.Take().Data);
- events1.Add(queue1.Take().Data);
-
- events2.Add(queue2.Take().Data);
- events2.Add(queue2.Take().Data);
- events2.Add(queue2.Take().Data);
- }
-
- Assert.AreEqual(4, events1.Count);
- Assert.AreEqual(3, events2.Count);
- }
-
- /// <summary>
- /// Tests two way communications where message needs an injectable
argument
- /// to be passed. Checks whether both sides are able to receive
messages
- /// </summary>
- [TestMethod]
- public void TestNonEmptyArgumentInjectionWritableTwoWayCommunication()
- {
- IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
- BlockingCollection<PrefixedStringWritable> queue1 = new
BlockingCollection<PrefixedStringWritable>();
- BlockingCollection<PrefixedStringWritable> queue2 = new
BlockingCollection<PrefixedStringWritable>();
- List<string> events1 = new List<string>();
- List<string> events2 = new List<string>();
-
- using (var remoteManager1 =
_remoteManagerFactory2.GetInstance<PrefixedStringWritable>(listeningAddress, 0))
- using (var remoteManager2 =
_remoteManagerFactory2.GetInstance<PrefixedStringWritable>(listeningAddress, 0))
- {
- // Register observers for remote manager 1 and remote manager 2
- var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
- var observer1 =
Observer.Create<PrefixedStringWritable>(queue1.Add);
- var observer2 =
Observer.Create<PrefixedStringWritable>(queue2.Add);
- remoteManager1.RegisterObserver(remoteEndpoint, observer1);
- remoteManager2.RegisterObserver(remoteEndpoint, observer2);
-
- // Remote manager 1 sends 3 events to remote manager 2
- var remoteObserver1 =
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
- remoteObserver1.OnNext(new PrefixedStringWritable("abc"));
- remoteObserver1.OnNext(new PrefixedStringWritable("def"));
- remoteObserver1.OnNext(new PrefixedStringWritable("ghi"));
-
- // Remote manager 2 sends 4 events to remote manager 1
- var remoteObserver2 =
remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
- remoteObserver2.OnNext(new PrefixedStringWritable("jkl"));
- remoteObserver2.OnNext(new PrefixedStringWritable("mno"));
- remoteObserver2.OnNext(new PrefixedStringWritable("pqr"));
- remoteObserver2.OnNext(new PrefixedStringWritable("stu"));
-
- events1.Add(queue1.Take().Data);
- events1.Add(queue1.Take().Data);
- events1.Add(queue1.Take().Data);
- events1.Add(queue1.Take().Data);
-
- events2.Add(queue2.Take().Data);
- events2.Add(queue2.Take().Data);
- events2.Add(queue2.Take().Data);
- }
-
- Assert.AreEqual(4, events1.Count);
- Assert.AreEqual(3, events2.Count);
- }
-
- /// <summary>
- /// Tests one way communication between 3 nodes.
- /// nodes 1 and 2 send messages to node 3
- /// </summary>
- [TestMethod]
- public void TestWritableCommunicationThreeNodesOneWay()
- {
- IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
- BlockingCollection<WritableString> queue = new
BlockingCollection<WritableString>();
- List<string> events = new List<string>();
-
- using (var remoteManager1 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager2 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager3 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- {
- var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
- var observer = Observer.Create<WritableString>(queue.Add);
- remoteManager3.RegisterObserver(remoteEndpoint, observer);
-
- var remoteObserver1 =
remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
- var remoteObserver2 =
remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
-
- remoteObserver2.OnNext(new WritableString("abc"));
- remoteObserver1.OnNext(new WritableString("def"));
- remoteObserver2.OnNext(new WritableString("ghi"));
- remoteObserver1.OnNext(new WritableString("jkl"));
- remoteObserver2.OnNext(new WritableString("mno"));
-
- for (int i = 0; i < 5; i++)
- {
- events.Add(queue.Take().Data);
- }
- }
-
- Assert.AreEqual(5, events.Count);
- }
-
- /// <summary>
- /// Tests one way communication between 3 nodes.
- /// nodes 1 and 2 send messages to node 3 and node 3 sends message back
- /// </summary>
- [TestMethod]
- public void TestWritableCommunicationThreeNodesBothWays()
- {
- IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
- BlockingCollection<WritableString> queue1 = new
BlockingCollection<WritableString>();
- BlockingCollection<WritableString> queue2 = new
BlockingCollection<WritableString>();
- BlockingCollection<WritableString> queue3 = new
BlockingCollection<WritableString>();
- List<string> events1 = new List<string>();
- List<string> events2 = new List<string>();
- List<string> events3 = new List<string>();
-
- using (var remoteManager1 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager2 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager3 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- {
- var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
-
- var observer = Observer.Create<WritableString>(queue1.Add);
- remoteManager1.RegisterObserver(remoteEndpoint, observer);
- var observer2 = Observer.Create<WritableString>(queue2.Add);
- remoteManager2.RegisterObserver(remoteEndpoint, observer2);
- var observer3 = Observer.Create<WritableString>(queue3.Add);
- remoteManager3.RegisterObserver(remoteEndpoint, observer3);
-
- var remoteObserver1 =
remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
- var remoteObserver2 =
remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
-
- // Observer 1 and 2 send messages to observer 3
- remoteObserver1.OnNext(new WritableString("abc"));
- remoteObserver1.OnNext(new WritableString("abc"));
- remoteObserver1.OnNext(new WritableString("abc"));
- remoteObserver2.OnNext(new WritableString("def"));
- remoteObserver2.OnNext(new WritableString("def"));
-
- // Observer 3 sends messages back to observers 1 and 2
- var remoteObserver3A =
remoteManager3.GetRemoteObserver(remoteManager1.LocalEndpoint);
- var remoteObserver3B =
remoteManager3.GetRemoteObserver(remoteManager2.LocalEndpoint);
-
- remoteObserver3A.OnNext(new WritableString("ghi"));
- remoteObserver3A.OnNext(new WritableString("ghi"));
- remoteObserver3B.OnNext(new WritableString("jkl"));
- remoteObserver3B.OnNext(new WritableString("jkl"));
- remoteObserver3B.OnNext(new WritableString("jkl"));
-
- events1.Add(queue1.Take().Data);
- events1.Add(queue1.Take().Data);
-
- events2.Add(queue2.Take().Data);
- events2.Add(queue2.Take().Data);
- events2.Add(queue2.Take().Data);
-
- events3.Add(queue3.Take().Data);
- events3.Add(queue3.Take().Data);
- events3.Add(queue3.Take().Data);
- events3.Add(queue3.Take().Data);
- events3.Add(queue3.Take().Data);
- }
-
- Assert.AreEqual(2, events1.Count);
- Assert.AreEqual(3, events2.Count);
- Assert.AreEqual(5, events3.Count);
- }
-
- /// <summary>
- /// Tests whether remote manager is able to send acknowledgement back
- /// </summary>
- [TestMethod]
- public void TestWritableRemoteSenderCallback()
- {
- IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
- BlockingCollection<WritableString> queue = new
BlockingCollection<WritableString>();
- List<string> events = new List<string>();
-
- using (var remoteManager1 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager2 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- {
- // Register handler for when remote manager 2 receives events;
respond
- // with an ack
- var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
- var remoteObserver2 =
remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
-
- var receiverObserver = Observer.Create<WritableString>(
- message => remoteObserver2.OnNext(new
WritableString("received message: " + message.Data)));
- remoteManager2.RegisterObserver(remoteEndpoint,
receiverObserver);
-
- // Register handler for remote manager 1 to record the ack
- var senderObserver =
Observer.Create<WritableString>(queue.Add);
- remoteManager1.RegisterObserver(remoteEndpoint,
senderObserver);
-
- // Begin to send messages
- var remoteObserver1 =
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
- remoteObserver1.OnNext(new WritableString("hello"));
- remoteObserver1.OnNext(new WritableString("there"));
- remoteObserver1.OnNext(new WritableString("buddy"));
-
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- }
-
- Assert.AreEqual(3, events.Count);
- Assert.AreEqual("received message: hello", events[0]);
- Assert.AreEqual("received message: there", events[1]);
- Assert.AreEqual("received message: buddy", events[2]);
- }
-
- /// <summary>
- /// Test whether observer can be created with IRemoteMessage interface
- /// </summary>
- [TestMethod]
- public void TestWritableRegisterObserverByType()
- {
- IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
- BlockingCollection<WritableString> queue = new
BlockingCollection<WritableString>();
- List<string> events = new List<string>();
-
- using (var remoteManager1 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager2 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- {
- // RemoteManager2 listens and records events of type
IRemoteEvent<WritableString>
- var observer =
Observer.Create<IRemoteMessage<WritableString>>(message =>
queue.Add(message.Message));
- remoteManager2.RegisterObserver(observer);
-
- // Remote manager 1 sends 3 events to remote manager 2
- var remoteObserver =
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
- remoteObserver.OnNext(new WritableString("abc"));
- remoteObserver.OnNext(new WritableString("def"));
- remoteObserver.OnNext(new WritableString("ghi"));
-
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- }
-
- Assert.AreEqual(3, events.Count);
- }
-
- /// <summary>
- /// Tests whether we get the cached observer back for sending message
without reinstantiating it
- /// </summary>
- [TestMethod]
- public void TestWritableCachedConnection()
- {
- IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
- BlockingCollection<WritableString> queue = new
BlockingCollection<WritableString>();
- List<string> events = new List<string>();
-
- using (var remoteManager1 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager2 =
_remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- {
- var observer = Observer.Create<WritableString>(queue.Add);
- IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
- remoteManager2.RegisterObserver(endpoint1, observer);
-
- var remoteObserver =
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
- remoteObserver.OnNext(new WritableString("abc"));
- remoteObserver.OnNext(new WritableString("def"));
-
- var cachedObserver =
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
- cachedObserver.OnNext(new WritableString("ghi"));
- cachedObserver.OnNext(new WritableString("jkl"));
-
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- }
-
- Assert.AreEqual(4, events.Count);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs
b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs
deleted file mode 100644
index 2255cfa..0000000
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs
+++ /dev/null
@@ -1,223 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Globalization;
-using System.IO;
-using System.Net;
-using System.Reactive;
-using System.Threading;
-using System.Threading.Tasks;
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Remote.Impl;
-using Org.Apache.REEF.Wake.Remote.Parameters;
-using Org.Apache.REEF.Wake.Util;
-
-namespace Org.Apache.REEF.Wake.Tests
-{
- /// <summary>
- /// Tests the WritableTransportServer, WritableTransportClient and
WritableLink.
- /// Basically the Wake transport layer.
- /// </summary>
- [TestClass]
- [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see
Jira REEF-295 ", false)]
- public class WritableTransportTest
- {
- private readonly ITcpPortProvider _tcpPortProvider =
GetTcpProvider(8900, 8940);
- private readonly IInjector _injector =
TangFactory.GetTang().NewInjector();
-
- /// <summary>
- /// Tests whether WritableTransportServer receives
- /// string messages from WritableTransportClient
- /// </summary>
- [TestMethod]
- public void TestWritableTransportServer()
- {
- BlockingCollection<WritableString> queue = new
BlockingCollection<WritableString>();
- List<string> events = new List<string>();
-
- IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
- var remoteHandler =
Observer.Create<TransportEvent<WritableString>>(tEvent =>
queue.Add(tEvent.Data));
-
- using (var server = new
WritableTransportServer<WritableString>(endpoint, remoteHandler,
_tcpPortProvider, _injector))
- {
- server.Run();
-
- IPEndPoint remoteEndpoint = new
IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
- using (var client = new
WritableTransportClient<WritableString>(remoteEndpoint, _injector))
- {
- client.Send(new WritableString("Hello"));
- client.Send(new WritableString(", "));
- client.Send(new WritableString("World!"));
-
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- }
- }
-
- Assert.AreEqual(3, events.Count);
- Assert.AreEqual(events[0], "Hello");
- Assert.AreEqual(events[1], ", ");
- Assert.AreEqual(events[2], "World!");
- }
-
-
- /// <summary>
- /// Tests whether WritableTransportServer receives
- /// string messages from WritableTransportClient with non empty
injector
- /// </summary>
- [TestMethod]
- public void TestNonEmptyInjectionTransportServer()
- {
- int id = 5;
- IConfiguration config =
TangFactory.GetTang().NewConfigurationBuilder().BindNamedParameter<StringId,
int>(
- GenericType<StringId>.Class,
id.ToString(CultureInfo.InvariantCulture)).Build();
-
- IInjector injector = TangFactory.GetTang().NewInjector(config);
-
- BlockingCollection<PrefixedStringWritable> queue = new
BlockingCollection<PrefixedStringWritable>();
- List<string> events = new List<string>();
-
- IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
- var remoteHandler =
Observer.Create<TransportEvent<PrefixedStringWritable>>(tEvent =>
queue.Add(tEvent.Data));
-
- using (var server = new
WritableTransportServer<PrefixedStringWritable>(endpoint, remoteHandler,
_tcpPortProvider, injector.ForkInjector()))
- {
- server.Run();
-
- IPEndPoint remoteEndpoint = new
IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
- using (var client = new
WritableTransportClient<PrefixedStringWritable>(remoteEndpoint,
injector.ForkInjector()))
- {
- client.Send(new PrefixedStringWritable("Hello"));
- client.Send(new PrefixedStringWritable(", "));
- client.Send(new PrefixedStringWritable("World!"));
-
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- }
- }
-
- Assert.AreEqual(3, events.Count);
- Assert.AreEqual(events[0], "Hello_" + id);
- Assert.AreEqual(events[1], ", _" + id);
- Assert.AreEqual(events[2], "World!_" + id);
- }
-
-
- /// <summary>
- /// Checks whether WritableTransportClient is able to receive messages
from remote host
- /// </summary>
- [TestMethod]
- public void TestWritableTransportSenderStage()
- {
-
- IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
-
- List<string> events = new List<string>();
- BlockingCollection<WritableString> queue = new
BlockingCollection<WritableString>();
-
- // Server echoes the message back to the client
- var remoteHandler =
Observer.Create<TransportEvent<WritableString>>(tEvent =>
tEvent.Link.Write(tEvent.Data));
-
- using (var server = new
WritableTransportServer<WritableString>(endpoint, remoteHandler,
_tcpPortProvider, _injector))
- {
- server.Run();
-
- var clientHandler =
Observer.Create<TransportEvent<WritableString>>(tEvent =>
queue.Add(tEvent.Data));
- IPEndPoint remoteEndpoint = new
IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
- using (var client = new
WritableTransportClient<WritableString>(remoteEndpoint, clientHandler,
_injector))
- {
- client.Send(new WritableString("Hello"));
- client.Send(new WritableString(", "));
- client.Send(new WritableString(" World"));
-
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- }
- }
-
- Assert.AreEqual(3, events.Count);
- Assert.AreEqual(events[0], "Hello");
- Assert.AreEqual(events[1], ", ");
- Assert.AreEqual(events[2], " World");
- }
-
- /// <summary>
- /// Checks whether WritableTransportClient and WritableTransportServer
works
- /// in asynchronous condition while sending messages asynchronously
from different
- /// threads
- /// </summary>
- [TestMethod]
- public void TestWritableRaceCondition()
- {
- BlockingCollection<WritableString> queue = new
BlockingCollection<WritableString>();
- List<string> events = new List<string>();
- int numEventsExpected = 150;
-
- IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
- var remoteHandler =
Observer.Create<TransportEvent<WritableString>>(tEvent =>
queue.Add(tEvent.Data));
-
- using (var server = new
WritableTransportServer<WritableString>(endpoint, remoteHandler,
_tcpPortProvider, _injector))
- {
- server.Run();
-
- for (int i = 0; i < numEventsExpected / 3; i++)
- {
- Task.Run(() =>
- {
- IPEndPoint remoteEndpoint = new
IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
- using (var client = new
WritableTransportClient<WritableString>(remoteEndpoint, _injector))
- {
- client.Send(new WritableString("Hello"));
- client.Send(new WritableString(", "));
- client.Send(new WritableString("World!"));
- }
- });
- }
-
- for (int i = 0; i < numEventsExpected; i++)
- {
- events.Add(queue.Take().Data);
- }
- }
-
- Assert.AreEqual(numEventsExpected, events.Count);
-
- }
-
- private static ITcpPortProvider GetTcpProvider(int portRangeStart, int
portRangeEnd)
- {
- var configuration = TangFactory.GetTang().NewConfigurationBuilder()
- .BindImplementation<ITcpPortProvider, TcpPortProvider>()
-
.BindIntNamedParam<TcpPortRangeStart>(portRangeStart.ToString())
- .BindIntNamedParam<TcpPortRangeCount>((portRangeEnd -
portRangeStart + 1).ToString())
- .Build();
- return
TangFactory.GetTang().NewInjector(configuration).GetInstance<ITcpPortProvider>();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
index d2b2970..4069d15 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
@@ -48,7 +48,9 @@ under the License.
<Compile Include="IEventHandler.cs" />
<Compile Include="IIdentifier.cs" />
<Compile Include="IIdentifierFactory.cs" />
- <Compile Include="Remote\Impl\WritableRemoteManagerFactory.cs" />
+ <Compile Include="Remote\Impl\RemoteEventStreamingCodec.cs" />
+ <Compile Include="Remote\Impl\TemporaryWritableToStreamingCodec.cs" />
+ <Compile Include="Remote\Impl\StreamingRemoteManagerFactory.cs" />
<Compile Include="Remote\Impl\DefaultRemoteManagerFactory.cs" />
<Compile Include="Impl\LoggingEventHandler.cs" />
<Compile Include="Impl\MissingStartHandlerHandler.cs" />
@@ -71,15 +73,12 @@ under the License.
<Compile Include="Remote\IDataWriter.cs" />
<Compile Include="Remote\Impl\StreamDataReader.cs" />
<Compile Include="Remote\Impl\StreamDataWriter.cs" />
- <Compile Include="Remote\Impl\WritableRemoteManager.cs" />
- <Compile Include="Remote\Impl\WritableLink.cs" />
- <Compile Include="Remote\Impl\WritableObserverContainer.cs" />
- <Compile Include="Remote\Impl\WritableRemoteEvent.cs" />
- <Compile Include="Remote\Impl\WritableTransportClient.cs" />
- <Compile Include="Remote\Impl\WritableTransportServer.cs" />
+ <Compile Include="Remote\Impl\StreamingRemoteManager.cs" />
+ <Compile Include="Remote\Impl\StreamingLink.cs" />
+ <Compile Include="Remote\Impl\StreamingTransportClient.cs" />
+ <Compile Include="Remote\Impl\StreamingTransportServer.cs" />
<Compile Include="Remote\IRemoteManagerFactory.cs" />
<Compile Include="Remote\IWritable.cs" />
- <Compile Include="Remote\IWritableRemoteEvent.cs" />
<Compile Include="Remote\Proto\WakeRemoteProtosGen.cs" />
<Compile Include="Remote\ICodec.cs" />
<Compile Include="Remote\ICodecFactory.cs" />
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/IWritableRemoteEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IWritableRemoteEvent.cs
b/lang/cs/Org.Apache.REEF.Wake/Remote/IWritableRemoteEvent.cs
deleted file mode 100644
index 40222aa..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/IWritableRemoteEvent.cs
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Linq.Expressions;
-using System.Net;
-
-namespace Org.Apache.REEF.Wake.Remote
-{
- /// <summary>
- /// Interface for remote event
- /// </summary>
- /// <typeparam name="T">Type of remote event message. It is assumed that T
implements IWritable</typeparam>
- [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see
Jira REEF-295 ", false)]
- internal interface IWritableRemoteEvent<T> : IWritable where T : IWritable
- {
- /// <summary>
- /// Local Endpoint
- /// </summary>
- IPEndPoint LocalEndPoint { get; set; }
-
- /// <summary>
- /// Remote Endpoint
- /// </summary>
- IPEndPoint RemoteEndPoint { get; set; }
-
- T Value { get; }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs
index b39b20f..bfed7f9 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs
@@ -17,6 +17,7 @@
* under the License.
*/
+using System;
using System.Net;
using Org.Apache.REEF.Tang.Annotations;
@@ -50,12 +51,15 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
public IPEndPoint RemoteEndPoint { get; set; }
+ [Obsolete("This field is never used and will be removed as part of
0.13. See [REEF-445]", false)]
public string Source { get; set; }
+ [Obsolete("This field is never used and will be removed as part of
0.13. See [REEF-445]", false)]
public string Sink { get; set; }
public T Value { get; set; }
+ [Obsolete("This field is never used and will be removed as part of
0.13. See [REEF-445]", false)]
public long Sequence { get; set; }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventStreamingCodec.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventStreamingCodec.cs
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventStreamingCodec.cs
new file mode 100644
index 0000000..01acd73
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventStreamingCodec.cs
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+ /// <summary>
+ /// Writable remote event class
+ /// </summary>
+ /// <typeparam name="T">Type of remote event message. It is assumed that T
implements IWritable</typeparam>
+ internal sealed class RemoteEventStreamingCodec<T> :
IStreamingCodec<IRemoteEvent<T>>
+ {
+ private readonly IStreamingCodec<T> _codec;
+
+ internal RemoteEventStreamingCodec(IStreamingCodec<T> codec)
+ {
+ _codec = codec;
+ }
+
+ /// <summary>
+ /// Read the class fields.
+ /// </summary>
+ /// <param name="reader">The reader from which to read </param>
+ /// <returns>The remote event</returns>
+ public IRemoteEvent<T> Read(IDataReader reader)
+ {
+ return new RemoteEvent<T>(null, null, _codec.Read(reader));
+ }
+
+ /// <summary>
+ /// Writes the class fields.
+ /// </summary>
+ /// <param name="value">The remote event</param>
+ /// <param name="writer">The writer to which to write</param>
+ public void Write(IRemoteEvent<T> value, IDataWriter writer)
+ {
+ _codec.Write(value.Value, writer);
+ }
+
+ /// <summary>
+ /// Read the class fields.
+ /// </summary>
+ /// <param name="reader">The reader from which to read </param>
+ /// <param name="token">The cancellation token</param>
+ /// <returns>The remote event</returns>
+ public async Task<IRemoteEvent<T>> ReadAsync(IDataReader reader,
CancellationToken token)
+ {
+ T message = await _codec.ReadAsync(reader, token);
+ return new RemoteEvent<T>(null, null, message);
+ }
+
+ /// <summary>
+ /// Writes the class fields.
+ /// </summary>
+ /// <param name="value">The remote event</param>
+ /// <param name="writer">The writer to which to write</param>
+ /// <param name="token">The cancellation token</param>
+ public async Task WriteAsync(IRemoteEvent<T> value, IDataWriter
writer, CancellationToken token)
+ {
+ await _codec.WriteAsync(value.Value, writer, token);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs
new file mode 100644
index 0000000..4396b56
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Net;
+using System.Net.Sockets;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.StreamingCodec;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+ /// <summary>
+ /// Represents an open connection between remote hosts. This class is not
thread safe
+ /// </summary>
+ /// <typeparam name="T">Generic Type of message.</typeparam>
+ internal sealed class StreamingLink<T> : ILink<T>
+ {
+ private static readonly Logger Logger = Logger.GetLogger(typeof
(StreamingLink<T>));
+
+ private readonly IPEndPoint _localEndpoint;
+ private bool _disposed;
+ private readonly IStreamingCodec<T> _streamingCodec;
+ private readonly TcpClient _client;
+
+ /// <summary>
+ /// Stream reader to be passed to codec
+ /// </summary>
+ private readonly StreamDataReader _reader;
+
+ /// <summary>
+ /// Stream writer from which to read from in codec
+ /// </summary>
+ private readonly StreamDataWriter _writer;
+
+ /// <summary>
+ /// Constructs a Link object.
+ /// Connects to the specified remote endpoint.
+ /// </summary>
+ /// <param name="remoteEndpoint">The remote endpoint to connect
to</param>
+ /// <param name="streamingCodec">Streaming codec</param>
+ internal StreamingLink(IPEndPoint remoteEndpoint, IStreamingCodec<T>
streamingCodec)
+ {
+ if (remoteEndpoint == null)
+ {
+ throw new ArgumentNullException("remoteEndpoint");
+ }
+
+ _client = new TcpClient();
+ _client.Connect(remoteEndpoint);
+
+ var stream = _client.GetStream();
+ _localEndpoint = GetLocalEndpoint();
+ _disposed = false;
+ _reader = new StreamDataReader(stream);
+ _writer = new StreamDataWriter(stream);
+ _streamingCodec = streamingCodec;
+ }
+
+ /// <summary>
+ /// Constructs a Link object.
+ /// Uses the already connected TcpClient.
+ /// </summary>
+ /// <param name="client">The already connected client</param>
+ /// <param name="streamingCodec">Streaming codec</param>
+ internal StreamingLink(TcpClient client, IStreamingCodec<T>
streamingCodec)
+ {
+ if (client == null)
+ {
+ throw new ArgumentNullException("client");
+ }
+
+ _client = client;
+ var stream = _client.GetStream();
+ _localEndpoint = GetLocalEndpoint();
+ _disposed = false;
+ _reader = new StreamDataReader(stream);
+ _writer = new StreamDataWriter(stream);
+ _streamingCodec = streamingCodec;
+ }
+
+ /// <summary>
+ /// Returns the local socket address
+ /// </summary>
+ public IPEndPoint LocalEndpoint
+ {
+ get { return _localEndpoint; }
+ }
+
+ /// <summary>
+ /// Returns the remote socket address
+ /// </summary>
+ public IPEndPoint RemoteEndpoint
+ {
+ get { return (IPEndPoint) _client.Client.RemoteEndPoint; }
+ }
+
+ /// <summary>
+ /// Writes the message to the remote host
+ /// </summary>
+ /// <param name="value">The data to write</param>
+ public void Write(T value)
+ {
+ if (value == null)
+ {
+ throw new ArgumentNullException("value");
+ }
+ if (_disposed)
+ {
+ Exceptions.Throw(new IllegalStateException("Link has been
closed."), Logger);
+ }
+
+ _streamingCodec.Write(value, _writer);
+ }
+
+ /// <summary>
+ /// Writes the value to this link asynchronously
+ /// </summary>
+ /// <param name="value">The data to write</param>
+ /// <param name="token">The cancellation token</param>
+ public async Task WriteAsync(T value, CancellationToken token)
+ {
+ if (_disposed)
+ {
+ Exceptions.Throw(new IllegalStateException("Link has been
closed."), Logger);
+ }
+
+ await _streamingCodec.WriteAsync(value, _writer, token);
+ }
+
+ /// <summary>
+ /// Reads the value from the link synchronously
+ /// </summary>
+ public T Read()
+ {
+ if (_disposed)
+ {
+ Exceptions.Throw(new IllegalStateException("Link has been
disposed."), Logger);
+ }
+
+ try
+ {
+ T value = _streamingCodec.Read(_reader);
+ return value;
+ }
+ catch (Exception e)
+ {
+ Logger.Log(Level.Warning, "In Read function unable to read the
message.");
+ Exceptions.CaughtAndThrow(e, Level.Error, Logger);
+ throw;
+ }
+ }
+
+ /// <summary>
+ /// Reads the value from the link asynchronously
+ /// </summary>
+ /// <param name="token">The cancellation token</param>
+ public async Task<T> ReadAsync(CancellationToken token)
+ {
+ if (_disposed)
+ {
+ Exceptions.Throw(new IllegalStateException("Link has been
disposed."), Logger);
+ }
+
+ try
+ {
+ T value = await _streamingCodec.ReadAsync(_reader, token);
+ return value;
+ }
+ catch (Exception e)
+ {
+ Logger.Log(Level.Warning, "In ReadAsync function unable to
read the message.");
+ Exceptions.CaughtAndThrow(e, Level.Error, Logger);
+ throw;
+ }
+ }
+
+ /// <summary>
+ /// Close the client connection
+ /// </summary>
+ public void Dispose()
+ {
+ if (_disposed)
+ {
+ return;
+ }
+
+ try
+ {
+ _client.GetStream().Close();
+ }
+ catch (InvalidOperationException)
+ {
+ Logger.Log(Level.Warning, "failed to close stream on a
non-connected socket.");
+ }
+
+ _client.Close();
+ _disposed = true;
+ }
+
+ /// <summary>
+ /// Overrides Equals. Two Link objects are equal if they are connected
+ /// to the same remote endpoint.
+ /// </summary>
+ /// <param name="obj">The object to compare</param>
+ /// <returns>True if the object is equal to this Link, otherwise
false</returns>
+ public override bool Equals(object obj)
+ {
+ Link<T> other = obj as Link<T>;
+ if (other == null)
+ {
+ return false;
+ }
+
+ return other.RemoteEndpoint.Equals(RemoteEndpoint);
+ }
+
+ /// <summary>
+ /// Gets the hash code for the Link object.
+ /// </summary>
+ /// <returns>The object's hash code</returns>
+ public override int GetHashCode()
+ {
+ return RemoteEndpoint.GetHashCode();
+ }
+
+ /// <summary>
+ /// Discovers the IPEndpoint for the current machine.
+ /// </summary>
+ /// <returns>The local IPEndpoint</returns>
+ private IPEndPoint GetLocalEndpoint()
+ {
+ IPAddress address = NetworkUtils.LocalIPAddress;
+ int port = ((IPEndPoint) _client.Client.LocalEndPoint).Port;
+ return new IPEndPoint(address, port);
+ }
+ }
+}
\ No newline at end of file