Repository: incubator-brooklyn Updated Branches: refs/heads/master 119d8945f -> 34ce96ee7
Adds addAssociationListener and removeAssociationListener to PortForwardManager Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/74c84873 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/74c84873 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/74c84873 Branch: refs/heads/master Commit: 74c848736bf52220a283b1cdf7f5616ce7d22f48 Parents: a626583 Author: Martin Harris <[email protected]> Authored: Tue Mar 3 16:16:56 2015 +0000 Committer: Martin Harris <[email protected]> Committed: Tue Mar 3 16:16:56 2015 +0000 ---------------------------------------------------------------------- .../location/access/PortForwardManager.java | 69 +++++++++++++++++++- .../access/PortForwardManagerClient.java | 18 +++-- .../location/access/PortForwardManagerImpl.java | 58 +++++++++++++++- .../location/access/PortForwardManagerTest.java | 37 +++++++++++ 4 files changed, 174 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/74c84873/core/src/main/java/brooklyn/location/access/PortForwardManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/location/access/PortForwardManager.java b/core/src/main/java/brooklyn/location/access/PortForwardManager.java index 6f93a4a..71c3974 100644 --- a/core/src/main/java/brooklyn/location/access/PortForwardManager.java +++ b/core/src/main/java/brooklyn/location/access/PortForwardManager.java @@ -18,15 +18,16 @@ */ package brooklyn.location.access; -import java.util.Collection; - import brooklyn.config.ConfigKey; import brooklyn.entity.basic.ConfigKeys; import brooklyn.location.Location; - import com.google.common.annotations.Beta; +import com.google.common.base.Objects; +import com.google.common.base.Predicate; import com.google.common.net.HostAndPort; +import java.util.Collection; + /** * Acts as a registry for existing port mappings (e.g. the public endpoints for accessing specific * ports on private VMs). This could be using DNAT, or iptables port-forwarding, or Docker port-mapping @@ -46,6 +47,57 @@ import com.google.common.net.HostAndPort; @Beta public interface PortForwardManager extends Location { + @Beta + class AssociationMetadata { + private final String publicIpId; + private final HostAndPort publicEndpoint; + private final Location location; + private final int privatePort; + + /** + * Users are discouraged from calling this constructor; the signature may change in future releases. + * Instead, instances will be created automatically by Brooklyn to be passed to the + * {@link AssociationListener#onAssociationCreated(AssociationMetadata)} method. + */ + public AssociationMetadata(String publicIpId, HostAndPort publicEndpoint, Location location, int privatePort) { + this.publicIpId = publicIpId; + this.publicEndpoint = publicEndpoint; + this.location = location; + this.privatePort = privatePort; + } + + public String getPublicIpId() { + return publicIpId; + } + + public HostAndPort getPublicEndpoint() { + return publicEndpoint; + } + + public Location getLocation() { + return location; + } + + public int getPrivatePort() { + return privatePort; + } + + public String toString() { + return Objects.toStringHelper(this) + .add("publicIpId", publicIpId) + .add("publicEndpoint", publicEndpoint) + .add("location", location) + .add("privatePort", privatePort) + .toString(); + } + } + + @Beta + interface AssociationListener { + void onAssociationCreated(AssociationMetadata metadata); + void onAssociationDeleted(AssociationMetadata metadata); + } + /** * The intention is that there is one PortForwardManager instance per "scope". If you * use global, then it will be a shared instance (for that management context). If you @@ -92,6 +144,16 @@ public interface PortForwardManager extends Location { * subsequently be looked up using {@link #lookup(String, int)}. */ public void associate(String publicIpId, HostAndPort publicEndpoint, int privatePort); + + /** + * Registers a listener, which will be notified each time a new port mapping is associated. See {@link #associate(String, HostAndPort, int)} + * and {@link #associate(String, HostAndPort, Location, int)}. + */ + @Beta + public void addAssociationListener(AssociationListener listener, Predicate<? super AssociationMetadata> filter); + + @Beta + public void removeAssociationListener(AssociationListener listener); /** * Returns the public ip hostname and public port for use contacting the given endpoint. @@ -261,4 +323,5 @@ public interface PortForwardManager extends Location { */ @Deprecated public PortMapping getPortMappingWithPrivateSide(Location l, int privatePort); + } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/74c84873/core/src/main/java/brooklyn/location/access/PortForwardManagerClient.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/location/access/PortForwardManagerClient.java b/core/src/main/java/brooklyn/location/access/PortForwardManagerClient.java index ddd1040..c796ef6 100644 --- a/core/src/main/java/brooklyn/location/access/PortForwardManagerClient.java +++ b/core/src/main/java/brooklyn/location/access/PortForwardManagerClient.java @@ -29,6 +29,7 @@ import brooklyn.location.Location; import brooklyn.util.exceptions.Exceptions; import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; import com.google.common.base.Supplier; import com.google.common.net.HostAndPort; @@ -141,19 +142,28 @@ public class PortForwardManagerClient implements PortForwardManager { @Override public String getId() { return getDelegate().getId(); - } + } @Override public String getScope() { return getDelegate().getScope(); - } + } + + @Override + public void addAssociationListener(AssociationListener listener, Predicate<? super AssociationMetadata> filter) { + getDelegate().addAssociationListener(listener, filter); + } + + @Override + public void removeAssociationListener(AssociationListener listener) { + getDelegate().removeAssociationListener(listener); + } @Override public String toVerboseString() { return getClass().getName()+"[wrapping="+getDelegate().toVerboseString()+"]"; } - - + /////////////////////////////////////////////////////////////////////////////////// // Deprecated /////////////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/74c84873/core/src/main/java/brooklyn/location/access/PortForwardManagerImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/location/access/PortForwardManagerImpl.java b/core/src/main/java/brooklyn/location/access/PortForwardManagerImpl.java index 16b8371..f392de0 100644 --- a/core/src/main/java/brooklyn/location/access/PortForwardManagerImpl.java +++ b/core/src/main/java/brooklyn/location/access/PortForwardManagerImpl.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; @@ -38,8 +39,10 @@ import brooklyn.location.Location; import brooklyn.location.basic.AbstractLocation; import brooklyn.mementos.LocationMemento; import brooklyn.util.collections.MutableMap; +import brooklyn.util.exceptions.Exceptions; import com.google.common.base.Objects.ToStringHelper; +import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -78,7 +81,9 @@ public class PortForwardManagerImpl extends AbstractLocation implements PortForw private static final Logger log = LoggerFactory.getLogger(PortForwardManagerImpl.class); protected final Map<String,PortMapping> mappings = new LinkedHashMap<String,PortMapping>(); - + + private final Map<AssociationListener, Predicate<? super AssociationMetadata>> associationListeners = new ConcurrentHashMap<AssociationListener, Predicate<? super AssociationMetadata>>(); + @Deprecated protected final Map<String,String> publicIpIdToHostname = new LinkedHashMap<String,String>(); @@ -160,11 +165,13 @@ public class PortForwardManagerImpl extends AbstractLocation implements PortForw @Override public void associate(String publicIpId, HostAndPort publicEndpoint, Location l, int privatePort) { associateImpl(publicIpId, publicEndpoint, l, privatePort); + emitAssociationCreatedEvent(publicIpId, publicEndpoint, l, privatePort); } @Override public void associate(String publicIpId, HostAndPort publicEndpoint, int privatePort) { associateImpl(publicIpId, publicEndpoint, null, privatePort); + emitAssociationCreatedEvent(publicIpId, publicEndpoint, null, privatePort); } protected void associateImpl(String publicIpId, HostAndPort publicEndpoint, Location l, int privatePort) { @@ -181,6 +188,20 @@ public class PortForwardManagerImpl extends AbstractLocation implements PortForw onChanged(); } + private void emitAssociationCreatedEvent(String publicIpId, HostAndPort publicEndpoint, Location location, int privatePort) { + AssociationMetadata metadata = new AssociationMetadata(publicIpId, publicEndpoint, location, privatePort); + for (Map.Entry<AssociationListener, Predicate<? super AssociationMetadata>> entry : associationListeners.entrySet()) { + if (entry.getValue().apply(metadata)) { + try { + entry.getKey().onAssociationCreated(metadata); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + log.warn("Exception thrown when emitting association creation event " + metadata, e); + } + } + } + } + @Override public HostAndPort lookup(Location l, int privatePort) { synchronized (mutex) { @@ -208,6 +229,9 @@ public class PortForwardManagerImpl extends AbstractLocation implements PortForw PortMapping old; synchronized (mutex) { old = mappings.remove(makeKey(publicIpId, publicPort)); + if (old != null) { + emitAssociationDeletedEvent(associationMetadataFromPortMapping(old)); + } log.debug("cleared port mapping for "+publicIpId+":"+publicPort+" - "+old); } if (old != null) onChanged(); @@ -223,6 +247,7 @@ public class PortForwardManagerImpl extends AbstractLocation implements PortForw if (l.equals(m.target)) { iter.remove(); result.add(m); + emitAssociationDeletedEvent(associationMetadataFromPortMapping(m)); } } } @@ -242,6 +267,7 @@ public class PortForwardManagerImpl extends AbstractLocation implements PortForw if (publicIpId.equals(m.publicIpId)) { iter.remove(); result.add(m); + emitAssociationDeletedEvent(associationMetadataFromPortMapping(m)); } } } @@ -251,6 +277,19 @@ public class PortForwardManagerImpl extends AbstractLocation implements PortForw } return !result.isEmpty(); } + + private void emitAssociationDeletedEvent(AssociationMetadata metadata) { + for (Map.Entry<AssociationListener, Predicate<? super AssociationMetadata>> entry : associationListeners.entrySet()) { + if (entry.getValue().apply(metadata)) { + try { + entry.getKey().onAssociationDeleted(metadata); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + log.warn("Exception thrown when emitting association creation event " + metadata, e); + } + } + } + } @Override protected ToStringHelper string() { @@ -280,10 +319,27 @@ public class PortForwardManagerImpl extends AbstractLocation implements PortForw return false; } + @Override + public void addAssociationListener(AssociationListener listener, Predicate<? super AssociationMetadata> filter) { + associationListeners.put(listener, filter); + } + + @Override + public void removeAssociationListener(AssociationListener listener) { + associationListeners.remove(listener); + } + protected String makeKey(String publicIpId, int publicPort) { return publicIpId+":"+publicPort; } + private AssociationMetadata associationMetadataFromPortMapping(PortMapping portMapping) { + String publicIpId = portMapping.getPublicEndpoint().getHostText(); + HostAndPort publicEndpoint = portMapping.getPublicEndpoint(); + Location location = portMapping.getTarget(); + int privatePort = portMapping.getPrivatePort(); + return new AssociationMetadata(publicIpId, publicEndpoint, location, privatePort); + } /////////////////////////////////////////////////////////////////////////////////// // Internal state, for generating memento http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/74c84873/core/src/test/java/brooklyn/location/access/PortForwardManagerTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/location/access/PortForwardManagerTest.java b/core/src/test/java/brooklyn/location/access/PortForwardManagerTest.java index 62c4f65..c999ce3 100644 --- a/core/src/test/java/brooklyn/location/access/PortForwardManagerTest.java +++ b/core/src/test/java/brooklyn/location/access/PortForwardManagerTest.java @@ -23,6 +23,7 @@ import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNull; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +38,8 @@ import brooklyn.location.basic.SshMachineLocation; import brooklyn.test.entity.LocalManagementContextForTests; import brooklyn.util.net.Networking; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; import com.google.common.net.HostAndPort; public class PortForwardManagerTest extends BrooklynAppUnitTestSupport { @@ -154,4 +157,38 @@ public class PortForwardManagerTest extends BrooklynAppUnitTestSupport { assertEquals(pfm.lookup(publicIpId, 80), HostAndPort.fromParts(publicAddress, 40080)); assertEquals(pfm.lookup(machine1, 80), HostAndPort.fromParts(publicAddress, 40080)); } + + @Test + public void testAssociationListeners() throws Exception { + final AtomicInteger associationCreatedCount = new AtomicInteger(0); + final AtomicInteger associationDeletedCount = new AtomicInteger(0); + + final String publicIpId = "myipid"; + final String anotherIpId = "anotherIpId"; + + pfm.addAssociationListener(new PortForwardManager.AssociationListener() { + @Override + public void onAssociationCreated(PortForwardManager.AssociationMetadata metadata) { + associationCreatedCount.incrementAndGet(); + } + + @Override + public void onAssociationDeleted(PortForwardManager.AssociationMetadata metadata) { + associationDeletedCount.incrementAndGet(); + } + }, new Predicate<PortForwardManager.AssociationMetadata>() { + @Override + public boolean apply(PortForwardManager.AssociationMetadata metadata) { + return publicIpId.equals(metadata.getPublicIpId()); + } + }); + + pfm.associate(publicIpId, HostAndPort.fromParts(publicIpId, 40080), machine1, 80); + pfm.associate(anotherIpId, HostAndPort.fromParts(anotherIpId, 40081), machine1, 80); + pfm.forgetPortMapping(publicIpId, 40080); + pfm.forgetPortMapping(anotherIpId, 40081); + + assertEquals(associationCreatedCount.get(), 1); + assertEquals(associationDeletedCount.get(), 1); + } }
