Repository: reef Updated Branches: refs/heads/master ce9e85cc4 -> e04b42271
[REEF-1142] Making caching optional the remote managers This addressed the issue by adding factory methods that cerate uncached observers JIRA: [REEF-1142] (https://issues.apache.org/jira/browse/REEF-1142) Pull Request: Closes #905 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/e04b4227 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/e04b4227 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/e04b4227 Branch: refs/heads/master Commit: e04b422714f3196118aa1b312a6869d5f1da33a5 Parents: ce9e85c Author: Boris Shulman <[email protected]> Authored: Fri Mar 25 23:50:54 2016 -0700 Committer: Andrew Chung <[email protected]> Committed: Mon Apr 4 14:33:05 2016 -0700 ---------------------------------------------------------------------- .../Org.Apache.REEF.Wake.csproj | 1 + .../Remote/IRemoteManager.cs | 4 ++ .../Remote/IRemoteObserver.cs | 27 ++++++++++ .../Remote/Impl/DefaultRemoteManager.cs | 55 +++++++++++++++---- .../Remote/Impl/StreamingRemoteManager.cs | 57 ++++++++++++++++++-- 5 files changed, 129 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/e04b4227/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj index 391648e..005aa82 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj +++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj @@ -62,6 +62,7 @@ under the License. <Compile Include="IObserverFactory.cs" /> <Compile Include="IStage.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> + <Compile Include="Remote\IRemoteObserver.cs" /> <Compile Include="Remote\ITcpPortProvider.cs" /> <Compile Include="Remote\Parameters\TcpPortRangeCount.cs" /> <Compile Include="Remote\Parameters\TcpPortRangeSeed.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/e04b4227/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteManager.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteManager.cs index cb0c4db..2208682 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteManager.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteManager.cs @@ -31,6 +31,10 @@ namespace Org.Apache.REEF.Wake.Remote IObserver<T> GetRemoteObserver(IPEndPoint remoteEndpoint); + IRemoteObserver<T> GetUnmanagedRemoteObserver(RemoteEventEndPoint<T> dest); + + IRemoteObserver<T> GetUnmanagedObserver(IPEndPoint remoteEndpoint); + IDisposable RegisterObserver(RemoteEventEndPoint<T> source, IObserver<T> theObserver); IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> theObserver); http://git-wip-us.apache.org/repos/asf/reef/blob/e04b4227/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteObserver.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteObserver.cs new file mode 100644 index 0000000..be2f565 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteObserver.cs @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +using System; + +namespace Org.Apache.REEF.Wake.Remote +{ + /// <summary> + /// A disposable remote observer observer, that is not managed by the manager that creates it + /// </summary> + public interface IRemoteObserver<T> : IObserver<T>, IDisposable + { + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/e04b4227/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs index 072bc11..4a14539 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs @@ -18,7 +18,6 @@ using System; using System.Collections.Generic; using System.Net; -using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Utilities.Logging; using Org.Apache.REEF.Wake.Util; @@ -144,20 +143,56 @@ namespace Org.Apache.REEF.Wake.Remote.Impl ProxyObserver remoteObserver; if (!_cachedClients.TryGetValue(remoteEndpoint, out remoteObserver)) { - TransportClient<IRemoteEvent<T>> client = - new TransportClient<IRemoteEvent<T>>(remoteEndpoint, _codec, _observerContainer); - var msg = string.Format("NewClientConnection: Local {0} connected to Remote {1}", - client.Link.LocalEndpoint.ToString(), - client.Link.RemoteEndpoint.ToString()); - LOGGER.Log(Level.Info, msg); - - remoteObserver = new ProxyObserver(client); + remoteObserver = CreateRemoteObserver(remoteEndpoint); _cachedClients[remoteEndpoint] = remoteObserver; } return remoteObserver; } + public IRemoteObserver<T> GetUnmanagedRemoteObserver(RemoteEventEndPoint<T> remoteEndpoint) + { + if (remoteEndpoint == null) + { + throw new ArgumentNullException("remoteEndpoint"); + } + + SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier; + if (id == null) + { + throw new ArgumentException("ID not supported"); + } + + return GetUnmanagedObserver(id.Addr); + } + + public IRemoteObserver<T> GetUnmanagedObserver(IPEndPoint remoteEndpoint) + { + if (remoteEndpoint == null) + { + throw new ArgumentNullException("remoteEndpoint"); + } + + ProxyObserver remoteObserver = CreateRemoteObserver(remoteEndpoint); + return remoteObserver; + } + + private ProxyObserver CreateRemoteObserver(IPEndPoint remoteEndpoint) + { + TransportClient<IRemoteEvent<T>> client = new TransportClient<IRemoteEvent<T>>( + remoteEndpoint, + this._codec, + this._observerContainer); + var msg = string.Format( + "NewClientConnection: Local {0} connected to Remote {1}", + client.Link.LocalEndpoint.ToString(), + client.Link.RemoteEndpoint.ToString()); + LOGGER.Log(Level.Info, msg); + + ProxyObserver remoteObserver = new ProxyObserver(client); + return remoteObserver; + } + /// <summary> /// Registers an IObserver used to handle incoming messages from the remote host /// at the specified IPEndPoint. @@ -240,7 +275,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// <summary> /// Observer to send messages to connected remote host /// </summary> - private class ProxyObserver : IObserver<T>, IDisposable + private class ProxyObserver : IRemoteObserver<T> { private readonly TransportClient<IRemoteEvent<T>> _client; private int _messageCount; http://git-wip-us.apache.org/repos/asf/reef/blob/e04b4227/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs index cf133f0..306fc70 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs @@ -107,10 +107,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl ProxyObserver remoteObserver; if (!_cachedClients.TryGetValue(remoteEndpoint, out remoteObserver)) { - StreamingTransportClient<IRemoteEvent<T>> client = - new StreamingTransportClient<IRemoteEvent<T>>(remoteEndpoint, _observerContainer, _remoteEventCodec); - - remoteObserver = new ProxyObserver(client); + remoteObserver = CreateRemoteObserver(remoteEndpoint); _cachedClients[remoteEndpoint] = remoteObserver; } @@ -118,6 +115,56 @@ namespace Org.Apache.REEF.Wake.Remote.Impl } /// <summary> + /// Returns an IRemoteObserver used to send messages to the remote host at + /// the specified IPEndpoint. + /// </summary> + /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param> + /// <returns>An IRemoteObserver used to send messages to the remote host</returns> + public IRemoteObserver<T> GetUnmanagedRemoteObserver(RemoteEventEndPoint<T> remoteEndpoint) + { + if (remoteEndpoint == null) + { + throw new ArgumentNullException("remoteEndpoint"); + } + + SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier; + if (id == null) + { + throw new ArgumentException("ID not supported"); + } + + return GetUnmanagedObserver(id.Addr); + } + + /// <summary> + /// Returns an IRemoteObserver used to send messages to the remote host at + /// the specified IPEndpoint. + /// </summary> + /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param> + /// <returns>An IRemoteObserver used to send messages to the remote host</returns> + public IRemoteObserver<T> GetUnmanagedObserver(IPEndPoint remoteEndpoint) + { + if (remoteEndpoint == null) + { + throw new ArgumentNullException("remoteEndpoint"); + } + + ProxyObserver remoteObserver = CreateRemoteObserver(remoteEndpoint); + return remoteObserver; + } + + private ProxyObserver CreateRemoteObserver(IPEndPoint remoteEndpoint) + { + StreamingTransportClient<IRemoteEvent<T>> client = new StreamingTransportClient<IRemoteEvent<T>>( + remoteEndpoint, + this._observerContainer, + this._remoteEventCodec); + + ProxyObserver remoteObserver = new ProxyObserver(client); + return remoteObserver; + } + + /// <summary> /// Registers an IObserver used to handle incoming messages from the remote host /// at the specified IPEndPoint. /// The IDisposable that is returned can be used to unregister the IObserver. @@ -204,7 +251,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// <summary> /// Observer to send messages to connected remote host /// </summary> - private class ProxyObserver : IObserver<T>, IDisposable + private class ProxyObserver : IRemoteObserver<T> { private readonly StreamingTransportClient<IRemoteEvent<T>> _client;
