Repository: reef Updated Branches: refs/heads/master 4c989421b -> 316d51e4f
[REEF-1292] Link and StreamingLink classes should have retry logic for setting up remote connection This addressed the issue by * introducing RemoteConnectionRetryHandler that handles the retry logic for connecting to remote endpoint * allowing user to specify the value of connection retries and sleep times bewteen retries for NameClient, DefaultRemoteManagerFactory and StreamingRemoteManagerFactory. JIRA: [REEF-1292](https://issues.apache.org/jira/browse/REEF-1292) This closes #929 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/316d51e4 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/316d51e4 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/316d51e4 Branch: refs/heads/master Commit: 316d51e4fd58eb581ac0a0cffbe8b0c63536837d Parents: 4c98942 Author: Dhruv <[email protected]> Authored: Thu Mar 31 16:37:11 2016 -0700 Committer: Julia Wang <[email protected]> Committed: Wed Apr 6 15:48:14 2016 -0700 ---------------------------------------------------------------------- .../Org.Apache.REEF.Network.Tests.csproj | 3 +- .../TcpClientConfigurationModuleTests.cs | 43 +++++++++++ .../Naming/NameClient.cs | 11 ++- .../Org.Apache.REEF.Network.csproj | 2 + .../TcpClientConfigurationModule.cs | 48 ++++++++++++ .../TcpClientConfigurationProvider.cs | 50 +++++++++++++ .../Org.Apache.REEF.Wake.Tests.csproj | 1 + .../StreamingTransportTest.cs | 50 ++++++++++--- .../TcpConnectionRetryTest.cs | 79 ++++++++++++++++++++ .../Org.Apache.REEF.Wake.Tests/TransportTest.cs | 22 +++++- .../Org.Apache.REEF.Wake.csproj | 9 +++ .../Remote/IConnectionRetryHandler.cs | 35 +++++++++ .../Remote/ITcpClientConnectionFactory.cs | 38 ++++++++++ .../Remote/Impl/DefaultRemoteManager.cs | 26 ++++--- .../Remote/Impl/DefaultRemoteManagerFactory.cs | 32 +++++++- .../cs/Org.Apache.REEF.Wake/Remote/Impl/Link.cs | 6 +- .../Remote/Impl/RemoteConnectionRetryHandler.cs | 76 +++++++++++++++++++ .../Remote/Impl/StreamingLink.cs | 7 +- .../Remote/Impl/StreamingRemoteManager.cs | 24 ++++-- .../Impl/StreamingRemoteManagerFactory.cs | 17 ++++- .../Remote/Impl/StreamingTransportClient.cs | 11 ++- .../Remote/Impl/TcpClientConnectionFactory.cs | 68 +++++++++++++++++ .../Remote/Impl/TransportClient.cs | 19 +++-- .../Remote/Parameters/ConnectionRetryCount.cs | 26 +++++++ .../Remote/Parameters/SleepTimeInMs.cs | 26 +++++++ lang/cs/Org.Apache.REEF.Wake/packages.config | 3 +- 26 files changed, 675 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj index e6c07ec..8ce3408 100644 --- a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj @@ -49,6 +49,7 @@ under the License. </Reference> </ItemGroup> <ItemGroup> + <Compile Include="TcpClientConfigurationModuleTests.cs" /> <Compile Include="BlockingCollectionExtensionTests.cs" /> <Compile Include="GroupCommunication\GroupCommuDriverTests.cs" /> <Compile Include="GroupCommunication\GroupCommunicationTests.cs" /> @@ -92,4 +93,4 @@ under the License. <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" /> <Import Project="$(PackagesDir)\StyleCop.MSBuild.$(StyleCopVersion)\build\StyleCop.MSBuild.Targets" Condition="Exists('$(PackagesDir)\StyleCop.MSBuild.$(StyleCopVersion)\build\StyleCop.MSBuild.Targets')" /> -</Project> +</Project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Network.Tests/TcpClientConfigurationModuleTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/TcpClientConfigurationModuleTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/TcpClientConfigurationModuleTests.cs new file mode 100644 index 0000000..6cd05b2 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network.Tests/TcpClientConfigurationModuleTests.cs @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Wake.Remote; +using Xunit; + +namespace Org.Apache.REEF.Network.Tests +{ + public class TcpClientConfigurationModuleTests + { + [Fact] + public void TestConfiguration() + { + int maxConnectionRetries = 5; + int sleepTimeinMs = 300; + + var tcpClientFactory = + TangFactory.GetTang() + .NewInjector( + TcpClientConfigurationModule.ConfigurationModule + .Set(TcpClientConfigurationModule.MaxConnectionRetry, maxConnectionRetries.ToString()) + .Set(TcpClientConfigurationModule.SleepTime, sleepTimeinMs.ToString()) + .Build()) + .GetInstance<ITcpClientConnectionFactory>(); + Assert.NotNull(tcpClientFactory); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Network/Naming/NameClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Naming/NameClient.cs b/lang/cs/Org.Apache.REEF.Network/Naming/NameClient.cs index 77b069d..c0bcbd7 100644 --- a/lang/cs/Org.Apache.REEF.Network/Naming/NameClient.cs +++ b/lang/cs/Org.Apache.REEF.Network/Naming/NameClient.cs @@ -55,6 +55,7 @@ namespace Org.Apache.REEF.Network.Naming private NameRegisterClient _registerClient; private bool _disposed; private readonly NameCache _cache; + private readonly ITcpClientConnectionFactory _tcpClientFactory; /// <summary> /// Constructs a NameClient to register, lookup, and unregister IPEndpoints @@ -62,12 +63,15 @@ namespace Org.Apache.REEF.Network.Naming /// </summary> /// <param name="remoteAddress">The ip address of the NameServer</param> /// <param name="remotePort">The port of the NameServer</param> + /// <param name="tcpClientFactory">provides TcpClient for given endpoint</param> [Inject] private NameClient( [Parameter(typeof(NamingConfigurationOptions.NameServerAddress))] string remoteAddress, - [Parameter(typeof(NamingConfigurationOptions.NameServerPort))] int remotePort) + [Parameter(typeof(NamingConfigurationOptions.NameServerPort))] int remotePort, + ITcpClientConnectionFactory tcpClientFactory) { IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse(remoteAddress), remotePort); + _tcpClientFactory = tcpClientFactory; Initialize(remoteEndpoint); _disposed = false; _cache = TangFactory.GetTang().NewInjector().GetInstance<NameCache>(); @@ -79,14 +83,17 @@ namespace Org.Apache.REEF.Network.Naming /// </summary> /// <param name="remoteAddress">The ip address of the NameServer</param> /// <param name="remotePort">The port of the NameServer</param> + /// <param name="tcpClientFactory">provides TcpClient for given endpoint</param> /// <param name="cache">The NameCache for caching IpAddresses</param> [Inject] private NameClient( [Parameter(typeof(NamingConfigurationOptions.NameServerAddress))] string remoteAddress, [Parameter(typeof(NamingConfigurationOptions.NameServerPort))] int remotePort, + ITcpClientConnectionFactory tcpClientFactory, NameCache cache) { IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse(remoteAddress), remotePort); + _tcpClientFactory = tcpClientFactory; Initialize(remoteEndpoint); _disposed = false; _cache = cache; @@ -241,7 +248,7 @@ namespace Org.Apache.REEF.Network.Naming IObserver<TransportEvent<NamingEvent>> clientHandler = CreateClientHandler(); ICodec<NamingEvent> codec = CreateClientCodec(); - _client = new TransportClient<NamingEvent>(serverEndpoint, codec, clientHandler); + _client = new TransportClient<NamingEvent>(serverEndpoint, codec, clientHandler, _tcpClientFactory); _lookupClient = new NameLookupClient(_client, _lookupResponseQueue, _getAllResponseQueue); _registerClient = new NameRegisterClient(_client, _registerResponseQueue, _unregisterResponseQueue); http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj index 8e16ae0..474f203 100644 --- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj +++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj @@ -155,6 +155,8 @@ under the License. <Compile Include="NetworkService\NsMessage.cs" /> <Compile Include="NetworkService\StreamingNetworkService.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> + <Compile Include="TcpClientConfigurationModule.cs" /> + <Compile Include="TcpClientConfigurationProvider.cs" /> <Compile Include="Utilities\BlockingCollectionExtensions.cs" /> <Compile Include="Utilities\Utils.cs" /> </ItemGroup> http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Network/TcpClientConfigurationModule.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/TcpClientConfigurationModule.cs b/lang/cs/Org.Apache.REEF.Network/TcpClientConfigurationModule.cs new file mode 100644 index 0000000..d77c00b --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/TcpClientConfigurationModule.cs @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.Common.Client.Parameters; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Wake.Remote.Parameters; + +namespace Org.Apache.REEF.Network +{ + /// <summary> + /// Configuration Module for the TCP Client provider + /// </summary> + public class TcpClientConfigurationModule : ConfigurationModuleBuilder + { + /// <summary> + /// Number of times to retry the connection + /// </summary> + public static readonly OptionalParameter<int> MaxConnectionRetry = new OptionalParameter<int>(); + + /// <summary> + /// SLeep time between tries in milliseconds. + /// </summary> + public static readonly OptionalParameter<int> SleepTime = new OptionalParameter<int>(); + + public static readonly ConfigurationModule ConfigurationModule = new TcpClientConfigurationModule() + .BindSetEntry<DriverConfigurationProviders, TcpClientConfigurationProvider, IConfigurationProvider>( + GenericType<DriverConfigurationProviders>.Class, GenericType<TcpClientConfigurationProvider>.Class) + .BindNamedParameter(GenericType<ConnectionRetryCount>.Class, MaxConnectionRetry) + .BindNamedParameter(GenericType<SleepTimeInMs>.Class, SleepTime) + .Build(); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Network/TcpClientConfigurationProvider.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/TcpClientConfigurationProvider.cs b/lang/cs/Org.Apache.REEF.Network/TcpClientConfigurationProvider.cs new file mode 100644 index 0000000..15cd0fb --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/TcpClientConfigurationProvider.cs @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.Common.Evaluator.Parameters; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Wake.Remote.Parameters; + +namespace Org.Apache.REEF.Network +{ + /// <summary> + /// Configuration provider for TcpClient connection. + /// </summary> + public sealed class TcpClientConfigurationProvider : IConfigurationProvider + { + private readonly IConfiguration _configuration; + + [Inject] + private TcpClientConfigurationProvider( + [Parameter(typeof(ConnectionRetryCount))] int connectionRetryCount, + [Parameter(typeof(SleepTimeInMs))] int sleepTimeInMs) + { + _configuration = TangFactory.GetTang().NewConfigurationBuilder() + .BindIntNamedParam<ConnectionRetryCount>(connectionRetryCount.ToString()) + .BindIntNamedParam<SleepTimeInMs>(sleepTimeInMs.ToString()) + .BindSetEntry<EvaluatorConfigurationProviders, TcpClientConfigurationProvider, IConfigurationProvider>() + .Build(); + } + + IConfiguration IConfigurationProvider.GetConfiguration() + { + return _configuration; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj index 578b078..283536f 100644 --- a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj @@ -52,6 +52,7 @@ under the License. <Compile Include="StreamingRemoteManagerTest.cs" /> <Compile Include="StreamingTransportTest.cs" /> <Compile Include="TimeTest.cs" /> + <Compile Include="TcpConnectionRetryTest.cs" /> <Compile Include="TransportTest.cs" /> </ItemGroup> <ItemGroup> http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingTransportTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingTransportTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingTransportTest.cs index 38eeea0..290dfc2 100644 --- a/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingTransportTest.cs +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingTransportTest.cs @@ -39,6 +39,7 @@ namespace Org.Apache.REEF.Wake.Tests { private readonly ITcpPortProvider _tcpPortProvider = GetTcpProvider(9900, 9940); private readonly IInjector _injector = TangFactory.GetTang().NewInjector(); + private readonly ITcpClientConnectionFactory _tcpClientFactory = GetTcpClientFactory(5, 500); /// <summary> /// Tests whether StreamingTransportServer receives @@ -54,12 +55,16 @@ namespace Org.Apache.REEF.Wake.Tests IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0); var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent => queue.Add(tEvent.Data)); - using (var server = new StreamingTransportServer<string>(endpoint.Address, remoteHandler, _tcpPortProvider, stringCodec)) + using ( + var server = new StreamingTransportServer<string>(endpoint.Address, + remoteHandler, + _tcpPortProvider, + stringCodec)) { server.Run(); IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port); - using (var client = new StreamingTransportClient<string>(remoteEndpoint, stringCodec)) + using (var client = new StreamingTransportClient<string>(remoteEndpoint, stringCodec, _tcpClientFactory)) { client.Send("Hello"); client.Send(", "); @@ -68,7 +73,7 @@ namespace Org.Apache.REEF.Wake.Tests events.Add(queue.Take()); events.Add(queue.Take()); events.Add(queue.Take()); - } + } } Assert.Equal(3, events.Count); @@ -92,13 +97,21 @@ namespace Org.Apache.REEF.Wake.Tests // Server echoes the message back to the client var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent => tEvent.Link.Write(tEvent.Data)); - using (var server = new StreamingTransportServer<string>(endpoint.Address, remoteHandler, _tcpPortProvider, stringCodec)) + using ( + var server = new StreamingTransportServer<string>(endpoint.Address, + remoteHandler, + _tcpPortProvider, + stringCodec)) { server.Run(); var clientHandler = Observer.Create<TransportEvent<string>>(tEvent => queue.Add(tEvent.Data)); IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port); - using (var client = new StreamingTransportClient<string>(remoteEndpoint, clientHandler, stringCodec)) + using ( + var client = new StreamingTransportClient<string>(remoteEndpoint, + clientHandler, + stringCodec, + _tcpClientFactory)) { client.Send("Hello"); client.Send(", "); @@ -107,7 +120,7 @@ namespace Org.Apache.REEF.Wake.Tests events.Add(queue.Take()); events.Add(queue.Take()); events.Add(queue.Take()); - } + } } Assert.Equal(3, events.Count); @@ -132,7 +145,11 @@ namespace Org.Apache.REEF.Wake.Tests IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0); var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent => queue.Add(tEvent.Data)); - using (var server = new StreamingTransportServer<string>(endpoint.Address, remoteHandler, _tcpPortProvider, stringCodec)) + using ( + var server = new StreamingTransportServer<string>(endpoint.Address, + remoteHandler, + _tcpPortProvider, + stringCodec)) { server.Run(); @@ -140,8 +157,12 @@ namespace Org.Apache.REEF.Wake.Tests { Task.Run(() => { - IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port); - using (var client = new StreamingTransportClient<string>(remoteEndpoint, stringCodec)) + IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), + server.LocalEndpoint.Port); + using ( + var client = new StreamingTransportClient<string>(remoteEndpoint, + stringCodec, + _tcpClientFactory)) { client.Send("Hello"); client.Send(", "); @@ -168,5 +189,16 @@ namespace Org.Apache.REEF.Wake.Tests .Build(); return TangFactory.GetTang().NewInjector(configuration).GetInstance<ITcpPortProvider>(); } + + private static ITcpClientConnectionFactory GetTcpClientFactory(int connectionRetryCount, int sleepTimeInMs) + { + var config = + TangFactory.GetTang() + .NewConfigurationBuilder() + .BindIntNamedParam<ConnectionRetryCount>(connectionRetryCount.ToString()) + .BindIntNamedParam<SleepTimeInMs>(sleepTimeInMs.ToString()) + .Build(); + return TangFactory.GetTang().NewInjector(config).GetInstance<ITcpClientConnectionFactory>(); + } } } http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake.Tests/TcpConnectionRetryTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/TcpConnectionRetryTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/TcpConnectionRetryTest.cs new file mode 100644 index 0000000..e314894 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/TcpConnectionRetryTest.cs @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.IO; +using System.Net; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.Remote.Parameters; +using Xunit; + +namespace Org.Apache.REEF.Wake.Tests +{ + public class TcpConnectionRetryTest + { + /// <summary> + /// Tests whether retry logic in RemoteConnectionRetryHandler is called. + /// We run just client but not server and then check redirected output for + /// retry messages + /// </summary> + [Fact] + public void TestConnectionRetries() + { + IPAddress localIpAddress = IPAddress.Parse("127.0.0.1"); + const int retryCount = 5; + const int sleepTimeInMs = 500; + const string message = "Retry - Count:"; + IPEndPoint remoteEndpoint = new IPEndPoint(localIpAddress, 8900); + + var memStream = new MemoryStream(); + var writer = new StreamWriter(memStream); + Console.SetOut(writer); + var config = + TangFactory.GetTang() + .NewConfigurationBuilder() + .BindIntNamedParam<ConnectionRetryCount>(retryCount.ToString()) + .BindIntNamedParam<SleepTimeInMs>(sleepTimeInMs.ToString()) + .Build(); + var tmp = TangFactory.GetTang().NewInjector(config).GetInstance<ITcpClientConnectionFactory>(); + + try + { + tmp.Connect(remoteEndpoint); + Assert.False(true); + } + catch + { + memStream.Position = 0; + using (var reader = new StreamReader(memStream)) + { + string line; + int counter = 0; + while ((line = reader.ReadLine()) != null) + { + if (line.Contains(message)) + { + counter++; + } + } + Assert.Equal(counter, retryCount); + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs index 3490961..61ce0c6 100644 --- a/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs @@ -21,6 +21,7 @@ using System.Net; using System.Reactive; using System.Threading.Tasks; using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Util; using Org.Apache.REEF.Wake.Remote; using Org.Apache.REEF.Wake.Remote.Impl; using Org.Apache.REEF.Wake.Remote.Parameters; @@ -33,6 +34,8 @@ namespace Org.Apache.REEF.Wake.Tests { private readonly IPAddress _localIpAddress = IPAddress.Parse("127.0.0.1"); private readonly ITcpPortProvider _tcpPortProvider = GetTcpProvider(8900, 8940); + private readonly ITcpClientConnectionFactory _tcpClientFactory = GetTcpClientFactory(5, 500); + [Fact] public void TestTransportServer() { @@ -49,7 +52,7 @@ namespace Org.Apache.REEF.Wake.Tests server.Run(); IPEndPoint remoteEndpoint = new IPEndPoint(_localIpAddress, server.LocalEndpoint.Port); - using (var client = new TransportClient<string>(remoteEndpoint, codec)) + using (var client = new TransportClient<string>(remoteEndpoint, codec, _tcpClientFactory)) { client.Send("Hello"); client.Send(", "); @@ -83,7 +86,7 @@ namespace Org.Apache.REEF.Wake.Tests server.Run(); IPEndPoint remoteEndpoint = new IPEndPoint(_localIpAddress, server.LocalEndpoint.Port); - using (var client = new TransportClient<TestEvent>(remoteEndpoint, codec)) + using (var client = new TransportClient<TestEvent>(remoteEndpoint, codec, _tcpClientFactory)) { client.Send(new TestEvent("Hello")); client.Send(new TestEvent(", ")); @@ -119,7 +122,7 @@ namespace Org.Apache.REEF.Wake.Tests var clientHandler = Observer.Create<TransportEvent<string>>(tEvent => queue.Add(tEvent.Data)); IPEndPoint remoteEndpoint = new IPEndPoint(_localIpAddress, server.LocalEndpoint.Port); - using (var client = new TransportClient<string>(remoteEndpoint, codec, clientHandler)) + using (var client = new TransportClient<string>(remoteEndpoint, codec, clientHandler, _tcpClientFactory)) { client.Send("Hello"); client.Send(", "); @@ -158,7 +161,7 @@ namespace Org.Apache.REEF.Wake.Tests Task.Run(() => { IPEndPoint remoteEndpoint = new IPEndPoint(_localIpAddress, server.LocalEndpoint.Port); - using (var client = new TransportClient<string>(remoteEndpoint, codec)) + using (var client = new TransportClient<string>(remoteEndpoint, codec, _tcpClientFactory)) { client.Send("Hello"); client.Send(", "); @@ -213,5 +216,16 @@ namespace Org.Apache.REEF.Wake.Tests .Build(); return TangFactory.GetTang().NewInjector(configuration).GetInstance<ITcpPortProvider>(); } + + private static ITcpClientConnectionFactory GetTcpClientFactory(int connectionRetryCount, int sleepTimeInMs) + { + var config = + TangFactory.GetTang() + .NewConfigurationBuilder() + .BindIntNamedParam<ConnectionRetryCount>(connectionRetryCount.ToString()) + .BindIntNamedParam<SleepTimeInMs>(sleepTimeInMs.ToString()) + .Build(); + return TangFactory.GetTang().NewInjector(config).GetInstance<ITcpClientConnectionFactory>(); + } } } http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj index 005aa82..1d6551e 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj +++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj @@ -30,6 +30,9 @@ under the License. </PropertyGroup> <Import Project="$(SolutionDir)\build.props" /> <ItemGroup> + <Reference Include="Microsoft.Practices.TransientFaultHandling.Core, Version=5.1.1209.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL"> + <HintPath>$(PackagesDir)\TransientFaultHandling.Core.5.1.1209.1\lib\NET4\Microsoft.Practices.TransientFaultHandling.Core.dll</HintPath> + </Reference> <Reference Include="protobuf-net"> <HintPath>$(PackagesDir)\protobuf-net.$(ProtobufVersion)\lib\net40\protobuf-net.dll</HintPath> </Reference> @@ -48,6 +51,8 @@ under the License. <Compile Include="IEventHandler.cs" /> <Compile Include="IIdentifier.cs" /> <Compile Include="IIdentifierFactory.cs" /> + <Compile Include="Remote\IConnectionRetryHandler.cs" /> + <Compile Include="Remote\Impl\RemoteConnectionRetryHandler.cs" /> <Compile Include="Remote\Impl\RemoteEventStreamingCodec.cs" /> <Compile Include="Remote\Impl\StreamingRemoteManagerFactory.cs" /> <Compile Include="Remote\Impl\DefaultRemoteManagerFactory.cs" /> @@ -63,7 +68,11 @@ under the License. <Compile Include="IStage.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="Remote\IRemoteObserver.cs" /> + <Compile Include="Remote\Impl\TcpClientConnectionFactory.cs" /> + <Compile Include="Remote\ITcpClientConnectionFactory.cs" /> <Compile Include="Remote\ITcpPortProvider.cs" /> + <Compile Include="Remote\Parameters\ConnectionRetryCount.cs" /> + <Compile Include="Remote\Parameters\SleepTimeInMs.cs" /> <Compile Include="Remote\Parameters\TcpPortRangeCount.cs" /> <Compile Include="Remote\Parameters\TcpPortRangeSeed.cs" /> <Compile Include="Remote\Parameters\TcpPortRangeStart.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Remote/IConnectionRetryHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IConnectionRetryHandler.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IConnectionRetryHandler.cs new file mode 100644 index 0000000..4b89b83 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IConnectionRetryHandler.cs @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Microsoft.Practices.TransientFaultHandling; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote.Impl; + +namespace Org.Apache.REEF.Wake.Remote +{ + /// <summary> + /// Interface for the retry logic to connect to remote endpoint + /// </summary> + [DefaultImplementation(typeof(RemoteConnectionRetryHandler))] + public interface IConnectionRetryHandler + { + /// <summary> + /// Retry policy for the tcp connection + /// </summary> + RetryPolicy Policy { get; } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Remote/ITcpClientConnectionFactory.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/ITcpClientConnectionFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/ITcpClientConnectionFactory.cs new file mode 100644 index 0000000..ccc137f --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/ITcpClientConnectionFactory.cs @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Net; +using System.Net.Sockets; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote.Impl; + +namespace Org.Apache.REEF.Wake.Remote +{ + /// <summary> + /// Provides TcpClient for the remote endpoint + /// </summary> + [DefaultImplementation(typeof(TcpClientConnectionFactory))] + public interface ITcpClientConnectionFactory + { + /// <summary> + /// Provides TcpClient for the specific endpoint + /// </summary> + /// <param name="endPoint">IP address and port number of server</param> + /// <returns>Tcp client</returns> + TcpClient Connect(IPEndPoint endPoint); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs index 4a14539..6a96d48 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs @@ -34,6 +34,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl private readonly TransportServer<IRemoteEvent<T>> _server; private readonly Dictionary<IPEndPoint, ProxyObserver> _cachedClients; private readonly ICodec<IRemoteEvent<T>> _codec; + private readonly ITcpClientConnectionFactory _tcpClientFactory; /// <summary> /// Constructs a DefaultRemoteManager listening on the specified address and any @@ -43,7 +44,12 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// <param name="port">The port to listen on</param> /// <param name="codec">The codec used for serializing messages</param> /// <param name="tcpPortProvider">provides port numbers to listen</param> - internal DefaultRemoteManager(IPAddress localAddress, int port, ICodec<T> codec, ITcpPortProvider tcpPortProvider) + /// <param name="tcpClientFactory">provides TcpClient for given endpoint</param> + internal DefaultRemoteManager(IPAddress localAddress, + int port, + ICodec<T> codec, + ITcpPortProvider tcpPortProvider, + ITcpClientConnectionFactory tcpClientFactory) { if (localAddress == null) { @@ -58,6 +64,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl throw new ArgumentNullException("codec"); } + _tcpClientFactory = tcpClientFactory; _observerContainer = new ObserverContainer<T>(); _codec = new RemoteEventCodec<T>(codec); _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>(); @@ -65,7 +72,9 @@ namespace Org.Apache.REEF.Wake.Remote.Impl IPEndPoint localEndpoint = new IPEndPoint(localAddress, port); // Begin to listen for incoming messages - _server = new TransportServer<IRemoteEvent<T>>(localEndpoint, _observerContainer, _codec, + _server = new TransportServer<IRemoteEvent<T>>(localEndpoint, + _observerContainer, + _codec, tcpPortProvider); _server.Run(); @@ -77,7 +86,8 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// Constructs a DefaultRemoteManager. Does not listen for incoming messages. /// </summary> /// <param name="codec">The codec used for serializing messages</param> - internal DefaultRemoteManager(ICodec<T> codec) + /// <param name="tcpClientFactory">provides TcpClient for given endpoint</param> + internal DefaultRemoteManager(ICodec<T> codec, ITcpClientConnectionFactory tcpClientFactory) { using (LOGGER.LogFunction("DefaultRemoteManager::DefaultRemoteManager")) { @@ -86,6 +96,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl throw new ArgumentNullException("codec"); } + _tcpClientFactory = tcpClientFactory; _observerContainer = new ObserverContainer<T>(); _codec = new RemoteEventCodec<T>(codec); _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>(); @@ -179,12 +190,9 @@ namespace Org.Apache.REEF.Wake.Remote.Impl private ProxyObserver CreateRemoteObserver(IPEndPoint remoteEndpoint) { - TransportClient<IRemoteEvent<T>> client = new TransportClient<IRemoteEvent<T>>( - remoteEndpoint, - this._codec, - this._observerContainer); - var msg = string.Format( - "NewClientConnection: Local {0} connected to Remote {1}", + TransportClient<IRemoteEvent<T>> client = + new TransportClient<IRemoteEvent<T>>(remoteEndpoint, _codec, _observerContainer, _tcpClientFactory); + var msg = string.Format("NewClientConnection: Local {0} connected to Remote {1}", client.Link.LocalEndpoint.ToString(), client.Link.RemoteEndpoint.ToString()); LOGGER.Log(Level.Info, msg); http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs index df1c095..0cb9d70 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs @@ -28,25 +28,49 @@ namespace Org.Apache.REEF.Wake.Impl internal sealed class DefaultRemoteManagerFactory : IRemoteManagerFactory { private readonly ITcpPortProvider _tcpPortProvider; + private readonly ITcpClientConnectionFactory _tcpClientFactory; + [Inject] - private DefaultRemoteManagerFactory(ITcpPortProvider tcpPortProvider) + private DefaultRemoteManagerFactory(ITcpPortProvider tcpPortProvider, ITcpClientConnectionFactory tcpClientFactory) { _tcpPortProvider = tcpPortProvider; + _tcpClientFactory = tcpClientFactory; } + /// <summary> + /// Gives DefaultRemoteManager instance + /// </summary> + /// <typeparam name="T">Type of Message</typeparam> + /// <param name="localAddress">Local IP address</param> + /// <param name="port">Port number</param> + /// <param name="codec">Codec for message type T</param> + /// <returns>IRemoteManager instance</returns> public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int port, ICodec<T> codec) { - return new DefaultRemoteManager<T>(localAddress, port, codec, _tcpPortProvider); + return new DefaultRemoteManager<T>(localAddress, port, codec, _tcpPortProvider, _tcpClientFactory); } + /// <summary> + /// Gives DefaultRemoteManager instance + /// </summary> + /// <typeparam name="T">Message type</typeparam> + /// <param name="localAddress">Local IP address</param> + /// <param name="codec">Codec for message type</param> + /// <returns>IRemoteManager instance</returns> public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, ICodec<T> codec) { - return new DefaultRemoteManager<T>(localAddress, 0, codec, _tcpPortProvider); + return new DefaultRemoteManager<T>(localAddress, 0, codec, _tcpPortProvider, _tcpClientFactory); } + /// <summary> + /// Gives DefaultRemoteManager instance + /// </summary> + /// <typeparam name="T">Message type</typeparam> + /// <param name="codec">Codec for message type</param> + /// <returns>IRemoteManager instance</returns> public IRemoteManager<T> GetInstance<T>(ICodec<T> codec) { - return new DefaultRemoteManager<T>(codec); + return new DefaultRemoteManager<T>(codec, _tcpClientFactory); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/Link.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/Link.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/Link.cs index 964cfc8..8537817 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/Link.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/Link.cs @@ -45,7 +45,8 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// </summary> /// <param name="remoteEndpoint">The remote endpoint to connect to</param> /// <param name="codec">The codec for serializing messages</param> - public Link(IPEndPoint remoteEndpoint, ICodec<T> codec) + /// <param name="tcpClientFactory">TcpClient factory</param> + public Link(IPEndPoint remoteEndpoint, ICodec<T> codec, ITcpClientConnectionFactory tcpClientFactory) { if (remoteEndpoint == null) { @@ -56,8 +57,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl throw new ArgumentNullException("codec"); } - Client = new TcpClient(); - Client.Connect(remoteEndpoint); + Client = tcpClientFactory.Connect(remoteEndpoint); _codec = codec; _channel = new Channel(Client.GetStream()); http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteConnectionRetryHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteConnectionRetryHandler.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteConnectionRetryHandler.cs new file mode 100644 index 0000000..aef68af --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteConnectionRetryHandler.cs @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Microsoft.Practices.TransientFaultHandling; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.Remote.Parameters; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + /// <summary> + /// Handles the retry logic to connect to remote endpoint + /// </summary> + internal sealed class RemoteConnectionRetryHandler : IConnectionRetryHandler + { + private static readonly Logger Logger = Logger.GetLogger(typeof(RemoteConnectionRetryHandler)); + private readonly RetryPolicy<AllErrorsTransientStrategy> _retryPolicy; + + /// <summary> + /// Constructor + /// </summary> + /// <param name="retryCount">Number of times to retry connection</param> + /// <param name="timeIntervalInMs">Time interval between retries in milli seconds</param> + [Inject] + internal RemoteConnectionRetryHandler([Parameter(typeof(ConnectionRetryCount))] int retryCount, + [Parameter(typeof(SleepTimeInMs))] int timeIntervalInMs) + { + var timeIntervalInMs1 = TimeSpan.FromMilliseconds(timeIntervalInMs); + _retryPolicy = new RetryPolicy<AllErrorsTransientStrategy>( + new FixedInterval("ConnectionRetries", retryCount, timeIntervalInMs1, true)); + _retryPolicy.Retrying += (sender, args) => + { + // Log details of the retry. + var msg = string.Format("Retry - Count:{0}, Delay:{1}, Exception:{2}", + args.CurrentRetryCount, + args.Delay, + args.LastException); + Logger.Log(Level.Info, msg); + }; + } + + /// <summary> + /// Retry policy for the connection + /// </summary> + public RetryPolicy Policy + { + get + { + return _retryPolicy; + } + } + } + + internal class AllErrorsTransientStrategy : ITransientErrorDetectionStrategy + { + public bool IsTransient(Exception ex) + { + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs index adb10dd..4a8a048 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs @@ -57,16 +57,15 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// </summary> /// <param name="remoteEndpoint">The remote endpoint to connect to</param> /// <param name="streamingCodec">Streaming codec</param> - internal StreamingLink(IPEndPoint remoteEndpoint, IStreamingCodec<T> streamingCodec) + /// <param name="tcpClientFactory">TcpClient factory</param> + internal StreamingLink(IPEndPoint remoteEndpoint, IStreamingCodec<T> streamingCodec, ITcpClientConnectionFactory tcpClientFactory) { if (remoteEndpoint == null) { throw new ArgumentNullException("remoteEndpoint"); } - _client = new TcpClient(); - _client.Connect(remoteEndpoint); - + _client = tcpClientFactory.Connect(remoteEndpoint); var stream = _client.GetStream(); _localEndpoint = GetLocalEndpoint(); _disposed = false; http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs index 306fc70..66a21df 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs @@ -32,6 +32,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl private readonly StreamingTransportServer<IRemoteEvent<T>> _server; private readonly Dictionary<IPEndPoint, ProxyObserver> _cachedClients; private readonly IStreamingCodec<IRemoteEvent<T>> _remoteEventCodec; + private readonly ITcpClientConnectionFactory _tcpClientFactory; /// <summary> /// Constructs a DefaultRemoteManager listening on the specified address and @@ -40,22 +41,30 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// <param name="localAddress">The address to listen on</param> /// <param name="tcpPortProvider">Tcp port provider</param> /// <param name="streamingCodec">Streaming codec</param> - internal StreamingRemoteManager(IPAddress localAddress, ITcpPortProvider tcpPortProvider, IStreamingCodec<T> streamingCodec) + /// <param name="tcpClientFactory">provides TcpClient for given endpoint</param> + internal StreamingRemoteManager(IPAddress localAddress, + ITcpPortProvider tcpPortProvider, + IStreamingCodec<T> streamingCodec, + ITcpClientConnectionFactory tcpClientFactory) { if (localAddress == null) { throw new ArgumentNullException("localAddress"); } + _tcpClientFactory = tcpClientFactory; _observerContainer = new ObserverContainer<T>(); _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>(); _remoteEventCodec = new RemoteEventStreamingCodec<T>(streamingCodec); // Begin to listen for incoming messages - _server = new StreamingTransportServer<IRemoteEvent<T>>(localAddress, _observerContainer, tcpPortProvider, _remoteEventCodec); + _server = new StreamingTransportServer<IRemoteEvent<T>>(localAddress, + _observerContainer, + tcpPortProvider, + _remoteEventCodec); _server.Run(); - LocalEndpoint = _server.LocalEndpoint; + LocalEndpoint = _server.LocalEndpoint; Identifier = new SocketRemoteIdentifier(LocalEndpoint); } @@ -155,10 +164,11 @@ namespace Org.Apache.REEF.Wake.Remote.Impl private ProxyObserver CreateRemoteObserver(IPEndPoint remoteEndpoint) { - StreamingTransportClient<IRemoteEvent<T>> client = new StreamingTransportClient<IRemoteEvent<T>>( - remoteEndpoint, - this._observerContainer, - this._remoteEventCodec); + StreamingTransportClient<IRemoteEvent<T>> client = + new StreamingTransportClient<IRemoteEvent<T>>(remoteEndpoint, + _observerContainer, + _remoteEventCodec, + _tcpClientFactory); ProxyObserver remoteObserver = new ProxyObserver(client); return remoteObserver; http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs index f1c4711..0c337fc 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs @@ -28,18 +28,29 @@ namespace Org.Apache.REEF.Wake.Remote.Impl public sealed class StreamingRemoteManagerFactory { private readonly ITcpPortProvider _tcpPortProvider; + private readonly ITcpClientConnectionFactory _tcpClientFactory; private readonly IInjector _injector; - [Inject] - private StreamingRemoteManagerFactory(ITcpPortProvider tcpPortProvider, IInjector injector) + [Inject] + private StreamingRemoteManagerFactory(ITcpPortProvider tcpPortProvider, + ITcpClientConnectionFactory tcpClientFactory, + IInjector injector) { _tcpPortProvider = tcpPortProvider; + _tcpClientFactory = tcpClientFactory; _injector = injector; } + /// <summary> + /// Gives StreamingRemoteManager instance + /// </summary> + /// <typeparam name="T">Type of message</typeparam> + /// <param name="localAddress">local IP address</param> + /// <param name="codec">codec for message type T</param> + /// <returns>IRemoteManager instance</returns> public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, IStreamingCodec<T> codec) { - return new StreamingRemoteManager<T>(localAddress, _tcpPortProvider, codec); + return new StreamingRemoteManager<T>(localAddress, _tcpPortProvider, codec, _tcpClientFactory); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs index 13d6db0..aecade6 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs @@ -43,11 +43,12 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// </summary> /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param> /// <param name="streamingCodec">Streaming codec</param> - internal StreamingTransportClient(IPEndPoint remoteEndpoint, IStreamingCodec<T> streamingCodec) + /// <param name="clientFactory">TcpClient factory</param> + internal StreamingTransportClient(IPEndPoint remoteEndpoint, IStreamingCodec<T> streamingCodec, ITcpClientConnectionFactory clientFactory) { Exceptions.ThrowIfArgumentNull(remoteEndpoint, "remoteEndpoint", Logger); - _link = new StreamingLink<T>(remoteEndpoint, streamingCodec); + _link = new StreamingLink<T>(remoteEndpoint, streamingCodec, clientFactory); _cancellationSource = new CancellationTokenSource(); _disposed = false; } @@ -59,10 +60,12 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param> /// <param name="observer">Callback used when receiving responses from remote host</param> /// <param name="streamingCodec">Streaming codec</param> + /// <param name="clientFactory">TcpClient factory</param> internal StreamingTransportClient(IPEndPoint remoteEndpoint, IObserver<TransportEvent<T>> observer, - IStreamingCodec<T> streamingCodec) - : this(remoteEndpoint, streamingCodec) + IStreamingCodec<T> streamingCodec, + ITcpClientConnectionFactory clientFactory) + : this(remoteEndpoint, streamingCodec, clientFactory) { _observer = observer; Task.Run(() => ResponseLoop()); http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TcpClientConnectionFactory.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TcpClientConnectionFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TcpClientConnectionFactory.cs new file mode 100644 index 0000000..82a4445 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TcpClientConnectionFactory.cs @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Net; +using System.Net.Sockets; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Wake.Remote.Impl +{ + /// <summary> + /// Default implementation of ITcpClientFactory. + /// Provides TcpClient for the remote endpoint with + /// the specified retry logic for connection. + /// </summary> + public sealed class TcpClientConnectionFactory : ITcpClientConnectionFactory + { + private readonly IConnectionRetryHandler _retryHandler; + private static readonly Logger Logger = Logger.GetLogger(typeof(TcpClientConnectionFactory)); + + [Inject] + private TcpClientConnectionFactory(IConnectionRetryHandler retryHandler) + { + _retryHandler = retryHandler; + } + + /// <summary> + /// Provides TcpClient for the specific endpoint with the + /// retry logic provided by the user. + /// </summary> + /// <param name="endPoint">IP address and port number of server</param> + /// <returns>Tcp client</returns> + public TcpClient Connect(IPEndPoint endPoint) + { + TcpClient client = new TcpClient(); + + try + { + _retryHandler.Policy.ExecuteAction(() => client.Connect(endPoint)); + var msg = string.Format("Connection to endpoint {0} established", endPoint); + Logger.Log(Level.Info, msg); + return client; + } + catch (Exception e) + { + var msg = string.Format("Connection to endpoint {0} failed", endPoint); + Exceptions.CaughtAndThrow(e, Level.Error, msg, Logger); + return null; + } + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportClient.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportClient.cs index b2f5707..25b9fdc 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportClient.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportClient.cs @@ -38,7 +38,8 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// </summary> /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param> /// <param name="codec">Codec to decode/encode</param> - public TransportClient(IPEndPoint remoteEndpoint, ICodec<T> codec) + /// <param name="clientFactory">TcpClient factory</param> + public TransportClient(IPEndPoint remoteEndpoint, ICodec<T> codec, ITcpClientConnectionFactory clientFactory) { if (remoteEndpoint == null) { @@ -48,8 +49,12 @@ namespace Org.Apache.REEF.Wake.Remote.Impl { throw new ArgumentNullException("codec"); } + if (clientFactory == null) + { + throw new ArgumentNullException("clientFactory"); + } - _link = new Link<T>(remoteEndpoint, codec); + _link = new Link<T>(remoteEndpoint, codec, clientFactory); _cancellationSource = new CancellationTokenSource(); _disposed = false; } @@ -61,10 +66,12 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param> /// <param name="codec">Codec to decode/encodec</param> /// <param name="observer">Callback used when receiving responses from remote host</param> - public TransportClient(IPEndPoint remoteEndpoint, - ICodec<T> codec, - IObserver<TransportEvent<T>> observer) - : this(remoteEndpoint, codec) + /// <param name="clientFactory">TcpClient factory</param> + public TransportClient(IPEndPoint remoteEndpoint, + ICodec<T> codec, + IObserver<TransportEvent<T>> observer, + ITcpClientConnectionFactory clientFactory) + : this(remoteEndpoint, codec, clientFactory) { _observer = observer; Task.Run(() => ResponseLoop()); http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/ConnectionRetryCount.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/ConnectionRetryCount.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/ConnectionRetryCount.cs new file mode 100644 index 0000000..934983f --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/ConnectionRetryCount.cs @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Wake.Remote.Parameters +{ + [NamedParameter("Number of retries for connecting to endpoint", defaultValue: "20")] + public sealed class ConnectionRetryCount : Name<int> + { + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/SleepTimeInMs.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/SleepTimeInMs.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/SleepTimeInMs.cs new file mode 100644 index 0000000..f3f0b77 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/SleepTimeInMs.cs @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Wake.Remote.Parameters +{ + [NamedParameter("Sleep Time in milliseconds between retries", defaultValue: "1000")] + public sealed class SleepTimeInMs : Name<int> + { + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/316d51e4/lang/cs/Org.Apache.REEF.Wake/packages.config ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/packages.config b/lang/cs/Org.Apache.REEF.Wake/packages.config index 18cb5a4..899f2b2 100644 --- a/lang/cs/Org.Apache.REEF.Wake/packages.config +++ b/lang/cs/Org.Apache.REEF.Wake/packages.config @@ -1,4 +1,4 @@ -<?xml version="1.0" encoding="utf-8"?> +<?xml version="1.0" encoding="utf-8"?> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file @@ -22,4 +22,5 @@ under the License. <package id="Rx-Core" version="2.2.5" targetFramework="net45" /> <package id="Rx-Interfaces" version="2.2.5" targetFramework="net45" /> <package id="StyleCop.MSBuild" version="4.7.49.1" targetFramework="net45" developmentDependency="true" /> + <package id="TransientFaultHandling.Core" version="5.1.1209.1" targetFramework="net45" /> </packages>
