[REEF-447]Convert Network Service Layer from Writable to Streaming

This addressed the issue by
  * Converting WritableNetworkSerbvice to StreamingNetworkService
  * Intoducing NsMessageStreamingCodec which caches and use StreamingCodecs of 
various GroupCommunicationMessage types.
  * Introducing GroupCommunicationMessage StreamingCodec

JIRA:
  [REEF-447](https://issues.apache.org/jira/browse/REEF-447)

 This Closes #289

Author:    Dhruv <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/8505dee9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/8505dee9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/8505dee9

Branch: refs/heads/master
Commit: 8505dee942f924cbfd72d631bdaabcb20c125f02
Parents: 5cdd655
Author: Dhruv <[email protected]>
Authored: Thu Jul 9 14:52:20 2015 -0700
Committer: Julia Wang <[email protected]>
Committed: Wed Jul 15 14:44:34 2015 -0700

----------------------------------------------------------------------
 .../GroupCommunicationTests.cs                  |  22 +-
 .../GroupCommunicationTreeTopologyTests.cs      |   5 -
 .../StreamingNetworkServiceTests.cs             | 361 +++++++++++++++++++
 .../WritableNetworkServiceTests.cs              | 262 --------------
 .../NetworkService/WritableString.cs            |  94 -----
 .../Org.Apache.REEF.Network.Tests.csproj        |   3 +-
 .../CodecToStreamingCodecConfiguration.cs       |   8 +-
 .../Group/Config/StreamingCodecConfiguration.cs |   8 +-
 .../Driver/Impl/CommunicationGroupDriver.cs     |   2 -
 .../Impl/GeneralGroupCommunicationMessage.cs    |  51 +--
 .../Group/Driver/Impl/GroupCommDriver.cs        |   5 +-
 .../Driver/Impl/GroupCommunicationMessage.cs    | 145 +-------
 .../GroupCommunicationMessageStreamingCodec.cs  | 223 ++++++++++++
 .../Group/Operators/Impl/Sender.cs              |   7 +-
 .../Task/ICommunicationGroupNetworkObserver.cs  |   3 +-
 .../Group/Task/IGroupCommNetworkObserver.cs     |   3 +-
 .../Group/Task/Impl/CommunicationGroupClient.cs |   1 -
 .../Impl/CommunicationGroupNetworkObserver.cs   |   5 +-
 .../Group/Task/Impl/GroupCommClient.cs          |   4 +-
 .../Group/Task/Impl/GroupCommNetworkObserver.cs |   7 +-
 .../Group/Task/Impl/NodeStruct.cs               |  11 +-
 .../Group/Task/Impl/OperatorTopology.cs         |  31 +-
 .../Codec/NsMessageStreamingCodec.cs            | 202 +++++++++++
 .../Codec/StreamingCodecFunctionCache.cs        | 203 +++++++++++
 .../NetworkService/StreamingNetworkService.cs   | 160 ++++++++
 .../NetworkService/WritableNetworkService.cs    | 159 --------
 .../NetworkService/WritableNsConnection.cs      | 138 -------
 .../NetworkService/WritableNsMessage.cs         | 185 ----------
 .../Org.Apache.REEF.Network.csproj              |   7 +-
 .../Org.Apache.REEF.Wake.Tests.csproj           |   1 -
 .../StreamingRemoteManagerTest.cs               | 232 ++++++------
 .../WritableString.cs                           |  95 -----
 .../Org.Apache.REEF.Wake.csproj                 |   2 -
 .../cs/Org.Apache.REEF.Wake/Remote/IWritable.cs |  61 ----
 .../Impl/StreamingRemoteManagerFactory.cs       |   5 +-
 .../Impl/TemporaryWritableToStreamingCodec.cs   |  70 ----
 36 files changed, 1350 insertions(+), 1431 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
 
b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
index 61813af..62a6a3f 100644
--- 
a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
@@ -72,17 +72,17 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
                     new BlockingCollection<GeneralGroupCommunicationMessage>();
 
                 var handler1 =
-                    
Observer.Create<WritableNsMessage<GeneralGroupCommunicationMessage>>(msg => 
messages1.Add(msg.Data.First()));
+                    
Observer.Create<NsMessage<GeneralGroupCommunicationMessage>>(msg => 
messages1.Add(msg.Data.First()));
                 var handler2 =
-                    
Observer.Create<WritableNsMessage<GeneralGroupCommunicationMessage>>(msg => 
messages2.Add(msg.Data.First()));
+                    
Observer.Create<NsMessage<GeneralGroupCommunicationMessage>>(msg => 
messages2.Add(msg.Data.First()));
 
                 var networkServiceInjector1 = 
BuildNetworkServiceInjector(endpoint, handler1);
                 var networkServiceInjector2 = 
BuildNetworkServiceInjector(endpoint, handler2);
 
                 var networkService1 = networkServiceInjector1.GetInstance<
-                  WritableNetworkService<GeneralGroupCommunicationMessage>>();
+                  StreamingNetworkService<GeneralGroupCommunicationMessage>>();
                 var networkService2 = networkServiceInjector2.GetInstance<
-                    
WritableNetworkService<GeneralGroupCommunicationMessage>>();
+                    
StreamingNetworkService<GeneralGroupCommunicationMessage>>();
                 networkService1.Register(new StringIdentifier("id1"));
                 networkService2.Register(new StringIdentifier("id2"));
 
@@ -822,7 +822,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
         }
 
         public static IInjector BuildNetworkServiceInjector(
-            IPEndPoint nameServerEndpoint, 
IObserver<WritableNsMessage<GeneralGroupCommunicationMessage>> handler)
+            IPEndPoint nameServerEndpoint, 
IObserver<NsMessage<GeneralGroupCommunicationMessage>> handler)
         {
             var config = TangFactory.GetTang().NewConfigurationBuilder()
                 .BindNamedParameter(typeof 
(NamingConfigurationOptions.NameServerAddress),
@@ -832,20 +832,24 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
                 .BindNamedParameter(typeof 
(NetworkServiceOptions.NetworkServicePort),
                     (0).ToString(CultureInfo.InvariantCulture))
                 .BindImplementation(GenericType<INameClient>.Class, 
GenericType<NameClient>.Class)
-                
.BindImplementation(GenericType<IStreamingCodec<string>>.Class, 
GenericType<StringStreamingCodec>.Class)
                 .Build();
 
+            var codecConfig = StreamingCodecConfiguration<string>.Conf
+                .Set(StreamingCodecConfiguration<string>.Codec, 
GenericType<StringStreamingCodec>.Class)
+                .Build();
+
+            config = Configurations.Merge(config, codecConfig);
+
             var injector = TangFactory.GetTang().NewInjector(config);
             injector.BindVolatileInstance(
-                
GenericType<IObserver<WritableNsMessage<GeneralGroupCommunicationMessage>>>.Class,
 handler);
+                
GenericType<IObserver<NsMessage<GeneralGroupCommunicationMessage>>>.Class, 
handler);
 
             return injector;
         }
 
         private GroupCommunicationMessage<string> CreateGcmStringType(string 
message, string from, string to)
         {
-            var stringCodec = 
TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
-            return new GroupCommunicationMessage<string>("g1", "op1", from, 
to, message, MessageType.Data, stringCodec);
+            return new GroupCommunicationMessage<string>("g1", "op1", from, 
to, message);
         }
 
         private static void ScatterReceiveReduce(IScatterReceiver<int> 
receiver, IReduceSender<int> sumSender)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
 
b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
index 6b9b8c7..c194bcb 100644
--- 
a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
@@ -18,21 +18,16 @@
  */
 
 using System.Collections.Generic;
-using System.Globalization;
 using System.Linq;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver;
