Repository: reef
Updated Branches:
  refs/heads/master ce9e85cc4 -> e04b42271


[REEF-1142] Making caching optional the remote managers

This addressed the issue by adding factory methods that cerate uncached 
observers

JIRA:
[REEF-1142] (https://issues.apache.org/jira/browse/REEF-1142)

Pull Request:
  Closes #905


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/e04b4227
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/e04b4227
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/e04b4227

Branch: refs/heads/master
Commit: e04b422714f3196118aa1b312a6869d5f1da33a5
Parents: ce9e85c
Author: Boris Shulman <[email protected]>
Authored: Fri Mar 25 23:50:54 2016 -0700
Committer: Andrew Chung <[email protected]>
Committed: Mon Apr 4 14:33:05 2016 -0700

----------------------------------------------------------------------
 .../Org.Apache.REEF.Wake.csproj                 |  1 +
 .../Remote/IRemoteManager.cs                    |  4 ++
 .../Remote/IRemoteObserver.cs                   | 27 ++++++++++
 .../Remote/Impl/DefaultRemoteManager.cs         | 55 +++++++++++++++----
 .../Remote/Impl/StreamingRemoteManager.cs       | 57 ++++++++++++++++++--
 5 files changed, 129 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/e04b4227/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj 
b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
index 391648e..005aa82 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
@@ -62,6 +62,7 @@ under the License.
     <Compile Include="IObserverFactory.cs" />
     <Compile Include="IStage.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
+    <Compile Include="Remote\IRemoteObserver.cs" />
     <Compile Include="Remote\ITcpPortProvider.cs" />
     <Compile Include="Remote\Parameters\TcpPortRangeCount.cs" />
     <Compile Include="Remote\Parameters\TcpPortRangeSeed.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/e04b4227/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteManager.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteManager.cs
index cb0c4db..2208682 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteManager.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteManager.cs
@@ -31,6 +31,10 @@ namespace Org.Apache.REEF.Wake.Remote
 
         IObserver<T> GetRemoteObserver(IPEndPoint remoteEndpoint);
 
+        IRemoteObserver<T> GetUnmanagedRemoteObserver(RemoteEventEndPoint<T> 
dest);
+
+        IRemoteObserver<T> GetUnmanagedObserver(IPEndPoint remoteEndpoint);
+
         IDisposable RegisterObserver(RemoteEventEndPoint<T> source, 
IObserver<T> theObserver);
 
         IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> 
theObserver);

http://git-wip-us.apache.org/repos/asf/reef/blob/e04b4227/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteObserver.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteObserver.cs
new file mode 100644
index 0000000..be2f565
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteObserver.cs
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+using System;
+
+namespace Org.Apache.REEF.Wake.Remote
+{
+    /// <summary>
+    /// A disposable remote observer observer, that is not managed by the 
manager that creates it
+    /// </summary>
+    public interface IRemoteObserver<T> : IObserver<T>, IDisposable
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/e04b4227/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs
index 072bc11..4a14539 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs
@@ -18,7 +18,6 @@
 using System;
 using System.Collections.Generic;
 using System.Net;
-using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Utilities.Logging;
 using Org.Apache.REEF.Wake.Util;
 
@@ -144,20 +143,56 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
             ProxyObserver remoteObserver;
             if (!_cachedClients.TryGetValue(remoteEndpoint, out 
remoteObserver))
             {
-                TransportClient<IRemoteEvent<T>> client = 
-                    new TransportClient<IRemoteEvent<T>>(remoteEndpoint, 
_codec, _observerContainer);
-                var msg = string.Format("NewClientConnection: Local {0} 
connected to Remote {1}",
-                    client.Link.LocalEndpoint.ToString(), 
-                    client.Link.RemoteEndpoint.ToString());
-                LOGGER.Log(Level.Info, msg);
-
-                remoteObserver = new ProxyObserver(client);
+                remoteObserver = CreateRemoteObserver(remoteEndpoint);
                 _cachedClients[remoteEndpoint] = remoteObserver;
             }
 
             return remoteObserver;
         }
 
