Repository: reef
Updated Branches:
  refs/heads/master 4c989421b -> 316d51e4f


[REEF-1292] Link and StreamingLink classes should have retry logic for setting 
up remote connection

This addressed the issue by

* introducing RemoteConnectionRetryHandler that handles the retry logic for 
connecting to remote endpoint
* allowing user to specify the value of connection retries and sleep times 
bewteen retries for NameClient, DefaultRemoteManagerFactory and 
StreamingRemoteManagerFactory.

JIRA:
  [REEF-1292](https://issues.apache.org/jira/browse/REEF-1292)

This closes #929


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/316d51e4
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/316d51e4
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/316d51e4

Branch: refs/heads/master
Commit: 316d51e4fd58eb581ac0a0cffbe8b0c63536837d
Parents: 4c98942
Author: Dhruv <[email protected]>
Authored: Thu Mar 31 16:37:11 2016 -0700
Committer: Julia Wang <[email protected]>
Committed: Wed Apr 6 15:48:14 2016 -0700

----------------------------------------------------------------------
 .../Org.Apache.REEF.Network.Tests.csproj        |  3 +-
 .../TcpClientConfigurationModuleTests.cs        | 43 +++++++++++
 .../Naming/NameClient.cs                        | 11 ++-
 .../Org.Apache.REEF.Network.csproj              |  2 +
 .../TcpClientConfigurationModule.cs             | 48 ++++++++++++
 .../TcpClientConfigurationProvider.cs           | 50 +++++++++++++
 .../Org.Apache.REEF.Wake.Tests.csproj           |  1 +
 .../StreamingTransportTest.cs                   | 50 ++++++++++---
 .../TcpConnectionRetryTest.cs                   | 79 ++++++++++++++++++++
 .../Org.Apache.REEF.Wake.Tests/TransportTest.cs | 22 +++++-
 .../Org.Apache.REEF.Wake.csproj                 |  9 +++
 .../Remote/IConnectionRetryHandler.cs           | 35 +++++++++
 .../Remote/ITcpClientConnectionFactory.cs       | 38 ++++++++++
 .../Remote/Impl/DefaultRemoteManager.cs         | 26 ++++---
 .../Remote/Impl/DefaultRemoteManagerFactory.cs  | 32 +++++++-
 .../cs/Org.Apache.REEF.Wake/Remote/Impl/Link.cs |  6 +-
 .../Remote/Impl/RemoteConnectionRetryHandler.cs | 76 +++++++++++++++++++
 .../Remote/Impl/StreamingLink.cs                |  7 +-
 .../Remote/Impl/StreamingRemoteManager.cs       | 24 ++++--
 .../Impl/StreamingRemoteManagerFactory.cs       | 17 ++++-
 .../Remote/Impl/StreamingTransportClient.cs     | 11 ++-
 .../Remote/Impl/TcpClientConnectionFactory.cs   | 68 +++++++++++++++++
 .../Remote/Impl/TransportClient.cs              | 19 +++--
 .../Remote/Parameters/ConnectionRetryCount.cs   | 26 +++++++
 .../Remote/Parameters/SleepTimeInMs.cs          | 26 +++++++
 lang/cs/Org.Apache.REEF.Wake/packages.config    |  3 +-
 26 files changed, 675 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj 
b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
index e6c07ec..8ce3408 100644
--- a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
@@ -49,6 +49,7 @@ under the License.
     </Reference>
   </ItemGroup>
   <ItemGroup>
+    <Compile Include="TcpClientConfigurationModuleTests.cs" />
     <Compile Include="BlockingCollectionExtensionTests.cs" />
     <Compile Include="GroupCommunication\GroupCommuDriverTests.cs" />
     <Compile Include="GroupCommunication\GroupCommunicationTests.cs" />
@@ -92,4 +93,4 @@ under the License.
   <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
   <Import Project="$(SolutionDir)\.nuget\NuGet.targets" 
Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
   <Import 
Project="$(PackagesDir)\StyleCop.MSBuild.$(StyleCopVersion)\build\StyleCop.MSBuild.Targets"
 
Condition="Exists('$(PackagesDir)\StyleCop.MSBuild.$(StyleCopVersion)\build\StyleCop.MSBuild.Targets')"
 />
-</Project>
+</Project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Network.Tests/TcpClientConfigurationModuleTests.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Tests/TcpClientConfigurationModuleTests.cs 
b/lang/cs/Org.Apache.REEF.Network.Tests/TcpClientConfigurationModuleTests.cs
new file mode 100644
index 0000000..6cd05b2
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/TcpClientConfigurationModuleTests.cs
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Wake.Remote;
+using Xunit;
+
+namespace Org.Apache.REEF.Network.Tests
+{
+    public class TcpClientConfigurationModuleTests
+    {
+        [Fact]
+        public void TestConfiguration()
+        {
+            int maxConnectionRetries = 5;
+            int sleepTimeinMs = 300;
+
+            var tcpClientFactory =
+                TangFactory.GetTang()
+                    .NewInjector(
+                        TcpClientConfigurationModule.ConfigurationModule
+                            
.Set(TcpClientConfigurationModule.MaxConnectionRetry, 
maxConnectionRetries.ToString())
+                            .Set(TcpClientConfigurationModule.SleepTime, 
sleepTimeinMs.ToString())
+                            .Build())
+                    .GetInstance<ITcpClientConnectionFactory>();
+            Assert.NotNull(tcpClientFactory);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Network/Naming/NameClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Naming/NameClient.cs 
b/lang/cs/Org.Apache.REEF.Network/Naming/NameClient.cs
index 77b069d..c0bcbd7 100644
--- a/lang/cs/Org.Apache.REEF.Network/Naming/NameClient.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Naming/NameClient.cs
@@ -55,6 +55,7 @@ namespace Org.Apache.REEF.Network.Naming
         private NameRegisterClient _registerClient;
         private bool _disposed;
         private readonly NameCache _cache;
+        private readonly ITcpClientConnectionFactory _tcpClientFactory;
 
         /// <summary>
         /// Constructs a NameClient to register, lookup, and unregister 
IPEndpoints
@@ -62,12 +63,15 @@ namespace Org.Apache.REEF.Network.Naming
         /// </summary>
         /// <param name="remoteAddress">The ip address of the 
NameServer</param>
         /// <param name="remotePort">The port of the NameServer</param>
+        /// <param name="tcpClientFactory">provides TcpClient for given 
endpoint</param>
         [Inject]
         private NameClient(
             [Parameter(typeof(NamingConfigurationOptions.NameServerAddress))] 
string remoteAddress,
-            [Parameter(typeof(NamingConfigurationOptions.NameServerPort))] int 
remotePort)
+            [Parameter(typeof(NamingConfigurationOptions.NameServerPort))] int 
remotePort,
+            ITcpClientConnectionFactory tcpClientFactory)
         {
             IPEndPoint remoteEndpoint = new 
IPEndPoint(IPAddress.Parse(remoteAddress), remotePort);
+            _tcpClientFactory = tcpClientFactory;
             Initialize(remoteEndpoint);
             _disposed = false;
             _cache = 
TangFactory.GetTang().NewInjector().GetInstance<NameCache>();
@@ -79,14 +83,17 @@ namespace Org.Apache.REEF.Network.Naming
         /// </summary>
         /// <param name="remoteAddress">The ip address of the 
NameServer</param>
         /// <param name="remotePort">The port of the NameServer</param>
+        /// <param name="tcpClientFactory">provides TcpClient for given 
endpoint</param>
         /// <param name="cache">The NameCache for caching IpAddresses</param>
         [Inject]
         private NameClient(
             [Parameter(typeof(NamingConfigurationOptions.NameServerAddress))] 
string remoteAddress,
             [Parameter(typeof(NamingConfigurationOptions.NameServerPort))] int 
remotePort,
+            ITcpClientConnectionFactory tcpClientFactory,
             NameCache cache)
         {
             IPEndPoint remoteEndpoint = new 
IPEndPoint(IPAddress.Parse(remoteAddress), remotePort);
+            _tcpClientFactory = tcpClientFactory;
             Initialize(remoteEndpoint);
             _disposed = false;
             _cache = cache;
@@ -241,7 +248,7 @@ namespace Org.Apache.REEF.Network.Naming
 
             IObserver<TransportEvent<NamingEvent>> clientHandler = 
CreateClientHandler();
             ICodec<NamingEvent> codec = CreateClientCodec();
-            _client = new TransportClient<NamingEvent>(serverEndpoint, codec, 
clientHandler);
+            _client = new TransportClient<NamingEvent>(serverEndpoint, codec, 
clientHandler, _tcpClientFactory);
 
             _lookupClient = new NameLookupClient(_client, 
_lookupResponseQueue, _getAllResponseQueue);
             _registerClient = new NameRegisterClient(_client, 
_registerResponseQueue, _unregisterResponseQueue);

http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj 
b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
index 8e16ae0..474f203 100644
--- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
+++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
@@ -155,6 +155,8 @@ under the License.
     <Compile Include="NetworkService\NsMessage.cs" />
     <Compile Include="NetworkService\StreamingNetworkService.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
+    <Compile Include="TcpClientConfigurationModule.cs" />
+    <Compile Include="TcpClientConfigurationProvider.cs" />
     <Compile Include="Utilities\BlockingCollectionExtensions.cs" />
     <Compile Include="Utilities\Utils.cs" />
   </ItemGroup>

http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Network/TcpClientConfigurationModule.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/TcpClientConfigurationModule.cs 
b/lang/cs/Org.Apache.REEF.Network/TcpClientConfigurationModule.cs
new file mode 100644
index 0000000..d77c00b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/TcpClientConfigurationModule.cs
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Common.Client.Parameters;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake.Remote.Parameters;
+
+namespace Org.Apache.REEF.Network
+{
+    /// <summary>
+    /// Configuration Module for the TCP Client provider
+    /// </summary>
+    public class TcpClientConfigurationModule : ConfigurationModuleBuilder
+    {
+        /// <summary>
+        /// Number of times to retry the connection
+        /// </summary>
+        public static readonly OptionalParameter<int> MaxConnectionRetry = new 
OptionalParameter<int>();
+
+        /// <summary>
+        /// SLeep time between tries in milliseconds.
+        /// </summary>
+        public static readonly OptionalParameter<int> SleepTime = new 
OptionalParameter<int>();
+
+        public static readonly ConfigurationModule ConfigurationModule = new 
TcpClientConfigurationModule()
+            .BindSetEntry<DriverConfigurationProviders, 
TcpClientConfigurationProvider, IConfigurationProvider>(
+                GenericType<DriverConfigurationProviders>.Class, 
GenericType<TcpClientConfigurationProvider>.Class)
+            .BindNamedParameter(GenericType<ConnectionRetryCount>.Class, 
MaxConnectionRetry)
+            .BindNamedParameter(GenericType<SleepTimeInMs>.Class, SleepTime)
+            .Build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Network/TcpClientConfigurationProvider.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/TcpClientConfigurationProvider.cs 
b/lang/cs/Org.Apache.REEF.Network/TcpClientConfigurationProvider.cs
new file mode 100644
index 0000000..15cd0fb
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/TcpClientConfigurationProvider.cs
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Common.Evaluator.Parameters;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Wake.Remote.Parameters;
+
+namespace Org.Apache.REEF.Network
+{
+    /// <summary>
+    /// Configuration provider for TcpClient connection.
+    /// </summary>
+    public sealed class TcpClientConfigurationProvider : IConfigurationProvider
+    {
+        private readonly IConfiguration _configuration;
+        
+        [Inject]
+        private TcpClientConfigurationProvider(
+            [Parameter(typeof(ConnectionRetryCount))] int connectionRetryCount,
+            [Parameter(typeof(SleepTimeInMs))] int sleepTimeInMs)
+        {
+            _configuration = TangFactory.GetTang().NewConfigurationBuilder()
+                
.BindIntNamedParam<ConnectionRetryCount>(connectionRetryCount.ToString())
+                .BindIntNamedParam<SleepTimeInMs>(sleepTimeInMs.ToString())
+                .BindSetEntry<EvaluatorConfigurationProviders, 
TcpClientConfigurationProvider, IConfigurationProvider>()
+                .Build();
+        }
+
+        IConfiguration IConfigurationProvider.GetConfiguration()
+        {
+            return _configuration;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj 
b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
index 578b078..283536f 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
@@ -52,6 +52,7 @@ under the License.
     <Compile Include="StreamingRemoteManagerTest.cs" />
     <Compile Include="StreamingTransportTest.cs" />
     <Compile Include="TimeTest.cs" />
+    <Compile Include="TcpConnectionRetryTest.cs" />
     <Compile Include="TransportTest.cs" />
   </ItemGroup>
   <ItemGroup>

http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingTransportTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingTransportTest.cs 
b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingTransportTest.cs
index 38eeea0..290dfc2 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingTransportTest.cs
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingTransportTest.cs
@@ -39,6 +39,7 @@ namespace Org.Apache.REEF.Wake.Tests
     {
         private readonly ITcpPortProvider _tcpPortProvider = 
GetTcpProvider(9900, 9940);
         private readonly IInjector _injector = 
TangFactory.GetTang().NewInjector();
+        private readonly ITcpClientConnectionFactory _tcpClientFactory = 
GetTcpClientFactory(5, 500);
 
         /// <summary>
         /// Tests whether StreamingTransportServer receives 
@@ -54,12 +55,16 @@ namespace Org.Apache.REEF.Wake.Tests
             IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
             var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent 
=> queue.Add(tEvent.Data));
 
-            using (var server = new 
StreamingTransportServer<string>(endpoint.Address, remoteHandler, 
_tcpPortProvider, stringCodec))
+            using (
+                var server = new 
StreamingTransportServer<string>(endpoint.Address,
+                    remoteHandler,
+                    _tcpPortProvider,
+                    stringCodec))
             {
                 server.Run();
 
                 IPEndPoint remoteEndpoint = new 
IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
-                using (var client = new 
StreamingTransportClient<string>(remoteEndpoint, stringCodec))
+                using (var client = new 
StreamingTransportClient<string>(remoteEndpoint, stringCodec, 
_tcpClientFactory))
                 {
                     client.Send("Hello");
                     client.Send(", ");
@@ -68,7 +73,7 @@ namespace Org.Apache.REEF.Wake.Tests
                     events.Add(queue.Take());
                     events.Add(queue.Take());
                     events.Add(queue.Take());
-                } 
+                }
             }
 
             Assert.Equal(3, events.Count);
@@ -92,13 +97,21 @@ namespace Org.Apache.REEF.Wake.Tests
             // Server echoes the message back to the client
             var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent 
=> tEvent.Link.Write(tEvent.Data));
 
-            using (var server = new 
StreamingTransportServer<string>(endpoint.Address, remoteHandler, 
_tcpPortProvider, stringCodec))
+            using (
+                var server = new 
StreamingTransportServer<string>(endpoint.Address,
+                    remoteHandler,
+                    _tcpPortProvider,
+                    stringCodec))
             {
                 server.Run();
 
                 var clientHandler = 
Observer.Create<TransportEvent<string>>(tEvent => queue.Add(tEvent.Data));
                 IPEndPoint remoteEndpoint = new 
IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
-                using (var client = new 
StreamingTransportClient<string>(remoteEndpoint, clientHandler, stringCodec))
+                using (
+                    var client = new 
StreamingTransportClient<string>(remoteEndpoint,
+                        clientHandler,
+                        stringCodec,
+                        _tcpClientFactory))
                 {
                     client.Send("Hello");
                     client.Send(", ");
@@ -107,7 +120,7 @@ namespace Org.Apache.REEF.Wake.Tests
                     events.Add(queue.Take());
                     events.Add(queue.Take());
                     events.Add(queue.Take());
-                } 
+                }
             }
 
             Assert.Equal(3, events.Count);
@@ -132,7 +145,11 @@ namespace Org.Apache.REEF.Wake.Tests
             IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
             var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent 
=> queue.Add(tEvent.Data));
 
-            using (var server = new 
StreamingTransportServer<string>(endpoint.Address, remoteHandler, 
_tcpPortProvider, stringCodec))
+            using (
+                var server = new 
StreamingTransportServer<string>(endpoint.Address,
+                    remoteHandler,
+                    _tcpPortProvider,
+                    stringCodec))
             {
                 server.Run();
 
@@ -140,8 +157,12 @@ namespace Org.Apache.REEF.Wake.Tests
                 {
                     Task.Run(() =>
                     {
-                        IPEndPoint remoteEndpoint = new 
IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
-                        using (var client = new 
StreamingTransportClient<string>(remoteEndpoint, stringCodec))
+                        IPEndPoint remoteEndpoint = new 
IPEndPoint(IPAddress.Parse("127.0.0.1"),
+                            server.LocalEndpoint.Port);
+                        using (
+                            var client = new 
StreamingTransportClient<string>(remoteEndpoint,
+                                stringCodec,
+                                _tcpClientFactory))
                         {
                             client.Send("Hello");
                             client.Send(", ");
@@ -168,5 +189,16 @@ namespace Org.Apache.REEF.Wake.Tests
                 .Build();
             return 
TangFactory.GetTang().NewInjector(configuration).GetInstance<ITcpPortProvider>();
         }
+
+        private static ITcpClientConnectionFactory GetTcpClientFactory(int 
connectionRetryCount, int sleepTimeInMs)
+        {
+            var config =
+                TangFactory.GetTang()
+                    .NewConfigurationBuilder()
+                    
.BindIntNamedParam<ConnectionRetryCount>(connectionRetryCount.ToString())
+                    .BindIntNamedParam<SleepTimeInMs>(sleepTimeInMs.ToString())
+                    .Build();
+            return 
TangFactory.GetTang().NewInjector(config).GetInstance<ITcpClientConnectionFactory>();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake.Tests/TcpConnectionRetryTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/TcpConnectionRetryTest.cs 
b/lang/cs/Org.Apache.REEF.Wake.Tests/TcpConnectionRetryTest.cs
new file mode 100644
index 0000000..e314894
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/TcpConnectionRetryTest.cs
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.IO;
+using System.Net;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Parameters;
+using Xunit;
+
+namespace Org.Apache.REEF.Wake.Tests
+{
+    public class TcpConnectionRetryTest
+    {
+        /// <summary>
+        /// Tests whether retry logic in RemoteConnectionRetryHandler is 
called.
+        /// We run just client but not server and then check redirected output 
for 
+        /// retry messages
+        /// </summary>
+        [Fact]
+        public void TestConnectionRetries()
+        {
+            IPAddress localIpAddress = IPAddress.Parse("127.0.0.1");
+            const int retryCount = 5;
+            const int sleepTimeInMs = 500;
+            const string message = "Retry - Count:";
+            IPEndPoint remoteEndpoint = new IPEndPoint(localIpAddress, 8900);
+
+            var memStream = new MemoryStream();
+            var writer = new StreamWriter(memStream);
+            Console.SetOut(writer);
+            var config =
+                TangFactory.GetTang()
+                    .NewConfigurationBuilder()
+                    
.BindIntNamedParam<ConnectionRetryCount>(retryCount.ToString())
+                    .BindIntNamedParam<SleepTimeInMs>(sleepTimeInMs.ToString())
+                    .Build();
+            var tmp = 
TangFactory.GetTang().NewInjector(config).GetInstance<ITcpClientConnectionFactory>();
+
+            try
+            {
+                tmp.Connect(remoteEndpoint);
+                Assert.False(true);
+            }
+            catch
+            {
+                memStream.Position = 0;
+                using (var reader = new StreamReader(memStream))
+                {
+                    string line;
+                    int counter = 0;
+                    while ((line = reader.ReadLine()) != null)
+                    {
+                        if (line.Contains(message))
+                        {
+                            counter++;
+                        }
+                    }
+                    Assert.Equal(counter, retryCount);
+                }
+            }
+        }
+    }
+}      
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs 
b/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs
index 3490961..61ce0c6 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs
@@ -21,6 +21,7 @@ using System.Net;
 using System.Reactive;
 using System.Threading.Tasks;
 using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Wake.Remote;
 using Org.Apache.REEF.Wake.Remote.Impl;
 using Org.Apache.REEF.Wake.Remote.Parameters;
@@ -33,6 +34,8 @@ namespace Org.Apache.REEF.Wake.Tests
     {
         private readonly IPAddress _localIpAddress = 
IPAddress.Parse("127.0.0.1");
         private readonly ITcpPortProvider _tcpPortProvider = 
GetTcpProvider(8900, 8940);
+        private readonly ITcpClientConnectionFactory _tcpClientFactory = 
GetTcpClientFactory(5, 500);
+
         [Fact]
         public void TestTransportServer()
         {
@@ -49,7 +52,7 @@ namespace Org.Apache.REEF.Wake.Tests
                 server.Run();
 
                 IPEndPoint remoteEndpoint = new IPEndPoint(_localIpAddress, 
server.LocalEndpoint.Port);
-                using (var client = new 
TransportClient<string>(remoteEndpoint, codec))
+                using (var client = new 
TransportClient<string>(remoteEndpoint, codec, _tcpClientFactory))
                 {
                     client.Send("Hello");
                     client.Send(", ");
@@ -83,7 +86,7 @@ namespace Org.Apache.REEF.Wake.Tests
                 server.Run();
 
                 IPEndPoint remoteEndpoint = new IPEndPoint(_localIpAddress, 
server.LocalEndpoint.Port);
-                using (var client = new 
TransportClient<TestEvent>(remoteEndpoint, codec))
+                using (var client = new 
TransportClient<TestEvent>(remoteEndpoint, codec, _tcpClientFactory))
                 {
                     client.Send(new TestEvent("Hello"));
                     client.Send(new TestEvent(", "));
@@ -119,7 +122,7 @@ namespace Org.Apache.REEF.Wake.Tests
 
                 var clientHandler = 
Observer.Create<TransportEvent<string>>(tEvent => queue.Add(tEvent.Data));
                 IPEndPoint remoteEndpoint = new IPEndPoint(_localIpAddress, 
server.LocalEndpoint.Port);
-                using (var client = new 
TransportClient<string>(remoteEndpoint, codec, clientHandler))
+                using (var client = new 
TransportClient<string>(remoteEndpoint, codec, clientHandler, 
_tcpClientFactory))
                 {
                     client.Send("Hello");
                     client.Send(", ");
@@ -158,7 +161,7 @@ namespace Org.Apache.REEF.Wake.Tests
                     Task.Run(() =>
                     {
                         IPEndPoint remoteEndpoint = new 
IPEndPoint(_localIpAddress, server.LocalEndpoint.Port);
-                        using (var client = new 
TransportClient<string>(remoteEndpoint, codec))
+                        using (var client = new 
TransportClient<string>(remoteEndpoint, codec, _tcpClientFactory))
                         {
                             client.Send("Hello");
                             client.Send(", ");
@@ -213,5 +216,16 @@ namespace Org.Apache.REEF.Wake.Tests
                 .Build();
             return 
TangFactory.GetTang().NewInjector(configuration).GetInstance<ITcpPortProvider>();
         }
+
+        private static ITcpClientConnectionFactory GetTcpClientFactory(int 
connectionRetryCount, int sleepTimeInMs)
+        {
+            var config =
+                TangFactory.GetTang()
+                    .NewConfigurationBuilder()
+                    
.BindIntNamedParam<ConnectionRetryCount>(connectionRetryCount.ToString())
+                    .BindIntNamedParam<SleepTimeInMs>(sleepTimeInMs.ToString())
+                    .Build();
+            return 
TangFactory.GetTang().NewInjector(config).GetInstance<ITcpClientConnectionFactory>();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj 
b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
index 005aa82..1d6551e 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
@@ -30,6 +30,9 @@ under the License.
   </PropertyGroup>
   <Import Project="$(SolutionDir)\build.props" />
   <ItemGroup>
+    <Reference Include="Microsoft.Practices.TransientFaultHandling.Core, 
Version=5.1.1209.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, 
processorArchitecture=MSIL">
+      
<HintPath>$(PackagesDir)\TransientFaultHandling.Core.5.1.1209.1\lib\NET4\Microsoft.Practices.TransientFaultHandling.Core.dll</HintPath>
+    </Reference>
     <Reference Include="protobuf-net">
       
<HintPath>$(PackagesDir)\protobuf-net.$(ProtobufVersion)\lib\net40\protobuf-net.dll</HintPath>
     </Reference>
@@ -48,6 +51,8 @@ under the License.
     <Compile Include="IEventHandler.cs" />
     <Compile Include="IIdentifier.cs" />
     <Compile Include="IIdentifierFactory.cs" />
+    <Compile Include="Remote\IConnectionRetryHandler.cs" />
+    <Compile Include="Remote\Impl\RemoteConnectionRetryHandler.cs" />
     <Compile Include="Remote\Impl\RemoteEventStreamingCodec.cs" />
     <Compile Include="Remote\Impl\StreamingRemoteManagerFactory.cs" />
     <Compile Include="Remote\Impl\DefaultRemoteManagerFactory.cs" />
@@ -63,7 +68,11 @@ under the License.
     <Compile Include="IStage.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
     <Compile Include="Remote\IRemoteObserver.cs" />
+    <Compile Include="Remote\Impl\TcpClientConnectionFactory.cs" />
+    <Compile Include="Remote\ITcpClientConnectionFactory.cs" />
     <Compile Include="Remote\ITcpPortProvider.cs" />
+    <Compile Include="Remote\Parameters\ConnectionRetryCount.cs" />
+    <Compile Include="Remote\Parameters\SleepTimeInMs.cs" />
     <Compile Include="Remote\Parameters\TcpPortRangeCount.cs" />
     <Compile Include="Remote\Parameters\TcpPortRangeSeed.cs" />
     <Compile Include="Remote\Parameters\TcpPortRangeStart.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Remote/IConnectionRetryHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IConnectionRetryHandler.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/IConnectionRetryHandler.cs
new file mode 100644
index 0000000..4b89b83
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IConnectionRetryHandler.cs
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Microsoft.Practices.TransientFaultHandling;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote.Impl;
+
+namespace Org.Apache.REEF.Wake.Remote
+{
+    /// <summary>
+    /// Interface for the retry logic to connect to remote endpoint
+    /// </summary>
+    [DefaultImplementation(typeof(RemoteConnectionRetryHandler))]
+    public interface IConnectionRetryHandler
+    {
+        /// <summary>
+        /// Retry policy for the tcp connection
+        /// </summary>
+        RetryPolicy Policy { get; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Remote/ITcpClientConnectionFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/ITcpClientConnectionFactory.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/ITcpClientConnectionFactory.cs
new file mode 100644
index 0000000..ccc137f
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/ITcpClientConnectionFactory.cs
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Net;
+using System.Net.Sockets;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote.Impl;
+
+namespace Org.Apache.REEF.Wake.Remote
+{
+    /// <summary>
+    /// Provides TcpClient for the remote endpoint
+    /// </summary>
+    [DefaultImplementation(typeof(TcpClientConnectionFactory))]
+    public interface ITcpClientConnectionFactory
+    {
+        /// <summary>
+        /// Provides TcpClient for the specific endpoint
+        /// </summary>
+        /// <param name="endPoint">IP address and port number of server</param>
+        /// <returns>Tcp client</returns>
+        TcpClient Connect(IPEndPoint endPoint);
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs
index 4a14539..6a96d48 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs
@@ -34,6 +34,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         private readonly TransportServer<IRemoteEvent<T>> _server; 
         private readonly Dictionary<IPEndPoint, ProxyObserver> _cachedClients;
         private readonly ICodec<IRemoteEvent<T>> _codec;
+        private readonly ITcpClientConnectionFactory _tcpClientFactory;
 
         /// <summary>
         /// Constructs a DefaultRemoteManager listening on the specified 
address and any
@@ -43,7 +44,12 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// <param name="port">The port to listen on</param>
         /// <param name="codec">The codec used for serializing messages</param>
         /// <param name="tcpPortProvider">provides port numbers to 
listen</param>
-        internal DefaultRemoteManager(IPAddress localAddress, int port, 
ICodec<T> codec, ITcpPortProvider tcpPortProvider)
+        /// <param name="tcpClientFactory">provides TcpClient for given 
endpoint</param>
+        internal DefaultRemoteManager(IPAddress localAddress,
+            int port,
+            ICodec<T> codec,
+            ITcpPortProvider tcpPortProvider,
+            ITcpClientConnectionFactory tcpClientFactory)
         {
             if (localAddress == null)
             {
@@ -58,6 +64,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
                 throw new ArgumentNullException("codec");
             }
 
+            _tcpClientFactory = tcpClientFactory;
             _observerContainer = new ObserverContainer<T>();
             _codec = new RemoteEventCodec<T>(codec);
             _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
@@ -65,7 +72,9 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
             IPEndPoint localEndpoint = new IPEndPoint(localAddress, port);
 
             // Begin to listen for incoming messages
-            _server = new TransportServer<IRemoteEvent<T>>(localEndpoint, 
_observerContainer, _codec,
+            _server = new TransportServer<IRemoteEvent<T>>(localEndpoint,
+                _observerContainer,
+                _codec,
                 tcpPortProvider);
             _server.Run();
 
@@ -77,7 +86,8 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// Constructs a DefaultRemoteManager. Does not listen for incoming 
messages.
         /// </summary>
         /// <param name="codec">The codec used for serializing messages</param>
-        internal DefaultRemoteManager(ICodec<T> codec)
+        /// <param name="tcpClientFactory">provides TcpClient for given 
endpoint</param>
+        internal DefaultRemoteManager(ICodec<T> codec, 
ITcpClientConnectionFactory tcpClientFactory)
         {
             using 
(LOGGER.LogFunction("DefaultRemoteManager::DefaultRemoteManager"))
             {
@@ -86,6 +96,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
                     throw new ArgumentNullException("codec");
                 }
 
+                _tcpClientFactory = tcpClientFactory;
                 _observerContainer = new ObserverContainer<T>();
                 _codec = new RemoteEventCodec<T>(codec);
                 _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
@@ -179,12 +190,9 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
 
         private ProxyObserver CreateRemoteObserver(IPEndPoint remoteEndpoint)
         {
-            TransportClient<IRemoteEvent<T>> client = new 
TransportClient<IRemoteEvent<T>>(
-                remoteEndpoint,
-                this._codec,
-                this._observerContainer);
-            var msg = string.Format(
-                "NewClientConnection: Local {0} connected to Remote {1}",
+            TransportClient<IRemoteEvent<T>> client =
+                new TransportClient<IRemoteEvent<T>>(remoteEndpoint, _codec, 
_observerContainer, _tcpClientFactory);
+            var msg = string.Format("NewClientConnection: Local {0} connected 
to Remote {1}",
                 client.Link.LocalEndpoint.ToString(),
                 client.Link.RemoteEndpoint.ToString());
             LOGGER.Log(Level.Info, msg);

http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs
index df1c095..0cb9d70 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs
@@ -28,25 +28,49 @@ namespace Org.Apache.REEF.Wake.Impl
     internal sealed class DefaultRemoteManagerFactory : IRemoteManagerFactory
     {
         private readonly ITcpPortProvider _tcpPortProvider;
+        private readonly ITcpClientConnectionFactory _tcpClientFactory;
+
         [Inject]
-        private DefaultRemoteManagerFactory(ITcpPortProvider tcpPortProvider)
+        private DefaultRemoteManagerFactory(ITcpPortProvider tcpPortProvider, 
ITcpClientConnectionFactory tcpClientFactory)
         {
             _tcpPortProvider = tcpPortProvider;
+            _tcpClientFactory = tcpClientFactory;
         }
 
+        /// <summary>
+        /// Gives DefaultRemoteManager instance
+        /// </summary>
+        /// <typeparam name="T">Type of Message</typeparam>
+        /// <param name="localAddress">Local IP address</param>
+        /// <param name="port">Port number</param>
+        /// <param name="codec">Codec for message type T</param>
+        /// <returns>IRemoteManager instance</returns>
         public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int 
port, ICodec<T> codec)
         {
-            return new DefaultRemoteManager<T>(localAddress, port, codec, 
_tcpPortProvider);
+            return new DefaultRemoteManager<T>(localAddress, port, codec, 
_tcpPortProvider, _tcpClientFactory);
         }
 
+        /// <summary>
+        /// Gives DefaultRemoteManager instance
+        /// </summary>
+        /// <typeparam name="T">Message type</typeparam>
+        /// <param name="localAddress">Local IP address</param>
+        /// <param name="codec">Codec for message type</param>
+        /// <returns>IRemoteManager instance</returns>
         public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, 
ICodec<T> codec)
         {
-            return new DefaultRemoteManager<T>(localAddress, 0, codec, 
_tcpPortProvider);
+            return new DefaultRemoteManager<T>(localAddress, 0, codec, 
_tcpPortProvider, _tcpClientFactory);
         }
 
+        /// <summary>
+        /// Gives DefaultRemoteManager instance
+        /// </summary>
+        /// <typeparam name="T">Message type</typeparam>
+        /// <param name="codec">Codec for message type</param>
+        /// <returns>IRemoteManager instance</returns>
         public IRemoteManager<T> GetInstance<T>(ICodec<T> codec)
         {
-            return new DefaultRemoteManager<T>(codec);
+            return new DefaultRemoteManager<T>(codec, _tcpClientFactory);
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/Link.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/Link.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/Link.cs
index 964cfc8..8537817 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/Link.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/Link.cs
@@ -45,7 +45,8 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// </summary>
         /// <param name="remoteEndpoint">The remote endpoint to connect 
to</param>
         /// <param name="codec">The codec for serializing messages</param>
-        public Link(IPEndPoint remoteEndpoint, ICodec<T> codec)
+        /// <param name="tcpClientFactory">TcpClient factory</param>
+        public Link(IPEndPoint remoteEndpoint, ICodec<T> codec, 
ITcpClientConnectionFactory tcpClientFactory)
         {
             if (remoteEndpoint == null)
             {
@@ -56,8 +57,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
                 throw new ArgumentNullException("codec");
             }
 
-            Client = new TcpClient();
-            Client.Connect(remoteEndpoint);
+            Client = tcpClientFactory.Connect(remoteEndpoint);
 
             _codec = codec;
             _channel = new Channel(Client.GetStream());

http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteConnectionRetryHandler.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteConnectionRetryHandler.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteConnectionRetryHandler.cs
new file mode 100644
index 0000000..aef68af
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteConnectionRetryHandler.cs
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Microsoft.Practices.TransientFaultHandling;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote.Parameters;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Handles the retry logic to connect to remote endpoint
+    /// </summary>
+    internal sealed class RemoteConnectionRetryHandler : 
IConnectionRetryHandler
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(RemoteConnectionRetryHandler));
+        private readonly RetryPolicy<AllErrorsTransientStrategy> _retryPolicy;
+
+        /// <summary>
+        /// Constructor
+        /// </summary>
+        /// <param name="retryCount">Number of times to retry 
connection</param>
+        /// <param name="timeIntervalInMs">Time interval between retries in 
milli seconds</param>
+        [Inject]
+        internal 
RemoteConnectionRetryHandler([Parameter(typeof(ConnectionRetryCount))] int 
retryCount,
+            [Parameter(typeof(SleepTimeInMs))] int timeIntervalInMs)
+        {
+            var timeIntervalInMs1 = 
TimeSpan.FromMilliseconds(timeIntervalInMs);
+            _retryPolicy = new RetryPolicy<AllErrorsTransientStrategy>(
+                    new FixedInterval("ConnectionRetries", retryCount, 
timeIntervalInMs1, true));
+            _retryPolicy.Retrying += (sender, args) =>
+            {
+                // Log details of the retry.
+                var msg = string.Format("Retry - Count:{0}, Delay:{1}, 
Exception:{2}",
+                    args.CurrentRetryCount,
+                    args.Delay,
+                    args.LastException);
+                Logger.Log(Level.Info, msg);
+            };
+        }
+
+        /// <summary>
+        /// Retry policy for the connection
+        /// </summary>
+        public RetryPolicy Policy
+        {
+            get
+            {
+                return _retryPolicy;
+            }
+        }
+    }
+
+    internal class AllErrorsTransientStrategy : 
ITransientErrorDetectionStrategy
+    {
+        public bool IsTransient(Exception ex)
+        {
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs
index adb10dd..4a8a048 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs
@@ -57,16 +57,15 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// </summary>
         /// <param name="remoteEndpoint">The remote endpoint to connect 
to</param>
         /// <param name="streamingCodec">Streaming codec</param>
-        internal StreamingLink(IPEndPoint remoteEndpoint, IStreamingCodec<T> 
streamingCodec)
+        /// <param name="tcpClientFactory">TcpClient factory</param>
+        internal StreamingLink(IPEndPoint remoteEndpoint, IStreamingCodec<T> 
streamingCodec, ITcpClientConnectionFactory tcpClientFactory)
         {
             if (remoteEndpoint == null)
             {
                 throw new ArgumentNullException("remoteEndpoint");
             }
 
-            _client = new TcpClient();
-            _client.Connect(remoteEndpoint);
-
+            _client = tcpClientFactory.Connect(remoteEndpoint);
             var stream = _client.GetStream();
             _localEndpoint = GetLocalEndpoint();
             _disposed = false;

http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs
index 306fc70..66a21df 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs
@@ -32,6 +32,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         private readonly StreamingTransportServer<IRemoteEvent<T>> _server;
         private readonly Dictionary<IPEndPoint, ProxyObserver> _cachedClients;
         private readonly IStreamingCodec<IRemoteEvent<T>> _remoteEventCodec;
+        private readonly ITcpClientConnectionFactory _tcpClientFactory;
 
         /// <summary>
         /// Constructs a DefaultRemoteManager listening on the specified 
address and
@@ -40,22 +41,30 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// <param name="localAddress">The address to listen on</param>
         /// <param name="tcpPortProvider">Tcp port provider</param>
         /// <param name="streamingCodec">Streaming codec</param>
-        internal StreamingRemoteManager(IPAddress localAddress, 
ITcpPortProvider tcpPortProvider, IStreamingCodec<T> streamingCodec)
+       /// <param name="tcpClientFactory">provides TcpClient for given 
endpoint</param>
+        internal StreamingRemoteManager(IPAddress localAddress,
+            ITcpPortProvider tcpPortProvider,
+            IStreamingCodec<T> streamingCodec,
+            ITcpClientConnectionFactory tcpClientFactory)
         {
             if (localAddress == null)
             {
                 throw new ArgumentNullException("localAddress");
             }
 
+            _tcpClientFactory = tcpClientFactory;
             _observerContainer = new ObserverContainer<T>();
             _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
             _remoteEventCodec = new 
RemoteEventStreamingCodec<T>(streamingCodec);
 
             // Begin to listen for incoming messages
-            _server = new 
StreamingTransportServer<IRemoteEvent<T>>(localAddress, _observerContainer, 
tcpPortProvider, _remoteEventCodec);
+            _server = new 
StreamingTransportServer<IRemoteEvent<T>>(localAddress,
+                _observerContainer,
+                tcpPortProvider,
+                _remoteEventCodec);
             _server.Run();
 
-            LocalEndpoint = _server.LocalEndpoint;  
+            LocalEndpoint = _server.LocalEndpoint;
             Identifier = new SocketRemoteIdentifier(LocalEndpoint);
         }
 
@@ -155,10 +164,11 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
 
         private ProxyObserver CreateRemoteObserver(IPEndPoint remoteEndpoint)
         {
-            StreamingTransportClient<IRemoteEvent<T>> client = new 
StreamingTransportClient<IRemoteEvent<T>>(
-                remoteEndpoint,
-                this._observerContainer,
-                this._remoteEventCodec);
+            StreamingTransportClient<IRemoteEvent<T>> client =
+                new StreamingTransportClient<IRemoteEvent<T>>(remoteEndpoint,
+                    _observerContainer,
+                    _remoteEventCodec,
+                    _tcpClientFactory);
 
             ProxyObserver remoteObserver = new ProxyObserver(client);
             return remoteObserver;

http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs
index f1c4711..0c337fc 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs
@@ -28,18 +28,29 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
     public sealed class StreamingRemoteManagerFactory
     {
         private readonly ITcpPortProvider _tcpPortProvider;
+        private readonly ITcpClientConnectionFactory _tcpClientFactory;
         private readonly IInjector _injector;
 
-        [Inject]  
-        private StreamingRemoteManagerFactory(ITcpPortProvider 
tcpPortProvider, IInjector injector)
+        [Inject]
+        private StreamingRemoteManagerFactory(ITcpPortProvider tcpPortProvider,
+            ITcpClientConnectionFactory tcpClientFactory,
+            IInjector injector)
         {
             _tcpPortProvider = tcpPortProvider;
+            _tcpClientFactory = tcpClientFactory;
             _injector = injector;
         }
 
+        /// <summary>
+        /// Gives StreamingRemoteManager instance
+        /// </summary>
+        /// <typeparam name="T">Type of message</typeparam>
+        /// <param name="localAddress">local IP address</param>
+        /// <param name="codec">codec for message type T</param>
+        /// <returns>IRemoteManager instance</returns>
         public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, 
IStreamingCodec<T> codec)
         {
-            return new StreamingRemoteManager<T>(localAddress, 
_tcpPortProvider, codec);
+            return new StreamingRemoteManager<T>(localAddress, 
_tcpPortProvider, codec, _tcpClientFactory);
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs
index 13d6db0..aecade6 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs
@@ -43,11 +43,12 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// </summary>
         /// <param name="remoteEndpoint">The endpoint of the remote server to 
connect to</param>
         /// <param name="streamingCodec">Streaming codec</param>
-        internal StreamingTransportClient(IPEndPoint remoteEndpoint, 
IStreamingCodec<T> streamingCodec)
+        /// <param name="clientFactory">TcpClient factory</param>
+        internal StreamingTransportClient(IPEndPoint remoteEndpoint, 
IStreamingCodec<T> streamingCodec, ITcpClientConnectionFactory clientFactory)
         {
             Exceptions.ThrowIfArgumentNull(remoteEndpoint, "remoteEndpoint", 
Logger);
 
-            _link = new StreamingLink<T>(remoteEndpoint, streamingCodec);
+            _link = new StreamingLink<T>(remoteEndpoint, streamingCodec, 
clientFactory);
             _cancellationSource = new CancellationTokenSource();
             _disposed = false;
         }
@@ -59,10 +60,12 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// <param name="remoteEndpoint">The endpoint of the remote server to 
connect to</param>
         /// <param name="observer">Callback used when receiving responses from 
remote host</param>
         /// <param name="streamingCodec">Streaming codec</param>
+        /// <param name="clientFactory">TcpClient factory</param>
         internal StreamingTransportClient(IPEndPoint remoteEndpoint,
             IObserver<TransportEvent<T>> observer,
-            IStreamingCodec<T> streamingCodec)
-            : this(remoteEndpoint, streamingCodec)
+            IStreamingCodec<T> streamingCodec, 
+            ITcpClientConnectionFactory clientFactory)
+            : this(remoteEndpoint, streamingCodec, clientFactory)
         {
             _observer = observer;
             Task.Run(() => ResponseLoop());

http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TcpClientConnectionFactory.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TcpClientConnectionFactory.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TcpClientConnectionFactory.cs
new file mode 100644
index 0000000..82a4445
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TcpClientConnectionFactory.cs
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Net;
+using System.Net.Sockets;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Default implementation of ITcpClientFactory.
+    /// Provides TcpClient for the remote endpoint with 
+    /// the specified retry logic for connection.
+    /// </summary>
+    public sealed class TcpClientConnectionFactory : 
ITcpClientConnectionFactory
+    {
+        private readonly IConnectionRetryHandler _retryHandler;
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(TcpClientConnectionFactory));
+
+        [Inject]
+        private TcpClientConnectionFactory(IConnectionRetryHandler 
retryHandler)
+        {
+            _retryHandler = retryHandler;
+        }
+
+        /// <summary>
+        /// Provides TcpClient for the specific endpoint with the 
+        /// retry logic provided by the user.
+        /// </summary>
+        /// <param name="endPoint">IP address and port number of server</param>
+        /// <returns>Tcp client</returns>
+        public TcpClient Connect(IPEndPoint endPoint)
+        {
+            TcpClient client = new TcpClient();
+
+            try
+            {
+                _retryHandler.Policy.ExecuteAction(() => 
client.Connect(endPoint));
+                var msg = string.Format("Connection to endpoint {0} 
established", endPoint);
+                Logger.Log(Level.Info, msg);
+                return client;
+            }
+            catch (Exception e)
+            {
+                var msg = string.Format("Connection to endpoint {0} failed", 
endPoint);
+                Exceptions.CaughtAndThrow(e, Level.Error, msg, Logger);
+                return null;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportClient.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportClient.cs
index b2f5707..25b9fdc 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportClient.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportClient.cs
@@ -38,7 +38,8 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// </summary>
         /// <param name="remoteEndpoint">The endpoint of the remote server to 
connect to</param>
         /// <param name="codec">Codec to decode/encode</param>
-        public TransportClient(IPEndPoint remoteEndpoint, ICodec<T> codec) 
+        /// <param name="clientFactory">TcpClient factory</param>
+        public TransportClient(IPEndPoint remoteEndpoint, ICodec<T> codec, 
ITcpClientConnectionFactory clientFactory) 
         {
             if (remoteEndpoint == null)
             {
@@ -48,8 +49,12 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
             {
                 throw new ArgumentNullException("codec");
             }
+            if (clientFactory == null)
+            {
+                throw new ArgumentNullException("clientFactory");
+            }
 
-            _link = new Link<T>(remoteEndpoint, codec);
+            _link = new Link<T>(remoteEndpoint, codec, clientFactory);
             _cancellationSource = new CancellationTokenSource();
             _disposed = false;
         }
@@ -61,10 +66,12 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// <param name="remoteEndpoint">The endpoint of the remote server to 
connect to</param>
         /// <param name="codec">Codec to decode/encodec</param>
         /// <param name="observer">Callback used when receiving responses from 
remote host</param>
-        public TransportClient(IPEndPoint remoteEndpoint, 
-                               ICodec<T> codec, 
-                               IObserver<TransportEvent<T>> observer) 
-                                   : this(remoteEndpoint, codec)
+        /// <param name="clientFactory">TcpClient factory</param>
+        public TransportClient(IPEndPoint remoteEndpoint,
+            ICodec<T> codec,
+            IObserver<TransportEvent<T>> observer,
+            ITcpClientConnectionFactory clientFactory)
+            : this(remoteEndpoint, codec, clientFactory)
         {
             _observer = observer;
             Task.Run(() => ResponseLoop());

http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/ConnectionRetryCount.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/ConnectionRetryCount.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/ConnectionRetryCount.cs
new file mode 100644
index 0000000..934983f
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/ConnectionRetryCount.cs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Wake.Remote.Parameters
+{
+    [NamedParameter("Number of retries for connecting to endpoint", 
defaultValue: "20")]
+    public sealed class ConnectionRetryCount : Name<int>
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/SleepTimeInMs.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/SleepTimeInMs.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/SleepTimeInMs.cs
new file mode 100644
index 0000000..f3f0b77
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/SleepTimeInMs.cs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Wake.Remote.Parameters
+{
+    [NamedParameter("Sleep Time in milliseconds between retries", 
defaultValue: "1000")]
+    public sealed class SleepTimeInMs : Name<int>
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/packages.config
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/packages.config 
b/lang/cs/Org.Apache.REEF.Wake/packages.config
index 18cb5a4..899f2b2 100644
--- a/lang/cs/Org.Apache.REEF.Wake/packages.config
+++ b/lang/cs/Org.Apache.REEF.Wake/packages.config
@@ -1,4 +1,4 @@
-<?xml version="1.0" encoding="utf-8"?>
+<?xml version="1.0" encoding="utf-8"?>
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
@@ -22,4 +22,5 @@ under the License.
   <package id="Rx-Core" version="2.2.5" targetFramework="net45" />
   <package id="Rx-Interfaces" version="2.2.5" targetFramework="net45" />
   <package id="StyleCop.MSBuild" version="4.7.49.1" targetFramework="net45" 
developmentDependency="true" />
+  <package id="TransientFaultHandling.Core" version="5.1.1209.1" 
targetFramework="net45" />
 </packages>

Reply via email to