-using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Operators;
 using Org.Apache.REEF.Network.Group.Operators.Impl;
 using Org.Apache.REEF.Network.Group.Pipelining.Impl;
 using Org.Apache.REEF.Network.Group.Topology;
-using Org.Apache.REEF.Tang.Implementations.Configuration;
-using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Wake.Remote.Impl;
 using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
 
 namespace Org.Apache.REEF.Network.Tests.GroupCommunication

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/StreamingNetworkServiceTests.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/StreamingNetworkServiceTests.cs
 
b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/StreamingNetworkServiceTests.cs
new file mode 100644
index 0000000..1e0378e
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/StreamingNetworkServiceTests.cs
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Globalization;
+using System.IO;
+using System.Linq;
+using System.Net;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Network.Naming;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Network.NetworkService.Codec;
+using Org.Apache.REEF.Network.Tests.NamingService;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.StreamingCodec;
+using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Network.Tests.NetworkService
+{
+    /// <summary>
+    /// Tests for Streaming Network Service
+    /// </summary>
+    [TestClass]
+    public class StreamingNetworkServiceTests
+    {
+        /// <summary>
+        /// Tests one way communication between two network services
+        /// </summary>
+        [TestMethod]
+        public void TestStreamingNetworkServiceOneWayCommunication()
+        {
+            int networkServicePort1 = NetworkUtils.GenerateRandomPort(6000, 
7000);
+            int networkServicePort2 = NetworkUtils.GenerateRandomPort(7001, 
8000);
+
+            BlockingCollection<string> queue;
+
+            using (var nameServer = NameServerTests.BuildNameServer())
+            {
+                IPEndPoint endpoint = nameServer.LocalEndpoint;
+                int nameServerPort = endpoint.Port;
+                string nameServerAddr = endpoint.Address.ToString();
+
+                var handlerConf1 =
+                    TangFactory.GetTang()
+                        .NewConfigurationBuilder()
+                        
.BindImplementation(GenericType<IObserver<NsMessage<string>>>.Class,
+                            GenericType<NetworkMessageHandler>.Class)
+                        .Build();
+
+                var handlerConf2 =
+                    TangFactory.GetTang()
+                        .NewConfigurationBuilder()
+                        
.BindImplementation(GenericType<IObserver<NsMessage<string>>>.Class,
+                            GenericType<MessageHandler>.Class)
+                        .Build();
+
+                var networkServiceInjection1 = 
BuildNetworkService(networkServicePort1, nameServerPort, nameServerAddr,
+                    handlerConf1);
+
+                var networkServiceInjection2 = 
BuildNetworkService(networkServicePort2, nameServerPort, nameServerAddr,
+                   handlerConf2);
+
+                using (INetworkService<string> networkService1 = 
networkServiceInjection1.GetInstance<StreamingNetworkService<string>>())
+                using (INetworkService<string> networkService2 = 
networkServiceInjection2.GetInstance<StreamingNetworkService<string>>())
+                {
+                    queue = 
networkServiceInjection2.GetInstance<MessageHandler>().Queue;
+                    IIdentifier id1 = new StringIdentifier("service1");
+                    IIdentifier id2 = new StringIdentifier("service2");
+                    networkService1.Register(id1);
+                    networkService2.Register(id2);
+
+                    using (IConnection<string> connection = 
networkService1.NewConnection(id2))
+                    {
+                        connection.Open();
+                        connection.Write("abc");
+                        connection.Write("def");
+                        connection.Write("ghi");
+
+                        Assert.AreEqual("abc", queue.Take());
+                        Assert.AreEqual("def", queue.Take());
+                        Assert.AreEqual("ghi", queue.Take());
+                    }
+                }
+            }
+        }
+
+        /// <summary>
+        /// Tests two way communication between two network services
+        /// </summary>
+        [TestMethod]
+        public void TestStreamingNetworkServiceTwoWayCommunication()
+        {
+            int networkServicePort1 = NetworkUtils.GenerateRandomPort(6000, 
7000);
+            int networkServicePort2 = NetworkUtils.GenerateRandomPort(7001, 
8000);
+
+            BlockingCollection<string> queue1;
+            BlockingCollection<string> queue2;
+
+            using (var nameServer = NameServerTests.BuildNameServer())
+            {
+                IPEndPoint endpoint = nameServer.LocalEndpoint;
+                int nameServerPort = endpoint.Port;
+                string nameServerAddr = endpoint.Address.ToString();
+
+                var handlerConf =
+                    TangFactory.GetTang()
+                        .NewConfigurationBuilder()
+                        
.BindImplementation(GenericType<IObserver<NsMessage<string>>>.Class,
+                            GenericType<MessageHandler>.Class)
+                        .Build();
+
+                var networkServiceInjection1 = 
BuildNetworkService(networkServicePort1, nameServerPort, nameServerAddr,
+                    handlerConf);
+
+                var networkServiceInjection2 = 
BuildNetworkService(networkServicePort2, nameServerPort, nameServerAddr,
+                   handlerConf);
+
+                using (INetworkService<string> networkService1 = 
networkServiceInjection1.GetInstance<StreamingNetworkService<string>>())
+                using (INetworkService<string> networkService2 = 
networkServiceInjection2.GetInstance<StreamingNetworkService<string>>())
+                {
+                    queue1 = 
networkServiceInjection1.GetInstance<MessageHandler>().Queue;
+                    queue2 = 
networkServiceInjection2.GetInstance<MessageHandler>().Queue;
+
+                    IIdentifier id1 = new StringIdentifier("service1");
+                    IIdentifier id2 = new StringIdentifier("service2");
+                    networkService1.Register(id1);
+                    networkService2.Register(id2);
+
+                    using (IConnection<string> connection1 = 
networkService1.NewConnection(id2))
+                    using (IConnection<string> connection2 = 
networkService2.NewConnection(id1))
+                    {
+                        connection1.Open();
+                        connection1.Write("abc");
+                        connection1.Write("def");
+                        connection1.Write("ghi");
+
+                        connection2.Open();
+                        connection2.Write("jkl");
+                        connection2.Write("nop");
+
+                        Assert.AreEqual("abc", queue2.Take());
+                        Assert.AreEqual("def", queue2.Take());
+                        Assert.AreEqual("ghi", queue2.Take());
+
+                        Assert.AreEqual("jkl", queue1.Take());
+                        Assert.AreEqual("nop", queue1.Take());
+                    }
+                }
+            }
+        }
+
+        /// <summary>
+        /// Tests StreamingCodecFunctionCache
+        /// </summary>
+        [TestMethod]
+        public void TestStreamingCodecFunctionCache()
+        {
+            IConfiguration conf = 
TangFactory.GetTang().NewConfigurationBuilder()
+                .BindImplementation(GenericType<IStreamingCodec<B>>.Class, 
GenericType<BStreamingCodec>.Class)
+                .Build();
+            IInjector injector = TangFactory.GetTang().NewInjector(conf);
+            
+            StreamingCodecFunctionCache<A> cache = new 
StreamingCodecFunctionCache<A>(injector);
+
+            var readFunc = cache.ReadFunction(typeof(B));
+            var writeFunc = cache.WriteFunction(typeof (B));
+            var readAsyncFunc = cache.ReadAsyncFunction(typeof(B));
+            var writeAsyncFunc = cache.WriteAsyncFunction(typeof(B));
+
+            var stream = new MemoryStream();
+            IDataWriter writer = new StreamDataWriter(stream);
+            IDataReader reader = new StreamDataReader(stream);
+
+            B val = new B();
+            val.Value1 = "hello";
+            val.Value2 = "reef";
+
+            writeFunc(val, writer);
+
+            val.Value1 = "helloasync";
+            val.Value2 = "reefasync";
+            CancellationToken token = new CancellationToken();
+            
+            var asyncResult = writeAsyncFunc.BeginInvoke(val, writer, token, 
null, null);
+            writeAsyncFunc.EndInvoke(asyncResult);
+            
+            stream.Position = 0;
+            A res = readFunc(reader);
+            B resB1 = res as B;
+
+            asyncResult = readAsyncFunc.BeginInvoke(reader, token, null, null);
+            res = readAsyncFunc.EndInvoke(asyncResult);
+            B resB2 = res as B;
+            
+            Assert.AreEqual("hello", resB1.Value1);
+            Assert.AreEqual("reef", resB1.Value2);
+            Assert.AreEqual("helloasync", resB2.Value1);
+            Assert.AreEqual("reefasync", resB2.Value2);
+        }
+
+        /// <summary>
+        /// Creates an instance of network service.
+        /// </summary>
+        /// <param name="networkServicePort">The port that the NetworkService 
will listen on</param>
+        /// <param name="nameServicePort">The port of the NameServer</param>
+        /// <param name="nameServiceAddr">The ip address of the 
NameServer</param>
+        /// <param name="handlerConf">The configuration of observer to handle 
incoming messages</param>
+        /// <returns></returns>
+        private IInjector BuildNetworkService(
+            int networkServicePort,
+            int nameServicePort,
+            string nameServiceAddr,
+            IConfiguration handlerConf)
+        {
+            var networkServiceConf = 
TangFactory.GetTang().NewConfigurationBuilder(handlerConf)
+                .BindNamedParameter<NetworkServiceOptions.NetworkServicePort, 
int>(
+                    
GenericType<NetworkServiceOptions.NetworkServicePort>.Class,
+                    networkServicePort.ToString(CultureInfo.CurrentCulture))
+                .BindNamedParameter<NamingConfigurationOptions.NameServerPort, 
int>(
+                    
GenericType<NamingConfigurationOptions.NameServerPort>.Class,
+                    nameServicePort.ToString(CultureInfo.CurrentCulture))
+                
.BindNamedParameter<NamingConfigurationOptions.NameServerAddress, string>(
+                    
GenericType<NamingConfigurationOptions.NameServerAddress>.Class,
+                    nameServiceAddr)
+                .BindImplementation(GenericType<INameClient>.Class, 
GenericType<NameClient>.Class)
+                
.BindImplementation(GenericType<IStreamingCodec<string>>.Class, 
GenericType<StringStreamingCodec>.Class)
+                .Build();
+
+            return TangFactory.GetTang().NewInjector(networkServiceConf);
+        }
+
+        public class A
+        {
+            public string Value1;
+        }
+
+        public class B : A
+        {
+            public string Value2;
+        }
+
+        public class BStreamingCodec : IStreamingCodec<B>
+        {
+            [Inject]
+            public BStreamingCodec()
+            {
+            }
+
+            public B Read(IDataReader reader)
+            {
+                B val = new B();
+                val.Value1 = reader.ReadString();
+                val.Value2 = reader.ReadString();
+                return val;
+            }
+
+            public void Write(B obj, IDataWriter writer)
+            {
+                writer.WriteString(obj.Value1);
+                writer.WriteString(obj.Value2);
+            }
+
+            public async Task<B> ReadAsync(IDataReader reader, 
CancellationToken token)
+            {
+                B val = new B();
+                val.Value1 = await reader.ReadStringAsync(token);
+                val.Value2 = await reader.ReadStringAsync(token);
+                return val;
+            }
+
+            public async Task WriteAsync(B obj, IDataWriter writer, 
CancellationToken token)
+            {
+                await writer.WriteStringAsync(obj.Value1, token);
+                await writer.WriteStringAsync(obj.Value2, token);
+            }
+        } 
+        /// <summary>
+        /// The observer to handle incoming messages for string
+        /// </summary>
+        private class MessageHandler : IObserver<NsMessage<string>>
+        {
+            private readonly BlockingCollection<string> _queue;
+
+            public BlockingCollection<string> Queue
+            {
+                get { return _queue; }
+            } 
+
+            [Inject]
+            private MessageHandler()
+            {
+                _queue = new BlockingCollection<string>();
+            }
+
+            public void OnNext(NsMessage<string> value)
+            {
+                _queue.Add(value.Data.First());
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+        }
+
+        /// <summary>
+        /// The network handler to handle incoming Streaming NsMessages
+        /// </summary>
+        private class NetworkMessageHandler : IObserver<NsMessage<string>>
+        {
+            [Inject]
+            public NetworkMessageHandler()
+            {
+            }
+
+            public void OnNext(NsMessage<string> value)
+            {
+            }
+
+            public void OnError(Exception error)
+            {
+            }
+
+            public void OnCompleted()
+            {
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableNetworkServiceTests.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableNetworkServiceTests.cs
 
b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableNetworkServiceTests.cs
deleted file mode 100644
index 07464ff..0000000
--- 
a/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableNetworkServiceTests.cs
+++ /dev/null
@@ -1,262 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Concurrent;
-using System.Globalization;
-using System.Linq;
-using System.Net;
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Org.Apache.REEF.Common.Io;
-using Org.Apache.REEF.Network.Naming;
-using Org.Apache.REEF.Network.NetworkService;
-using Org.Apache.REEF.Network.Tests.NamingService;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Wake;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Remote.Impl;
-using Org.Apache.REEF.Wake.Util;
-
-namespace Org.Apache.REEF.Network.Tests.NetworkService
-{
-    /// <summary>
-    /// Tests for Writable Network Service
-    /// </summary>
-    [TestClass]
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see 
Jira REEF-295 ", false)]
-    public class WritableNetworkServiceTests
-    {
-        /// <summary>
-        /// Tests one way communication between two network services
-        /// </summary>
-        [TestMethod]
-        public void TestWritableNetworkServiceOneWayCommunication()
-        {
-            int networkServicePort1 = NetworkUtils.GenerateRandomPort(6000, 
7000);
-            int networkServicePort2 = NetworkUtils.GenerateRandomPort(7001, 
8000);
-
-            BlockingCollection<WritableString> queue;
-
-            using (var nameServer = NameServerTests.BuildNameServer())
-            {
-                IPEndPoint endpoint = nameServer.LocalEndpoint;
-                int nameServerPort = endpoint.Port;
-                string nameServerAddr = endpoint.Address.ToString();
-
-                var handlerConf1 =
-                    TangFactory.GetTang()
-                        .NewConfigurationBuilder()
-                        
.BindImplementation(GenericType<IObserver<WritableNsMessage<WritableString>>>.Class,
-                            GenericType<NetworkMessageHandler>.Class)
-                        .Build();
-
-                var handlerConf2 =
-                    TangFactory.GetTang()
-                        .NewConfigurationBuilder()
-                        
.BindImplementation(GenericType<IObserver<WritableNsMessage<WritableString>>>.Class,
-                            GenericType<MessageHandler>.Class)
-                        .Build();
-
-                var networkServiceInjection1 = 
BuildNetworkService(networkServicePort1, nameServerPort, nameServerAddr,
-                    handlerConf1);
-
-                var networkServiceInjection2 = 
BuildNetworkService(networkServicePort2, nameServerPort, nameServerAddr,
-                   handlerConf2);
-
-                using (INetworkService<WritableString> networkService1 = 
networkServiceInjection1.GetInstance<WritableNetworkService<WritableString>>())
-                using (INetworkService<WritableString> networkService2 = 
networkServiceInjection2.GetInstance<WritableNetworkService<WritableString>>())
-                {
-                    queue = 
networkServiceInjection2.GetInstance<MessageHandler>().Queue;
-                    IIdentifier id1 = new StringIdentifier("service1");
-                    IIdentifier id2 = new StringIdentifier("service2");
-                    networkService1.Register(id1);
-                    networkService2.Register(id2);
-
-                    using (IConnection<WritableString> connection = 
networkService1.NewConnection(id2))
-                    {
-                        connection.Open();
-                        connection.Write(new WritableString("abc"));
-                        connection.Write(new WritableString("def"));
-                        connection.Write(new WritableString("ghi"));
-
-                        Assert.AreEqual("abc", queue.Take().Data);
-                        Assert.AreEqual("def", queue.Take().Data);
-                        Assert.AreEqual("ghi", queue.Take().Data);
-                    }
-                }
-            }
-        }
-
-        /// <summary>
-        /// Tests two way communication between two network services
-        /// </summary>
-        [TestMethod]
-        public void TestWritableNetworkServiceTwoWayCommunication()
-        {
-            int networkServicePort1 = NetworkUtils.GenerateRandomPort(6000, 
7000);
-            int networkServicePort2 = NetworkUtils.GenerateRandomPort(7001, 
8000);
-
-            BlockingCollection<WritableString> queue1;
-            BlockingCollection<WritableString> queue2;
-
-            using (var nameServer = NameServerTests.BuildNameServer())
-            {
-                IPEndPoint endpoint = nameServer.LocalEndpoint;
-                int nameServerPort = endpoint.Port;
-                string nameServerAddr = endpoint.Address.ToString();
-
-                var handlerConf =
-                    TangFactory.GetTang()
-                        .NewConfigurationBuilder()
-                        
.BindImplementation(GenericType<IObserver<WritableNsMessage<WritableString>>>.Class,
-                            GenericType<MessageHandler>.Class)
-                        .Build();
-
-                var networkServiceInjection1 = 
BuildNetworkService(networkServicePort1, nameServerPort, nameServerAddr,
-                    handlerConf);
-
-                var networkServiceInjection2 = 
BuildNetworkService(networkServicePort2, nameServerPort, nameServerAddr,
-                   handlerConf);
-
-                using (INetworkService<WritableString> networkService1 = 
networkServiceInjection1.GetInstance<WritableNetworkService<WritableString>>())
-                using (INetworkService<WritableString> networkService2 = 
networkServiceInjection2.GetInstance<WritableNetworkService<WritableString>>())
-                {
-                    queue1 = 
networkServiceInjection1.GetInstance<MessageHandler>().Queue;
-                    queue2 = 
networkServiceInjection2.GetInstance<MessageHandler>().Queue;
-
-                    IIdentifier id1 = new StringIdentifier("service1");
-                    IIdentifier id2 = new StringIdentifier("service2");
-                    networkService1.Register(id1);
-                    networkService2.Register(id2);
-
-                    using (IConnection<WritableString> connection1 = 
networkService1.NewConnection(id2))
-                    using (IConnection<WritableString> connection2 = 
networkService2.NewConnection(id1))
-                    {
-                        connection1.Open();
-                        connection1.Write(new WritableString("abc"));
-                        connection1.Write(new WritableString("def"));
-                        connection1.Write(new WritableString("ghi"));
-
-                        connection2.Open();
-                        connection2.Write(new WritableString("jkl"));
-                        connection2.Write(new WritableString("nop"));
-
-                        Assert.AreEqual("abc", queue2.Take().Data);
-                        Assert.AreEqual("def", queue2.Take().Data);
-                        Assert.AreEqual("ghi", queue2.Take().Data);
-
-                        Assert.AreEqual("jkl", queue1.Take().Data);
-                        Assert.AreEqual("nop", queue1.Take().Data);
-                    }
-                }
-            }
-        }
-
-        /// <summary>
-        /// Creates an instance of network service.
-        /// </summary>
-        /// <param name="networkServicePort">The port that the NetworkService 
will listen on</param>
-        /// <param name="nameServicePort">The port of the NameServer</param>
-        /// <param name="nameServiceAddr">The ip address of the 
NameServer</param>
-        /// <param name="factory">Identifier factory for WritableString</param>
-        /// <param name="handler">The observer to handle incoming 
messages</param>
-        /// <returns></returns>
-        private IInjector BuildNetworkService(
-            int networkServicePort,
-            int nameServicePort,
-            string nameServiceAddr,
-            IConfiguration handlerConf)
-        {
-            var networkServiceConf = 
TangFactory.GetTang().NewConfigurationBuilder(handlerConf)
-                .BindNamedParameter<NetworkServiceOptions.NetworkServicePort, 
int>(
-                    
GenericType<NetworkServiceOptions.NetworkServicePort>.Class,
-                    networkServicePort.ToString(CultureInfo.CurrentCulture))
-                .BindNamedParameter<NamingConfigurationOptions.NameServerPort, 
int>(
-                    
GenericType<NamingConfigurationOptions.NameServerPort>.Class,
-                    nameServicePort.ToString(CultureInfo.CurrentCulture))
-                
.BindNamedParameter<NamingConfigurationOptions.NameServerAddress, string>(
-                    
GenericType<NamingConfigurationOptions.NameServerAddress>.Class,
-                    nameServiceAddr)
-                .BindImplementation(GenericType<INameClient>.Class, 
GenericType<NameClient>.Class)
-                .Build();
-
-            return TangFactory.GetTang().NewInjector(networkServiceConf);
-        }
-
-        /// <summary>
-        /// The observer to handle incoming messages for WritableString
-        /// </summary>
-        private class MessageHandler : 
IObserver<WritableNsMessage<WritableString>>
-        {
-            private readonly BlockingCollection<WritableString> _queue;
-
-            public BlockingCollection<WritableString> Queue
-            {
-                get { return _queue; }
-            } 
-
-            [Inject]
-            private MessageHandler()
-            {
-                _queue = new BlockingCollection<WritableString>();
-            }
-
-            public void OnNext(WritableNsMessage<WritableString> value)
-            {
-                _queue.Add(value.Data.First());
-            }
-
-            public void OnError(Exception error)
-            {
-                throw new NotImplementedException();
-            }
-
-            public void OnCompleted()
-            {
-                throw new NotImplementedException();
-            }
-        }
-
-        /// <summary>
-        /// The network handler to handle incoming Writable NsMessages
-        /// </summary>
-        private class NetworkMessageHandler : 
IObserver<WritableNsMessage<WritableString>>
-        {
-            [Inject]
-            public NetworkMessageHandler()
-            {
-            }
-
-            public void OnNext(WritableNsMessage<WritableString> value)
-            {
-            }
-
-            public void OnError(Exception error)
-            {
-            }
-
-            public void OnCompleted()
-            {
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableString.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableString.cs 
b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableString.cs
deleted file mode 100644
index 400aa52..0000000
--- a/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableString.cs
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Tang.Annotations;
-
-namespace Org.Apache.REEF.Network.Tests.NetworkService
-{
-    /// <summary>
-    /// Writable wrapper around the string class
-    /// </summary>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see 
Jira REEF-295 ", false)]
-    public class WritableString : IWritable
-    {
-        /// <summary>
-        /// Returns the actual string data
-        /// </summary>
-        public string Data { get; set; }
-        
-        /// <summary>
-        /// Empty constructor for instantiation with reflection
-        /// </summary>
-        [Inject]
-        public WritableString()
-        {
-        }
-
-        /// <summary>
-        /// Constructor
-        /// </summary>
-        /// <param name="data">The string data</param>
-        public WritableString(string data)
-        {
-            Data = data;
-        }
-
-        /// <summary>
-        /// Reads the string
-        /// </summary>
-        /// <param name="reader">reader to read from</param>
-        public void Read(IDataReader reader)
-        {
-            Data = reader.ReadString();
-        }
-
-        /// <summary>
-        /// Writes the string
-        /// </summary>
-        /// <param name="writer">Writer to write</param>
-        public void Write(IDataWriter writer)
-        {
-            writer.WriteString(Data);
-        }
-
-        /// <summary>
-        /// Reads the string
-        /// </summary>
-        /// <param name="reader">reader to read from</param>
-        /// <param name="token">the cancellation token</param>
-        public async Task ReadAsync(IDataReader reader, CancellationToken 
token)
-        {
-            Data = await reader.ReadStringAsync(token);
-        }
-
-        /// <summary>
-        /// Writes the string
-        /// </summary>
-        /// <param name="writer">Writer to write</param>
-        /// <param name="token">the cancellation token</param>
-        public async Task WriteAsync(IDataWriter writer, CancellationToken 
token)
-        {
-            await writer.WriteStringAsync(Data, token);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj 
b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
index e0568aa..3355ac7 100644
--- a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
@@ -55,9 +55,8 @@ under the License.
     <Compile 
Include="GroupCommunication\GroupCommunicationTreeTopologyTests.cs" />
     <Compile Include="GroupCommunication\StreamingCodecTests.cs" />
     <Compile Include="NamingService\NameServerTests.cs" />
-    <Compile Include="NetworkService\WritableNetworkServiceTests.cs" />
     <Compile Include="NetworkService\NetworkServiceTests.cs" />
-    <Compile Include="NetworkService\WritableString.cs" />
+    <Compile Include="NetworkService\StreamingNetworkServiceTests.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
   </ItemGroup>
   <ItemGroup>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs
 
b/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs
index f8e1483..da156ca 100644
--- 
a/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Pipelining;
 using Org.Apache.REEF.Tang.Formats;
 using Org.Apache.REEF.Tang.Util;
@@ -38,7 +39,12 @@ namespace Org.Apache.REEF.Network.Group.Config
         public static ConfigurationModule Conf = new 
CodecToStreamingCodecConfiguration<T>()
             .BindImplementation(GenericType<ICodec<T>>.Class, Codec)
             .BindImplementation(GenericType<IStreamingCodec<T>>.Class, 
GenericType<CodecToStreamingCodec<T>>.Class)
-            
.BindImplementation(GenericType<IStreamingCodec<PipelineMessage<T>>>.Class, 
GenericType<StreamingPipelineMessageCodec<T>>.Class)
+            
.BindImplementation(GenericType<IStreamingCodec<PipelineMessage<T>>>.Class,
+                GenericType<StreamingPipelineMessageCodec<T>>.Class)
+            
.BindImplementation(GenericType<IStreamingCodec<GroupCommunicationMessage<T>>>.Class,
+                GenericType<GroupCommunicationMessageStreamingCodec<T>>.Class)
+            
.BindImplementation(GenericType<IStreamingCodec<GroupCommunicationMessage<PipelineMessage<T>>>>.Class,
+                
GenericType<GroupCommunicationMessageStreamingCodec<PipelineMessage<T>>>.Class)
             .Build();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs
index 2a30047..20c6ade 100644
--- 
a/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Pipelining;
 using Org.Apache.REEF.Tang.Formats;
 using Org.Apache.REEF.Tang.Util;
@@ -41,7 +42,12 @@ namespace Org.Apache.REEF.Network.Group.Config
         /// </summary>
         public static ConfigurationModule Conf = new 
StreamingCodecConfiguration<T>()
             .BindImplementation(GenericType<IStreamingCodec<T>>.Class, Codec)
-            
.BindImplementation(GenericType<IStreamingCodec<PipelineMessage<T>>>.Class, 
GenericType<StreamingPipelineMessageCodec<T>>.Class)
+            
.BindImplementation(GenericType<IStreamingCodec<PipelineMessage<T>>>.Class,
+                GenericType<StreamingPipelineMessageCodec<T>>.Class)
+            
.BindImplementation(GenericType<IStreamingCodec<GroupCommunicationMessage<T>>>.Class,
+                GenericType<GroupCommunicationMessageStreamingCodec<T>>.Class)
+            
.BindImplementation(GenericType<IStreamingCodec<GroupCommunicationMessage<PipelineMessage<T>>>>.Class,
+                
GenericType<GroupCommunicationMessageStreamingCodec<PipelineMessage<T>>>.Class)
             .Build();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
index 5ebb357..001c110 100644
--- 
a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
@@ -17,7 +17,6 @@
  * under the License.
  */
 
-using System;
 using System.Collections.Generic;
 using System.Reflection;
 using Org.Apache.REEF.Network.Group.Config;
@@ -39,7 +38,6 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
     /// All operators in the same Communication Group run on the the 
     /// same set of tasks.
     /// </summary>
-    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira 
REEF-295.]
     public sealed class CommunicationGroupDriver : ICommunicationGroupDriver
     {
         private static readonly Logger LOGGER = Logger.GetLogger(typeof 
(CommunicationGroupDriver));

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs
 
b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs
index e807a4e..9aa49a4 100644
--- 
a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs
@@ -25,11 +25,10 @@ using Org.Apache.REEF.Tang.Annotations;
 namespace Org.Apache.REEF.Network.Group.Driver.Impl
 {
     /// <summary>
-    /// Messages sent by MPI Operators. This is the abstract class inherited 
by 
-    /// WritableGroupCommunicationMessage but seen by Network Service
+    /// Messages sent by MPI Operators. This is the class inherited by 
+    /// GroupCommunicationMessage but seen by Network Service
     /// </summary>
-    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira 
REEF-295.
-    public abstract class GeneralGroupCommunicationMessage : IWritable
+    public class GeneralGroupCommunicationMessage
     {        
         /// <summary>
         /// Empty constructor to allow instantiation by reflection
@@ -45,70 +44,36 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         /// <param name="operatorName">The name of the MPI operator</param>
         /// <param name="source">The message source</param>
         /// <param name="destination">The message destination</param>
-        /// <param name="messageType">The type of message to send</param>
         protected GeneralGroupCommunicationMessage(
             string groupName,
             string operatorName,
             string source,
-            string destination,
-            MessageType messageType)
+            string destination)
         {
             GroupName = groupName;
             OperatorName = operatorName;
             Source = source;
             Destination = destination;
-            MsgType = messageType;
         }
 
         /// <summary>
         /// Returns the Communication Group name.
         /// </summary>
-        public string GroupName { get; internal set; }
+        internal string GroupName { get; set; }
 
         /// <summary>
         /// Returns the MPI Operator name.
         /// </summary>
-        public string OperatorName { get; internal set; }
+        internal string OperatorName { get; set; }
 
         /// <summary>
         /// Returns the source of the message.
         /// </summary>
-        public string Source { get; internal set; }
+        internal string Source { get; set; }
 
         /// <summary>
         /// Returns the destination of the message.
         /// </summary>
-        public string Destination { get; internal set; }
-
-        /// <summary>
-        /// Returns the type of message being sent.
-        /// </summary>
-        public MessageType MsgType { get; internal set; }
-
-        /// <summary>
-        /// Read the class fields.
-        /// </summary>
-        /// <param name="reader">The reader from which to read </param>
-        public abstract void Read(IDataReader reader);
-
-        /// <summary>
-        /// Writes the class fields.
-        /// </summary>
-        /// <param name="writer">The writer to which to write</param>
-        public abstract void Write(IDataWriter writer);
-
-        /// <summary>
-        /// Read the class fields.
-        /// </summary>
-        /// <param name="reader">The reader from which to read </param>
-        /// <param name="token">The cancellation token</param>
-        public abstract System.Threading.Tasks.Task ReadAsync(IDataReader 
reader, CancellationToken token);
-
-        /// <summary>
-        /// Writes the class fields.
-        /// </summary>
-        /// <param name="writer">The writer to which to write</param>
-        /// <param name="token">The cancellation token</param>
-        public abstract System.Threading.Tasks.Task WriteAsync(IDataWriter 
writer, CancellationToken token);
+        internal string Destination { get; set; }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
index ade5834..5bf0848 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
@@ -42,7 +42,6 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
     /// Used to create Communication Groups for Group Communication Operators 
on the Reef driver.
     /// Also manages configuration for Group Communication tasks/services.
     /// </summary>
-    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira 
REEF-295.
     public sealed class GroupCommDriver : IGroupCommDriver
     {
         private const string MasterTaskContextName = "MasterTaskContext";
@@ -158,12 +157,12 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         public IConfiguration GetServiceConfiguration()
         {
             IConfiguration serviceConfig = 
ServiceConfiguration.ConfigurationModule
-                .Set(ServiceConfiguration.Services, 
GenericType<WritableNetworkService<GeneralGroupCommunicationMessage>>.Class)
+                .Set(ServiceConfiguration.Services, 
GenericType<StreamingNetworkService<GeneralGroupCommunicationMessage>>.Class)
                 .Build();
 
             return TangFactory.GetTang().NewConfigurationBuilder(serviceConfig)
                 .BindImplementation(
-                    
GenericType<IObserver<WritableNsMessage<GeneralGroupCommunicationMessage>>>.Class,
+                    
GenericType<IObserver<NsMessage<GeneralGroupCommunicationMessage>>>.Class,
                     GenericType<GroupCommNetworkObserver>.Class)
                 
.BindNamedParameter<NamingConfigurationOptions.NameServerAddress, string>(
                     
GenericType<NamingConfigurationOptions.NameServerAddress>.Class,

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
 
b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
index ed7855b..c53cdb0 100644
--- 
a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
@@ -17,30 +17,21 @@
  * under the License.
  */
 
-using System;
-using System.Threading;
-using Org.Apache.REEF.Wake.Remote;
 using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake.StreamingCodec;
 
 namespace Org.Apache.REEF.Network.Group.Driver.Impl
 {
     /// <summary>
-    /// Messages sent by MPI Operators. This is the Writable version of 
GroupCommunicationMessage
-    ///  class and will eventually replace it once everybody agrees with the 
design
+    /// Messages sent by MPI Operators.
     /// </summary>
-    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira 
REEF-295.
-    public sealed class GroupCommunicationMessage<T> : 
GeneralGroupCommunicationMessage
+    internal sealed class GroupCommunicationMessage<T> : 
GeneralGroupCommunicationMessage
     {
-        private readonly IStreamingCodec<T> _codec;
-
         /// <summary>
         /// Empty constructor to allow instantiation by reflection
         /// </summary>
         [Inject]
-        private GroupCommunicationMessage(IStreamingCodec<T> codec)
+        private GroupCommunicationMessage()
         {
-            _codec = codec;
         }
 
         /// <summary>
@@ -51,20 +42,15 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         /// <param name="source">The message source</param>
         /// <param name="destination">The message destination</param>
         /// <param name="message">The actual Writable message</param>
-        /// <param name="messageType">The type of message to send</param>
-        /// <param name="codec">Streaming Codec</param>
-        public GroupCommunicationMessage(
+        internal GroupCommunicationMessage(
             string groupName,
             string operatorName,
             string source,
             string destination,
-            T message,
-            MessageType messageType,
-            IStreamingCodec<T> codec)
-            : base(groupName, operatorName, source, destination, messageType)
+            T message)
+            : base(groupName, operatorName, source, destination)
         {
-            _codec = codec;
-            Data = new T[] { message };
+            Data = new[] { message };
         }
 
         /// <summary>
@@ -75,133 +61,24 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         /// <param name="source">The message source</param>
         /// <param name="destination">The message destination</param>
         /// <param name="message">The actual Writable message array</param>
-        /// <param name="messageType">The type of message to send</param>
-        /// <param name="codec">Streaming Codec</param>
-        public GroupCommunicationMessage(
+        internal GroupCommunicationMessage(
             string groupName,
             string operatorName,
             string source,
             string destination,
-            T[] message,
-            MessageType messageType,
-            IStreamingCodec<T> codec)
-            : base(groupName, operatorName, source, destination, messageType)
+            T[] message)
+            : base(groupName, operatorName, source, destination)
         {
-            _codec = codec;
             Data = message;
         }
 
         /// <summary>
         /// Returns the array of messages.
         /// </summary>
-        public T[] Data
+        internal T[] Data
         {
             get;
             set;
         }
-
-        /// <summary>
-        /// Read the class fields.
-        /// </summary>
-        /// <param name="reader">The reader from which to read </param>
-        public override void Read(IDataReader reader)
-        {
-            GroupName = reader.ReadString();
-            OperatorName = reader.ReadString();
-            Source = reader.ReadString();
-            Destination = reader.ReadString();
-
-            int dataCount = reader.ReadInt32();
-
-            if (dataCount == 0)
-            {
-                throw new Exception("Data Count in Group COmmunication Message 
cannot be zero");
-            }
-
-            MsgType = (MessageType)Enum.Parse(typeof(MessageType), 
reader.ReadString());
-            Data = new T[dataCount];
-
-            for (int index = 0; index < dataCount; index++)
-            {
-                Data[index] = _codec.Read(reader);
-
-                if (Data[index] == null)
-                {
-                    throw new Exception("message instance cannot be created 
from the IDataReader in Group Communication Message");
-                }
-            }
-        }
-
-        /// <summary>
-        /// Writes the class fields.
-        /// </summary>
-        /// <param name="writer">The writer to which to write</param>
-        public override void Write(IDataWriter writer)
-        {
-            writer.WriteString(GroupName);
-            writer.WriteString(OperatorName);
-            writer.WriteString(Source);
-            writer.WriteString(Destination);
-            writer.WriteInt32(Data.Length);
-            writer.WriteString(MsgType.ToString());
-
-            foreach (var data in Data)
-            {
-                _codec.Write(data, writer);
-            }
-        }
-
-        /// <summary>
-        /// Read the class fields.
-        /// </summary>
-        /// <param name="reader">The reader from which to read </param>
-        /// <param name="token">The cancellation token</param>
-        public override async System.Threading.Tasks.Task 
ReadAsync(IDataReader reader, CancellationToken token)
-        {
-            GroupName = await reader.ReadStringAsync(token);
-            OperatorName = await reader.ReadStringAsync(token);
-            Source = await reader.ReadStringAsync(token);
-            Destination = await reader.ReadStringAsync(token);
-
-            int dataCount = await reader.ReadInt32Async(token);
-
-            if (dataCount == 0)
-            {
-                throw new Exception("Data Count in Group COmmunication Message 
cannot be zero");
-            }
-
-            MsgType = (MessageType)Enum.Parse(typeof(MessageType), await 
reader.ReadStringAsync(token));
-            Data = new T[dataCount];
-
-            for (int index = 0; index < dataCount; index++)
-            {
-                Data[index] = await _codec.ReadAsync(reader, token);
-
-                if (Data[index] == null)
-                {
-                    throw new Exception("message instance cannot be created 
from the IDataReader in Group Communication Message");
-                }
-            }
-        }
-
-        /// <summary>
-        /// Writes the class fields.
-        /// </summary>
-        /// <param name="writer">The writer to which to write</param>
-        /// <param name="token">The cancellation token</param>
-        public override async System.Threading.Tasks.Task 
WriteAsync(IDataWriter writer, CancellationToken token)
-        {
-            await writer.WriteStringAsync(GroupName, token);
-            await writer.WriteStringAsync(OperatorName, token);
-            await writer.WriteStringAsync(Source, token);
-            await writer.WriteStringAsync(Destination, token);
-            await writer.WriteInt32Async(Data.Length, token);
-            await writer.WriteStringAsync(MsgType.ToString(), token);
-
-            foreach (var data in Data)
-            {
-                await _codec.WriteAsync(data, writer, token);
-            }
-        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessageStreamingCodec.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessageStreamingCodec.cs
 
b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessageStreamingCodec.cs
new file mode 100644
index 0000000..d619e64
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessageStreamingCodec.cs
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.Network.Group.Driver.Impl
+{
+    /// <summary>
+    /// Streaming Codec for the Group Communication Message
+    /// </summary>
+    internal sealed class GroupCommunicationMessageStreamingCodec<T> : 
IStreamingCodec<GroupCommunicationMessage<T>>
+    {
+        private readonly IStreamingCodec<T> _codec;
+
+        /// <summary>
+        /// Empty constructor to allow instantiation by reflection
+        /// </summary>
+        [Inject]
+        private GroupCommunicationMessageStreamingCodec(IStreamingCodec<T> 
codec)
+        {
+            _codec = codec;
+        }
+
+        /// <summary>
+        /// Read the class fields.
+        /// </summary>
+        /// <param name="reader">The reader from which to read </param>
+        /// <returns>The Group Communication Message</returns>
+        public GroupCommunicationMessage<T> Read(IDataReader reader)
+        {
+            int metadataSize = reader.ReadInt32();
+            byte[] metadata = new byte[metadataSize];
+            reader.Read(ref metadata, 0, metadataSize);
+            var res = GenerateMetaDataDecoding(metadata);
+
+            string groupName = res.Item1;
+            string operatorName = res.Item2;
+            string source = res.Item3;
+            string destination = res.Item4;
+            int dataCount = res.Item5;
+
+            if (dataCount == 0)
+            {
+                throw new Exception("Data Count in Group Communication Message 
cannot be zero");
+            }
+
+            var data = new T[dataCount];
+
+            for (int index = 0; index < dataCount; index++)
+            {
+                data[index] = _codec.Read(reader);
+
+                if (data[index] == null)
+                {
+                    throw new Exception("message instance cannot be created 
from the IDataReader in Group Communication Message");
+                }
+            }
+
+            return new GroupCommunicationMessage<T>(groupName, operatorName, 
source, destination, data);
+        }
+
+        /// <summary>
+        /// Writes the class fields.
+        /// </summary>
+        /// <param name="obj">The message to write</param>
+        /// <param name="writer">The writer to which to write</param>
+        public void Write(GroupCommunicationMessage<T> obj, IDataWriter writer)
+        {
+            byte[] encodedMetadata = GenerateMetaDataEncoding(obj);
+            byte[] encodedInt = BitConverter.GetBytes(encodedMetadata.Length);
+            byte[] totalEncoding = 
encodedInt.Concat(encodedMetadata).ToArray();
+            writer.Write(totalEncoding, 0, totalEncoding.Length);
+
+            foreach (var data in obj.Data)
+            {
+                _codec.Write(data, writer);
+            }
+        }
+
+        /// <summary>
+        /// Read the class fields.
+        /// </summary>
+        /// <param name="reader">The reader from which to read </param>
+        /// <param name="token">The cancellation token</param>
+        /// <returns>The Group Communication Message</returns>
+        public async Task<GroupCommunicationMessage<T>> ReadAsync(IDataReader 
reader,
+            CancellationToken token)
+        {
+            int metadataSize = await reader.ReadInt32Async(token);
+            byte[] metadata = new byte[metadataSize];
+            await reader.ReadAsync(metadata, 0, metadataSize, token);
+            var res = GenerateMetaDataDecoding(metadata);
+
+            string groupName = res.Item1;
+            string operatorName = res.Item2;
+            string source = res.Item3;
+            string destination = res.Item4;
+            int dataCount = res.Item5;
+
+            if (dataCount == 0)
+            {
+                throw new Exception("Data Count in Group Communication Message 
cannot be zero");
+            }
+
+            var data = new T[dataCount];
+
+            for (int index = 0; index < dataCount; index++)
+            {
+                data[index] = await _codec.ReadAsync(reader, token);
+
+                if (data[index] == null)
+                {
+                    throw new Exception(
+                        "message instance cannot be created from the 
IDataReader in Group Communication Message");
+                }
+            }
+
+            return new GroupCommunicationMessage<T>(groupName, operatorName, 
source, destination, data);
+        }
+
+        /// <summary>
+        /// Writes the class fields.
+        /// </summary>
+        /// <param name="obj">The message to write</param>
+        /// <param name="writer">The writer to which to write</param>
+        /// <param name="token">The cancellation token</param>
+        public async System.Threading.Tasks.Task 
WriteAsync(GroupCommunicationMessage<T> obj, IDataWriter writer, 
CancellationToken token)
+        {
+            byte[] encodedMetadata = GenerateMetaDataEncoding(obj);
+            byte[] encodedInt = BitConverter.GetBytes(encodedMetadata.Length);
+            byte[] totalEncoding = 
encodedInt.Concat(encodedMetadata).ToArray();
+            await writer.WriteAsync(totalEncoding, 0, totalEncoding.Length, 
token);
+
+            foreach (var data in obj.Data)
+            {
+                await _codec.WriteAsync(data, writer, token);
+            }
+        }
+
+        private static byte[] 
GenerateMetaDataEncoding(GroupCommunicationMessage<T> obj)
+        {
+            List<byte[]> metadataBytes = new List<byte[]>();
+
+            byte[] groupBytes = StringToBytes(obj.GroupName);
+            byte[] operatorBytes = StringToBytes(obj.OperatorName);
+            byte[] sourceBytes = StringToBytes(obj.Source);
+            byte[] dstBytes = StringToBytes(obj.Destination);
+            byte[] messageCount = BitConverter.GetBytes(obj.Data.Length);
+
+            metadataBytes.Add(BitConverter.GetBytes(groupBytes.Length));
+            metadataBytes.Add(BitConverter.GetBytes(operatorBytes.Length));
+            metadataBytes.Add(BitConverter.GetBytes(sourceBytes.Length));
+            metadataBytes.Add(BitConverter.GetBytes(dstBytes.Length));
+            metadataBytes.Add(groupBytes);
+            metadataBytes.Add(operatorBytes);
+            metadataBytes.Add(sourceBytes);
+            metadataBytes.Add(dstBytes);
+            metadataBytes.Add(messageCount);
+
+            return metadataBytes.SelectMany(i => i).ToArray();
+        }
+
+        private static Tuple<string, string, string, string, int> 
GenerateMetaDataDecoding(byte[] obj)
+        {
+            int groupCount = BitConverter.ToInt32(obj, 0);
+            int operatorCount = BitConverter.ToInt32(obj, sizeof (int));
+            int srcCount = BitConverter.ToInt32(obj, 2*sizeof (int));
+            int dstCount = BitConverter.ToInt32(obj, 3*sizeof (int));
+
+            int offset = 4 * sizeof(int);
+
+            string groupString = 
BytesToString(obj.Skip(offset).Take(groupCount).ToArray());
+            offset += groupCount;
+            string operatorString = 
BytesToString(obj.Skip(offset).Take(operatorCount).ToArray());
+            offset += operatorCount;
+            string srcString = 
BytesToString(obj.Skip(offset).Take(srcCount).ToArray());
+            offset += srcCount;
+            string dstString = 
BytesToString(obj.Skip(offset).Take(dstCount).ToArray());
+            offset += dstCount;
+            int messageCount = BitConverter.ToInt32(obj, offset);
+
+            return new Tuple<string, string, string, string, int>(groupString, 
operatorString, srcString, dstString,
+                messageCount);
+        }
+
+        private static byte[] StringToBytes(string str)
+        {
+            byte[] bytes = new byte[str.Length * sizeof(char)];
+            Buffer.BlockCopy(str.ToCharArray(), 0, bytes, 0, bytes.Length);
+            return bytes;
+        }
+
+        private static string BytesToString(byte[] bytes)
+        {
+            char[] chars = new char[bytes.Length / sizeof(char)];
+            Buffer.BlockCopy(bytes, 0, chars, 0, bytes.Length);
+            return new string(chars);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs
index a78b13c..41b34a9 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs
@@ -29,8 +29,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
     /// Group Communication operator used to do point-to-point communication 
between named Tasks.
     /// It uses Writable classes
     /// </summary>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see 
Jira REEF-295 ", false)]
-    public sealed class Sender
+    internal sealed class Sender
     {
         private readonly INetworkService<GeneralGroupCommunicationMessage> 
_networkService;
         private readonly IIdentifierFactory _idFactory;
@@ -42,7 +41,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <param name="idFactory">Used to create IIdentifier for 
GroupCommunicationMessages.</param>
         [Inject]
         private Sender(
-            WritableNetworkService<GeneralGroupCommunicationMessage> 
networkService,
+            StreamingNetworkService<GeneralGroupCommunicationMessage> 
networkService,
             IIdentifierFactory idFactory)
         {
             _networkService = networkService;
@@ -54,7 +53,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// included in the message.
         /// </summary>
         /// <param name="message">The message to send.</param>
-        public void Send(GeneralGroupCommunicationMessage message)
+        internal void Send(GeneralGroupCommunicationMessage message)
         {
             if (message == null)
             {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
index 96c36e1..14dbd96 100644
--- 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
@@ -29,8 +29,7 @@ namespace Org.Apache.REEF.Network.Group.Task
     /// Writable Version
     /// </summary>
     [DefaultImplementation(typeof(CommunicationGroupNetworkObserver))]
-    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira 
REEF-295.
-    public interface ICommunicationGroupNetworkObserver : 
IObserver<GeneralGroupCommunicationMessage>
+    internal interface ICommunicationGroupNetworkObserver : 
IObserver<GeneralGroupCommunicationMessage>
     {
         /// <summary>
         /// Registers the handler with the 
WritableCommunicationGroupNetworkObserver.

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs
index de19754..c700d88 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs
@@ -30,8 +30,7 @@ namespace Org.Apache.REEF.Network.Group.Task
     /// Writable Version
     /// </summary>
     [DefaultImplementation(typeof(GroupCommNetworkObserver))]
-    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira 
REEF-295.
-    public interface IGroupCommNetworkObserver : 
IObserver<WritableNsMessage<GeneralGroupCommunicationMessage>>
+    internal interface IGroupCommNetworkObserver : 
IObserver<NsMessage<GeneralGroupCommunicationMessage>>
     {
         /// <summary>
         /// Registers the network handler for the given CommunicationGroup.

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
index 305d245..cf3e559 100644
--- 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
@@ -19,7 +19,6 @@
 
 using System;
 using System.Collections.Generic;
-using System.Linq;
 using System.Reflection;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Operators;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
index 5be1457..c3989a9 100644
--- 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
@@ -30,8 +30,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
     /// Handles incoming messages sent to this Communication Group.
     /// Writable version
     /// </summary>
-    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira 
REEF-295.
-    public sealed class CommunicationGroupNetworkObserver : 
ICommunicationGroupNetworkObserver
+    internal sealed class CommunicationGroupNetworkObserver : 
ICommunicationGroupNetworkObserver
     {
         private static readonly Logger LOGGER = 
Logger.GetLogger(typeof(CommunicationGroupNetworkObserver));
         private readonly Dictionary<string, 
IObserver<GeneralGroupCommunicationMessage>> _handlers;
@@ -54,7 +53,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// will be invoked</param>
         /// <param name="observer">The writable handler to invoke when 
messages are sent
         /// to the operator specified by operatorName</param>
-        public void Register(string operatorName, 
IObserver<GeneralGroupCommunicationMessage> observer)
+        void ICommunicationGroupNetworkObserver.Register(string operatorName, 
IObserver<GeneralGroupCommunicationMessage> observer)
         {
             if (string.IsNullOrEmpty(operatorName))
             {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
index e79df55..8a54ede 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
@@ -34,7 +34,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
     /// Used by Tasks to fetch CommunicationGroupClients.
     /// Writable version
     /// </summary>
-    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira 
REEF-295.
     public sealed class GroupCommClient : IGroupCommClient
     {
         private readonly Dictionary<string, ICommunicationGroupClientInternal> 
_commGroups;
@@ -50,11 +49,10 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <param name="configSerializer">Used to deserialize Group 
Communication configuration</param>
         /// <param name="injector">injector forked from the injector that 
creates this instance</param>
         [Inject]
-        [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please 
see Jira REEF-295 ", false)]
         public GroupCommClient(
             
[Parameter(typeof(GroupCommConfigurationOptions.SerializedGroupConfigs))] 
ISet<string> groupConfigs,
             [Parameter(typeof(TaskConfigurationOptions.Identifier))] string 
taskId,
-            WritableNetworkService<GeneralGroupCommunicationMessage> 
networkService,
+            StreamingNetworkService<GeneralGroupCommunicationMessage> 
networkService,
             AvroConfigurationSerializer configSerializer,
             IInjector injector)
         {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs
index 9d35ff1..d0f35fe 100644
--- 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs
@@ -31,8 +31,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
     /// Handles all incoming messages for this Task.
     /// Writable version
     /// </summary>
-    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira 
REEF-295.
-    public sealed class GroupCommNetworkObserver : IGroupCommNetworkObserver
+    internal sealed class GroupCommNetworkObserver : IGroupCommNetworkObserver
     {
         private static readonly Logger LOGGER = 
Logger.GetLogger(typeof(GroupCommNetworkObserver));
 
@@ -42,7 +41,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// Creates a new GroupCommNetworkObserver.
         /// </summary>
         [Inject]
-        public GroupCommNetworkObserver()
+        private GroupCommNetworkObserver()
         {
             _commGroupHandlers = new Dictionary<string, 
IObserver<GeneralGroupCommunicationMessage>>();
         }
@@ -53,7 +52,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// WritableCommunicationGroupNetworkObserver.
         /// </summary>
         /// <param name="nsMessage"></param>
-        public void OnNext(WritableNsMessage<GeneralGroupCommunicationMessage> 
nsMessage)
+        public void OnNext(NsMessage<GeneralGroupCommunicationMessage> 
nsMessage)
         {
             if (nsMessage == null)
             {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
index aa5de1e..00fa9a5 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
@@ -29,7 +29,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
     /// Writable version
     /// </summary>
     /// <typeparam name="T"> Generic type of message</typeparam>
-    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira 
REEF-295.
     internal sealed class NodeStruct<T>
     {
         private readonly BlockingCollection<GroupCommunicationMessage<T>> 
_messageQueue;
@@ -38,7 +37,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// Creates a new NodeStruct.
         /// </summary>
         /// <param name="id">The Task identifier</param>
-        public NodeStruct(string id)
+        internal NodeStruct(string id)
         {
             Identifier = id;
             _messageQueue = new 
BlockingCollection<GroupCommunicationMessage<T>>();
@@ -48,13 +47,13 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// Returns the identifier for the Task that sent all
         /// messages in the message queue.
         /// </summary>
-        public string Identifier { get; private set; }
+        internal string Identifier { get; private set; }
 
         /// <summary>
         /// Gets the first message in the message queue.
         /// </summary>
         /// <returns>The first available message.</returns>
-        public T[] GetData()
+        internal T[] GetData()
         {
             return _messageQueue.Take().Data;
         }
@@ -63,7 +62,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// Adds an incoming message to the message queue.
         /// </summary>
         /// <param name="gcm">The incoming message</param>
-        public void AddData(GroupCommunicationMessage<T> gcm)
+        internal void AddData(GroupCommunicationMessage<T> gcm)
         {
             _messageQueue.Add(gcm);
         }
@@ -72,7 +71,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// Tells whether there is a message in queue or not.
         /// </summary>
         /// <returns>True if queue is non empty, false otherwise.</returns>
-        public bool HasMessage()
+        internal bool HasMessage()
         {
             if (_messageQueue.Count != 0)
             {

Reply via email to