Repository: incubator-brooklyn
Updated Branches:
  refs/heads/master 119d8945f -> 34ce96ee7


Adds addAssociationListener and removeAssociationListener to PortForwardManager


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/74c84873
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/74c84873
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/74c84873

Branch: refs/heads/master
Commit: 74c848736bf52220a283b1cdf7f5616ce7d22f48
Parents: a626583
Author: Martin Harris <[email protected]>
Authored: Tue Mar 3 16:16:56 2015 +0000
Committer: Martin Harris <[email protected]>
Committed: Tue Mar 3 16:16:56 2015 +0000

----------------------------------------------------------------------
 .../location/access/PortForwardManager.java     | 69 +++++++++++++++++++-
 .../access/PortForwardManagerClient.java        | 18 +++--
 .../location/access/PortForwardManagerImpl.java | 58 +++++++++++++++-
 .../location/access/PortForwardManagerTest.java | 37 +++++++++++
 4 files changed, 174 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/74c84873/core/src/main/java/brooklyn/location/access/PortForwardManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/brooklyn/location/access/PortForwardManager.java 
b/core/src/main/java/brooklyn/location/access/PortForwardManager.java
index 6f93a4a..71c3974 100644
--- a/core/src/main/java/brooklyn/location/access/PortForwardManager.java
+++ b/core/src/main/java/brooklyn/location/access/PortForwardManager.java
@@ -18,15 +18,16 @@
  */
 package brooklyn.location.access;
 
-import java.util.Collection;
-
 import brooklyn.config.ConfigKey;
 import brooklyn.entity.basic.ConfigKeys;
 import brooklyn.location.Location;
-
 import com.google.common.annotations.Beta;
+import com.google.common.base.Objects;
+import com.google.common.base.Predicate;
 import com.google.common.net.HostAndPort;
 
+import java.util.Collection;
+
 /**
  * Acts as a registry for existing port mappings (e.g. the public endpoints 
for accessing specific
  * ports on private VMs). This could be using DNAT, or iptables 
port-forwarding, or Docker port-mapping 
@@ -46,6 +47,57 @@ import com.google.common.net.HostAndPort;
 @Beta
 public interface PortForwardManager extends Location {
 
+    @Beta
+    class AssociationMetadata {
+        private final String publicIpId;
+        private final HostAndPort publicEndpoint;
+        private final Location location;
+        private final int privatePort;
+
+        /**
+         * Users are discouraged from calling this constructor; the signature 
may change in future releases.
+         * Instead, instances will be created automatically by Brooklyn to be 
passed to the
+         * {@link 
AssociationListener#onAssociationCreated(AssociationMetadata)} method.
+         */
+        public AssociationMetadata(String publicIpId, HostAndPort 
publicEndpoint, Location location, int privatePort) {
+            this.publicIpId = publicIpId;
+            this.publicEndpoint = publicEndpoint;
+            this.location = location;
+            this.privatePort = privatePort;
+        }
+
+        public String getPublicIpId() {
+            return publicIpId;
+        }
+
+        public HostAndPort getPublicEndpoint() {
+            return publicEndpoint;
+        }
+
+        public Location getLocation() {
+            return location;
+        }
+
+        public int getPrivatePort() {
+            return privatePort;
+        }
+
+        public String toString() {
+            return Objects.toStringHelper(this)
+                    .add("publicIpId", publicIpId)
+                    .add("publicEndpoint", publicEndpoint)
+                    .add("location", location)
+                    .add("privatePort", privatePort)
+                    .toString();
+        }
+    }
+
+    @Beta
+    interface AssociationListener {
+        void onAssociationCreated(AssociationMetadata metadata);
+        void onAssociationDeleted(AssociationMetadata metadata);
+    }
+
     /**
      * The intention is that there is one PortForwardManager instance per 
"scope". If you 
      * use global, then it will be a shared instance (for that management 
context). If you 
@@ -92,6 +144,16 @@ public interface PortForwardManager extends Location {
      * subsequently be looked up using {@link #lookup(String, int)}.
      */
     public void associate(String publicIpId, HostAndPort publicEndpoint, int 
privatePort);
+
+    /**
+     * Registers a listener, which will be notified each time a new port 
mapping is associated. See {@link #associate(String, HostAndPort, int)}
+     * and {@link #associate(String, HostAndPort, Location, int)}.
+     */
+    @Beta
+    public void addAssociationListener(AssociationListener listener, 
Predicate<? super AssociationMetadata> filter);
+
+    @Beta
+    public void removeAssociationListener(AssociationListener listener);
     
     /**
      * Returns the public ip hostname and public port for use contacting the 
given endpoint.
@@ -261,4 +323,5 @@ public interface PortForwardManager extends Location {
      */
     @Deprecated
     public PortMapping getPortMappingWithPrivateSide(Location l, int 
privatePort);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/74c84873/core/src/main/java/brooklyn/location/access/PortForwardManagerClient.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/brooklyn/location/access/PortForwardManagerClient.java 
b/core/src/main/java/brooklyn/location/access/PortForwardManagerClient.java
index ddd1040..c796ef6 100644
--- a/core/src/main/java/brooklyn/location/access/PortForwardManagerClient.java
+++ b/core/src/main/java/brooklyn/location/access/PortForwardManagerClient.java
@@ -29,6 +29,7 @@ import brooklyn.location.Location;
 import brooklyn.util.exceptions.Exceptions;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
 import com.google.common.base.Supplier;
 import com.google.common.net.HostAndPort;
 
