[REEF-447]Convert Network Service Layer from Writable to Streaming This addressed the issue by * Converting WritableNetworkSerbvice to StreamingNetworkService * Intoducing NsMessageStreamingCodec which caches and use StreamingCodecs of various GroupCommunicationMessage types. * Introducing GroupCommunicationMessage StreamingCodec
JIRA: [REEF-447](https://issues.apache.org/jira/browse/REEF-447) This Closes #289 Author: Dhruv <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/8505dee9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/8505dee9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/8505dee9 Branch: refs/heads/master Commit: 8505dee942f924cbfd72d631bdaabcb20c125f02 Parents: 5cdd655 Author: Dhruv <[email protected]> Authored: Thu Jul 9 14:52:20 2015 -0700 Committer: Julia Wang <[email protected]> Committed: Wed Jul 15 14:44:34 2015 -0700 ---------------------------------------------------------------------- .../GroupCommunicationTests.cs | 22 +- .../GroupCommunicationTreeTopologyTests.cs | 5 - .../StreamingNetworkServiceTests.cs | 361 +++++++++++++++++++ .../WritableNetworkServiceTests.cs | 262 -------------- .../NetworkService/WritableString.cs | 94 ----- .../Org.Apache.REEF.Network.Tests.csproj | 3 +- .../CodecToStreamingCodecConfiguration.cs | 8 +- .../Group/Config/StreamingCodecConfiguration.cs | 8 +- .../Driver/Impl/CommunicationGroupDriver.cs | 2 - .../Impl/GeneralGroupCommunicationMessage.cs | 51 +-- .../Group/Driver/Impl/GroupCommDriver.cs | 5 +- .../Driver/Impl/GroupCommunicationMessage.cs | 145 +------- .../GroupCommunicationMessageStreamingCodec.cs | 223 ++++++++++++ .../Group/Operators/Impl/Sender.cs | 7 +- .../Task/ICommunicationGroupNetworkObserver.cs | 3 +- .../Group/Task/IGroupCommNetworkObserver.cs | 3 +- .../Group/Task/Impl/CommunicationGroupClient.cs | 1 - .../Impl/CommunicationGroupNetworkObserver.cs | 5 +- .../Group/Task/Impl/GroupCommClient.cs | 4 +- .../Group/Task/Impl/GroupCommNetworkObserver.cs | 7 +- .../Group/Task/Impl/NodeStruct.cs | 11 +- .../Group/Task/Impl/OperatorTopology.cs | 31 +- .../Codec/NsMessageStreamingCodec.cs | 202 +++++++++++ .../Codec/StreamingCodecFunctionCache.cs | 203 +++++++++++ .../NetworkService/StreamingNetworkService.cs | 160 ++++++++ .../NetworkService/WritableNetworkService.cs | 159 -------- .../NetworkService/WritableNsConnection.cs | 138 ------- .../NetworkService/WritableNsMessage.cs | 185 ---------- .../Org.Apache.REEF.Network.csproj | 7 +- .../Org.Apache.REEF.Wake.Tests.csproj | 1 - .../StreamingRemoteManagerTest.cs | 232 ++++++------ .../WritableString.cs | 95 ----- .../Org.Apache.REEF.Wake.csproj | 2 - .../cs/Org.Apache.REEF.Wake/Remote/IWritable.cs | 61 ---- .../Impl/StreamingRemoteManagerFactory.cs | 5 +- .../Impl/TemporaryWritableToStreamingCodec.cs | 70 ---- 36 files changed, 1350 insertions(+), 1431 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs index 61813af..62a6a3f 100644 --- a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs +++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs @@ -72,17 +72,17 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication new BlockingCollection<GeneralGroupCommunicationMessage>(); var handler1 = - Observer.Create<WritableNsMessage<GeneralGroupCommunicationMessage>>(msg => messages1.Add(msg.Data.First())); + Observer.Create<NsMessage<GeneralGroupCommunicationMessage>>(msg => messages1.Add(msg.Data.First())); var handler2 = - Observer.Create<WritableNsMessage<GeneralGroupCommunicationMessage>>(msg => messages2.Add(msg.Data.First())); + Observer.Create<NsMessage<GeneralGroupCommunicationMessage>>(msg => messages2.Add(msg.Data.First())); var networkServiceInjector1 = BuildNetworkServiceInjector(endpoint, handler1); var networkServiceInjector2 = BuildNetworkServiceInjector(endpoint, handler2); var networkService1 = networkServiceInjector1.GetInstance< - WritableNetworkService<GeneralGroupCommunicationMessage>>(); + StreamingNetworkService<GeneralGroupCommunicationMessage>>(); var networkService2 = networkServiceInjector2.GetInstance< - WritableNetworkService<GeneralGroupCommunicationMessage>>(); + StreamingNetworkService<GeneralGroupCommunicationMessage>>(); networkService1.Register(new StringIdentifier("id1")); networkService2.Register(new StringIdentifier("id2")); @@ -822,7 +822,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication } public static IInjector BuildNetworkServiceInjector( - IPEndPoint nameServerEndpoint, IObserver<WritableNsMessage<GeneralGroupCommunicationMessage>> handler) + IPEndPoint nameServerEndpoint, IObserver<NsMessage<GeneralGroupCommunicationMessage>> handler) { var config = TangFactory.GetTang().NewConfigurationBuilder() .BindNamedParameter(typeof (NamingConfigurationOptions.NameServerAddress), @@ -832,20 +832,24 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication .BindNamedParameter(typeof (NetworkServiceOptions.NetworkServicePort), (0).ToString(CultureInfo.InvariantCulture)) .BindImplementation(GenericType<INameClient>.Class, GenericType<NameClient>.Class) - .BindImplementation(GenericType<IStreamingCodec<string>>.Class, GenericType<StringStreamingCodec>.Class) .Build(); + var codecConfig = StreamingCodecConfiguration<string>.Conf + .Set(StreamingCodecConfiguration<string>.Codec, GenericType<StringStreamingCodec>.Class) + .Build(); + + config = Configurations.Merge(config, codecConfig); + var injector = TangFactory.GetTang().NewInjector(config); injector.BindVolatileInstance( - GenericType<IObserver<WritableNsMessage<GeneralGroupCommunicationMessage>>>.Class, handler); + GenericType<IObserver<NsMessage<GeneralGroupCommunicationMessage>>>.Class, handler); return injector; } private GroupCommunicationMessage<string> CreateGcmStringType(string message, string from, string to) { - var stringCodec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>(); - return new GroupCommunicationMessage<string>("g1", "op1", from, to, message, MessageType.Data, stringCodec); + return new GroupCommunicationMessage<string>("g1", "op1", from, to, message); } private static void ScatterReceiveReduce(IScatterReceiver<int> receiver, IReduceSender<int> sumSender) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs index 6b9b8c7..c194bcb 100644 --- a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs +++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs @@ -18,21 +18,16 @@ */ using System.Collections.Generic; -using System.Globalization; using System.Linq; using Microsoft.VisualStudio.TestTools.UnitTesting; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Driver; -using Org.Apache.REEF.Network.Group.Driver.Impl; using Org.Apache.REEF.Network.Group.Operators; using Org.Apache.REEF.Network.Group.Operators.Impl; using Org.Apache.REEF.Network.Group.Pipelining.Impl; using Org.Apache.REEF.Network.Group.Topology; -using Org.Apache.REEF.Tang.Implementations.Configuration; -using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Tang.Util; -using Org.Apache.REEF.Wake.Remote.Impl; using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs; namespace Org.Apache.REEF.Network.Tests.GroupCommunication http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/StreamingNetworkServiceTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/StreamingNetworkServiceTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/StreamingNetworkServiceTests.cs new file mode 100644 index 0000000..1e0378e --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/StreamingNetworkServiceTests.cs @@ -0,0 +1,361 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Concurrent; +using System.Globalization; +using System.IO; +using System.Linq; +using System.Net; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.REEF.Common.Io; +using Org.Apache.REEF.Network.Naming; +using Org.Apache.REEF.Network.NetworkService; +using Org.Apache.REEF.Network.NetworkService.Codec; +using Org.Apache.REEF.Network.Tests.NamingService; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Wake; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.Remote.Impl; +using Org.Apache.REEF.Wake.StreamingCodec; +using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs; +using Org.Apache.REEF.Wake.Util; + +namespace Org.Apache.REEF.Network.Tests.NetworkService +{ + /// <summary> + /// Tests for Streaming Network Service + /// </summary> + [TestClass] + public class StreamingNetworkServiceTests + { + /// <summary> + /// Tests one way communication between two network services + /// </summary> + [TestMethod] + public void TestStreamingNetworkServiceOneWayCommunication() + { + int networkServicePort1 = NetworkUtils.GenerateRandomPort(6000, 7000); + int networkServicePort2 = NetworkUtils.GenerateRandomPort(7001, 8000); + + BlockingCollection<string> queue; + + using (var nameServer = NameServerTests.BuildNameServer()) + { + IPEndPoint endpoint = nameServer.LocalEndpoint; + int nameServerPort = endpoint.Port; + string nameServerAddr = endpoint.Address.ToString(); + + var handlerConf1 = + TangFactory.GetTang() + .NewConfigurationBuilder() + .BindImplementation(GenericType<IObserver<NsMessage<string>>>.Class, + GenericType<NetworkMessageHandler>.Class) + .Build(); + + var handlerConf2 = + TangFactory.GetTang() + .NewConfigurationBuilder() + .BindImplementation(GenericType<IObserver<NsMessage<string>>>.Class, + GenericType<MessageHandler>.Class) + .Build(); + + var networkServiceInjection1 = BuildNetworkService(networkServicePort1, nameServerPort, nameServerAddr, + handlerConf1); + + var networkServiceInjection2 = BuildNetworkService(networkServicePort2, nameServerPort, nameServerAddr, + handlerConf2); + + using (INetworkService<string> networkService1 = networkServiceInjection1.GetInstance<StreamingNetworkService<string>>()) + using (INetworkService<string> networkService2 = networkServiceInjection2.GetInstance<StreamingNetworkService<string>>()) + { + queue = networkServiceInjection2.GetInstance<MessageHandler>().Queue; + IIdentifier id1 = new StringIdentifier("service1"); + IIdentifier id2 = new StringIdentifier("service2"); + networkService1.Register(id1); + networkService2.Register(id2); + + using (IConnection<string> connection = networkService1.NewConnection(id2)) + { + connection.Open(); + connection.Write("abc"); + connection.Write("def"); + connection.Write("ghi"); + + Assert.AreEqual("abc", queue.Take()); + Assert.AreEqual("def", queue.Take()); + Assert.AreEqual("ghi", queue.Take()); + } + } + } + } + + /// <summary> + /// Tests two way communication between two network services + /// </summary> + [TestMethod] + public void TestStreamingNetworkServiceTwoWayCommunication() + { + int networkServicePort1 = NetworkUtils.GenerateRandomPort(6000, 7000); + int networkServicePort2 = NetworkUtils.GenerateRandomPort(7001, 8000); + + BlockingCollection<string> queue1; + BlockingCollection<string> queue2; + + using (var nameServer = NameServerTests.BuildNameServer()) + { + IPEndPoint endpoint = nameServer.LocalEndpoint; + int nameServerPort = endpoint.Port; + string nameServerAddr = endpoint.Address.ToString(); + + var handlerConf = + TangFactory.GetTang() + .NewConfigurationBuilder() + .BindImplementation(GenericType<IObserver<NsMessage<string>>>.Class, + GenericType<MessageHandler>.Class) + .Build(); + + var networkServiceInjection1 = BuildNetworkService(networkServicePort1, nameServerPort, nameServerAddr, + handlerConf); + + var networkServiceInjection2 = BuildNetworkService(networkServicePort2, nameServerPort, nameServerAddr, + handlerConf); + + using (INetworkService<string> networkService1 = networkServiceInjection1.GetInstance<StreamingNetworkService<string>>()) + using (INetworkService<string> networkService2 = networkServiceInjection2.GetInstance<StreamingNetworkService<string>>()) + { + queue1 = networkServiceInjection1.GetInstance<MessageHandler>().Queue; + queue2 = networkServiceInjection2.GetInstance<MessageHandler>().Queue; + + IIdentifier id1 = new StringIdentifier("service1"); + IIdentifier id2 = new StringIdentifier("service2"); + networkService1.Register(id1); + networkService2.Register(id2); + + using (IConnection<string> connection1 = networkService1.NewConnection(id2)) + using (IConnection<string> connection2 = networkService2.NewConnection(id1)) + { + connection1.Open(); + connection1.Write("abc"); + connection1.Write("def"); + connection1.Write("ghi"); + + connection2.Open(); + connection2.Write("jkl"); + connection2.Write("nop"); + + Assert.AreEqual("abc", queue2.Take()); + Assert.AreEqual("def", queue2.Take()); + Assert.AreEqual("ghi", queue2.Take()); + + Assert.AreEqual("jkl", queue1.Take()); + Assert.AreEqual("nop", queue1.Take()); + } + } + } + } + + /// <summary> + /// Tests StreamingCodecFunctionCache + /// </summary> + [TestMethod] + public void TestStreamingCodecFunctionCache() + { + IConfiguration conf = TangFactory.GetTang().NewConfigurationBuilder() + .BindImplementation(GenericType<IStreamingCodec<B>>.Class, GenericType<BStreamingCodec>.Class) + .Build(); + IInjector injector = TangFactory.GetTang().NewInjector(conf); + + StreamingCodecFunctionCache<A> cache = new StreamingCodecFunctionCache<A>(injector); + + var readFunc = cache.ReadFunction(typeof(B)); + var writeFunc = cache.WriteFunction(typeof (B)); + var readAsyncFunc = cache.ReadAsyncFunction(typeof(B)); + var writeAsyncFunc = cache.WriteAsyncFunction(typeof(B)); + + var stream = new MemoryStream(); + IDataWriter writer = new StreamDataWriter(stream); + IDataReader reader = new StreamDataReader(stream); + + B val = new B(); + val.Value1 = "hello"; + val.Value2 = "reef"; + + writeFunc(val, writer); + + val.Value1 = "helloasync"; + val.Value2 = "reefasync"; + CancellationToken token = new CancellationToken(); + + var asyncResult = writeAsyncFunc.BeginInvoke(val, writer, token, null, null); + writeAsyncFunc.EndInvoke(asyncResult); + + stream.Position = 0; + A res = readFunc(reader); + B resB1 = res as B; + + asyncResult = readAsyncFunc.BeginInvoke(reader, token, null, null); + res = readAsyncFunc.EndInvoke(asyncResult); + B resB2 = res as B; + + Assert.AreEqual("hello", resB1.Value1); + Assert.AreEqual("reef", resB1.Value2); + Assert.AreEqual("helloasync", resB2.Value1); + Assert.AreEqual("reefasync", resB2.Value2); + } + + /// <summary> + /// Creates an instance of network service. + /// </summary> + /// <param name="networkServicePort">The port that the NetworkService will listen on</param> + /// <param name="nameServicePort">The port of the NameServer</param> + /// <param name="nameServiceAddr">The ip address of the NameServer</param> + /// <param name="handlerConf">The configuration of observer to handle incoming messages</param> + /// <returns></returns> + private IInjector BuildNetworkService( + int networkServicePort, + int nameServicePort, + string nameServiceAddr, + IConfiguration handlerConf) + { + var networkServiceConf = TangFactory.GetTang().NewConfigurationBuilder(handlerConf) + .BindNamedParameter<NetworkServiceOptions.NetworkServicePort, int>( + GenericType<NetworkServiceOptions.NetworkServicePort>.Class, + networkServicePort.ToString(CultureInfo.CurrentCulture)) + .BindNamedParameter<NamingConfigurationOptions.NameServerPort, int>( + GenericType<NamingConfigurationOptions.NameServerPort>.Class, + nameServicePort.ToString(CultureInfo.CurrentCulture)) + .BindNamedParameter<NamingConfigurationOptions.NameServerAddress, string>( + GenericType<NamingConfigurationOptions.NameServerAddress>.Class, + nameServiceAddr) + .BindImplementation(GenericType<INameClient>.Class, GenericType<NameClient>.Class) + .BindImplementation(GenericType<IStreamingCodec<string>>.Class, GenericType<StringStreamingCodec>.Class) + .Build(); + + return TangFactory.GetTang().NewInjector(networkServiceConf); + } + + public class A + { + public string Value1; + } + + public class B : A + { + public string Value2; + } + + public class BStreamingCodec : IStreamingCodec<B> + { + [Inject] + public BStreamingCodec() + { + } + + public B Read(IDataReader reader) + { + B val = new B(); + val.Value1 = reader.ReadString(); + val.Value2 = reader.ReadString(); + return val; + } + + public void Write(B obj, IDataWriter writer) + { + writer.WriteString(obj.Value1); + writer.WriteString(obj.Value2); + } + + public async Task<B> ReadAsync(IDataReader reader, CancellationToken token) + { + B val = new B(); + val.Value1 = await reader.ReadStringAsync(token); + val.Value2 = await reader.ReadStringAsync(token); + return val; + } + + public async Task WriteAsync(B obj, IDataWriter writer, CancellationToken token) + { + await writer.WriteStringAsync(obj.Value1, token); + await writer.WriteStringAsync(obj.Value2, token); + } + } + /// <summary> + /// The observer to handle incoming messages for string + /// </summary> + private class MessageHandler : IObserver<NsMessage<string>> + { + private readonly BlockingCollection<string> _queue; + + public BlockingCollection<string> Queue + { + get { return _queue; } + } + + [Inject] + private MessageHandler() + { + _queue = new BlockingCollection<string>(); + } + + public void OnNext(NsMessage<string> value) + { + _queue.Add(value.Data.First()); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } + + /// <summary> + /// The network handler to handle incoming Streaming NsMessages + /// </summary> + private class NetworkMessageHandler : IObserver<NsMessage<string>> + { + [Inject] + public NetworkMessageHandler() + { + } + + public void OnNext(NsMessage<string> value) + { + } + + public void OnError(Exception error) + { + } + + public void OnCompleted() + { + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableNetworkServiceTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableNetworkServiceTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableNetworkServiceTests.cs deleted file mode 100644 index 07464ff..0000000 --- a/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableNetworkServiceTests.cs +++ /dev/null @@ -1,262 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Concurrent; -using System.Globalization; -using System.Linq; -using System.Net; -using Microsoft.VisualStudio.TestTools.UnitTesting; -using Org.Apache.REEF.Common.Io; -using Org.Apache.REEF.Network.Naming; -using Org.Apache.REEF.Network.NetworkService; -using Org.Apache.REEF.Network.Tests.NamingService; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Tang.Implementations.Tang; -using Org.Apache.REEF.Tang.Interface; -using Org.Apache.REEF.Tang.Util; -using Org.Apache.REEF.Wake; -using Org.Apache.REEF.Wake.Remote; -using Org.Apache.REEF.Wake.Remote.Impl; -using Org.Apache.REEF.Wake.Util; - -namespace Org.Apache.REEF.Network.Tests.NetworkService -{ - /// <summary> - /// Tests for Writable Network Service - /// </summary> - [TestClass] - [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] - public class WritableNetworkServiceTests - { - /// <summary> - /// Tests one way communication between two network services - /// </summary> - [TestMethod] - public void TestWritableNetworkServiceOneWayCommunication() - { - int networkServicePort1 = NetworkUtils.GenerateRandomPort(6000, 7000); - int networkServicePort2 = NetworkUtils.GenerateRandomPort(7001, 8000); - - BlockingCollection<WritableString> queue; - - using (var nameServer = NameServerTests.BuildNameServer()) - { - IPEndPoint endpoint = nameServer.LocalEndpoint; - int nameServerPort = endpoint.Port; - string nameServerAddr = endpoint.Address.ToString(); - - var handlerConf1 = - TangFactory.GetTang() - .NewConfigurationBuilder() - .BindImplementation(GenericType<IObserver<WritableNsMessage<WritableString>>>.Class, - GenericType<NetworkMessageHandler>.Class) - .Build(); - - var handlerConf2 = - TangFactory.GetTang() - .NewConfigurationBuilder() - .BindImplementation(GenericType<IObserver<WritableNsMessage<WritableString>>>.Class, - GenericType<MessageHandler>.Class) - .Build(); - - var networkServiceInjection1 = BuildNetworkService(networkServicePort1, nameServerPort, nameServerAddr, - handlerConf1); - - var networkServiceInjection2 = BuildNetworkService(networkServicePort2, nameServerPort, nameServerAddr, - handlerConf2); - - using (INetworkService<WritableString> networkService1 = networkServiceInjection1.GetInstance<WritableNetworkService<WritableString>>()) - using (INetworkService<WritableString> networkService2 = networkServiceInjection2.GetInstance<WritableNetworkService<WritableString>>()) - { - queue = networkServiceInjection2.GetInstance<MessageHandler>().Queue; - IIdentifier id1 = new StringIdentifier("service1"); - IIdentifier id2 = new StringIdentifier("service2"); - networkService1.Register(id1); - networkService2.Register(id2); - - using (IConnection<WritableString> connection = networkService1.NewConnection(id2)) - { - connection.Open(); - connection.Write(new WritableString("abc")); - connection.Write(new WritableString("def")); - connection.Write(new WritableString("ghi")); - - Assert.AreEqual("abc", queue.Take().Data); - Assert.AreEqual("def", queue.Take().Data); - Assert.AreEqual("ghi", queue.Take().Data); - } - } - } - } - - /// <summary> - /// Tests two way communication between two network services - /// </summary> - [TestMethod] - public void TestWritableNetworkServiceTwoWayCommunication() - { - int networkServicePort1 = NetworkUtils.GenerateRandomPort(6000, 7000); - int networkServicePort2 = NetworkUtils.GenerateRandomPort(7001, 8000); - - BlockingCollection<WritableString> queue1; - BlockingCollection<WritableString> queue2; - - using (var nameServer = NameServerTests.BuildNameServer()) - { - IPEndPoint endpoint = nameServer.LocalEndpoint; - int nameServerPort = endpoint.Port; - string nameServerAddr = endpoint.Address.ToString(); - - var handlerConf = - TangFactory.GetTang() - .NewConfigurationBuilder() - .BindImplementation(GenericType<IObserver<WritableNsMessage<WritableString>>>.Class, - GenericType<MessageHandler>.Class) - .Build(); - - var networkServiceInjection1 = BuildNetworkService(networkServicePort1, nameServerPort, nameServerAddr, - handlerConf); - - var networkServiceInjection2 = BuildNetworkService(networkServicePort2, nameServerPort, nameServerAddr, - handlerConf); - - using (INetworkService<WritableString> networkService1 = networkServiceInjection1.GetInstance<WritableNetworkService<WritableString>>()) - using (INetworkService<WritableString> networkService2 = networkServiceInjection2.GetInstance<WritableNetworkService<WritableString>>()) - { - queue1 = networkServiceInjection1.GetInstance<MessageHandler>().Queue; - queue2 = networkServiceInjection2.GetInstance<MessageHandler>().Queue; - - IIdentifier id1 = new StringIdentifier("service1"); - IIdentifier id2 = new StringIdentifier("service2"); - networkService1.Register(id1); - networkService2.Register(id2); - - using (IConnection<WritableString> connection1 = networkService1.NewConnection(id2)) - using (IConnection<WritableString> connection2 = networkService2.NewConnection(id1)) - { - connection1.Open(); - connection1.Write(new WritableString("abc")); - connection1.Write(new WritableString("def")); - connection1.Write(new WritableString("ghi")); - - connection2.Open(); - connection2.Write(new WritableString("jkl")); - connection2.Write(new WritableString("nop")); - - Assert.AreEqual("abc", queue2.Take().Data); - Assert.AreEqual("def", queue2.Take().Data); - Assert.AreEqual("ghi", queue2.Take().Data); - - Assert.AreEqual("jkl", queue1.Take().Data); - Assert.AreEqual("nop", queue1.Take().Data); - } - } - } - } - - /// <summary> - /// Creates an instance of network service. - /// </summary> - /// <param name="networkServicePort">The port that the NetworkService will listen on</param> - /// <param name="nameServicePort">The port of the NameServer</param> - /// <param name="nameServiceAddr">The ip address of the NameServer</param> - /// <param name="factory">Identifier factory for WritableString</param> - /// <param name="handler">The observer to handle incoming messages</param> - /// <returns></returns> - private IInjector BuildNetworkService( - int networkServicePort, - int nameServicePort, - string nameServiceAddr, - IConfiguration handlerConf) - { - var networkServiceConf = TangFactory.GetTang().NewConfigurationBuilder(handlerConf) - .BindNamedParameter<NetworkServiceOptions.NetworkServicePort, int>( - GenericType<NetworkServiceOptions.NetworkServicePort>.Class, - networkServicePort.ToString(CultureInfo.CurrentCulture)) - .BindNamedParameter<NamingConfigurationOptions.NameServerPort, int>( - GenericType<NamingConfigurationOptions.NameServerPort>.Class, - nameServicePort.ToString(CultureInfo.CurrentCulture)) - .BindNamedParameter<NamingConfigurationOptions.NameServerAddress, string>( - GenericType<NamingConfigurationOptions.NameServerAddress>.Class, - nameServiceAddr) - .BindImplementation(GenericType<INameClient>.Class, GenericType<NameClient>.Class) - .Build(); - - return TangFactory.GetTang().NewInjector(networkServiceConf); - } - - /// <summary> - /// The observer to handle incoming messages for WritableString - /// </summary> - private class MessageHandler : IObserver<WritableNsMessage<WritableString>> - { - private readonly BlockingCollection<WritableString> _queue; - - public BlockingCollection<WritableString> Queue - { - get { return _queue; } - } - - [Inject] - private MessageHandler() - { - _queue = new BlockingCollection<WritableString>(); - } - - public void OnNext(WritableNsMessage<WritableString> value) - { - _queue.Add(value.Data.First()); - } - - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - - public void OnCompleted() - { - throw new NotImplementedException(); - } - } - - /// <summary> - /// The network handler to handle incoming Writable NsMessages - /// </summary> - private class NetworkMessageHandler : IObserver<WritableNsMessage<WritableString>> - { - [Inject] - public NetworkMessageHandler() - { - } - - public void OnNext(WritableNsMessage<WritableString> value) - { - } - - public void OnError(Exception error) - { - } - - public void OnCompleted() - { - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableString.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableString.cs b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableString.cs deleted file mode 100644 index 400aa52..0000000 --- a/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableString.cs +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Threading; -using System.Threading.Tasks; -using Org.Apache.REEF.Wake.Remote; -using Org.Apache.REEF.Tang.Annotations; - -namespace Org.Apache.REEF.Network.Tests.NetworkService -{ - /// <summary> - /// Writable wrapper around the string class - /// </summary> - [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] - public class WritableString : IWritable - { - /// <summary> - /// Returns the actual string data - /// </summary> - public string Data { get; set; } - - /// <summary> - /// Empty constructor for instantiation with reflection - /// </summary> - [Inject] - public WritableString() - { - } - - /// <summary> - /// Constructor - /// </summary> - /// <param name="data">The string data</param> - public WritableString(string data) - { - Data = data; - } - - /// <summary> - /// Reads the string - /// </summary> - /// <param name="reader">reader to read from</param> - public void Read(IDataReader reader) - { - Data = reader.ReadString(); - } - - /// <summary> - /// Writes the string - /// </summary> - /// <param name="writer">Writer to write</param> - public void Write(IDataWriter writer) - { - writer.WriteString(Data); - } - - /// <summary> - /// Reads the string - /// </summary> - /// <param name="reader">reader to read from</param> - /// <param name="token">the cancellation token</param> - public async Task ReadAsync(IDataReader reader, CancellationToken token) - { - Data = await reader.ReadStringAsync(token); - } - - /// <summary> - /// Writes the string - /// </summary> - /// <param name="writer">Writer to write</param> - /// <param name="token">the cancellation token</param> - public async Task WriteAsync(IDataWriter writer, CancellationToken token) - { - await writer.WriteStringAsync(Data, token); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj index e0568aa..3355ac7 100644 --- a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj @@ -55,9 +55,8 @@ under the License. <Compile Include="GroupCommunication\GroupCommunicationTreeTopologyTests.cs" /> <Compile Include="GroupCommunication\StreamingCodecTests.cs" /> <Compile Include="NamingService\NameServerTests.cs" /> - <Compile Include="NetworkService\WritableNetworkServiceTests.cs" /> <Compile Include="NetworkService\NetworkServiceTests.cs" /> - <Compile Include="NetworkService\WritableString.cs" /> + <Compile Include="NetworkService\StreamingNetworkServiceTests.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> </ItemGroup> <ItemGroup> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs b/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs index f8e1483..da156ca 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs @@ -17,6 +17,7 @@ * under the License. */ +using Org.Apache.REEF.Network.Group.Driver.Impl; using Org.Apache.REEF.Network.Group.Pipelining; using Org.Apache.REEF.Tang.Formats; using Org.Apache.REEF.Tang.Util; @@ -38,7 +39,12 @@ namespace Org.Apache.REEF.Network.Group.Config public static ConfigurationModule Conf = new CodecToStreamingCodecConfiguration<T>() .BindImplementation(GenericType<ICodec<T>>.Class, Codec) .BindImplementation(GenericType<IStreamingCodec<T>>.Class, GenericType<CodecToStreamingCodec<T>>.Class) - .BindImplementation(GenericType<IStreamingCodec<PipelineMessage<T>>>.Class, GenericType<StreamingPipelineMessageCodec<T>>.Class) + .BindImplementation(GenericType<IStreamingCodec<PipelineMessage<T>>>.Class, + GenericType<StreamingPipelineMessageCodec<T>>.Class) + .BindImplementation(GenericType<IStreamingCodec<GroupCommunicationMessage<T>>>.Class, + GenericType<GroupCommunicationMessageStreamingCodec<T>>.Class) + .BindImplementation(GenericType<IStreamingCodec<GroupCommunicationMessage<PipelineMessage<T>>>>.Class, + GenericType<GroupCommunicationMessageStreamingCodec<PipelineMessage<T>>>.Class) .Build(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs b/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs index 2a30047..20c6ade 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs @@ -17,6 +17,7 @@ * under the License. */ +using Org.Apache.REEF.Network.Group.Driver.Impl; using Org.Apache.REEF.Network.Group.Pipelining; using Org.Apache.REEF.Tang.Formats; using Org.Apache.REEF.Tang.Util; @@ -41,7 +42,12 @@ namespace Org.Apache.REEF.Network.Group.Config /// </summary> public static ConfigurationModule Conf = new StreamingCodecConfiguration<T>() .BindImplementation(GenericType<IStreamingCodec<T>>.Class, Codec) - .BindImplementation(GenericType<IStreamingCodec<PipelineMessage<T>>>.Class, GenericType<StreamingPipelineMessageCodec<T>>.Class) + .BindImplementation(GenericType<IStreamingCodec<PipelineMessage<T>>>.Class, + GenericType<StreamingPipelineMessageCodec<T>>.Class) + .BindImplementation(GenericType<IStreamingCodec<GroupCommunicationMessage<T>>>.Class, + GenericType<GroupCommunicationMessageStreamingCodec<T>>.Class) + .BindImplementation(GenericType<IStreamingCodec<GroupCommunicationMessage<PipelineMessage<T>>>>.Class, + GenericType<GroupCommunicationMessageStreamingCodec<PipelineMessage<T>>>.Class) .Build(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs index 5ebb357..001c110 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs @@ -17,7 +17,6 @@ * under the License. */ -using System; using System.Collections.Generic; using System.Reflection; using Org.Apache.REEF.Network.Group.Config; @@ -39,7 +38,6 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl /// All operators in the same Communication Group run on the the /// same set of tasks. /// </summary> - // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295.] public sealed class CommunicationGroupDriver : ICommunicationGroupDriver { private static readonly Logger LOGGER = Logger.GetLogger(typeof (CommunicationGroupDriver)); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs index e807a4e..9aa49a4 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs @@ -25,11 +25,10 @@ using Org.Apache.REEF.Tang.Annotations; namespace Org.Apache.REEF.Network.Group.Driver.Impl { /// <summary> - /// Messages sent by MPI Operators. This is the abstract class inherited by - /// WritableGroupCommunicationMessage but seen by Network Service + /// Messages sent by MPI Operators. This is the class inherited by + /// GroupCommunicationMessage but seen by Network Service /// </summary> - // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295. - public abstract class GeneralGroupCommunicationMessage : IWritable + public class GeneralGroupCommunicationMessage { /// <summary> /// Empty constructor to allow instantiation by reflection @@ -45,70 +44,36 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl /// <param name="operatorName">The name of the MPI operator</param> /// <param name="source">The message source</param> /// <param name="destination">The message destination</param> - /// <param name="messageType">The type of message to send</param> protected GeneralGroupCommunicationMessage( string groupName, string operatorName, string source, - string destination, - MessageType messageType) + string destination) { GroupName = groupName; OperatorName = operatorName; Source = source; Destination = destination; - MsgType = messageType; } /// <summary> /// Returns the Communication Group name. /// </summary> - public string GroupName { get; internal set; } + internal string GroupName { get; set; } /// <summary> /// Returns the MPI Operator name. /// </summary> - public string OperatorName { get; internal set; } + internal string OperatorName { get; set; } /// <summary> /// Returns the source of the message. /// </summary> - public string Source { get; internal set; } + internal string Source { get; set; } /// <summary> /// Returns the destination of the message. /// </summary> - public string Destination { get; internal set; } - - /// <summary> - /// Returns the type of message being sent. - /// </summary> - public MessageType MsgType { get; internal set; } - - /// <summary> - /// Read the class fields. - /// </summary> - /// <param name="reader">The reader from which to read </param> - public abstract void Read(IDataReader reader); - - /// <summary> - /// Writes the class fields. - /// </summary> - /// <param name="writer">The writer to which to write</param> - public abstract void Write(IDataWriter writer); - - /// <summary> - /// Read the class fields. - /// </summary> - /// <param name="reader">The reader from which to read </param> - /// <param name="token">The cancellation token</param> - public abstract System.Threading.Tasks.Task ReadAsync(IDataReader reader, CancellationToken token); - - /// <summary> - /// Writes the class fields. - /// </summary> - /// <param name="writer">The writer to which to write</param> - /// <param name="token">The cancellation token</param> - public abstract System.Threading.Tasks.Task WriteAsync(IDataWriter writer, CancellationToken token); + internal string Destination { get; set; } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs index ade5834..5bf0848 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs @@ -42,7 +42,6 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl /// Used to create Communication Groups for Group Communication Operators on the Reef driver. /// Also manages configuration for Group Communication tasks/services. /// </summary> - // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295. public sealed class GroupCommDriver : IGroupCommDriver { private const string MasterTaskContextName = "MasterTaskContext"; @@ -158,12 +157,12 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl public IConfiguration GetServiceConfiguration() { IConfiguration serviceConfig = ServiceConfiguration.ConfigurationModule - .Set(ServiceConfiguration.Services, GenericType<WritableNetworkService<GeneralGroupCommunicationMessage>>.Class) + .Set(ServiceConfiguration.Services, GenericType<StreamingNetworkService<GeneralGroupCommunicationMessage>>.Class) .Build(); return TangFactory.GetTang().NewConfigurationBuilder(serviceConfig) .BindImplementation( - GenericType<IObserver<WritableNsMessage<GeneralGroupCommunicationMessage>>>.Class, + GenericType<IObserver<NsMessage<GeneralGroupCommunicationMessage>>>.Class, GenericType<GroupCommNetworkObserver>.Class) .BindNamedParameter<NamingConfigurationOptions.NameServerAddress, string>( GenericType<NamingConfigurationOptions.NameServerAddress>.Class, http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs index ed7855b..c53cdb0 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs @@ -17,30 +17,21 @@ * under the License. */ -using System; -using System.Threading; -using Org.Apache.REEF.Wake.Remote; using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Wake.StreamingCodec; namespace Org.Apache.REEF.Network.Group.Driver.Impl { /// <summary> - /// Messages sent by MPI Operators. This is the Writable version of GroupCommunicationMessage - /// class and will eventually replace it once everybody agrees with the design + /// Messages sent by MPI Operators. /// </summary> - // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295. - public sealed class GroupCommunicationMessage<T> : GeneralGroupCommunicationMessage + internal sealed class GroupCommunicationMessage<T> : GeneralGroupCommunicationMessage { - private readonly IStreamingCodec<T> _codec; - /// <summary> /// Empty constructor to allow instantiation by reflection /// </summary> [Inject] - private GroupCommunicationMessage(IStreamingCodec<T> codec) + private GroupCommunicationMessage() { - _codec = codec; } /// <summary> @@ -51,20 +42,15 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl /// <param name="source">The message source</param> /// <param name="destination">The message destination</param> /// <param name="message">The actual Writable message</param> - /// <param name="messageType">The type of message to send</param> - /// <param name="codec">Streaming Codec</param> - public GroupCommunicationMessage( + internal GroupCommunicationMessage( string groupName, string operatorName, string source, string destination, - T message, - MessageType messageType, - IStreamingCodec<T> codec) - : base(groupName, operatorName, source, destination, messageType) + T message) + : base(groupName, operatorName, source, destination) { - _codec = codec; - Data = new T[] { message }; + Data = new[] { message }; } /// <summary> @@ -75,133 +61,24 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl /// <param name="source">The message source</param> /// <param name="destination">The message destination</param> /// <param name="message">The actual Writable message array</param> - /// <param name="messageType">The type of message to send</param> - /// <param name="codec">Streaming Codec</param> - public GroupCommunicationMessage( + internal GroupCommunicationMessage( string groupName, string operatorName, string source, string destination, - T[] message, - MessageType messageType, - IStreamingCodec<T> codec) - : base(groupName, operatorName, source, destination, messageType) + T[] message) + : base(groupName, operatorName, source, destination) { - _codec = codec; Data = message; } /// <summary> /// Returns the array of messages. /// </summary> - public T[] Data + internal T[] Data { get; set; } - - /// <summary> - /// Read the class fields. - /// </summary> - /// <param name="reader">The reader from which to read </param> - public override void Read(IDataReader reader) - { - GroupName = reader.ReadString(); - OperatorName = reader.ReadString(); - Source = reader.ReadString(); - Destination = reader.ReadString(); - - int dataCount = reader.ReadInt32(); - - if (dataCount == 0) - { - throw new Exception("Data Count in Group COmmunication Message cannot be zero"); - } - - MsgType = (MessageType)Enum.Parse(typeof(MessageType), reader.ReadString()); - Data = new T[dataCount]; - - for (int index = 0; index < dataCount; index++) - { - Data[index] = _codec.Read(reader); - - if (Data[index] == null) - { - throw new Exception("message instance cannot be created from the IDataReader in Group Communication Message"); - } - } - } - - /// <summary> - /// Writes the class fields. - /// </summary> - /// <param name="writer">The writer to which to write</param> - public override void Write(IDataWriter writer) - { - writer.WriteString(GroupName); - writer.WriteString(OperatorName); - writer.WriteString(Source); - writer.WriteString(Destination); - writer.WriteInt32(Data.Length); - writer.WriteString(MsgType.ToString()); - - foreach (var data in Data) - { - _codec.Write(data, writer); - } - } - - /// <summary> - /// Read the class fields. - /// </summary> - /// <param name="reader">The reader from which to read </param> - /// <param name="token">The cancellation token</param> - public override async System.Threading.Tasks.Task ReadAsync(IDataReader reader, CancellationToken token) - { - GroupName = await reader.ReadStringAsync(token); - OperatorName = await reader.ReadStringAsync(token); - Source = await reader.ReadStringAsync(token); - Destination = await reader.ReadStringAsync(token); - - int dataCount = await reader.ReadInt32Async(token); - - if (dataCount == 0) - { - throw new Exception("Data Count in Group COmmunication Message cannot be zero"); - } - - MsgType = (MessageType)Enum.Parse(typeof(MessageType), await reader.ReadStringAsync(token)); - Data = new T[dataCount]; - - for (int index = 0; index < dataCount; index++) - { - Data[index] = await _codec.ReadAsync(reader, token); - - if (Data[index] == null) - { - throw new Exception("message instance cannot be created from the IDataReader in Group Communication Message"); - } - } - } - - /// <summary> - /// Writes the class fields. - /// </summary> - /// <param name="writer">The writer to which to write</param> - /// <param name="token">The cancellation token</param> - public override async System.Threading.Tasks.Task WriteAsync(IDataWriter writer, CancellationToken token) - { - await writer.WriteStringAsync(GroupName, token); - await writer.WriteStringAsync(OperatorName, token); - await writer.WriteStringAsync(Source, token); - await writer.WriteStringAsync(Destination, token); - await writer.WriteInt32Async(Data.Length, token); - await writer.WriteStringAsync(MsgType.ToString(), token); - - foreach (var data in Data) - { - await _codec.WriteAsync(data, writer, token); - } - } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessageStreamingCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessageStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessageStreamingCodec.cs new file mode 100644 index 0000000..d619e64 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessageStreamingCodec.cs @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.StreamingCodec; + +namespace Org.Apache.REEF.Network.Group.Driver.Impl +{ + /// <summary> + /// Streaming Codec for the Group Communication Message + /// </summary> + internal sealed class GroupCommunicationMessageStreamingCodec<T> : IStreamingCodec<GroupCommunicationMessage<T>> + { + private readonly IStreamingCodec<T> _codec; + + /// <summary> + /// Empty constructor to allow instantiation by reflection + /// </summary> + [Inject] + private GroupCommunicationMessageStreamingCodec(IStreamingCodec<T> codec) + { + _codec = codec; + } + + /// <summary> + /// Read the class fields. + /// </summary> + /// <param name="reader">The reader from which to read </param> + /// <returns>The Group Communication Message</returns> + public GroupCommunicationMessage<T> Read(IDataReader reader) + { + int metadataSize = reader.ReadInt32(); + byte[] metadata = new byte[metadataSize]; + reader.Read(ref metadata, 0, metadataSize); + var res = GenerateMetaDataDecoding(metadata); + + string groupName = res.Item1; + string operatorName = res.Item2; + string source = res.Item3; + string destination = res.Item4; + int dataCount = res.Item5; + + if (dataCount == 0) + { + throw new Exception("Data Count in Group Communication Message cannot be zero"); + } + + var data = new T[dataCount]; + + for (int index = 0; index < dataCount; index++) + { + data[index] = _codec.Read(reader); + + if (data[index] == null) + { + throw new Exception("message instance cannot be created from the IDataReader in Group Communication Message"); + } + } + + return new GroupCommunicationMessage<T>(groupName, operatorName, source, destination, data); + } + + /// <summary> + /// Writes the class fields. + /// </summary> + /// <param name="obj">The message to write</param> + /// <param name="writer">The writer to which to write</param> + public void Write(GroupCommunicationMessage<T> obj, IDataWriter writer) + { + byte[] encodedMetadata = GenerateMetaDataEncoding(obj); + byte[] encodedInt = BitConverter.GetBytes(encodedMetadata.Length); + byte[] totalEncoding = encodedInt.Concat(encodedMetadata).ToArray(); + writer.Write(totalEncoding, 0, totalEncoding.Length); + + foreach (var data in obj.Data) + { + _codec.Write(data, writer); + } + } + + /// <summary> + /// Read the class fields. + /// </summary> + /// <param name="reader">The reader from which to read </param> + /// <param name="token">The cancellation token</param> + /// <returns>The Group Communication Message</returns> + public async Task<GroupCommunicationMessage<T>> ReadAsync(IDataReader reader, + CancellationToken token) + { + int metadataSize = await reader.ReadInt32Async(token); + byte[] metadata = new byte[metadataSize]; + await reader.ReadAsync(metadata, 0, metadataSize, token); + var res = GenerateMetaDataDecoding(metadata); + + string groupName = res.Item1; + string operatorName = res.Item2; + string source = res.Item3; + string destination = res.Item4; + int dataCount = res.Item5; + + if (dataCount == 0) + { + throw new Exception("Data Count in Group Communication Message cannot be zero"); + } + + var data = new T[dataCount]; + + for (int index = 0; index < dataCount; index++) + { + data[index] = await _codec.ReadAsync(reader, token); + + if (data[index] == null) + { + throw new Exception( + "message instance cannot be created from the IDataReader in Group Communication Message"); + } + } + + return new GroupCommunicationMessage<T>(groupName, operatorName, source, destination, data); + } + + /// <summary> + /// Writes the class fields. + /// </summary> + /// <param name="obj">The message to write</param> + /// <param name="writer">The writer to which to write</param> + /// <param name="token">The cancellation token</param> + public async System.Threading.Tasks.Task WriteAsync(GroupCommunicationMessage<T> obj, IDataWriter writer, CancellationToken token) + { + byte[] encodedMetadata = GenerateMetaDataEncoding(obj); + byte[] encodedInt = BitConverter.GetBytes(encodedMetadata.Length); + byte[] totalEncoding = encodedInt.Concat(encodedMetadata).ToArray(); + await writer.WriteAsync(totalEncoding, 0, totalEncoding.Length, token); + + foreach (var data in obj.Data) + { + await _codec.WriteAsync(data, writer, token); + } + } + + private static byte[] GenerateMetaDataEncoding(GroupCommunicationMessage<T> obj) + { + List<byte[]> metadataBytes = new List<byte[]>(); + + byte[] groupBytes = StringToBytes(obj.GroupName); + byte[] operatorBytes = StringToBytes(obj.OperatorName); + byte[] sourceBytes = StringToBytes(obj.Source); + byte[] dstBytes = StringToBytes(obj.Destination); + byte[] messageCount = BitConverter.GetBytes(obj.Data.Length); + + metadataBytes.Add(BitConverter.GetBytes(groupBytes.Length)); + metadataBytes.Add(BitConverter.GetBytes(operatorBytes.Length)); + metadataBytes.Add(BitConverter.GetBytes(sourceBytes.Length)); + metadataBytes.Add(BitConverter.GetBytes(dstBytes.Length)); + metadataBytes.Add(groupBytes); + metadataBytes.Add(operatorBytes); + metadataBytes.Add(sourceBytes); + metadataBytes.Add(dstBytes); + metadataBytes.Add(messageCount); + + return metadataBytes.SelectMany(i => i).ToArray(); + } + + private static Tuple<string, string, string, string, int> GenerateMetaDataDecoding(byte[] obj) + { + int groupCount = BitConverter.ToInt32(obj, 0); + int operatorCount = BitConverter.ToInt32(obj, sizeof (int)); + int srcCount = BitConverter.ToInt32(obj, 2*sizeof (int)); + int dstCount = BitConverter.ToInt32(obj, 3*sizeof (int)); + + int offset = 4 * sizeof(int); + + string groupString = BytesToString(obj.Skip(offset).Take(groupCount).ToArray()); + offset += groupCount; + string operatorString = BytesToString(obj.Skip(offset).Take(operatorCount).ToArray()); + offset += operatorCount; + string srcString = BytesToString(obj.Skip(offset).Take(srcCount).ToArray()); + offset += srcCount; + string dstString = BytesToString(obj.Skip(offset).Take(dstCount).ToArray()); + offset += dstCount; + int messageCount = BitConverter.ToInt32(obj, offset); + + return new Tuple<string, string, string, string, int>(groupString, operatorString, srcString, dstString, + messageCount); + } + + private static byte[] StringToBytes(string str) + { + byte[] bytes = new byte[str.Length * sizeof(char)]; + Buffer.BlockCopy(str.ToCharArray(), 0, bytes, 0, bytes.Length); + return bytes; + } + + private static string BytesToString(byte[] bytes) + { + char[] chars = new char[bytes.Length / sizeof(char)]; + Buffer.BlockCopy(bytes, 0, chars, 0, bytes.Length); + return new string(chars); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs index a78b13c..41b34a9 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs @@ -29,8 +29,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl /// Group Communication operator used to do point-to-point communication between named Tasks. /// It uses Writable classes /// </summary> - [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] - public sealed class Sender + internal sealed class Sender { private readonly INetworkService<GeneralGroupCommunicationMessage> _networkService; private readonly IIdentifierFactory _idFactory; @@ -42,7 +41,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl /// <param name="idFactory">Used to create IIdentifier for GroupCommunicationMessages.</param> [Inject] private Sender( - WritableNetworkService<GeneralGroupCommunicationMessage> networkService, + StreamingNetworkService<GeneralGroupCommunicationMessage> networkService, IIdentifierFactory idFactory) { _networkService = networkService; @@ -54,7 +53,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl /// included in the message. /// </summary> /// <param name="message">The message to send.</param> - public void Send(GeneralGroupCommunicationMessage message) + internal void Send(GeneralGroupCommunicationMessage message) { if (message == null) { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs index 96c36e1..14dbd96 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs @@ -29,8 +29,7 @@ namespace Org.Apache.REEF.Network.Group.Task /// Writable Version /// </summary> [DefaultImplementation(typeof(CommunicationGroupNetworkObserver))] - // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295. - public interface ICommunicationGroupNetworkObserver : IObserver<GeneralGroupCommunicationMessage> + internal interface ICommunicationGroupNetworkObserver : IObserver<GeneralGroupCommunicationMessage> { /// <summary> /// Registers the handler with the WritableCommunicationGroupNetworkObserver. http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs index de19754..c700d88 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs @@ -30,8 +30,7 @@ namespace Org.Apache.REEF.Network.Group.Task /// Writable Version /// </summary> [DefaultImplementation(typeof(GroupCommNetworkObserver))] - // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295. - public interface IGroupCommNetworkObserver : IObserver<WritableNsMessage<GeneralGroupCommunicationMessage>> + internal interface IGroupCommNetworkObserver : IObserver<NsMessage<GeneralGroupCommunicationMessage>> { /// <summary> /// Registers the network handler for the given CommunicationGroup. http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs index 305d245..cf3e559 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs @@ -19,7 +19,6 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Reflection; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Operators; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs index 5be1457..c3989a9 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs @@ -30,8 +30,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// Handles incoming messages sent to this Communication Group. /// Writable version /// </summary> - // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295. - public sealed class CommunicationGroupNetworkObserver : ICommunicationGroupNetworkObserver + internal sealed class CommunicationGroupNetworkObserver : ICommunicationGroupNetworkObserver { private static readonly Logger LOGGER = Logger.GetLogger(typeof(CommunicationGroupNetworkObserver)); private readonly Dictionary<string, IObserver<GeneralGroupCommunicationMessage>> _handlers; @@ -54,7 +53,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// will be invoked</param> /// <param name="observer">The writable handler to invoke when messages are sent /// to the operator specified by operatorName</param> - public void Register(string operatorName, IObserver<GeneralGroupCommunicationMessage> observer) + void ICommunicationGroupNetworkObserver.Register(string operatorName, IObserver<GeneralGroupCommunicationMessage> observer) { if (string.IsNullOrEmpty(operatorName)) { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs index e79df55..8a54ede 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs @@ -34,7 +34,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// Used by Tasks to fetch CommunicationGroupClients. /// Writable version /// </summary> - // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295. public sealed class GroupCommClient : IGroupCommClient { private readonly Dictionary<string, ICommunicationGroupClientInternal> _commGroups; @@ -50,11 +49,10 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// <param name="configSerializer">Used to deserialize Group Communication configuration</param> /// <param name="injector">injector forked from the injector that creates this instance</param> [Inject] - [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] public GroupCommClient( [Parameter(typeof(GroupCommConfigurationOptions.SerializedGroupConfigs))] ISet<string> groupConfigs, [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId, - WritableNetworkService<GeneralGroupCommunicationMessage> networkService, + StreamingNetworkService<GeneralGroupCommunicationMessage> networkService, AvroConfigurationSerializer configSerializer, IInjector injector) { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs index 9d35ff1..d0f35fe 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs @@ -31,8 +31,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// Handles all incoming messages for this Task. /// Writable version /// </summary> - // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295. - public sealed class GroupCommNetworkObserver : IGroupCommNetworkObserver + internal sealed class GroupCommNetworkObserver : IGroupCommNetworkObserver { private static readonly Logger LOGGER = Logger.GetLogger(typeof(GroupCommNetworkObserver)); @@ -42,7 +41,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// Creates a new GroupCommNetworkObserver. /// </summary> [Inject] - public GroupCommNetworkObserver() + private GroupCommNetworkObserver() { _commGroupHandlers = new Dictionary<string, IObserver<GeneralGroupCommunicationMessage>>(); } @@ -53,7 +52,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// WritableCommunicationGroupNetworkObserver. /// </summary> /// <param name="nsMessage"></param> - public void OnNext(WritableNsMessage<GeneralGroupCommunicationMessage> nsMessage) + public void OnNext(NsMessage<GeneralGroupCommunicationMessage> nsMessage) { if (nsMessage == null) { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs index aa5de1e..00fa9a5 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs @@ -29,7 +29,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// Writable version /// </summary> /// <typeparam name="T"> Generic type of message</typeparam> - // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295. internal sealed class NodeStruct<T> { private readonly BlockingCollection<GroupCommunicationMessage<T>> _messageQueue; @@ -38,7 +37,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// Creates a new NodeStruct. /// </summary> /// <param name="id">The Task identifier</param> - public NodeStruct(string id) + internal NodeStruct(string id) { Identifier = id; _messageQueue = new BlockingCollection<GroupCommunicationMessage<T>>(); @@ -48,13 +47,13 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// Returns the identifier for the Task that sent all /// messages in the message queue. /// </summary> - public string Identifier { get; private set; } + internal string Identifier { get; private set; } /// <summary> /// Gets the first message in the message queue. /// </summary> /// <returns>The first available message.</returns> - public T[] GetData() + internal T[] GetData() { return _messageQueue.Take().Data; } @@ -63,7 +62,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// Adds an incoming message to the message queue. /// </summary> /// <param name="gcm">The incoming message</param> - public void AddData(GroupCommunicationMessage<T> gcm) + internal void AddData(GroupCommunicationMessage<T> gcm) { _messageQueue.Add(gcm); } @@ -72,7 +71,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// Tells whether there is a message in queue or not. /// </summary> /// <returns>True if queue is non empty, false otherwise.</returns> - public bool HasMessage() + internal bool HasMessage() { if (_messageQueue.Count != 0) {
