http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java new file mode 100644 index 0000000..f50848f --- /dev/null +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.discovery.zookeeper.subscribe; + +import java.io.ByteArrayInputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser; +import org.apache.aries.rsa.discovery.endpoint.PropertiesMapper; +import org.apache.aries.rsa.discovery.zookeeper.util.Utils; +import org.apache.zookeeper.AsyncCallback.StatCallback; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; +import org.osgi.service.remoteserviceadmin.EndpointDescription; +import org.osgi.service.remoteserviceadmin.EndpointListener; +import org.osgi.xmlns.rsa.v1_0.EndpointDescriptionType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Monitors ZooKeeper for changes in published endpoints. + * <p> + * Specifically, it monitors the node path associated with a given interface class, + * whose data is a serialized version of an EndpointDescription, and notifies an + * EndpointListener when changes are detected (which can then propagate the + * notification to other EndpointListeners with a matching scope). + * <p> + * Note that the EndpointListener is used here as a decoupling interface for + * convenience, and is not necessarily used according to its documented contract. + */ +public class InterfaceMonitor implements Watcher, StatCallback { + + private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitor.class); + + private final String znode; + private final ZooKeeper zk; + private final EndpointListener endpointListener; + private final boolean recursive; + private volatile boolean closed; + + // This map reference changes, so don't synchronize on it + private Map<String, EndpointDescription> nodes = new HashMap<String, EndpointDescription>(); + + private EndpointDescriptionParser parser; + + public InterfaceMonitor(ZooKeeper zk, String objClass, EndpointListener endpointListener, String scope) { + this.zk = zk; + this.znode = Utils.getZooKeeperPath(objClass); + this.recursive = objClass == null || objClass.isEmpty(); + this.endpointListener = endpointListener; + this.parser = new EndpointDescriptionParser(); + LOG.debug("Creating new InterfaceMonitor {} for scope [{}] and objectClass [{}]", + new Object[] {recursive ? "(recursive)" : "", scope, objClass}); + } + + /** + * Returns all endpoints that are currently known to this monitor. + * + * @return all endpoints that are currently known to this monitor + */ + public synchronized List<EndpointDescription> getEndpoints() { + return new ArrayList<EndpointDescription>(nodes.values()); + } + + public void start() { + watch(); + } + + private void watch() { + LOG.debug("registering a ZooKeeper.exists({}) callback", znode); + zk.exists(znode, this, this, null); + } + + /** + * Zookeeper Watcher interface callback. + */ + public void process(WatchedEvent event) { + LOG.debug("ZooKeeper watcher callback on node {} for event {}", znode, event); + processDelta(); + } + + /** + * Zookeeper StatCallback interface callback. + */ + @SuppressWarnings("deprecation") + public void processResult(int rc, String path, Object ctx, Stat stat) { + LOG.debug("ZooKeeper callback on node: {} code: {}", znode, rc); + + switch (rc) { + case Code.Ok: + case Code.NoNode: + processDelta(); + return; + + case Code.SessionExpired: + case Code.NoAuth: + case Code.ConnectionLoss: + return; + + default: + watch(); + } + } + + private void processDelta() { + if (closed) { + return; + } + + if (zk.getState() != ZooKeeper.States.CONNECTED) { + LOG.debug("ZooKeeper connection was already closed! Not processing changed event."); + return; + } + + try { + if (zk.exists(znode, false) != null) { + zk.getChildren(znode, this); + refreshNodes(); + } else { + LOG.debug("znode {} doesn't exist -> not processing any changes", znode); + } + } catch (Exception e) { + if (zk.getState() != ZooKeeper.States.CONNECTED) { + LOG.debug("Error getting Zookeeper data: " + e); // e.g. session expired, handled by ZooKeeperDiscovery + } else { + LOG.error("Error getting ZooKeeper data.", e); + } + } + } + + public synchronized void close() { + closed = true; + for (EndpointDescription endpoint : nodes.values()) { + endpointListener.endpointRemoved(endpoint, null); + } + nodes.clear(); + } + + private synchronized void refreshNodes() { + if (closed) { + return; + } + LOG.info("Processing change on node: {}", znode); + + Map<String, EndpointDescription> newNodes = new HashMap<String, EndpointDescription>(); + Map<String, EndpointDescription> prevNodes = new HashMap<String, EndpointDescription>(nodes); + processChildren(znode, newNodes, prevNodes); + + // whatever is left in prevNodes now has been removed from Discovery + LOG.debug("processChildren done. Nodes that are missing now and need to be removed: {}", prevNodes.values()); + for (EndpointDescription endpoint : prevNodes.values()) { + endpointListener.endpointRemoved(endpoint, null); + } + nodes = newNodes; + } + + /** + * Iterates through all child nodes of the given node and tries to find + * endpoints. If the recursive flag is set it also traverses into the child + * nodes. + * + * @return true if an endpoint was found and if the node therefore needs to + * be monitored for changes + */ + private boolean processChildren(String zn, Map<String, EndpointDescription> newNodes, + Map<String, EndpointDescription> prevNodes) { + List<String> children; + try { + LOG.debug("Processing the children of {}", zn); + children = zk.getChildren(zn, false); + + boolean foundANode = false; + for (String child : children) { + String childZNode = zn + '/' + child; + EndpointDescription endpoint = getEndpointDescriptionFromNode(childZNode); + if (endpoint != null) { + EndpointDescription prevEndpoint = prevNodes.get(child); + LOG.info("found new node " + zn + "/[" + child + "] ( []->child ) props: " + + endpoint.getProperties().values()); + newNodes.put(child, endpoint); + prevNodes.remove(child); + foundANode = true; + LOG.debug("Properties: {}", endpoint.getProperties()); + if (prevEndpoint == null) { + // This guy is new + endpointListener.endpointAdded(endpoint, null); + } else if (!prevEndpoint.getProperties().equals(endpoint.getProperties())) { + // TODO + } + } + if (recursive && processChildren(childZNode, newNodes, prevNodes)) { + zk.getChildren(childZNode, this); + } + } + + return foundANode; + } catch (KeeperException e) { + LOG.error("Problem processing ZooKeeper node", e); + } catch (InterruptedException e) { + LOG.error("Problem processing ZooKeeper node", e); + } + return false; + } + + /** + * Retrieves data from the given node and parses it into an EndpointDescription. + * + * @param node a node path + * @return endpoint found in the node or null if no endpoint was found + */ + private EndpointDescription getEndpointDescriptionFromNode(String node) { + try { + Stat stat = zk.exists(node, false); + if (stat == null || stat.getDataLength() <= 0) { + return null; + } + byte[] data = zk.getData(node, false, null); + LOG.debug("Got data for node: {}", node); + + EndpointDescription endpoint = getFirstEnpointDescription(data); + if (endpoint != null) { + return endpoint; + } + LOG.warn("No Discovery information found for node: {}", node); + } catch (Exception e) { + LOG.error("Problem getting EndpointDescription from node " + node, e); + } + return null; + } + + public EndpointDescription getFirstEnpointDescription(byte[] data) { + List<EndpointDescriptionType> elements = parser.getEndpointDescriptions(new ByteArrayInputStream(data)); + if (elements.isEmpty()) { + return null; + } + Map<String, Object> props = new PropertiesMapper().toProps(elements.get(0).getProperty()); + return new EndpointDescription(props); + } +}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java new file mode 100644 index 0000000..3a02a48 --- /dev/null +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.discovery.zookeeper.subscribe; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Dictionary; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery; +import org.apache.aries.rsa.discovery.zookeeper.util.Utils; +import org.apache.aries.rsa.util.StringPlus; +import org.apache.zookeeper.ZooKeeper; +import org.osgi.framework.Bundle; +import org.osgi.framework.BundleContext; +import org.osgi.framework.Filter; +import org.osgi.framework.ServiceReference; +import org.osgi.service.remoteserviceadmin.EndpointDescription; +import org.osgi.service.remoteserviceadmin.EndpointListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages the EndpointListeners and the scopes they are interested in. + * For each scope with interested EndpointListeners an InterfaceMonitor is created. + * The InterfaceMonitor calls back when it detects added or removed external Endpoints. + * These events are then forwarded to all interested EndpointListeners. + */ +public class InterfaceMonitorManager { + private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitorManager.class); + private static final Pattern OBJECTCLASS_PATTERN = Pattern.compile(".*\\(objectClass=([^)]+)\\).*"); + + private final BundleContext bctx; + private final ZooKeeper zk; + // map of EndpointListeners and the scopes they are interested in + private final Map<ServiceReference<EndpointListener>, List<String>> endpointListenerScopes = + new HashMap<ServiceReference<EndpointListener>, List<String>>(); + // map of scopes and their interest data + private final Map<String, Interest> interests = new HashMap<String, Interest>(); + + protected static class Interest { + List<ServiceReference<EndpointListener>> endpointListeners = + new CopyOnWriteArrayList<ServiceReference<EndpointListener>>(); + InterfaceMonitor monitor; + } + + public InterfaceMonitorManager(BundleContext bctx, ZooKeeper zk) { + this.bctx = bctx; + this.zk = zk; + } + + public void addInterest(ServiceReference<EndpointListener> endpointListener) { + if (isOurOwnEndpointListener(endpointListener)) { + LOG.debug("Skipping our own EndpointListener"); + return; + } + + LOG.info("updating EndpointListener interests: {}", endpointListener); + if (LOG.isDebugEnabled()) { + LOG.debug("updated EndpointListener properties: {}", getProperties(endpointListener)); + } + for (String scope : getScopes(endpointListener)) { + String objClass = getObjectClass(scope); + LOG.debug("Adding interest in scope {}, objectClass {}", scope, objClass); + addInterest(endpointListener, scope, objClass); + } + } + + private static boolean isOurOwnEndpointListener(ServiceReference<EndpointListener> endpointListener) { + return Boolean.parseBoolean(String.valueOf( + endpointListener.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID))); + } + + public synchronized void addInterest(ServiceReference<EndpointListener> endpointListener, + String scope, String objClass) { + // get or create interest for given scope and add listener to it + Interest interest = interests.get(scope); + if (interest == null) { + // create interest, add listener and start monitor + interest = new Interest(); + interests.put(scope, interest); + interest.endpointListeners.add(endpointListener); // add it before monitor starts so we don't miss events + interest.monitor = createInterfaceMonitor(scope, objClass, interest); + interest.monitor.start(); + } else { + // interest already exists, so just add listener to it + if (!interest.endpointListeners.contains(endpointListener)) { + interest.endpointListeners.add(endpointListener); + } + // notify listener of all known endpoints for given scope + // (as EndpointListener contract requires of all added/modified listeners) + for (EndpointDescription endpoint : interest.monitor.getEndpoints()) { + notifyListeners(endpoint, scope, true, Arrays.asList(endpointListener)); + } + } + + // add scope to listener's scopes list + List<String> scopes = endpointListenerScopes.get(endpointListener); + if (scopes == null) { + scopes = new ArrayList<String>(1); + endpointListenerScopes.put(endpointListener, scopes); + } + if (!scopes.contains(scope)) { + scopes.add(scope); + } + } + + public synchronized void removeInterest(ServiceReference<EndpointListener> endpointListener) { + LOG.info("removing EndpointListener interests: {}", endpointListener); + List<String> scopes = endpointListenerScopes.get(endpointListener); + if (scopes == null) { + return; + } + + for (String scope : scopes) { + Interest interest = interests.get(scope); + if (interest != null) { + interest.endpointListeners.remove(endpointListener); + if (interest.endpointListeners.isEmpty()) { + interest.monitor.close(); + interests.remove(scope); + } + } + } + endpointListenerScopes.remove(endpointListener); + } + + protected InterfaceMonitor createInterfaceMonitor(final String scope, String objClass, final Interest interest) { + // holding this object's lock in the callbacks can lead to a deadlock with InterfaceMonitor + EndpointListener endpointListener = new EndpointListener() { + + public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) { + notifyListeners(endpoint, scope, false, interest.endpointListeners); + } + + public void endpointAdded(EndpointDescription endpoint, String matchedFilter) { + notifyListeners(endpoint, scope, true, interest.endpointListeners); + } + }; + return new InterfaceMonitor(zk, objClass, endpointListener, scope); + } + + private void notifyListeners(EndpointDescription endpoint, String currentScope, boolean isAdded, + List<ServiceReference<EndpointListener>> endpointListeners) { + for (ServiceReference<EndpointListener> endpointListenerRef : endpointListeners) { + EndpointListener service = bctx.getService(endpointListenerRef); + try { + EndpointListener endpointListener = (EndpointListener)service; + LOG.trace("matching {} against {}", endpoint, currentScope); + if (matchFilter(bctx, currentScope, endpoint)) { + LOG.debug("Matched {} against {}", endpoint, currentScope); + notifyListener(endpoint, currentScope, isAdded, endpointListenerRef.getBundle(), + endpointListener); + } + } finally { + if (service != null) { + bctx.ungetService(endpointListenerRef); + } + } + } + } + + private static boolean matchFilter(BundleContext bctx, String filter, EndpointDescription endpoint) { + if (filter == null) { + return false; + } + + try { + Filter f = bctx.createFilter(filter); + Dictionary<String, Object> dict = new Hashtable<String, Object>(endpoint.getProperties()); + return f.match(dict); + } catch (Exception e) { + return false; + } + } + + + private void notifyListener(EndpointDescription endpoint, String currentScope, boolean isAdded, + Bundle endpointListenerBundle, EndpointListener endpointListener) { + if (endpointListenerBundle == null) { + LOG.info("listening service was unregistered, ignoring"); + } else if (isAdded) { + LOG.info("calling EndpointListener.endpointAdded: " + endpointListener + " from bundle " + + endpointListenerBundle.getSymbolicName() + " for endpoint: " + endpoint); + endpointListener.endpointAdded(endpoint, currentScope); + } else { + LOG.info("calling EndpointListener.endpointRemoved: " + endpointListener + " from bundle " + + endpointListenerBundle.getSymbolicName() + " for endpoint: " + endpoint); + endpointListener.endpointRemoved(endpoint, currentScope); + } + } + + public synchronized void close() { + for (Interest interest : interests.values()) { + interest.monitor.close(); + } + interests.clear(); + endpointListenerScopes.clear(); + } + + /** + * Only for test case! + */ + protected synchronized Map<String, Interest> getInterests() { + return interests; + } + + /** + * Only for test case! + */ + protected synchronized Map<ServiceReference<EndpointListener>, List<String>> getEndpointListenerScopes() { + return endpointListenerScopes; + } + + protected List<String> getScopes(ServiceReference<?> sref) { + return Utils.removeEmpty(StringPlus.normalize(sref.getProperty(EndpointListener.ENDPOINT_LISTENER_SCOPE))); + } + + public static String getObjectClass(String scope) { + Matcher m = OBJECTCLASS_PATTERN.matcher(scope); + return m.matches() ? m.group(1) : null; + } + + /** + * Returns a service's properties as a map. + * + * @param serviceReference a service reference + * @return the service's properties as a map + */ + public static Map<String, Object> getProperties(ServiceReference<?> serviceReference) { + String[] keys = serviceReference.getPropertyKeys(); + Map<String, Object> props = new HashMap<String, Object>(keys.length); + for (String key : keys) { + Object val = serviceReference.getProperty(key); + props.put(key, val); + } + return props; + } +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java new file mode 100644 index 0000000..82ccb85 --- /dev/null +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.discovery.zookeeper.util; + +import java.util.ArrayList; +import java.util.List; + +public final class Utils { + + static final String PATH_PREFIX = "/osgi/service_registry"; + + private Utils() { + // never constructed + } + + public static String getZooKeeperPath(String name) { + return name == null || name.isEmpty() ? PATH_PREFIX : PATH_PREFIX + '/' + name.replace('.', '/'); + } + + /** + * Removes nulls and empty strings from the given string array. + * + * @param strings an array of strings + * @return a new array containing the non-null and non-empty + * elements of the original array in the same order + */ + public static List<String> removeEmpty(List<String> strings) { + List<String> result = new ArrayList<String>(); + for (String s : strings) { + if (s != null && !s.isEmpty()) { + result.add(s); + } + } + return result; + } + + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java deleted file mode 100644 index 1e6c551..0000000 --- a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.discovery.zookeeper; - -import java.util.Dictionary; -import java.util.Hashtable; - -import org.osgi.framework.BundleActivator; -import org.osgi.framework.BundleContext; -import org.osgi.framework.Constants; -import org.osgi.service.cm.ManagedService; - -public class Activator implements BundleActivator { - - private ZooKeeperDiscovery zkd; - - public synchronized void start(BundleContext bc) throws Exception { - zkd = new ZooKeeperDiscovery(bc); - Dictionary<String, String> props = new Hashtable<String, String>(); - props.put(Constants.SERVICE_PID, "org.apache.aries.rsa.discovery.zookeeper"); - bc.registerService(ManagedService.class.getName(), zkd, props); - } - - public synchronized void stop(BundleContext bc) throws Exception { - zkd.stop(true); - } -} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java deleted file mode 100644 index 3a7f2c4..0000000 --- a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java +++ /dev/null @@ -1,186 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.discovery.zookeeper; - -import java.io.IOException; -import java.util.Dictionary; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.Map; - -import org.apache.cxf.dosgi.discovery.zookeeper.publish.PublishingEndpointListenerFactory; -import org.apache.cxf.dosgi.discovery.zookeeper.subscribe.EndpointListenerTracker; -import org.apache.cxf.dosgi.discovery.zookeeper.subscribe.InterfaceMonitorManager; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.osgi.framework.BundleContext; -import org.osgi.service.cm.ConfigurationException; -import org.osgi.service.cm.ManagedService; -import org.osgi.service.remoteserviceadmin.EndpointListener; -import org.osgi.util.tracker.ServiceTracker; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ZooKeeperDiscovery implements Watcher, ManagedService { - - public static final String DISCOVERY_ZOOKEEPER_ID = "org.apache.cxf.dosgi.discovery.zookeeper"; - - private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperDiscovery.class); - - private final BundleContext bctx; - - private PublishingEndpointListenerFactory endpointListenerFactory; - private ServiceTracker<EndpointListener, EndpointListener> endpointListenerTracker; - private InterfaceMonitorManager imManager; - private ZooKeeper zkClient; - private boolean closed; - private boolean started; - - private Dictionary<String, ?> curConfiguration; - - public ZooKeeperDiscovery(BundleContext bctx) { - this.bctx = bctx; - } - - public synchronized void updated(Dictionary<String, ?> configuration) throws ConfigurationException { - LOG.debug("Received configuration update for Zookeeper Discovery: {}", configuration); - // make changes only if config actually changed, to prevent unnecessary ZooKeeper reconnections - if (!ZooKeeperDiscovery.toMap(configuration).equals(ZooKeeperDiscovery.toMap(curConfiguration))) { - stop(false); - curConfiguration = configuration; - // config is null if it doesn't exist, is being deleted or has not yet been loaded - // in which case we just stop running - if (!closed && configuration != null) { - try { - createZookeeper(configuration); - } catch (IOException e) { - throw new ConfigurationException(null, "Error starting zookeeper client", e); - } - } - } - } - - private synchronized void start() { - if (closed) { - return; - } - if (started) { - // we must be re-entrant, i.e. can be called when already started - LOG.debug("ZookeeperDiscovery already started"); - return; - } - LOG.debug("starting ZookeeperDiscovery"); - endpointListenerFactory = new PublishingEndpointListenerFactory(zkClient, bctx); - endpointListenerFactory.start(); - imManager = new InterfaceMonitorManager(bctx, zkClient); - endpointListenerTracker = new EndpointListenerTracker(bctx, imManager); - endpointListenerTracker.open(); - started = true; - } - - public synchronized void stop(boolean close) { - if (started) { - LOG.debug("stopping ZookeeperDiscovery"); - } - started = false; - closed |= close; - if (endpointListenerFactory != null) { - endpointListenerFactory.stop(); - } - if (endpointListenerTracker != null) { - endpointListenerTracker.close(); - } - if (imManager != null) { - imManager.close(); - } - if (zkClient != null) { - try { - zkClient.close(); - } catch (InterruptedException e) { - LOG.error("Error closing ZooKeeper", e); - } - } - } - - protected ZooKeeper createZooKeeper(String host, String port, int timeout) throws IOException { - LOG.info("ZooKeeper discovery connecting to {}:{} with timeout {}", - new Object[]{host, port, timeout}); - return new ZooKeeper(host + ":" + port, timeout, this); - } - - /* Callback for ZooKeeper */ - public void process(WatchedEvent event) { - LOG.debug("got ZooKeeper event " + event); - switch (event.getState()) { - case SyncConnected: - LOG.info("Connection to ZooKeeper established"); - // this event can be triggered more than once in a row (e.g. after Disconnected event), - // so we must be re-entrant here - start(); - break; - - case Expired: - LOG.info("Connection to ZooKeeper expired. Trying to create a new connection"); - stop(false); - try { - createZookeeper(curConfiguration); - } catch (IOException e) { - LOG.error("Error starting zookeeper client", e); - } - break; - - default: - // ignore other events - break; - } - } - - private void createZookeeper(Dictionary<String, ?> config) throws IOException { - String host = (String)getWithDefault(config, "zookeeper.host", "localhost"); - String port = (String)getWithDefault(config, "zookeeper.port", "2181"); - int timeout = Integer.parseInt((String)getWithDefault(config, "zookeeper.timeout", "3000")); - zkClient = createZooKeeper(host, port, timeout); - } - - public Object getWithDefault(Dictionary<String, ?> config, String key, Object defaultValue) { - Object value = config.get(key); - return value != null ? value : defaultValue; - } - - /** - * Converts the given Dictionary to a Map. - * - * @param dict a dictionary - * @param <K> the key type - * @param <V> the value type - * @return the converted map, or an empty map if the given dictionary is null - */ - public static <K, V> Map<K, V> toMap(Dictionary<K, V> dict) { - Map<K, V> map = new HashMap<K, V>(); - if (dict != null) { - Enumeration<K> keys = dict.keys(); - while (keys.hasMoreElements()) { - K key = keys.nextElement(); - map.put(key, dict.get(key)); - } - } - return map; - } -} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/DiscoveryPlugin.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/DiscoveryPlugin.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/DiscoveryPlugin.java deleted file mode 100644 index 5d46585..0000000 --- a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/DiscoveryPlugin.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.discovery.zookeeper.publish; - -import java.util.Map; - -/** - * This interface allows transformation of service registration information before it is pushed into the ZooKeeper - * discovery system. - * It can be useful for situations where a host name or port number needs to be changed in cases where the host running - * the service is known differently from the outside to what the local Java process thinks it is. - * Extra service properties can also be added to the registration which can be useful to refine the remote service - * lookup process. <p/> - * - * DiscoveryPlugins use the OSGi WhiteBoard pattern. To add one to the system, register an instance under this interface - * with the OSGi Service Registry. All registered DiscoveryPlugin instances are visited and given a chance to - * process the information before it is pushed into ZooKeeper. <p/> - * - * Note that the changes made using this plugin do not modify the local service registration. - * - */ -public interface DiscoveryPlugin { - - /** - * Process service registration information. Plugins can change this information before it is published into the - * ZooKeeper discovery system. - * - * @param mutableProperties A map of service registration properties. The map is mutable and any changes to the map - * will be reflected in the ZooKeeper registration. - * @param endpointKey The key under which the service is registered in ZooKeeper. This key typically has the - * following format: hostname#port##context. While the actual value of this key is not actually used by the - * system (people can use it as a hint to understand where the service is located), the value <i>must</i> be - * unique for all services of a given type. - * @return The <tt>endpointKey</tt> value to be used. If there is no need to change this simply return the value - * of the <tt>endpointKey</tt> parameter. - */ - String process(Map<String, Object> mutableProperties, String endpointKey); -} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListener.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListener.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListener.java deleted file mode 100644 index 9bcfe72..0000000 --- a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListener.java +++ /dev/null @@ -1,210 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.discovery.zookeeper.publish; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser; -import org.apache.aries.rsa.discovery.endpoint.PropertiesMapper; -import org.apache.cxf.dosgi.discovery.zookeeper.util.Utils; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.NoNodeException; -import org.apache.zookeeper.KeeperException.NodeExistsException; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.ZooKeeper; -import org.osgi.framework.BundleContext; -import org.osgi.service.remoteserviceadmin.EndpointDescription; -import org.osgi.service.remoteserviceadmin.EndpointListener; -import org.osgi.util.tracker.ServiceTracker; -import org.osgi.xmlns.rsa.v1_0.EndpointDescriptionType; -import org.osgi.xmlns.rsa.v1_0.PropertyType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Listens for local Endpoints and publishes them to ZooKeeper. - */ -public class PublishingEndpointListener implements EndpointListener { - - private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListener.class); - - private final ZooKeeper zk; - private final ServiceTracker<DiscoveryPlugin, DiscoveryPlugin> discoveryPluginTracker; - private final List<EndpointDescription> endpoints = new ArrayList<EndpointDescription>(); - private boolean closed; - - private final EndpointDescriptionParser endpointDescriptionParser; - - public PublishingEndpointListener(ZooKeeper zk, BundleContext bctx) { - this.zk = zk; - discoveryPluginTracker = new ServiceTracker<DiscoveryPlugin, DiscoveryPlugin>(bctx, - DiscoveryPlugin.class, null); - discoveryPluginTracker.open(); - endpointDescriptionParser = new EndpointDescriptionParser(); - } - - public void endpointAdded(EndpointDescription endpoint, String matchedFilter) { - LOG.info("Local EndpointDescription added: {}", endpoint); - - synchronized (endpoints) { - if (closed) { - return; - } - if (endpoints.contains(endpoint)) { - // TODO -> Should the published endpoint be updated here? - return; - } - - try { - addEndpoint(endpoint); - endpoints.add(endpoint); - } catch (Exception ex) { - LOG.error("Exception while processing the addition of an endpoint.", ex); - } - } - } - - private void addEndpoint(EndpointDescription endpoint) throws URISyntaxException, KeeperException, - InterruptedException, IOException { - Collection<String> interfaces = endpoint.getInterfaces(); - String endpointKey = getKey(endpoint); - Map<String, Object> props = new HashMap<String, Object>(endpoint.getProperties()); - - // process plugins - Object[] plugins = discoveryPluginTracker.getServices(); - if (plugins != null) { - for (Object plugin : plugins) { - if (plugin instanceof DiscoveryPlugin) { - endpointKey = ((DiscoveryPlugin)plugin).process(props, endpointKey); - } - } - } - - for (String name : interfaces) { - String path = Utils.getZooKeeperPath(name); - String fullPath = path + '/' + endpointKey; - LOG.info("Creating ZooKeeper node for service with path {}", fullPath); - createPath(path, zk); - List<PropertyType> propsOut = new PropertiesMapper().fromProps(props); - EndpointDescriptionType epd = new EndpointDescriptionType(); - epd.getProperty().addAll(propsOut); - byte[] epData = endpointDescriptionParser.getData(epd); - createEphemeralNode(fullPath, epData); - } - } - - private void createEphemeralNode(String fullPath, byte[] data) throws KeeperException, InterruptedException { - try { - zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); - } catch (NodeExistsException nee) { - // this sometimes happens after a ZooKeeper node dies and the ephemeral node - // that belonged to the old session was not yet deleted. We need to make our - // session the owner of the node so it won't get deleted automatically - - // we do this by deleting and recreating it ourselves. - LOG.info("node for endpoint already exists, recreating: {}", fullPath); - try { - zk.delete(fullPath, -1); - } catch (NoNodeException nne) { - // it's a race condition, but as long as it got deleted - it's ok - } - zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); - } - } - - public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) { - LOG.info("Local EndpointDescription removed: {}", endpoint); - - synchronized (endpoints) { - if (closed) { - return; - } - if (!endpoints.contains(endpoint)) { - return; - } - - try { - removeEndpoint(endpoint); - endpoints.remove(endpoint); - } catch (Exception ex) { - LOG.error("Exception while processing the removal of an endpoint", ex); - } - } - } - - private void removeEndpoint(EndpointDescription endpoint) throws UnknownHostException, URISyntaxException { - Collection<String> interfaces = endpoint.getInterfaces(); - String endpointKey = getKey(endpoint); - for (String name : interfaces) { - String path = Utils.getZooKeeperPath(name); - String fullPath = path + '/' + endpointKey; - LOG.debug("Removing ZooKeeper node: {}", fullPath); - try { - zk.delete(fullPath, -1); - } catch (Exception ex) { - LOG.debug("Error while removing endpoint: {}", ex); // e.g. session expired - } - } - } - - private static void createPath(String path, ZooKeeper zk) throws KeeperException, InterruptedException { - StringBuilder current = new StringBuilder(); - List<String> parts = Utils.removeEmpty(Arrays.asList(path.split("/"))); - for (String part : parts) { - current.append('/'); - current.append(part); - try { - zk.create(current.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } catch (NodeExistsException nee) { - // it's not the first node with this path to ever exist - that's normal - } - } - } - - private static String getKey(EndpointDescription endpoint) throws URISyntaxException { - URI uri = new URI(endpoint.getId()); - return new StringBuilder().append(uri.getHost()).append("#").append(uri.getPort()) - .append("#").append(uri.getPath().replace('/', '#')).toString(); - } - - public void close() { - LOG.debug("closing - removing all endpoints"); - synchronized (endpoints) { - closed = true; - for (EndpointDescription endpoint : endpoints) { - try { - removeEndpoint(endpoint); - } catch (Exception ex) { - LOG.error("Exception while removing endpoint during close", ex); - } - } - endpoints.clear(); - } - discoveryPluginTracker.close(); - } -} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java deleted file mode 100644 index 99a9849..0000000 --- a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.discovery.zookeeper.publish; - -import java.util.ArrayList; -import java.util.Dictionary; -import java.util.Hashtable; -import java.util.List; - -import org.apache.cxf.dosgi.discovery.zookeeper.ZooKeeperDiscovery; -import org.apache.zookeeper.ZooKeeper; -import org.osgi.framework.Bundle; -import org.osgi.framework.BundleContext; -import org.osgi.framework.Constants; -import org.osgi.framework.ServiceFactory; -import org.osgi.framework.ServiceRegistration; -import org.osgi.service.remoteserviceadmin.EndpointListener; -import org.osgi.service.remoteserviceadmin.RemoteConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Creates local EndpointListeners that publish to ZooKeeper. - */ -public class PublishingEndpointListenerFactory implements ServiceFactory<PublishingEndpointListener> { - - private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListenerFactory.class); - - private final BundleContext bctx; - private final ZooKeeper zk; - private final List<PublishingEndpointListener> listeners = new ArrayList<PublishingEndpointListener>(); - private ServiceRegistration<?> serviceRegistration; - - public PublishingEndpointListenerFactory(ZooKeeper zk, BundleContext bctx) { - this.bctx = bctx; - this.zk = zk; - } - - public PublishingEndpointListener getService(Bundle b, ServiceRegistration<PublishingEndpointListener> sr) { - LOG.debug("new EndpointListener from factory"); - synchronized (listeners) { - PublishingEndpointListener pel = new PublishingEndpointListener(zk, bctx); - listeners.add(pel); - return pel; - } - } - - public void ungetService(Bundle b, ServiceRegistration<PublishingEndpointListener> sr, - PublishingEndpointListener pel) { - LOG.debug("remove EndpointListener"); - synchronized (listeners) { - if (listeners.remove(pel)) { - pel.close(); - } - } - } - - public synchronized void start() { - Dictionary<String, String> props = new Hashtable<String, String>(); - String uuid = bctx.getProperty(Constants.FRAMEWORK_UUID); - props.put(EndpointListener.ENDPOINT_LISTENER_SCOPE, - String.format("(&(%s=*)(%s=%s))", Constants.OBJECTCLASS, - RemoteConstants.ENDPOINT_FRAMEWORK_UUID, uuid)); - props.put(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID, "true"); - serviceRegistration = bctx.registerService(EndpointListener.class.getName(), this, props); - } - - public synchronized void stop() { - if (serviceRegistration != null) { - serviceRegistration.unregister(); - serviceRegistration = null; - } - synchronized (listeners) { - for (PublishingEndpointListener pel : listeners) { - pel.close(); - } - listeners.clear(); - } - } - - /** - * Only for the test case! - */ - protected List<PublishingEndpointListener> getListeners() { - synchronized (listeners) { - return listeners; - } - } -} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTracker.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTracker.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTracker.java deleted file mode 100644 index 4d0a25f..0000000 --- a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTracker.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.discovery.zookeeper.subscribe; - -import org.osgi.framework.BundleContext; -import org.osgi.framework.ServiceReference; -import org.osgi.service.remoteserviceadmin.EndpointListener; -import org.osgi.util.tracker.ServiceTracker; - -/** - * Tracks interest in EndpointListeners. Delegates to InterfaceMonitorManager to manage - * interest in the scopes of each EndpointListener. - */ -public class EndpointListenerTracker extends ServiceTracker<EndpointListener, EndpointListener> { - private final InterfaceMonitorManager imManager; - - public EndpointListenerTracker(BundleContext bctx, InterfaceMonitorManager imManager) { - super(bctx, EndpointListener.class, null); - this.imManager = imManager; - } - - @Override - public EndpointListener addingService(ServiceReference<EndpointListener> endpointListener) { - imManager.addInterest(endpointListener); - return null; - } - - @Override - public void modifiedService(ServiceReference<EndpointListener> endpointListener, EndpointListener service) { - // called when an EndpointListener updates its service properties, - // e.g. when its interest scope is expanded/reduced - imManager.addInterest(endpointListener); - } - - @Override - public void removedService(ServiceReference<EndpointListener> endpointListener, EndpointListener service) { - imManager.removeInterest(endpointListener); - } - -} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitor.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitor.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitor.java deleted file mode 100644 index 3822b6e..0000000 --- a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitor.java +++ /dev/null @@ -1,262 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.discovery.zookeeper.subscribe; - -import java.io.ByteArrayInputStream; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser; -import org.apache.aries.rsa.discovery.endpoint.PropertiesMapper; -import org.apache.cxf.dosgi.discovery.zookeeper.util.Utils; -import org.apache.zookeeper.AsyncCallback.StatCallback; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.Code; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.Stat; -import org.osgi.service.remoteserviceadmin.EndpointDescription; -import org.osgi.service.remoteserviceadmin.EndpointListener; -import org.osgi.xmlns.rsa.v1_0.EndpointDescriptionType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Monitors ZooKeeper for changes in published endpoints. - * <p> - * Specifically, it monitors the node path associated with a given interface class, - * whose data is a serialized version of an EndpointDescription, and notifies an - * EndpointListener when changes are detected (which can then propagate the - * notification to other EndpointListeners with a matching scope). - * <p> - * Note that the EndpointListener is used here as a decoupling interface for - * convenience, and is not necessarily used according to its documented contract. - */ -public class InterfaceMonitor implements Watcher, StatCallback { - - private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitor.class); - - private final String znode; - private final ZooKeeper zk; - private final EndpointListener endpointListener; - private final boolean recursive; - private volatile boolean closed; - - // This map reference changes, so don't synchronize on it - private Map<String, EndpointDescription> nodes = new HashMap<String, EndpointDescription>(); - - private EndpointDescriptionParser parser; - - public InterfaceMonitor(ZooKeeper zk, String objClass, EndpointListener endpointListener, String scope) { - this.zk = zk; - this.znode = Utils.getZooKeeperPath(objClass); - this.recursive = objClass == null || objClass.isEmpty(); - this.endpointListener = endpointListener; - this.parser = new EndpointDescriptionParser(); - LOG.debug("Creating new InterfaceMonitor {} for scope [{}] and objectClass [{}]", - new Object[] {recursive ? "(recursive)" : "", scope, objClass}); - } - - /** - * Returns all endpoints that are currently known to this monitor. - * - * @return all endpoints that are currently known to this monitor - */ - public synchronized List<EndpointDescription> getEndpoints() { - return new ArrayList<EndpointDescription>(nodes.values()); - } - - public void start() { - watch(); - } - - private void watch() { - LOG.debug("registering a ZooKeeper.exists({}) callback", znode); - zk.exists(znode, this, this, null); - } - - /** - * Zookeeper Watcher interface callback. - */ - public void process(WatchedEvent event) { - LOG.debug("ZooKeeper watcher callback on node {} for event {}", znode, event); - processDelta(); - } - - /** - * Zookeeper StatCallback interface callback. - */ - @SuppressWarnings("deprecation") - public void processResult(int rc, String path, Object ctx, Stat stat) { - LOG.debug("ZooKeeper callback on node: {} code: {}", znode, rc); - - switch (rc) { - case Code.Ok: - case Code.NoNode: - processDelta(); - return; - - case Code.SessionExpired: - case Code.NoAuth: - case Code.ConnectionLoss: - return; - - default: - watch(); - } - } - - private void processDelta() { - if (closed) { - return; - } - - if (zk.getState() != ZooKeeper.States.CONNECTED) { - LOG.debug("ZooKeeper connection was already closed! Not processing changed event."); - return; - } - - try { - if (zk.exists(znode, false) != null) { - zk.getChildren(znode, this); - refreshNodes(); - } else { - LOG.debug("znode {} doesn't exist -> not processing any changes", znode); - } - } catch (Exception e) { - if (zk.getState() != ZooKeeper.States.CONNECTED) { - LOG.debug("Error getting Zookeeper data: " + e); // e.g. session expired, handled by ZooKeeperDiscovery - } else { - LOG.error("Error getting ZooKeeper data.", e); - } - } - } - - public synchronized void close() { - closed = true; - for (EndpointDescription endpoint : nodes.values()) { - endpointListener.endpointRemoved(endpoint, null); - } - nodes.clear(); - } - - private synchronized void refreshNodes() { - if (closed) { - return; - } - LOG.info("Processing change on node: {}", znode); - - Map<String, EndpointDescription> newNodes = new HashMap<String, EndpointDescription>(); - Map<String, EndpointDescription> prevNodes = new HashMap<String, EndpointDescription>(nodes); - processChildren(znode, newNodes, prevNodes); - - // whatever is left in prevNodes now has been removed from Discovery - LOG.debug("processChildren done. Nodes that are missing now and need to be removed: {}", prevNodes.values()); - for (EndpointDescription endpoint : prevNodes.values()) { - endpointListener.endpointRemoved(endpoint, null); - } - nodes = newNodes; - } - - /** - * Iterates through all child nodes of the given node and tries to find - * endpoints. If the recursive flag is set it also traverses into the child - * nodes. - * - * @return true if an endpoint was found and if the node therefore needs to - * be monitored for changes - */ - private boolean processChildren(String zn, Map<String, EndpointDescription> newNodes, - Map<String, EndpointDescription> prevNodes) { - List<String> children; - try { - LOG.debug("Processing the children of {}", zn); - children = zk.getChildren(zn, false); - - boolean foundANode = false; - for (String child : children) { - String childZNode = zn + '/' + child; - EndpointDescription endpoint = getEndpointDescriptionFromNode(childZNode); - if (endpoint != null) { - EndpointDescription prevEndpoint = prevNodes.get(child); - LOG.info("found new node " + zn + "/[" + child + "] ( []->child ) props: " - + endpoint.getProperties().values()); - newNodes.put(child, endpoint); - prevNodes.remove(child); - foundANode = true; - LOG.debug("Properties: {}", endpoint.getProperties()); - if (prevEndpoint == null) { - // This guy is new - endpointListener.endpointAdded(endpoint, null); - } else if (!prevEndpoint.getProperties().equals(endpoint.getProperties())) { - // TODO - } - } - if (recursive && processChildren(childZNode, newNodes, prevNodes)) { - zk.getChildren(childZNode, this); - } - } - - return foundANode; - } catch (KeeperException e) { - LOG.error("Problem processing ZooKeeper node", e); - } catch (InterruptedException e) { - LOG.error("Problem processing ZooKeeper node", e); - } - return false; - } - - /** - * Retrieves data from the given node and parses it into an EndpointDescription. - * - * @param node a node path - * @return endpoint found in the node or null if no endpoint was found - */ - private EndpointDescription getEndpointDescriptionFromNode(String node) { - try { - Stat stat = zk.exists(node, false); - if (stat == null || stat.getDataLength() <= 0) { - return null; - } - byte[] data = zk.getData(node, false, null); - LOG.debug("Got data for node: {}", node); - - EndpointDescription endpoint = getFirstEnpointDescription(data); - if (endpoint != null) { - return endpoint; - } - LOG.warn("No Discovery information found for node: {}", node); - } catch (Exception e) { - LOG.error("Problem getting EndpointDescription from node " + node, e); - } - return null; - } - - public EndpointDescription getFirstEnpointDescription(byte[] data) { - List<EndpointDescriptionType> elements = parser.getEndpointDescriptions(new ByteArrayInputStream(data)); - if (elements.isEmpty()) { - return null; - } - Map<String, Object> props = new PropertiesMapper().toProps(elements.get(0).getProperty()); - return new EndpointDescription(props); - } -} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManager.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManager.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManager.java deleted file mode 100644 index f44b5af..0000000 --- a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManager.java +++ /dev/null @@ -1,261 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.discovery.zookeeper.subscribe; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Dictionary; -import java.util.HashMap; -import java.util.Hashtable; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.aries.rsa.util.StringPlus; -import org.apache.cxf.dosgi.discovery.zookeeper.ZooKeeperDiscovery; -import org.apache.cxf.dosgi.discovery.zookeeper.util.Utils; -import org.apache.zookeeper.ZooKeeper; -import org.osgi.framework.Bundle; -import org.osgi.framework.BundleContext; -import org.osgi.framework.Filter; -import org.osgi.framework.ServiceReference; -import org.osgi.service.remoteserviceadmin.EndpointDescription; -import org.osgi.service.remoteserviceadmin.EndpointListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Manages the EndpointListeners and the scopes they are interested in. - * For each scope with interested EndpointListeners an InterfaceMonitor is created. - * The InterfaceMonitor calls back when it detects added or removed external Endpoints. - * These events are then forwarded to all interested EndpointListeners. - */ -public class InterfaceMonitorManager { - private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitorManager.class); - private static final Pattern OBJECTCLASS_PATTERN = Pattern.compile(".*\\(objectClass=([^)]+)\\).*"); - - private final BundleContext bctx; - private final ZooKeeper zk; - // map of EndpointListeners and the scopes they are interested in - private final Map<ServiceReference<EndpointListener>, List<String>> endpointListenerScopes = - new HashMap<ServiceReference<EndpointListener>, List<String>>(); - // map of scopes and their interest data - private final Map<String, Interest> interests = new HashMap<String, Interest>(); - - protected static class Interest { - List<ServiceReference<EndpointListener>> endpointListeners = - new CopyOnWriteArrayList<ServiceReference<EndpointListener>>(); - InterfaceMonitor monitor; - } - - public InterfaceMonitorManager(BundleContext bctx, ZooKeeper zk) { - this.bctx = bctx; - this.zk = zk; - } - - public void addInterest(ServiceReference<EndpointListener> endpointListener) { - if (isOurOwnEndpointListener(endpointListener)) { - LOG.debug("Skipping our own EndpointListener"); - return; - } - - LOG.info("updating EndpointListener interests: {}", endpointListener); - if (LOG.isDebugEnabled()) { - LOG.debug("updated EndpointListener properties: {}", getProperties(endpointListener)); - } - for (String scope : getScopes(endpointListener)) { - String objClass = getObjectClass(scope); - LOG.debug("Adding interest in scope {}, objectClass {}", scope, objClass); - addInterest(endpointListener, scope, objClass); - } - } - - private static boolean isOurOwnEndpointListener(ServiceReference<EndpointListener> endpointListener) { - return Boolean.parseBoolean(String.valueOf( - endpointListener.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID))); - } - - public synchronized void addInterest(ServiceReference<EndpointListener> endpointListener, - String scope, String objClass) { - // get or create interest for given scope and add listener to it - Interest interest = interests.get(scope); - if (interest == null) { - // create interest, add listener and start monitor - interest = new Interest(); - interests.put(scope, interest); - interest.endpointListeners.add(endpointListener); // add it before monitor starts so we don't miss events - interest.monitor = createInterfaceMonitor(scope, objClass, interest); - interest.monitor.start(); - } else { - // interest already exists, so just add listener to it - if (!interest.endpointListeners.contains(endpointListener)) { - interest.endpointListeners.add(endpointListener); - } - // notify listener of all known endpoints for given scope - // (as EndpointListener contract requires of all added/modified listeners) - for (EndpointDescription endpoint : interest.monitor.getEndpoints()) { - notifyListeners(endpoint, scope, true, Arrays.asList(endpointListener)); - } - } - - // add scope to listener's scopes list - List<String> scopes = endpointListenerScopes.get(endpointListener); - if (scopes == null) { - scopes = new ArrayList<String>(1); - endpointListenerScopes.put(endpointListener, scopes); - } - if (!scopes.contains(scope)) { - scopes.add(scope); - } - } - - public synchronized void removeInterest(ServiceReference<EndpointListener> endpointListener) { - LOG.info("removing EndpointListener interests: {}", endpointListener); - List<String> scopes = endpointListenerScopes.get(endpointListener); - if (scopes == null) { - return; - } - - for (String scope : scopes) { - Interest interest = interests.get(scope); - if (interest != null) { - interest.endpointListeners.remove(endpointListener); - if (interest.endpointListeners.isEmpty()) { - interest.monitor.close(); - interests.remove(scope); - } - } - } - endpointListenerScopes.remove(endpointListener); - } - - protected InterfaceMonitor createInterfaceMonitor(final String scope, String objClass, final Interest interest) { - // holding this object's lock in the callbacks can lead to a deadlock with InterfaceMonitor - EndpointListener endpointListener = new EndpointListener() { - - public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) { - notifyListeners(endpoint, scope, false, interest.endpointListeners); - } - - public void endpointAdded(EndpointDescription endpoint, String matchedFilter) { - notifyListeners(endpoint, scope, true, interest.endpointListeners); - } - }; - return new InterfaceMonitor(zk, objClass, endpointListener, scope); - } - - private void notifyListeners(EndpointDescription endpoint, String currentScope, boolean isAdded, - List<ServiceReference<EndpointListener>> endpointListeners) { - for (ServiceReference<EndpointListener> endpointListenerRef : endpointListeners) { - EndpointListener service = bctx.getService(endpointListenerRef); - try { - EndpointListener endpointListener = (EndpointListener)service; - LOG.trace("matching {} against {}", endpoint, currentScope); - if (matchFilter(bctx, currentScope, endpoint)) { - LOG.debug("Matched {} against {}", endpoint, currentScope); - notifyListener(endpoint, currentScope, isAdded, endpointListenerRef.getBundle(), - endpointListener); - } - } finally { - if (service != null) { - bctx.ungetService(endpointListenerRef); - } - } - } - } - - private static boolean matchFilter(BundleContext bctx, String filter, EndpointDescription endpoint) { - if (filter == null) { - return false; - } - - try { - Filter f = bctx.createFilter(filter); - Dictionary<String, Object> dict = new Hashtable<String, Object>(endpoint.getProperties()); - return f.match(dict); - } catch (Exception e) { - return false; - } - } - - - private void notifyListener(EndpointDescription endpoint, String currentScope, boolean isAdded, - Bundle endpointListenerBundle, EndpointListener endpointListener) { - if (endpointListenerBundle == null) { - LOG.info("listening service was unregistered, ignoring"); - } else if (isAdded) { - LOG.info("calling EndpointListener.endpointAdded: " + endpointListener + " from bundle " - + endpointListenerBundle.getSymbolicName() + " for endpoint: " + endpoint); - endpointListener.endpointAdded(endpoint, currentScope); - } else { - LOG.info("calling EndpointListener.endpointRemoved: " + endpointListener + " from bundle " - + endpointListenerBundle.getSymbolicName() + " for endpoint: " + endpoint); - endpointListener.endpointRemoved(endpoint, currentScope); - } - } - - public synchronized void close() { - for (Interest interest : interests.values()) { - interest.monitor.close(); - } - interests.clear(); - endpointListenerScopes.clear(); - } - - /** - * Only for test case! - */ - protected synchronized Map<String, Interest> getInterests() { - return interests; - } - - /** - * Only for test case! - */ - protected synchronized Map<ServiceReference<EndpointListener>, List<String>> getEndpointListenerScopes() { - return endpointListenerScopes; - } - - protected List<String> getScopes(ServiceReference<?> sref) { - return Utils.removeEmpty(StringPlus.normalize(sref.getProperty(EndpointListener.ENDPOINT_LISTENER_SCOPE))); - } - - public static String getObjectClass(String scope) { - Matcher m = OBJECTCLASS_PATTERN.matcher(scope); - return m.matches() ? m.group(1) : null; - } - - /** - * Returns a service's properties as a map. - * - * @param serviceReference a service reference - * @return the service's properties as a map - */ - public static Map<String, Object> getProperties(ServiceReference<?> serviceReference) { - String[] keys = serviceReference.getPropertyKeys(); - Map<String, Object> props = new HashMap<String, Object>(keys.length); - for (String key : keys) { - Object val = serviceReference.getProperty(key); - props.put(key, val); - } - return props; - } -} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/util/Utils.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/util/Utils.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/util/Utils.java deleted file mode 100644 index afd9c0a..0000000 --- a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/util/Utils.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.discovery.zookeeper.util; - -import java.util.ArrayList; -import java.util.List; - -public final class Utils { - - static final String PATH_PREFIX = "/osgi/service_registry"; - - private Utils() { - // never constructed - } - - public static String getZooKeeperPath(String name) { - return name == null || name.isEmpty() ? PATH_PREFIX : PATH_PREFIX + '/' + name.replace('.', '/'); - } - - /** - * Removes nulls and empty strings from the given string array. - * - * @param strings an array of strings - * @return a new array containing the non-null and non-empty - * elements of the original array in the same order - */ - public static List<String> removeEmpty(List<String> strings) { - List<String> result = new ArrayList<String>(); - for (String s : strings) { - if (s != null && !s.isEmpty()) { - result.add(s); - } - } - return result; - } - - -} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/resources/OSGI-INF/metatype/zookeeper.xml ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/resources/OSGI-INF/metatype/zookeeper.xml b/discovery/zookeeper/src/main/resources/OSGI-INF/metatype/zookeeper.xml new file mode 100644 index 0000000..361fa1e --- /dev/null +++ b/discovery/zookeeper/src/main/resources/OSGI-INF/metatype/zookeeper.xml @@ -0,0 +1,34 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<MetaData xmlns="http://www.osgi.org/xmlns/metadata/v1.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.osgi.org/xmlns/metadata/v1.0.0 http://www.osgi.org/xmlns/metatype/v1.1.0/metatype.xsd + "> + <OCD description="" name="Zookeeper server config" id="zookeeper.server"> + <AD id="clientPort" required="false" type="String" default="2181" description=""/> + <AD id="tickTime" required="false" type="String" default="2000" description=""/> + <AD id="initLimit" required="false" type="String" default="10" description=""/> + <AD id="syncLimit" required="false" type="String" default="5" description=""/> + </OCD> + <Designate pid="org.apache.aries.rsa.discovery.zookeeper.server"> + <Object ocdref="zookeeper.server"/> + </Designate> +</MetaData> \ No newline at end of file