@@ -141,19 +142,28 @@ public class PortForwardManagerClient implements 
PortForwardManager {
     @Override
     public String getId() {
         return getDelegate().getId();
-    }    
+    }
 
     @Override
     public String getScope() {
         return getDelegate().getScope();
-    }    
+    }
+
+    @Override
+    public void addAssociationListener(AssociationListener listener, 
Predicate<? super AssociationMetadata> filter) {
+        getDelegate().addAssociationListener(listener, filter);
+    }
+
+    @Override
+    public void removeAssociationListener(AssociationListener listener) {
+        getDelegate().removeAssociationListener(listener);
+    }
 
     @Override
     public String toVerboseString() {
         return 
getClass().getName()+"[wrapping="+getDelegate().toVerboseString()+"]";
     }
-    
-    
+
     
///////////////////////////////////////////////////////////////////////////////////
     // Deprecated
     
///////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/74c84873/core/src/main/java/brooklyn/location/access/PortForwardManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/brooklyn/location/access/PortForwardManagerImpl.java 
b/core/src/main/java/brooklyn/location/access/PortForwardManagerImpl.java
index 16b8371..f392de0 100644
--- a/core/src/main/java/brooklyn/location/access/PortForwardManagerImpl.java
+++ b/core/src/main/java/brooklyn/location/access/PortForwardManagerImpl.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
@@ -38,8 +39,10 @@ import brooklyn.location.Location;
 import brooklyn.location.basic.AbstractLocation;
 import brooklyn.mementos.LocationMemento;
 import brooklyn.util.collections.MutableMap;
+import brooklyn.util.exceptions.Exceptions;
 
 import com.google.common.base.Objects.ToStringHelper;
+import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -78,7 +81,9 @@ public class PortForwardManagerImpl extends AbstractLocation 
implements PortForw
     private static final Logger log = 
LoggerFactory.getLogger(PortForwardManagerImpl.class);
     
     protected final Map<String,PortMapping> mappings = new 
LinkedHashMap<String,PortMapping>();
-    
+
+    private final Map<AssociationListener, Predicate<? super 
AssociationMetadata>> associationListeners = new 
ConcurrentHashMap<AssociationListener, Predicate<? super 
AssociationMetadata>>();
+
     @Deprecated
     protected final Map<String,String> publicIpIdToHostname = new 