+        public IRemoteObserver<T> 
GetUnmanagedRemoteObserver(RemoteEventEndPoint<T> remoteEndpoint)
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+
+            SocketRemoteIdentifier id = remoteEndpoint.Id as 
SocketRemoteIdentifier;
+            if (id == null)
+            {
+                throw new ArgumentException("ID not supported");
+            }
+
+            return GetUnmanagedObserver(id.Addr);
+        }
+
+        public IRemoteObserver<T> GetUnmanagedObserver(IPEndPoint 
remoteEndpoint)
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+
+            ProxyObserver remoteObserver = 
CreateRemoteObserver(remoteEndpoint);
+            return remoteObserver;
+        }
+
+        private ProxyObserver CreateRemoteObserver(IPEndPoint remoteEndpoint)
+        {
+            TransportClient<IRemoteEvent<T>> client = new 
TransportClient<IRemoteEvent<T>>(
+                remoteEndpoint,
+                this._codec,
+                this._observerContainer);
+            var msg = string.Format(
+                "NewClientConnection: Local {0} connected to Remote {1}",
+                client.Link.LocalEndpoint.ToString(),
+                client.Link.RemoteEndpoint.ToString());
+            LOGGER.Log(Level.Info, msg);
+
+            ProxyObserver remoteObserver = new ProxyObserver(client);
+            return remoteObserver;
+        }
+
         /// <summary>
         /// Registers an IObserver used to handle incoming messages from the 
remote host
         /// at the specified IPEndPoint.
@@ -240,7 +275,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// <summary>
         /// Observer to send messages to connected remote host
         /// </summary>
-        private class ProxyObserver : IObserver<T>, IDisposable
+        private class ProxyObserver : IRemoteObserver<T>
         {
             private readonly TransportClient<IRemoteEvent<T>> _client;
             private int _messageCount;

http://git-wip-us.apache.org/repos/asf/reef/blob/e04b4227/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs
index cf133f0..306fc70 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs
@@ -107,10 +107,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
             ProxyObserver remoteObserver;
             if (!_cachedClients.TryGetValue(remoteEndpoint, out 
remoteObserver))
             {
-                StreamingTransportClient<IRemoteEvent<T>> client =
-                    new 
StreamingTransportClient<IRemoteEvent<T>>(remoteEndpoint, _observerContainer, 
_remoteEventCodec);
-
-                remoteObserver = new ProxyObserver(client);
+                remoteObserver = CreateRemoteObserver(remoteEndpoint);
                 _cachedClients[remoteEndpoint] = remoteObserver;
             }
 
@@ -118,6 +115,56 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         }
 
         /// <summary>
+        /// Returns an IRemoteObserver used to send messages to the remote 
host at
+        /// the specified IPEndpoint.
+        /// </summary>
+        /// <param name="remoteEndpoint">The IPEndpoint of the remote 
host</param>
+        /// <returns>An IRemoteObserver used to send messages to the remote 
host</returns>
+        public IRemoteObserver<T> 
GetUnmanagedRemoteObserver(RemoteEventEndPoint<T> remoteEndpoint)
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+
+            SocketRemoteIdentifier id = remoteEndpoint.Id as 
SocketRemoteIdentifier;
+            if (id == null)
+            {
+                throw new ArgumentException("ID not supported");
+            }
+
+            return GetUnmanagedObserver(id.Addr);
+        }
+
+        /// <summary>
+        /// Returns an IRemoteObserver used to send messages to the remote 
host at
+        /// the specified IPEndpoint.
+        /// </summary>
+        /// <param name="remoteEndpoint">The IPEndpoint of the remote 
host</param>
+        /// <returns>An IRemoteObserver used to send messages to the remote 
host</returns>
+        public IRemoteObserver<T> GetUnmanagedObserver(IPEndPoint 
remoteEndpoint)
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+
+            ProxyObserver remoteObserver = 
CreateRemoteObserver(remoteEndpoint);
+            return remoteObserver;
+        }
+
+        private ProxyObserver CreateRemoteObserver(IPEndPoint remoteEndpoint)
+        {
+            StreamingTransportClient<IRemoteEvent<T>> client = new 
StreamingTransportClient<IRemoteEvent<T>>(
+                remoteEndpoint,
+                this._observerContainer,
+                this._remoteEventCodec);
+
+            ProxyObserver remoteObserver = new ProxyObserver(client);
+            return remoteObserver;
+        }
+
+        /// <summary>
         /// Registers an IObserver used to handle incoming messages from the 
remote host
         /// at the specified IPEndPoint.
         /// The IDisposable that is returned can be used to unregister the 
IObserver.
@@ -204,7 +251,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// <summary>
         /// Observer to send messages to connected remote host
         /// </summary>
-        private class ProxyObserver : IObserver<T>, IDisposable
+        private class ProxyObserver : IRemoteObserver<T>
         {
             private readonly StreamingTransportClient<IRemoteEvent<T>> _client;
 

Reply via email to