Modified: sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/ConnectorRegistryImpl.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/ConnectorRegistryImpl.java?rev=1476138&r1=1476137&r2=1476138&view=diff ============================================================================== --- sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/ConnectorRegistryImpl.java (original) +++ sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/ConnectorRegistryImpl.java Fri Apr 26 11:08:29 2013 @@ -28,13 +28,14 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.Service; import org.apache.sling.discovery.impl.Config; import org.apache.sling.discovery.impl.cluster.ClusterViewService; import org.apache.sling.discovery.impl.topology.announcement.AnnouncementRegistry; -import org.apache.sling.discovery.impl.topology.connector.TopologyConnectorClientInformation.OriginInfo; import org.osgi.service.component.ComponentContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,13 +58,24 @@ public class ConnectorRegistryImpl imple /** the local port is added to the announcement as the serverInfo object **/ private String port = ""; - protected void activate(ComponentContext cc) { + @Activate + protected void activate(final ComponentContext cc) { port = cc.getBundleContext().getProperty("org.osgi.service.http.port"); } - - public TopologyConnectorClientInformation registerOutgoingConnection( - final ClusterViewService clusterViewService, final URL connectorUrl, - final OriginInfo originInfo) { + + @Deactivate + protected void deactivate() { + synchronized (outgoingClientsMap) { + for (Iterator<TopologyConnectorClient> it = outgoingClientsMap.values().iterator(); it.hasNext();) { + final TopologyConnectorClient client = it.next(); + client.disconnect(); + it.remove(); + } + } + } + + public TopologyConnectorClientInformation registerOutgoingConnector( + final ClusterViewService clusterViewService, final URL connectorUrl) { if (announcementRegistry == null) { logger.error("registerOutgoingConnection: announcementRegistry is null"); return null; @@ -86,7 +98,7 @@ public class ConnectorRegistryImpl imple serverInfo = "localhost:" + port; } client = new TopologyConnectorClient(clusterViewService, - announcementRegistry, config, connectorUrl, originInfo, + announcementRegistry, config, connectorUrl, serverInfo); outgoingClientsMap.put(client.getId(), client); } @@ -94,7 +106,7 @@ public class ConnectorRegistryImpl imple return client; } - public Collection<TopologyConnectorClientInformation> listOutgoingConnections() { + public Collection<TopologyConnectorClientInformation> listOutgoingConnectors() { final List<TopologyConnectorClientInformation> result = new ArrayList<TopologyConnectorClientInformation>(); synchronized (outgoingClientsMap) { result.addAll(outgoingClientsMap.values()); @@ -102,7 +114,7 @@ public class ConnectorRegistryImpl imple return result; } - public boolean unregisterOutgoingConnection(final String id) { + public boolean unregisterOutgoingConnector(final String id) { if (id == null || id.length() == 0) { throw new IllegalArgumentException("id must not be null"); } @@ -115,20 +127,7 @@ public class ConnectorRegistryImpl imple } } - public boolean pingOutgoingConnection(final String id) { - if (id == null || id.length() == 0) { - throw new IllegalArgumentException("id must not be null"); - } - synchronized (outgoingClientsMap) { - TopologyConnectorClient client = outgoingClientsMap.get(id); - if (client != null) { - client.ping(); - } - return client != null; - } - } - - public void pingOutgoingConnections() { + public void pingOutgoingConnectors() { List<TopologyConnectorClient> outgoingTemplatesClone; synchronized (outgoingClientsMap) { outgoingTemplatesClone = new ArrayList<TopologyConnectorClient>(
Modified: sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorClient.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorClient.java?rev=1476138&r1=1476137&r2=1476138&view=diff ============================================================================== --- sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorClient.java (original) +++ sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorClient.java Fri Apr 26 11:08:29 2013 @@ -23,12 +23,16 @@ import java.net.URL; import java.util.Iterator; import java.util.UUID; +import javax.servlet.http.HttpServletResponse; + import org.apache.commons.httpclient.Credentials; import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.URIException; import org.apache.commons.httpclient.UsernamePasswordCredentials; import org.apache.commons.httpclient.auth.AuthScope; -import org.apache.commons.httpclient.methods.PostMethod; +import org.apache.commons.httpclient.methods.DeleteMethod; +import org.apache.commons.httpclient.methods.PutMethod; +import org.apache.commons.httpclient.methods.StringRequestEntity; import org.apache.sling.commons.json.JSONException; import org.apache.sling.discovery.InstanceDescription; import org.apache.sling.discovery.impl.Config; @@ -66,9 +70,6 @@ public class TopologyConnectorClient imp /** the last inherited announcement **/ private Announcement lastInheritedAnnouncement; - /** the information as to where this connector came from **/ - private final OriginInfo originInfo; - /** the information about this server **/ private final String serverInfo; @@ -77,7 +78,7 @@ public class TopologyConnectorClient imp TopologyConnectorClient(final ClusterViewService clusterViewService, final AnnouncementRegistry announcementRegistry, final Config config, - final URL connectorUrl, final OriginInfo originInfo, final String serverInfo) { + final URL connectorUrl, final String serverInfo) { if (clusterViewService == null) { throw new IllegalArgumentException( "clusterViewService must not be null"); @@ -92,24 +93,22 @@ public class TopologyConnectorClient imp if (connectorUrl == null) { throw new IllegalArgumentException("connectorUrl must not be null"); } - if (originInfo == null) { - throw new IllegalArgumentException("originInfo must not be null"); - } this.clusterViewService = clusterViewService; this.announcementRegistry = announcementRegistry; this.config = config; this.connectorUrl = connectorUrl; - this.originInfo = originInfo; this.serverInfo = serverInfo; this.id = UUID.randomUUID(); } /** ping the server and pass the announcements between the two **/ void ping() { - logger.debug("ping: connectorUrl=" + connectorUrl); + final String uri = connectorUrl.toString()+"."+clusterViewService.getSlingId()+".json"; + if (logger.isDebugEnabled()) { + logger.debug("ping: connectorUrl=" + connectorUrl + ", complete uri=" + uri); + } HttpClient httpClient = new HttpClient(); - PostMethod method = new PostMethod(connectorUrl.toString()); - + PutMethod method = new PutMethod(uri); try { String userInfo = connectorUrl.getUserInfo(); if (userInfo != null) { @@ -121,7 +120,6 @@ public class TopologyConnectorClient imp Announcement topologyAnnouncement = new Announcement( clusterViewService.getSlingId()); - topologyAnnouncement.setOriginInfo(originInfo); topologyAnnouncement.setServerInfo(serverInfo); topologyAnnouncement.setLocalCluster(clusterViewService .getClusterView()); @@ -147,37 +145,60 @@ public class TopologyConnectorClient imp }); final String p = topologyAnnouncement.asJSON(); - logger.debug("ping: topologyAnnouncement json is: " + p); - method.addParameter("topologyAnnouncement", p); + if (logger.isDebugEnabled()) { + logger.debug("ping: topologyAnnouncement json is: " + p); + } + method.setRequestEntity(new StringRequestEntity(p, "application/json", "UTF-8")); httpClient.executeMethod(method); - logger.debug("ping: done. code=" + method.getStatusCode() + " - " - + method.getStatusText()); + if (logger.isDebugEnabled()) { + logger.debug("ping: done. code=" + method.getStatusCode() + " - " + + method.getStatusText()); + } lastStatusCode = method.getStatusCode(); - if (method.getStatusCode()==200) { + if (method.getStatusCode()==HttpServletResponse.SC_OK) { String responseBody = method.getResponseBodyAsString(16*1024*1024); // limiting to 16MB, should be way enough - logger.debug("ping: response body=" + responseBody); - Announcement inheritedAnnouncement = Announcement - .fromJSON(responseBody); - inheritedAnnouncement.setInherited(true); - if (!announcementRegistry - .registerAnnouncement(inheritedAnnouncement)) { - logger.info("ping: connector response is from an instance which I already see in my topology" - + inheritedAnnouncement); + if (logger.isDebugEnabled()) { + logger.debug("ping: response body=" + responseBody); + } + if (responseBody!=null && responseBody.length()>0) { + Announcement inheritedAnnouncement = Announcement + .fromJSON(responseBody); + if (inheritedAnnouncement.isLoop()) { + if (logger.isDebugEnabled()) { + logger.debug("ping: connector response indicated a loop detected. not registering this announcement from "+ + inheritedAnnouncement.getOwnerId()); + } + } else { + inheritedAnnouncement.setInherited(true); + if (!announcementRegistry + .registerAnnouncement(inheritedAnnouncement)) { + if (logger.isDebugEnabled()) { + logger.debug("ping: connector response is from an instance which I already see in my topology" + + inheritedAnnouncement); + } + lastInheritedAnnouncement = null; + return; + } + } + lastInheritedAnnouncement = inheritedAnnouncement; + } else { lastInheritedAnnouncement = null; - return; } - lastInheritedAnnouncement = inheritedAnnouncement; } else { lastInheritedAnnouncement = null; } } catch (URIException e) { logger.warn("ping: Got URIException: " + e); + lastInheritedAnnouncement = null; } catch (IOException e) { logger.warn("ping: got IOException: " + e); + lastInheritedAnnouncement = null; } catch (JSONException e) { logger.warn("ping: got JSONException: " + e); + lastInheritedAnnouncement = null; } catch (RuntimeException re) { - logger.error("ping: got RuntimeException: " + re, re); + logger.warn("ping: got RuntimeException: " + re, re); + lastInheritedAnnouncement = null; } } @@ -188,6 +209,14 @@ public class TopologyConnectorClient imp public URL getConnectorUrl() { return connectorUrl; } + + public boolean representsLoop() { + if (lastInheritedAnnouncement == null) { + return false; + } else { + return lastInheritedAnnouncement.isLoop(); + } + } public boolean isConnected() { if (lastInheritedAnnouncement == null) { @@ -205,17 +234,16 @@ public class TopologyConnectorClient imp } } - public OriginInfo getOriginInfo() { - return originInfo; - } - public String getId() { return id.toString(); } /** Disconnect this connector **/ public void disconnect() { - logger.debug("disconnect: connectorUrl=" + connectorUrl); + final String uri = connectorUrl.toString()+"."+clusterViewService.getSlingId()+".json"; + if (logger.isDebugEnabled()) { + logger.debug("disconnect: connectorUrl=" + connectorUrl + ", complete uri="+uri); + } if (lastInheritedAnnouncement != null) { announcementRegistry @@ -224,7 +252,7 @@ public class TopologyConnectorClient imp } HttpClient httpClient = new HttpClient(); - PostMethod method = new PostMethod(connectorUrl.toString()); + DeleteMethod method = new DeleteMethod(uri); try { String userInfo = connectorUrl.getUserInfo(); @@ -235,11 +263,13 @@ public class TopologyConnectorClient imp .getURI().getPort()), c); } - method.addParameter("topologyDisconnect", - clusterViewService.getSlingId()); httpClient.executeMethod(method); - logger.debug("disconnect: done. code=" + method.getStatusCode() - + " - " + method.getStatusText()); + if (logger.isDebugEnabled()) { + logger.debug("disconnect: done. code=" + method.getStatusCode() + + " - " + method.getStatusText()); + } + // ignoring the actual statuscode though as there's little we can + // do about it after this point } catch (URIException e) { logger.warn("disconnect: Got URIException: " + e); } catch (IOException e) { Modified: sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorClientInformation.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorClientInformation.java?rev=1476138&r1=1476137&r2=1476138&view=diff ============================================================================== --- sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorClientInformation.java (original) +++ sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorClientInformation.java Fri Apr 26 11:08:29 2013 @@ -25,12 +25,6 @@ import java.net.URL; */ public interface TopologyConnectorClientInformation { - enum OriginInfo { - Config, // this connector was created via config - WebConsole, // this connector was created via the wbconsole - Programmatically // this connector was created programmatically - } - /** the endpoint url where this connector is connecting to **/ URL getConnectorUrl(); @@ -40,12 +34,13 @@ public interface TopologyConnectorClient /** whether or not this connector was able to successfully connect **/ boolean isConnected(); + /** whether or not the counterpart of this connector has detected a loop in the topology connectors **/ + boolean representsLoop(); + /** the sling id of the remote end **/ String getRemoteSlingId(); /** the unique id of this connector **/ String getId(); - /** the information about how this connector was created **/ - OriginInfo getOriginInfo(); } Modified: sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorServlet.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorServlet.java?rev=1476138&r1=1476137&r2=1476138&view=diff ============================================================================== --- sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorServlet.java (original) +++ sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorServlet.java Fri Apr 26 11:08:29 2013 @@ -18,18 +18,21 @@ */ package org.apache.sling.discovery.impl.topology.connector; +import java.io.BufferedReader; import java.io.IOException; import java.io.PrintWriter; import java.util.HashSet; import java.util.Set; import javax.servlet.ServletException; +import javax.servlet.http.HttpServletResponse; import org.apache.felix.scr.annotations.Property; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.sling.SlingServlet; import org.apache.sling.api.SlingHttpServletRequest; import org.apache.sling.api.SlingHttpServletResponse; +import org.apache.sling.api.request.RequestPathInfo; import org.apache.sling.api.servlets.SlingAllMethodsServlet; import org.apache.sling.commons.json.JSONException; import org.apache.sling.discovery.impl.Config; @@ -46,14 +49,13 @@ import org.slf4j.LoggerFactory; * Servlet which receives topology announcements at * /libs/sling/topology/connector (which is reachable without authorization) */ -@SlingServlet(paths = { "/libs/sling/topology/connector" }) -@Property(name = "sling.auth.requirements", value = { "-/libs/sling/topology/connector" }) +@SuppressWarnings("serial") +@SlingServlet(paths = { TopologyConnectorServlet.TOPOLOGY_CONNECTOR_PATH }) +@Property(name = "sling.auth.requirements", value = { "-"+TopologyConnectorServlet.TOPOLOGY_CONNECTOR_PATH }) public class TopologyConnectorServlet extends SlingAllMethodsServlet { public static final String TOPOLOGY_CONNECTOR_PATH = "/libs/sling/topology/connector"; - private static final long serialVersionUID = 1300640476823585873L; - private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Reference @@ -80,75 +82,135 @@ public class TopologyConnectorServlet ex whitelist.add(aWhitelistEntry); } } - + @Override - protected void doPost(final SlingHttpServletRequest request, - final SlingHttpServletResponse response) throws ServletException, + protected void doDelete(SlingHttpServletRequest request, + SlingHttpServletResponse response) throws ServletException, IOException { - + if (!isWhitelisted(request)) { - response.sendError(404); // in theory it would be 403==forbidden, but that would reveal that - // a resource would exist there in the first place + // in theory it would be 403==forbidden, but that would reveal that + // a resource would exist there in the first place + response.sendError(HttpServletResponse.SC_NOT_FOUND); + return; + } + + final RequestPathInfo pathInfo = request.getRequestPathInfo(); + final String extension = pathInfo.getExtension(); + if (!"json".equals(extension)) { + response.sendError(HttpServletResponse.SC_NOT_FOUND); return; } + final String selector = pathInfo.getSelectorString(); + + announcementRegistry.unregisterAnnouncement(selector); + } + + @Override + protected void doPut(SlingHttpServletRequest request, + SlingHttpServletResponse response) throws ServletException, + IOException { - final String topologyAnnouncementJSON = request - .getParameter("topologyAnnouncement"); - final String topologyDisconnect = request - .getParameter("topologyDisconnect"); - if (topologyDisconnect != null) { - announcementRegistry.unregisterAnnouncement(topologyDisconnect); + if (!isWhitelisted(request)) { + // in theory it would be 403==forbidden, but that would reveal that + // a resource would exist there in the first place + response.sendError(HttpServletResponse.SC_NOT_FOUND); + return; + } + + final RequestPathInfo pathInfo = request.getRequestPathInfo(); + final String extension = pathInfo.getExtension(); + if (!"json".equals(extension)) { + response.sendError(HttpServletResponse.SC_NOT_FOUND); return; } - logger.debug("doPost: incoming topology announcement is: " - + topologyAnnouncementJSON); + final String selector = pathInfo.getSelectorString(); + + final BufferedReader reader = request.getReader(); + StringBuffer sb = new StringBuffer(); + while(true) { + final String line = reader.readLine(); + if (line==null) { + break; + } + sb.append(line); + sb.append(System.getProperty("line.separator")); + } + + String topologyAnnouncementJSON = sb.toString(); + if (logger.isDebugEnabled()) { + logger.debug("doPost: incoming topology announcement is: " + + topologyAnnouncementJSON); + } final Announcement incomingTopologyAnnouncement; try { incomingTopologyAnnouncement = Announcement .fromJSON(topologyAnnouncementJSON); - incomingTopologyAnnouncement.removeInherited(clusterViewService - .getSlingId()); - - if (clusterViewService.contains(incomingTopologyAnnouncement - .getOwnerId())) { - logger.info("doPost: rejecting an announcement from an instance that is part of my cluster: " - + incomingTopologyAnnouncement); - response.sendError(409); + + if (!incomingTopologyAnnouncement.getOwnerId().equals(selector)) { + response.sendError(HttpServletResponse.SC_BAD_REQUEST); return; } - if (clusterViewService.containsAny(incomingTopologyAnnouncement - .listInstances())) { - logger.info("doPost: rejecting an announcement as it contains instance(s) that is/are part of my cluster: " - + incomingTopologyAnnouncement); - response.sendError(409); - return; + + String slingId = clusterViewService.getSlingId(); + if (slingId==null) { + response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + logger.info("doPut: no slingId available. Service not ready as expected at the moment."); + return; } - if (!announcementRegistry - .registerAnnouncement(incomingTopologyAnnouncement)) { - logger.info("doPost: rejecting an announcement from an instance that I already see in my topology: " + incomingTopologyAnnouncement.removeInherited(slingId); + + final Announcement replyAnnouncement = new Announcement( + slingId); + + if (!incomingTopologyAnnouncement.isCorrectVersion()) { + logger.warn("doPost: rejecting an announcement from an incompatible connector protocol version: " + incomingTopologyAnnouncement); - response.sendError(409); + response.sendError(HttpServletResponse.SC_BAD_REQUEST); return; - } - - Announcement replyAnnouncement = new Announcement( - clusterViewService.getSlingId()); - replyAnnouncement.setLocalCluster(clusterViewService - .getClusterView()); - announcementRegistry.addAllExcept(replyAnnouncement, - new AnnouncementFilter() { - - public boolean accept(final String receivingSlingId, Announcement announcement) { - if (announcement.getPrimaryKey().equals( - incomingTopologyAnnouncement - .getPrimaryKey())) { - return false; + } else if (clusterViewService.contains(incomingTopologyAnnouncement + .getOwnerId())) { + if (logger.isDebugEnabled()) { + logger.debug("doPost: rejecting an announcement from an instance that is part of my cluster: " + + incomingTopologyAnnouncement); + } + // marking as 'loop' + replyAnnouncement.setLoop(true); + } else if (clusterViewService.containsAny(incomingTopologyAnnouncement + .listInstances())) { + if (logger.isDebugEnabled()) { + logger.debug("doPost: rejecting an announcement as it contains instance(s) that is/are part of my cluster: " + + incomingTopologyAnnouncement); + } + // marking as 'loop' + replyAnnouncement.setLoop(true); + } else if (!announcementRegistry + .registerAnnouncement(incomingTopologyAnnouncement)) { + if (logger.isDebugEnabled()) { + logger.debug("doPost: rejecting an announcement from an instance that I already see in my topology: " + + incomingTopologyAnnouncement); + } + // marking as 'loop' + replyAnnouncement.setLoop(true); + } else { + // normal, successful case: replying with the part of the topology which this instance sees + replyAnnouncement.setLocalCluster(clusterViewService + .getClusterView()); + announcementRegistry.addAllExcept(replyAnnouncement, + new AnnouncementFilter() { + + public boolean accept(final String receivingSlingId, Announcement announcement) { + if (announcement.getPrimaryKey().equals( + incomingTopologyAnnouncement + .getPrimaryKey())) { + return false; + } + return true; } - return true; - } - }); + }); + } final String p = replyAnnouncement.asJSON(); - PrintWriter pw = response.getWriter(); + final PrintWriter pw = response.getWriter(); pw.print(p); pw.flush(); } catch (JSONException e) { Modified: sling/trunk/contrib/extensions/discovery/impl/src/main/resources/OSGI-INF/metatype/metatype.properties URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/main/resources/OSGI-INF/metatype/metatype.properties?rev=1476138&r1=1476137&r2=1476138&view=diff ============================================================================== --- sling/trunk/contrib/extensions/discovery/impl/src/main/resources/OSGI-INF/metatype/metatype.properties (original) +++ sling/trunk/contrib/extensions/discovery/impl/src/main/resources/OSGI-INF/metatype/metatype.properties Fri Apr 26 11:08:29 2013 @@ -32,8 +32,14 @@ heartbeatInterval.name = Heartbeat inter heartbeatInterval.description = Configure the interval (in seconds) according to which the \ heartbeats are exchanged in the topology. Default is 15 seconds. -topologyConnectorUrl.name = Topology Connector URL -topologyConnectorUrl.description = URL where to join a topology, e.g. \ +minEventDelay.name = Minimal Event Delay (seconds) +minEventDelay.description = Configure a minimal delay (in seconds) between TOPOLOGY_CHANGING \ + and TOPOLOGY_CHANGED. Any further changes happening during this delay are accumulated and \ + combined in the TOPOLOGY_CHANGED after this delay. THis helps avoiding event-flooding. \ + Default is 3 seconds. A negative value or zero disables this delay. + +topologyConnectorUrls.name = Topology Connector URLs +topologyConnectorUrls.description = URLs where to join a topology, e.g. \ http://localhost:4502/libs/sling/topology/connector topologyConnectorWhitelist.name = Topology Connector Whitelist Modified: sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/SingleInstanceTest.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/SingleInstanceTest.java?rev=1476138&r1=1476137&r2=1476138&view=diff ============================================================================== --- sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/SingleInstanceTest.java (original) +++ sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/SingleInstanceTest.java Fri Apr 26 11:08:29 2013 @@ -138,6 +138,7 @@ public class SingleInstanceTest { assertEquals(1, assertingTopologyEventListener.getRemainingExpectedCount()); assertEquals(0, pp.getGetCnt()); instance.bindPropertyProvider(pp, propertyName); + Thread.sleep(1500); assertEquals(0, assertingTopologyEventListener.getRemainingExpectedCount()); // we can only assume that the getProperty was called at least once - it // could be called multiple times though.. @@ -153,13 +154,13 @@ public class SingleInstanceTest { instance.runHeartbeatOnce(); Thread.sleep(2000); assertEquals(0, assertingTopologyEventListener.getRemainingExpectedCount()); - assertEquals(1, pp.getGetCnt()); + assertEquals(2, pp.getGetCnt()); // a heartbeat repeat should not result in another call though instance.runHeartbeatOnce(); Thread.sleep(2000); assertEquals(0, assertingTopologyEventListener.getRemainingExpectedCount()); - assertEquals(2, pp.getGetCnt()); + assertEquals(3, pp.getGetCnt()); } Modified: sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/Instance.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/Instance.java?rev=1476138&r1=1476137&r2=1476138&view=diff ============================================================================== --- sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/Instance.java (original) +++ sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/Instance.java Fri Apr 26 11:08:29 2013 @@ -39,11 +39,14 @@ import javax.jcr.observation.Observation import junitx.util.PrivateAccessor; import org.apache.sling.api.SlingConstants; -import org.apache.sling.api.resource.LoginException; import org.apache.sling.api.resource.ResourceResolver; import org.apache.sling.api.resource.ResourceResolverFactory; -import org.apache.sling.discovery.TopologyEventListener; +import org.apache.sling.commons.scheduler.Scheduler; +import org.apache.sling.commons.scheduler.impl.QuartzScheduler; +import org.apache.sling.commons.threads.ThreadPoolManager; +import org.apache.sling.commons.threads.impl.DefaultThreadPoolManager; import org.apache.sling.discovery.PropertyProvider; +import org.apache.sling.discovery.TopologyEventListener; import org.apache.sling.discovery.impl.Config; import org.apache.sling.discovery.impl.DiscoveryServiceImpl; import org.apache.sling.discovery.impl.cluster.ClusterViewService; @@ -84,6 +87,26 @@ public class Instance { private ResourceResolver resourceResolver; private int serviceId = 999; + + private static Scheduler singletonScheduler = null; + + private static Scheduler getSingletonScheduler() throws Exception { + if (singletonScheduler!=null) { + return singletonScheduler; + } + final Scheduler newscheduler = new QuartzScheduler(); + final ThreadPoolManager tpm = new DefaultThreadPoolManager(null, null); + try { + PrivateAccessor.invoke(newscheduler, "bindThreadPoolManager", + new Class[] { ThreadPoolManager.class }, + new Object[] { tpm }); + } catch (Throwable e1) { + org.junit.Assert.fail(e1.toString()); + } + OSGiMock.activate(newscheduler); + singletonScheduler = newscheduler; + return singletonScheduler; + } private Instance(String debugName, ResourceResolverFactory resourceResolverFactory, boolean resetRepo) @@ -97,10 +120,15 @@ public class Instance { Config config = new Config() { @Override public long getHeartbeatTimeout() { - return 20000; + return 20; + } + + @Override + public int getMinEventDelay() { + return 1; } }; - + clusterViewService = OSGiFactory.createClusterViewServiceImpl(slingId, resourceResolverFactory, config); announcementRegistry = OSGiFactory.createITopologyAnnouncementRegistry( @@ -111,11 +139,11 @@ public class Instance { resourceResolverFactory, slingId, announcementRegistry, connectorRegistry, config, resourceResolverFactory.getAdministrativeResourceResolver(null) - .adaptTo(Repository.class)); - - discoveryService = OSGiFactory.createDiscoverService(slingId, + .adaptTo(Repository.class), getSingletonScheduler()); + + discoveryService = OSGiFactory.createDiscoverService(slingId, heartbeatHandler, clusterViewService, announcementRegistry, - resourceResolverFactory, config); + resourceResolverFactory, config, connectorRegistry, getSingletonScheduler()); votingHandler = OSGiFactory.createVotingHandler(slingId, resourceResolverFactory, config); @@ -272,10 +300,11 @@ public class Instance { } } - public void stop() throws LoginException { + public void stop() throws Exception { if (resourceResolver != null) { resourceResolver.close(); } + osgiMock.deactivateAll(); } public void bindTopologyEventListener(TopologyEventListener eventListener) Modified: sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/MockFactory.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/MockFactory.java?rev=1476138&r1=1476137&r2=1476138&view=diff ============================================================================== --- sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/MockFactory.java (original) +++ sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/MockFactory.java Fri Apr 26 11:08:29 2013 @@ -22,6 +22,7 @@ import java.util.Dictionary; import java.util.Properties; import org.apache.sling.api.resource.ResourceResolverFactory; +import org.apache.sling.commons.scheduler.Scheduler; import org.apache.sling.settings.SlingSettingsService; import org.hamcrest.Description; import org.jmock.Expectations; Modified: sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/OSGiFactory.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/OSGiFactory.java?rev=1476138&r1=1476137&r2=1476138&view=diff ============================================================================== --- sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/OSGiFactory.java (original) +++ sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/OSGiFactory.java Fri Apr 26 11:08:29 2013 @@ -23,6 +23,7 @@ import javax.jcr.Repository; import junitx.util.PrivateAccessor; import org.apache.sling.api.resource.ResourceResolverFactory; +import org.apache.sling.commons.scheduler.Scheduler; import org.apache.sling.discovery.impl.Config; import org.apache.sling.discovery.impl.DiscoveryServiceImpl; import org.apache.sling.discovery.impl.cluster.ClusterViewServiceImpl; @@ -52,7 +53,7 @@ public class OSGiFactory { public static HeartbeatHandler createHeartbeatHandler( ResourceResolverFactory resourceResolverFactory, String slingId, AnnouncementRegistry topologyService, - ConnectorRegistry connectorRegistry, Config config, Repository repository) + ConnectorRegistry connectorRegistry, Config config, Repository repository, Scheduler scheduler) throws Exception { HeartbeatHandler heartbeatHandler = new HeartbeatHandler(); PrivateAccessor.setField(heartbeatHandler, "resourceResolverFactory", @@ -64,6 +65,7 @@ public class OSGiFactory { PrivateAccessor.setField(heartbeatHandler, "connectorRegistry", connectorRegistry); PrivateAccessor.setField(heartbeatHandler, "config", config); + PrivateAccessor.setField(heartbeatHandler, "scheduler", scheduler); return heartbeatHandler; } @@ -72,7 +74,8 @@ public class OSGiFactory { HeartbeatHandler heartbeatHandler, ClusterViewServiceImpl clusterViewService, AnnouncementRegistry topologyRegistry, - ResourceResolverFactory resourceResolverFactory, Config config) + ResourceResolverFactory resourceResolverFactory, Config config, + ConnectorRegistry connectorRegistry, Scheduler scheduler) throws Exception { DiscoveryServiceImpl discoveryService = new DiscoveryServiceImpl(); PrivateAccessor.setField(discoveryService, "settingsService", @@ -86,6 +89,10 @@ public class OSGiFactory { PrivateAccessor.setField(discoveryService, "resourceResolverFactory", resourceResolverFactory); PrivateAccessor.setField(discoveryService, "config", config); + PrivateAccessor.setField(discoveryService, "connectorRegistry", + connectorRegistry); + PrivateAccessor.setField(discoveryService, "scheduler", + scheduler); return discoveryService; } Modified: sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/OSGiMock.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/OSGiMock.java?rev=1476138&r1=1476137&r2=1476138&view=diff ============================================================================== --- sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/OSGiMock.java (original) +++ sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/OSGiMock.java Fri Apr 26 11:08:29 2013 @@ -18,6 +18,7 @@ */ package org.apache.sling.discovery.impl.setup; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Iterator; import java.util.LinkedList; @@ -53,18 +54,48 @@ public class OSGiMock { Iterator it = services.iterator(); it.hasNext();) { Object aService = it.next(); - Method[] methods = aService.getClass().getDeclaredMethods(); - for (int i = 0; i < methods.length; i++) { - Method method = methods[i]; - if (method.getName().equals("activate")) { - method.setAccessible(true); - if ( method.getParameterTypes().length == 0 ) { - method.invoke(aService, null); - } else { - method.invoke(aService, MockFactory.mockComponentContext()); - } - } - } + activate(aService); } } + + public static void activate(Object aService) throws IllegalAccessException, + InvocationTargetException { + Method[] methods = aService.getClass().getDeclaredMethods(); + for (int i = 0; i < methods.length; i++) { + Method method = methods[i]; + if (method.getName().equals("activate")) { + method.setAccessible(true); + if ( method.getParameterTypes().length == 0 ) { + method.invoke(aService, null); + } else { + method.invoke(aService, MockFactory.mockComponentContext()); + } + } + } + } + + public void deactivateAll() throws Exception { + for (@SuppressWarnings("rawtypes") + Iterator it = services.iterator(); it.hasNext();) { + Object aService = it.next(); + + deactivate(aService); + } + } + + public static void deactivate(Object aService) throws IllegalAccessException, + InvocationTargetException { + Method[] methods = aService.getClass().getDeclaredMethods(); + for (int i = 0; i < methods.length; i++) { + Method method = methods[i]; + if (method.getName().equals("deactivate")) { + method.setAccessible(true); + if ( method.getParameterTypes().length == 0 ) { + method.invoke(aService, null); + } else { + method.invoke(aService, MockFactory.mockComponentContext()); + } + } + } + } } Modified: sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/topology/connector/ConnectorRegistryTest.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/topology/connector/ConnectorRegistryTest.java?rev=1476138&r1=1476137&r2=1476138&view=diff ============================================================================== --- sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/topology/connector/ConnectorRegistryTest.java (original) +++ sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/topology/connector/ConnectorRegistryTest.java Fri Apr 26 11:08:29 2013 @@ -29,7 +29,6 @@ import org.apache.sling.discovery.impl.s import org.apache.sling.discovery.impl.setup.MockFactory; import org.apache.sling.discovery.impl.setup.OSGiFactory; import org.apache.sling.discovery.impl.topology.announcement.AnnouncementRegistry; -import org.apache.sling.discovery.impl.topology.connector.TopologyConnectorClientInformation.OriginInfo; import org.junit.Test; public class ConnectorRegistryTest { @@ -51,44 +50,37 @@ public class ConnectorRegistryTest { ConnectorRegistry c = OSGiFactory.createConnectorRegistry( announcementRegistry, config); - final URL url = new URL("http://localhost:1234"); + final URL url = new URL("http://localhost:1234/connector"); final ClusterViewService cvs = i.getClusterViewService(); try { - c.registerOutgoingConnection(cvs, url, null); + c.registerOutgoingConnector(null, url); fail("should have complained"); } catch (IllegalArgumentException e) { // ok } try { - c.registerOutgoingConnection(null, url, OriginInfo.Programmatically); - fail("should have complained"); - } catch (IllegalArgumentException e) { - // ok - } - try { - c.registerOutgoingConnection(cvs, null, OriginInfo.Config); + c.registerOutgoingConnector(cvs, null); fail("should have complained"); } catch (IllegalArgumentException e) { // ok } TopologyConnectorClientInformation client = c - .registerOutgoingConnection(cvs, url, OriginInfo.WebConsole); + .registerOutgoingConnector(cvs, url); try { // should not be able to register same url twice - client = c.registerOutgoingConnection(cvs, url, - OriginInfo.Programmatically); + client = c.registerOutgoingConnector(cvs, url); fail("should have complained"); } catch (IllegalStateException e) { // ok } try { - c.unregisterOutgoingConnection(null); + c.unregisterOutgoingConnector(null); fail("should have complained"); } catch (IllegalArgumentException e) { // ok } - c.unregisterOutgoingConnection(client.getId()); + c.unregisterOutgoingConnector(client.getId()); } }