LinkedHashMap<String,String>();
     
@@ -160,11 +165,13 @@ public class PortForwardManagerImpl extends 
AbstractLocation implements PortForw
     @Override
     public void associate(String publicIpId, HostAndPort publicEndpoint, 
Location l, int privatePort) {
         associateImpl(publicIpId, publicEndpoint, l, privatePort);
+        emitAssociationCreatedEvent(publicIpId, publicEndpoint, l, 
privatePort);
     }
 
     @Override
     public void associate(String publicIpId, HostAndPort publicEndpoint, int 
privatePort) {
         associateImpl(publicIpId, publicEndpoint, null, privatePort);
+        emitAssociationCreatedEvent(publicIpId, publicEndpoint, null, 
privatePort);
     }
 
     protected void associateImpl(String publicIpId, HostAndPort 
publicEndpoint, Location l, int privatePort) {
@@ -181,6 +188,20 @@ public class PortForwardManagerImpl extends 
AbstractLocation implements PortForw
         onChanged();
     }
 
+    private void emitAssociationCreatedEvent(String publicIpId, HostAndPort 
publicEndpoint, Location location, int privatePort) {
+        AssociationMetadata metadata = new AssociationMetadata(publicIpId, 
publicEndpoint, location, privatePort);
+        for (Map.Entry<AssociationListener, Predicate<? super 
AssociationMetadata>> entry : associationListeners.entrySet()) {
+            if (entry.getValue().apply(metadata)) {
+                try {
+                    entry.getKey().onAssociationCreated(metadata);
+                } catch (Exception e) {
+                    Exceptions.propagateIfFatal(e);
+                    log.warn("Exception thrown when emitting association 
creation event " + metadata, e);
+                }
+            }
+        }
+    }
+
     @Override
     public HostAndPort lookup(Location l, int privatePort) {
         synchronized (mutex) {
@@ -208,6 +229,9 @@ public class PortForwardManagerImpl extends 
AbstractLocation implements PortForw
         PortMapping old;
         synchronized (mutex) {
             old = mappings.remove(makeKey(publicIpId, publicPort));
+            if (old != null) {
+                
emitAssociationDeletedEvent(associationMetadataFromPortMapping(old));
+            }
             log.debug("cleared port mapping for "+publicIpId+":"+publicPort+" 
- "+old);
         }
         if (old != null) onChanged();
@@ -223,6 +247,7 @@ public class PortForwardManagerImpl extends 
AbstractLocation implements PortForw
                 if (l.equals(m.target)) {
                     iter.remove();
                     result.add(m);
+                    
emitAssociationDeletedEvent(associationMetadataFromPortMapping(m));
                 }
             }
         }
@@ -242,6 +267,7 @@ public class PortForwardManagerImpl extends 
AbstractLocation implements PortForw
                 if (publicIpId.equals(m.publicIpId)) {
                     iter.remove();
                     result.add(m);
+                    
emitAssociationDeletedEvent(associationMetadataFromPortMapping(m));
                 }
             }
         }
@@ -251,6 +277,19 @@ public class PortForwardManagerImpl extends 
AbstractLocation implements PortForw
         }
         return !result.isEmpty();
     }
+
+    private void emitAssociationDeletedEvent(AssociationMetadata metadata) {
+        for (Map.Entry<AssociationListener, Predicate<? super 
AssociationMetadata>> entry : associationListeners.entrySet()) {
+            if (entry.getValue().apply(metadata)) {
+                try {
+                    entry.getKey().onAssociationDeleted(metadata);
+                } catch (Exception e) {
+                    Exceptions.propagateIfFatal(e);
+                    log.warn("Exception thrown when emitting association 
creation event " + metadata, e);
+                }
+            }
+        }
+    }
     
     @Override
     protected ToStringHelper string() {
@@ -280,10 +319,27 @@ public class PortForwardManagerImpl extends 
AbstractLocation implements PortForw
         return false;
     }
 
+    @Override
+    public void addAssociationListener(AssociationListener listener, 
Predicate<? super AssociationMetadata> filter) {
+        associationListeners.put(listener, filter);
+    }
+
+    @Override
+    public void removeAssociationListener(AssociationListener listener) {
+        associationListeners.remove(listener);
+    }
+
     protected String makeKey(String publicIpId, int publicPort) {
         return publicIpId+":"+publicPort;
     }
 
+    private AssociationMetadata associationMetadataFromPortMapping(PortMapping 
portMapping) {
+        String publicIpId = portMapping.getPublicEndpoint().getHostText();
+        HostAndPort publicEndpoint = portMapping.getPublicEndpoint();
+        Location location = portMapping.getTarget();
+        int privatePort = portMapping.getPrivatePort();
+        return new AssociationMetadata(publicIpId, publicEndpoint, location, 
privatePort);
+    }
     
     
///////////////////////////////////////////////////////////////////////////////////
     // Internal state, for generating memento

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/74c84873/core/src/test/java/brooklyn/location/access/PortForwardManagerTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/brooklyn/location/access/PortForwardManagerTest.java 
b/core/src/test/java/brooklyn/location/access/PortForwardManagerTest.java
index 62c4f65..c999ce3 100644
--- a/core/src/test/java/brooklyn/location/access/PortForwardManagerTest.java
+++ b/core/src/test/java/brooklyn/location/access/PortForwardManagerTest.java
@@ -23,6 +23,7 @@ import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNull;
 
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,6 +38,8 @@ import brooklyn.location.basic.SshMachineLocation;
 import brooklyn.test.entity.LocalManagementContextForTests;
 import brooklyn.util.net.Networking;
 
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
 import com.google.common.net.HostAndPort;
 
 public class PortForwardManagerTest extends BrooklynAppUnitTestSupport {
@@ -154,4 +157,38 @@ public class PortForwardManagerTest extends 
BrooklynAppUnitTestSupport {
         assertEquals(pfm.lookup(publicIpId, 80), 
HostAndPort.fromParts(publicAddress, 40080));
         assertEquals(pfm.lookup(machine1, 80), 
HostAndPort.fromParts(publicAddress, 40080));
     }
+
+    @Test
+    public void testAssociationListeners() throws Exception {
+        final AtomicInteger associationCreatedCount = new AtomicInteger(0);
+        final AtomicInteger associationDeletedCount = new AtomicInteger(0);
+
+        final String publicIpId = "myipid";
+        final String anotherIpId = "anotherIpId";
+
+        pfm.addAssociationListener(new 
PortForwardManager.AssociationListener() {
+            @Override
+            public void 
onAssociationCreated(PortForwardManager.AssociationMetadata metadata) {
+                associationCreatedCount.incrementAndGet();
+            }
+
+            @Override
+            public void 
onAssociationDeleted(PortForwardManager.AssociationMetadata metadata) {
+                associationDeletedCount.incrementAndGet();
+            }
+        }, new Predicate<PortForwardManager.AssociationMetadata>() {
+            @Override
+            public boolean apply(PortForwardManager.AssociationMetadata 
metadata) {
+                return publicIpId.equals(metadata.getPublicIpId());
+            }
+        });
+
+        pfm.associate(publicIpId, HostAndPort.fromParts(publicIpId, 40080), 
machine1, 80);
+        pfm.associate(anotherIpId, HostAndPort.fromParts(anotherIpId, 40081), 
machine1, 80);
+        pfm.forgetPortMapping(publicIpId, 40080);
+        pfm.forgetPortMapping(anotherIpId, 40081);
+
+        assertEquals(associationCreatedCount.get(), 1);
+        assertEquals(associationDeletedCount.get(), 1);
+    }
 }

Reply via email to