Adding roundtrip test
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/69bb901e Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/69bb901e Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/69bb901e Branch: refs/heads/master Commit: 69bb901e935452f25ae1ed8ba86484b33deb1bd5 Parents: bd0352f Author: Christian Schneider <[email protected]> Authored: Mon Mar 14 17:49:19 2016 +0100 Committer: Christian Schneider <[email protected]> Committed: Mon Mar 14 17:49:19 2016 +0100 ---------------------------------------------------------------------- discovery/pom.xml | 2 - discovery/zookeeper-server-config/bnd.bnd | 1 - discovery/zookeeper-server-config/pom.xml | 40 --- .../zookeeper/server/config/Activator.java | 110 ------- discovery/zookeeper-server/bnd.bnd | 1 - discovery/zookeeper-server/pom.xml | 50 --- .../discovery/zookeeper/server/Activator.java | 44 --- .../dosgi/discovery/zookeeper/server/Utils.java | 108 ------- .../zookeeper/server/ZookeeperStarter.java | 164 ---------- .../resources/OSGI-INF/metatype/zookeeper.xml | 34 --- .../zookeeper/server/ZookeeperStarterTest.java | 81 ----- discovery/zookeeper/bnd.bnd | 3 +- .../rsa/discovery/zookeeper/Activator.java | 58 ++++ .../discovery/zookeeper/ZooKeeperDiscovery.java | 186 ++++++++++++ .../zookeeper/publish/DiscoveryPlugin.java | 54 ++++ .../publish/PublishingEndpointListener.java | 210 +++++++++++++ .../PublishingEndpointListenerFactory.java | 105 +++++++ .../rsa/discovery/zookeeper/server/Utils.java | 108 +++++++ .../zookeeper/server/ZookeeperStarter.java | 164 ++++++++++ .../subscribe/EndpointListenerTracker.java | 56 ++++ .../zookeeper/subscribe/InterfaceMonitor.java | 262 ++++++++++++++++ .../subscribe/InterfaceMonitorManager.java | 261 ++++++++++++++++ .../rsa/discovery/zookeeper/util/Utils.java | 54 ++++ .../dosgi/discovery/zookeeper/Activator.java | 43 --- .../discovery/zookeeper/ZooKeeperDiscovery.java | 186 ------------ .../zookeeper/publish/DiscoveryPlugin.java | 54 ---- .../publish/PublishingEndpointListener.java | 210 ------------- .../PublishingEndpointListenerFactory.java | 105 ------- .../subscribe/EndpointListenerTracker.java | 56 ---- .../zookeeper/subscribe/InterfaceMonitor.java | 262 ---------------- .../subscribe/InterfaceMonitorManager.java | 261 ---------------- .../dosgi/discovery/zookeeper/util/Utils.java | 54 ---- .../resources/OSGI-INF/metatype/zookeeper.xml | 34 +++ .../zookeeper/DiscoveryDriverTest.java | 135 +++++++++ .../FindInZooKeeperCustomizerTest.java | 301 +++++++++++++++++++ .../InterfaceDataMonitorListenerImplTest.java | 183 +++++++++++ .../zookeeper/ZookeeperDiscoveryTest.java | 56 ++++ .../PublishingEndpointListenerFactoryTest.java | 102 +++++++ .../publish/PublishingEndpointListenerTest.java | 209 +++++++++++++ .../zookeeper/server/ZookeeperStarterTest.java | 82 +++++ .../subscribe/InterfaceMonitorManagerTest.java | 113 +++++++ .../subscribe/InterfaceMonitorTest.java | 68 +++++ .../rsa/discovery/zookeeper/util/UtilsTest.java | 37 +++ .../zookeeper/DiscoveryDriverTest.java | 135 --------- .../FindInZooKeeperCustomizerTest.java | 301 ------------------- .../InterfaceDataMonitorListenerImplTest.java | 183 ----------- .../zookeeper/ZookeeperDiscoveryTest.java | 55 ---- .../PublishingEndpointListenerFactoryTest.java | 100 ------ .../publish/PublishingEndpointListenerTest.java | 207 ------------- .../subscribe/InterfaceMonitorManagerTest.java | 112 ------- .../subscribe/InterfaceMonitorTest.java | 67 ----- .../discovery/zookeeper/util/UtilsTest.java | 35 --- examples/echotcp/Readme.md | 44 +++ examples/echotcp/api/bnd.bnd | 1 + examples/echotcp/api/pom.xml | 11 + .../rsa/examples/echotcp/api/EchoService.java | 5 + .../aries/rsa/examples/echotcp/api/packageinfo | 19 ++ examples/echotcp/consumer/bnd.bnd | 1 + examples/echotcp/consumer/pom.xml | 39 +++ .../examples/echotcp/consumer/EchoConsumer.java | 41 +++ examples/echotcp/pom.xml | 76 +++++ examples/echotcp/service/bnd.bnd | 1 + examples/echotcp/service/pom.xml | 39 +++ .../echotcp/service/EchoServiceImpl.java | 14 + examples/pom.xml | 43 +++ features/src/main/resources/features.xml | 4 - itests/felix/pom.xml | 35 ++- .../aries/rsa/itests/felix/RsaTestBase.java | 120 ++++++++ .../rsa/itests/felix/TestDiscoveryExport.java | 90 +----- .../aries/rsa/itests/felix/TestRoundTrip.java | 88 ++++++ .../felix/ZookeeperDiscoveryConfigurer.java | 28 ++ .../itests/felix/ZookeeperServerConfigurer.java | 32 ++ itests/pom.xml | 1 - itests/testbundle-service-tcp/bnd.bnd | 3 - itests/testbundle-service-tcp/pom.xml | 40 --- .../aries/rsa/itests/tcp/api/EchoService.java | 5 - .../aries/rsa/itests/tcp/service/Activator.java | 25 -- .../rsa/itests/tcp/service/EchoServiceImpl.java | 12 - pom.xml | 1 + .../aries/rsa/core/ClientServiceFactory.java | 4 - spi/pom.xml | 19 ++ 81 files changed, 3503 insertions(+), 3240 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/pom.xml ---------------------------------------------------------------------- diff --git a/discovery/pom.xml b/discovery/pom.xml index 40b5f80..9fc449b 100644 --- a/discovery/pom.xml +++ b/discovery/pom.xml @@ -34,7 +34,5 @@ <modules> <module>local</module> <module>zookeeper</module> - <module>zookeeper-server</module> - <module>zookeeper-server-config</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server-config/bnd.bnd ---------------------------------------------------------------------- diff --git a/discovery/zookeeper-server-config/bnd.bnd b/discovery/zookeeper-server-config/bnd.bnd deleted file mode 100644 index 769558e..0000000 --- a/discovery/zookeeper-server-config/bnd.bnd +++ /dev/null @@ -1 +0,0 @@ -Bundle-Activator: org.apache.cxf.dosgi.discovery.zookeeper.server.config.Activator http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server-config/pom.xml ---------------------------------------------------------------------- diff --git a/discovery/zookeeper-server-config/pom.xml b/discovery/zookeeper-server-config/pom.xml deleted file mode 100644 index 4f7ac7a..0000000 --- a/discovery/zookeeper-server-config/pom.xml +++ /dev/null @@ -1,40 +0,0 @@ -<?xml version='1.0' encoding='UTF-8' ?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.aries.rsa</groupId> - <artifactId>parent</artifactId> - <version>1.8-SNAPSHOT</version> - <relativePath>../../parent/pom.xml</relativePath> - </parent> - - <groupId>org.apache.aries.rsa.discovery</groupId> - <artifactId>zookeeper-server-config</artifactId> - <packaging>bundle</packaging> - <name>Aries Remote Service Admin Discovery Zookeeper Config</name> - - <properties> - <topDirectoryLocation>../..</topDirectoryLocation> - </properties> - -</project> http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server-config/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/config/Activator.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper-server-config/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/config/Activator.java b/discovery/zookeeper-server-config/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/config/Activator.java deleted file mode 100644 index e92fe0b..0000000 --- a/discovery/zookeeper-server-config/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/config/Activator.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.discovery.zookeeper.server.config; - -import java.io.IOException; -import java.net.ServerSocket; -import java.util.Dictionary; -import java.util.Hashtable; - -import org.osgi.framework.BundleActivator; -import org.osgi.framework.BundleContext; -import org.osgi.framework.ServiceReference; -import org.osgi.service.cm.Configuration; -import org.osgi.service.cm.ConfigurationAdmin; -import org.osgi.service.cm.ManagedService; -import org.osgi.util.tracker.ServiceTracker; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class Activator implements BundleActivator { - - private static final Logger LOG = LoggerFactory.getLogger(Activator.class); - private static final String ZOOKEEPER_PORT = "org.apache.aries.rsa.discovery.zookeeper.port"; - private static final String PID = "org.apache.aries.rsa.discovery.zookeeper.server"; - private ServiceTracker<ConfigurationAdmin, ConfigurationAdmin> st; - - public void start(BundleContext context) throws Exception { - synchronized (Activator.class) { - // Only one thread gets to set the port number - if (System.getProperty(ZOOKEEPER_PORT) == null) { - String port = getFreePort(); - System.setProperty(ZOOKEEPER_PORT, port); - LOG.info("Global ZooKeeper port: {}", port); - } - } - - st = new ServiceTracker<ConfigurationAdmin, ConfigurationAdmin>(context, ConfigurationAdmin.class, null) { - @Override - public ConfigurationAdmin addingService(ServiceReference<ConfigurationAdmin> reference) { - ConfigurationAdmin service = super.addingService(reference); - try { - Configuration cfg = service.getConfiguration(PID, null); - Dictionary<String, Object> props = new Hashtable<String, Object>(); - String zp = System.getProperty(ZOOKEEPER_PORT); - props.put("clientPort", zp); - cfg.update(props); - LOG.debug("Set ZooKeeper client port to {}", zp); - } catch (IOException e) { - LOG.error("Failed to configure ZooKeeper server!", e); - } - return service; - } - }; - st.open(); - - // The following section is done synchronously otherwise it doesn't happen in time for the CT - ServiceReference<?>[] refs = context.getServiceReferences(ManagedService.class.getName(), - "(service.pid=org.apache.cxf.dosgi.discovery.zookeeper)"); - if (refs == null || refs.length == 0) { - throw new RuntimeException("This bundle must be started after the bundle with the ZooKeeper " - + "Discovery Managed Service was started."); - } - - Dictionary<String, Object> props = new Hashtable<String, Object>(); - props.put("zookeeper.host", "127.0.0.1"); - props.put("zookeeper.port", System.getProperty(ZOOKEEPER_PORT)); - - ManagedService ms = (ManagedService) context.getService(refs[0]); - try { - ms.updated(props); - } finally { - if (ms != null) { - context.ungetService(refs[0]); - } - } - LOG.debug("Passed the zookeeper.host property to the ZooKeeper Client managed service."); - } - - private String getFreePort() { - try { - ServerSocket ss = new ServerSocket(0); - String port = "" + ss.getLocalPort(); - ss.close(); - return port; - } catch (IOException e) { - LOG.error("Failed to find a free port!", e); - return null; - } - } - - public void stop(BundleContext context) throws Exception { - st.close(); - } -} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server/bnd.bnd ---------------------------------------------------------------------- diff --git a/discovery/zookeeper-server/bnd.bnd b/discovery/zookeeper-server/bnd.bnd deleted file mode 100644 index cef642b..0000000 --- a/discovery/zookeeper-server/bnd.bnd +++ /dev/null @@ -1 +0,0 @@ -Bundle-Activator: org.apache.cxf.dosgi.discovery.zookeeper.server.Activator http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server/pom.xml ---------------------------------------------------------------------- diff --git a/discovery/zookeeper-server/pom.xml b/discovery/zookeeper-server/pom.xml deleted file mode 100644 index e6bcdba..0000000 --- a/discovery/zookeeper-server/pom.xml +++ /dev/null @@ -1,50 +0,0 @@ -<?xml version='1.0' encoding='UTF-8' ?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.aries.rsa</groupId> - <artifactId>parent</artifactId> - <version>1.8-SNAPSHOT</version> - <relativePath>../../parent/pom.xml</relativePath> - </parent> - - <groupId>org.apache.aries.rsa.discovery</groupId> - <artifactId>zookeeper-server</artifactId> - <packaging>bundle</packaging> - <name>Aries Remote Service Admin Discovery Zookeeper Server</name> - - - <properties> - <topDirectoryLocation>../..</topDirectoryLocation> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - <scope>provided</scope> - </dependency> - - </dependencies> - -</project> http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Activator.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Activator.java b/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Activator.java deleted file mode 100644 index 17c5568..0000000 --- a/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Activator.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.discovery.zookeeper.server; - -import java.util.Dictionary; -import java.util.Hashtable; - -import org.osgi.framework.BundleActivator; -import org.osgi.framework.BundleContext; -import org.osgi.framework.Constants; - -public class Activator implements BundleActivator { - - ZookeeperStarter zkStarter; - - public void start(BundleContext context) throws Exception { - zkStarter = new ZookeeperStarter(context); - Dictionary<String, Object> props = new Hashtable<String, Object>(); - props.put(Constants.SERVICE_PID, "org.apache.aries.rsa.discovery.zookeeper.server"); - context.registerService(org.osgi.service.cm.ManagedService.class.getName(), zkStarter, props); - } - - public void stop(BundleContext context) throws Exception { - if (zkStarter != null) { - zkStarter.shutdown(); - } - } -} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Utils.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Utils.java b/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Utils.java deleted file mode 100644 index fe3c663..0000000 --- a/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Utils.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.discovery.zookeeper.server; - -import java.util.ArrayList; -import java.util.Dictionary; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -/** - * General purpose utility methods. - */ -public final class Utils { - - private Utils() { - // prevent instantiation - } - - /** - * Remove entries whose values are empty from the given dictionary. - * - * @param dict a dictionary - */ - public static void removeEmptyValues(Dictionary<String, ?> dict) { - List<String> keysToRemove = new ArrayList<String>(); - Enumeration<String> keys = dict.keys(); - while (keys.hasMoreElements()) { - String key = keys.nextElement(); - Object value = dict.get(key); - if (value instanceof String && "".equals(value)) { - keysToRemove.add(key); - } - } - for (String key : keysToRemove) { - dict.remove(key); - } - } - - /** - * Puts the given key-value pair in the given dictionary if the key does not - * already exist in it or if its existing value is null. - * - * @param dict a dictionary - * @param key the key - * @param value the default value to set - */ - public static void setDefault(Dictionary<String, String> dict, String key, String value) { - if (dict.get(key) == null) { - dict.put(key, value); - } - } - - /** - * Converts the given Dictionary to a Map. - * - * @param dict a dictionary - * @param <K> the key type - * @param <V> the value type - * @return the converted map, or an empty map if the given dictionary is null - */ - public static <K, V> Map<K, V> toMap(Dictionary<K, V> dict) { - Map<K, V> map = new HashMap<K, V>(); - if (dict != null) { - Enumeration<K> keys = dict.keys(); - while (keys.hasMoreElements()) { - K key = keys.nextElement(); - map.put(key, dict.get(key)); - } - } - return map; - } - - /** - * Converts a Dictionary into a Properties instance. - * - * @param dict a dictionary - * @param <K> the key type - * @param <V> the value type - * @return the properties - */ - public static <K, V> Properties toProperties(Dictionary<K, V> dict) { - Properties props = new Properties(); - for (Enumeration<K> e = dict.keys(); e.hasMoreElements();) { - K key = e.nextElement(); - props.put(key, dict.get(key)); - } - return props; - } -} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarter.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarter.java b/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarter.java deleted file mode 100644 index bd5618f..0000000 --- a/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarter.java +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.discovery.zookeeper.server; - -import java.io.File; -import java.io.IOException; -import java.util.Dictionary; -import java.util.Map; - -import org.apache.zookeeper.server.ServerConfig; -import org.apache.zookeeper.server.ZooKeeperServerMain; -import org.apache.zookeeper.server.quorum.QuorumPeerConfig; -import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; -import org.apache.zookeeper.server.quorum.QuorumPeerMain; -import org.osgi.framework.BundleContext; -import org.osgi.service.cm.ConfigurationException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ZookeeperStarter implements org.osgi.service.cm.ManagedService { - - private static final Logger LOG = LoggerFactory.getLogger(ZookeeperStarter.class); //NOPMD - using log4j here - - protected ZookeeperServer main; - private final BundleContext bundleContext; - private Thread zkMainThread; - private Map<String, ?> curConfiguration; - - public ZookeeperStarter(BundleContext ctx) { - bundleContext = ctx; - } - - synchronized void shutdown() { - if (main != null) { - LOG.info("Shutting down ZooKeeper server"); - try { - main.shutdown(); - if (zkMainThread != null) { - zkMainThread.join(); - } - } catch (Throwable e) { - LOG.error(e.getMessage(), e); - } - main = null; - zkMainThread = null; - } - } - - private void setDefaults(Dictionary<String, String> dict) throws IOException { - Utils.removeEmptyValues(dict); // to avoid NumberFormatExceptions - Utils.setDefault(dict, "tickTime", "2000"); - Utils.setDefault(dict, "initLimit", "10"); - Utils.setDefault(dict, "syncLimit", "5"); - Utils.setDefault(dict, "clientPort", "2181"); - Utils.setDefault(dict, "dataDir", new File(bundleContext.getDataFile(""), "zkdata").getCanonicalPath()); - } - - @SuppressWarnings("unchecked") - public synchronized void updated(Dictionary<String, ?> dict) throws ConfigurationException { - LOG.debug("Received configuration update for Zookeeper Server: " + dict); - try { - if (dict != null) { - setDefaults((Dictionary<String, String>)dict); - } - Map<String, ?> configMap = Utils.toMap(dict); - if (!configMap.equals(curConfiguration)) { // only if something actually changed - shutdown(); - curConfiguration = configMap; - // config is null if it doesn't exist, is being deleted or has not yet been loaded - // in which case we just stop running - if (dict != null) { - startFromConfig(parseConfig(dict)); - LOG.info("Applied configuration update: " + dict); - } - } - } catch (Exception th) { - LOG.error("Problem applying configuration update: " + dict, th); - } - } - - private QuorumPeerConfig parseConfig(Dictionary<String, ?> dict) throws IOException, ConfigException { - QuorumPeerConfig config = new QuorumPeerConfig(); - config.parseProperties(Utils.toProperties(dict)); - return config; - } - - protected void startFromConfig(final QuorumPeerConfig config) { - int numServers = config.getServers().size(); - main = numServers > 1 ? new MyQuorumPeerMain(config) : new MyZooKeeperServerMain(config); - zkMainThread = new Thread(new Runnable() { - public void run() { - try { - main.startup(); - } catch (Throwable e) { - LOG.error("Problem running ZooKeeper server.", e); - } - } - }); - zkMainThread.start(); - } - - interface ZookeeperServer { - void startup() throws IOException; - void shutdown(); - } - - static class MyQuorumPeerMain extends QuorumPeerMain implements ZookeeperServer { - - private QuorumPeerConfig config; - - MyQuorumPeerMain(QuorumPeerConfig config) { - this.config = config; - } - - public void startup() throws IOException { - runFromConfig(config); - } - - public void shutdown() { - if (null != quorumPeer) { - quorumPeer.shutdown(); - } - } - } - - static class MyZooKeeperServerMain extends ZooKeeperServerMain implements ZookeeperServer { - - private QuorumPeerConfig config; - - MyZooKeeperServerMain(QuorumPeerConfig config) { - this.config = config; - } - - public void startup() throws IOException { - ServerConfig serverConfig = new ServerConfig(); - serverConfig.readFrom(config); - runFromConfig(serverConfig); - } - - public void shutdown() { - try { - super.shutdown(); - } catch (Exception e) { - LOG.error("Error shutting down ZooKeeper", e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server/src/main/resources/OSGI-INF/metatype/zookeeper.xml ---------------------------------------------------------------------- diff --git a/discovery/zookeeper-server/src/main/resources/OSGI-INF/metatype/zookeeper.xml b/discovery/zookeeper-server/src/main/resources/OSGI-INF/metatype/zookeeper.xml deleted file mode 100644 index efd9403..0000000 --- a/discovery/zookeeper-server/src/main/resources/OSGI-INF/metatype/zookeeper.xml +++ /dev/null @@ -1,34 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. ---> -<MetaData xmlns="http://www.osgi.org/xmlns/metadata/v1.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation=" - http://www.osgi.org/xmlns/metadata/v1.0.0 http://www.osgi.org/xmlns/metatype/v1.1.0/metatype.xsd - "> - <OCD description="" name="Zookeeper server config" id="org.apache.cxf.dosgi.discovery.zookeeper.server"> - <AD id="clientPort" required="false" type="String" default="2181" description=""/> - <AD id="tickTime" required="false" type="String" default="2000" description=""/> - <AD id="initLimit" required="false" type="String" default="10" description=""/> - <AD id="syncLimit" required="false" type="String" default="5" description=""/> - </OCD> - <Designate pid="org.apache.cxf.dosgi.discovery.zookeeper.server"> - <Object ocdref="org.apache.cxf.dosgi.discovery.zookeeper.server"/> - </Designate> -</MetaData> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarterTest.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper-server/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarterTest.java b/discovery/zookeeper-server/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarterTest.java deleted file mode 100644 index 17ca117..0000000 --- a/discovery/zookeeper-server/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarterTest.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.discovery.zookeeper.server; - -import java.io.File; -import java.util.Dictionary; -import java.util.Hashtable; - -import junit.framework.TestCase; - -import org.apache.cxf.dosgi.discovery.zookeeper.server.ZookeeperStarter.MyZooKeeperServerMain; -import org.apache.zookeeper.server.quorum.QuorumPeerConfig; -import org.easymock.classextension.EasyMock; -import org.easymock.classextension.IMocksControl; -import org.osgi.framework.BundleContext; - -import static org.easymock.EasyMock.expect; -import static org.easymock.classextension.EasyMock.replay; -import static org.easymock.classextension.EasyMock.verify; - -public class ZookeeperStarterTest extends TestCase { - - public void testUpdateConfig() throws Exception { - final File tempDir = new File("target"); - IMocksControl control = EasyMock.createControl(); - BundleContext bc = control.createMock(BundleContext.class); - expect(bc.getDataFile("")).andReturn(tempDir); - final MyZooKeeperServerMain mockServer = control.createMock(MyZooKeeperServerMain.class); - control.replay(); - - ZookeeperStarter starter = new ZookeeperStarter(bc) { - @Override - protected void startFromConfig(QuorumPeerConfig config) { - assertEquals(1234, config.getClientPortAddress().getPort()); - assertTrue(config.getDataDir().contains(tempDir + File.separator + "zkdata")); - assertEquals(2000, config.getTickTime()); - assertEquals(10, config.getInitLimit()); - assertEquals(5, config.getSyncLimit()); - this.main = mockServer; - } - }; - Dictionary<String, Object> props = new Hashtable<String, Object>(); - props.put("clientPort", "1234"); - starter.updated(props); - assertNotNull(starter.main); - - control.verify(); - } - - public void testRemoveConfiguration() throws Exception { - BundleContext bc = EasyMock.createMock(BundleContext.class); - MyZooKeeperServerMain zkServer = EasyMock.createMock(MyZooKeeperServerMain.class); - zkServer.shutdown(); - EasyMock.expectLastCall(); - - replay(zkServer); - - ZookeeperStarter starter = new ZookeeperStarter(bc); - starter.main = zkServer; - starter.updated(null); - - verify(zkServer); - assertNull("main should be null", starter.main); - } -} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/bnd.bnd ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/bnd.bnd b/discovery/zookeeper/bnd.bnd index 5c1f23d..3e572c6 100644 --- a/discovery/zookeeper/bnd.bnd +++ b/discovery/zookeeper/bnd.bnd @@ -1 +1,2 @@ -Bundle-Activator: org.apache.cxf.dosgi.discovery.zookeeper.Activator +Bundle-Activator: org.apache.aries.rsa.discovery.zookeeper.Activator + http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/Activator.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/Activator.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/Activator.java new file mode 100644 index 0000000..3b17f35 --- /dev/null +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/Activator.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.discovery.zookeeper; + +import java.util.Dictionary; +import java.util.Hashtable; + +import org.apache.aries.rsa.discovery.zookeeper.server.ZookeeperStarter; +import org.osgi.framework.BundleActivator; +import org.osgi.framework.BundleContext; +import org.osgi.framework.Constants; +import org.osgi.service.cm.ManagedService; + +public class Activator implements BundleActivator { + + private static final String PID_DISCOVERY_ZOOKEEPER = "org.apache.aries.rsa.discovery.zookeeper"; + private static final String PID_ZOOKEEPER_SERVER = "org.apache.aries.rsa.discovery.zookeeper.server"; + private ZooKeeperDiscovery zkd; + private ZookeeperStarter zkStarter; + + public synchronized void start(BundleContext bc) throws Exception { + zkd = new ZooKeeperDiscovery(bc); + bc.registerService(ManagedService.class, zkd, configProperties(PID_DISCOVERY_ZOOKEEPER)); + + zkStarter = new ZookeeperStarter(bc); + bc.registerService(ManagedService.class, zkStarter, configProperties(PID_ZOOKEEPER_SERVER)); + } + + public synchronized void stop(BundleContext bc) throws Exception { + zkd.stop(true); + + if (zkStarter != null) { + zkStarter.shutdown(); + } + } + + private Dictionary<String, String> configProperties(String pid) { + Dictionary<String, String> props = new Hashtable<String, String>(); + props.put(Constants.SERVICE_PID, pid); + return props; + } +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java new file mode 100644 index 0000000..085c074 --- /dev/null +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.discovery.zookeeper; + +import java.io.IOException; +import java.util.Dictionary; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; + +import org.apache.aries.rsa.discovery.zookeeper.publish.PublishingEndpointListenerFactory; +import org.apache.aries.rsa.discovery.zookeeper.subscribe.EndpointListenerTracker; +import org.apache.aries.rsa.discovery.zookeeper.subscribe.InterfaceMonitorManager; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.osgi.framework.BundleContext; +import org.osgi.service.cm.ConfigurationException; +import org.osgi.service.cm.ManagedService; +import org.osgi.service.remoteserviceadmin.EndpointListener; +import org.osgi.util.tracker.ServiceTracker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ZooKeeperDiscovery implements Watcher, ManagedService { + + public static final String DISCOVERY_ZOOKEEPER_ID = "org.apache.cxf.dosgi.discovery.zookeeper"; + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperDiscovery.class); + + private final BundleContext bctx; + + private PublishingEndpointListenerFactory endpointListenerFactory; + private ServiceTracker<EndpointListener, EndpointListener> endpointListenerTracker; + private InterfaceMonitorManager imManager; + private ZooKeeper zkClient; + private boolean closed; + private boolean started; + + private Dictionary<String, ?> curConfiguration; + + public ZooKeeperDiscovery(BundleContext bctx) { + this.bctx = bctx; + } + + public synchronized void updated(Dictionary<String, ?> configuration) throws ConfigurationException { + LOG.debug("Received configuration update for Zookeeper Discovery: {}", configuration); + // make changes only if config actually changed, to prevent unnecessary ZooKeeper reconnections + if (!ZooKeeperDiscovery.toMap(configuration).equals(ZooKeeperDiscovery.toMap(curConfiguration))) { + stop(false); + curConfiguration = configuration; + // config is null if it doesn't exist, is being deleted or has not yet been loaded + // in which case we just stop running + if (!closed && configuration != null) { + try { + createZookeeper(configuration); + } catch (IOException e) { + throw new ConfigurationException(null, "Error starting zookeeper client", e); + } + } + } + } + + private synchronized void start() { + if (closed) { + return; + } + if (started) { + // we must be re-entrant, i.e. can be called when already started + LOG.debug("ZookeeperDiscovery already started"); + return; + } + LOG.debug("starting ZookeeperDiscovery"); + endpointListenerFactory = new PublishingEndpointListenerFactory(zkClient, bctx); + endpointListenerFactory.start(); + imManager = new InterfaceMonitorManager(bctx, zkClient); + endpointListenerTracker = new EndpointListenerTracker(bctx, imManager); + endpointListenerTracker.open(); + started = true; + } + + public synchronized void stop(boolean close) { + if (started) { + LOG.debug("stopping ZookeeperDiscovery"); + } + started = false; + closed |= close; + if (endpointListenerFactory != null) { + endpointListenerFactory.stop(); + } + if (endpointListenerTracker != null) { + endpointListenerTracker.close(); + } + if (imManager != null) { + imManager.close(); + } + if (zkClient != null) { + try { + zkClient.close(); + } catch (InterruptedException e) { + LOG.error("Error closing ZooKeeper", e); + } + } + } + + protected ZooKeeper createZooKeeper(String host, String port, int timeout) throws IOException { + LOG.info("ZooKeeper discovery connecting to {}:{} with timeout {}", + new Object[]{host, port, timeout}); + return new ZooKeeper(host + ":" + port, timeout, this); + } + + /* Callback for ZooKeeper */ + public void process(WatchedEvent event) { + LOG.debug("got ZooKeeper event " + event); + switch (event.getState()) { + case SyncConnected: + LOG.info("Connection to ZooKeeper established"); + // this event can be triggered more than once in a row (e.g. after Disconnected event), + // so we must be re-entrant here + start(); + break; + + case Expired: + LOG.info("Connection to ZooKeeper expired. Trying to create a new connection"); + stop(false); + try { + createZookeeper(curConfiguration); + } catch (IOException e) { + LOG.error("Error starting zookeeper client", e); + } + break; + + default: + // ignore other events + break; + } + } + + private void createZookeeper(Dictionary<String, ?> config) throws IOException { + String host = (String)getWithDefault(config, "zookeeper.host", "localhost"); + String port = (String)getWithDefault(config, "zookeeper.port", "2181"); + int timeout = Integer.parseInt((String)getWithDefault(config, "zookeeper.timeout", "3000")); + zkClient = createZooKeeper(host, port, timeout); + } + + public Object getWithDefault(Dictionary<String, ?> config, String key, Object defaultValue) { + Object value = config.get(key); + return value != null ? value : defaultValue; + } + + /** + * Converts the given Dictionary to a Map. + * + * @param dict a dictionary + * @param <K> the key type + * @param <V> the value type + * @return the converted map, or an empty map if the given dictionary is null + */ + public static <K, V> Map<K, V> toMap(Dictionary<K, V> dict) { + Map<K, V> map = new HashMap<K, V>(); + if (dict != null) { + Enumeration<K> keys = dict.keys(); + while (keys.hasMoreElements()) { + K key = keys.nextElement(); + map.put(key, dict.get(key)); + } + } + return map; + } +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/DiscoveryPlugin.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/DiscoveryPlugin.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/DiscoveryPlugin.java new file mode 100644 index 0000000..033bee2 --- /dev/null +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/DiscoveryPlugin.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.discovery.zookeeper.publish; + +import java.util.Map; + +/** + * This interface allows transformation of service registration information before it is pushed into the ZooKeeper + * discovery system. + * It can be useful for situations where a host name or port number needs to be changed in cases where the host running + * the service is known differently from the outside to what the local Java process thinks it is. + * Extra service properties can also be added to the registration which can be useful to refine the remote service + * lookup process. <p/> + * + * DiscoveryPlugins use the OSGi WhiteBoard pattern. To add one to the system, register an instance under this interface + * with the OSGi Service Registry. All registered DiscoveryPlugin instances are visited and given a chance to + * process the information before it is pushed into ZooKeeper. <p/> + * + * Note that the changes made using this plugin do not modify the local service registration. + * + */ +public interface DiscoveryPlugin { + + /** + * Process service registration information. Plugins can change this information before it is published into the + * ZooKeeper discovery system. + * + * @param mutableProperties A map of service registration properties. The map is mutable and any changes to the map + * will be reflected in the ZooKeeper registration. + * @param endpointKey The key under which the service is registered in ZooKeeper. This key typically has the + * following format: hostname#port##context. While the actual value of this key is not actually used by the + * system (people can use it as a hint to understand where the service is located), the value <i>must</i> be + * unique for all services of a given type. + * @return The <tt>endpointKey</tt> value to be used. If there is no need to change this simply return the value + * of the <tt>endpointKey</tt> parameter. + */ + String process(Map<String, Object> mutableProperties, String endpointKey); +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java new file mode 100644 index 0000000..75efbd3 --- /dev/null +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.discovery.zookeeper.publish; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser; +import org.apache.aries.rsa.discovery.endpoint.PropertiesMapper; +import org.apache.aries.rsa.discovery.zookeeper.util.Utils; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.KeeperException.NodeExistsException; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; +import org.osgi.framework.BundleContext; +import org.osgi.service.remoteserviceadmin.EndpointDescription; +import org.osgi.service.remoteserviceadmin.EndpointListener; +import org.osgi.util.tracker.ServiceTracker; +import org.osgi.xmlns.rsa.v1_0.EndpointDescriptionType; +import org.osgi.xmlns.rsa.v1_0.PropertyType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Listens for local Endpoints and publishes them to ZooKeeper. + */ +public class PublishingEndpointListener implements EndpointListener { + + private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListener.class); + + private final ZooKeeper zk; + private final ServiceTracker<DiscoveryPlugin, DiscoveryPlugin> discoveryPluginTracker; + private final List<EndpointDescription> endpoints = new ArrayList<EndpointDescription>(); + private boolean closed; + + private final EndpointDescriptionParser endpointDescriptionParser; + + public PublishingEndpointListener(ZooKeeper zk, BundleContext bctx) { + this.zk = zk; + discoveryPluginTracker = new ServiceTracker<DiscoveryPlugin, DiscoveryPlugin>(bctx, + DiscoveryPlugin.class, null); + discoveryPluginTracker.open(); + endpointDescriptionParser = new EndpointDescriptionParser(); + } + + public void endpointAdded(EndpointDescription endpoint, String matchedFilter) { + LOG.info("Local EndpointDescription added: {}", endpoint); + + synchronized (endpoints) { + if (closed) { + return; + } + if (endpoints.contains(endpoint)) { + // TODO -> Should the published endpoint be updated here? + return; + } + + try { + addEndpoint(endpoint); + endpoints.add(endpoint); + } catch (Exception ex) { + LOG.error("Exception while processing the addition of an endpoint.", ex); + } + } + } + + private void addEndpoint(EndpointDescription endpoint) throws URISyntaxException, KeeperException, + InterruptedException, IOException { + Collection<String> interfaces = endpoint.getInterfaces(); + String endpointKey = getKey(endpoint); + Map<String, Object> props = new HashMap<String, Object>(endpoint.getProperties()); + + // process plugins + Object[] plugins = discoveryPluginTracker.getServices(); + if (plugins != null) { + for (Object plugin : plugins) { + if (plugin instanceof DiscoveryPlugin) { + endpointKey = ((DiscoveryPlugin)plugin).process(props, endpointKey); + } + } + } + + for (String name : interfaces) { + String path = Utils.getZooKeeperPath(name); + String fullPath = path + '/' + endpointKey; + LOG.info("Creating ZooKeeper node for service with path {}", fullPath); + createPath(path, zk); + List<PropertyType> propsOut = new PropertiesMapper().fromProps(props); + EndpointDescriptionType epd = new EndpointDescriptionType(); + epd.getProperty().addAll(propsOut); + byte[] epData = endpointDescriptionParser.getData(epd); + createEphemeralNode(fullPath, epData); + } + } + + private void createEphemeralNode(String fullPath, byte[] data) throws KeeperException, InterruptedException { + try { + zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + } catch (NodeExistsException nee) { + // this sometimes happens after a ZooKeeper node dies and the ephemeral node + // that belonged to the old session was not yet deleted. We need to make our + // session the owner of the node so it won't get deleted automatically - + // we do this by deleting and recreating it ourselves. + LOG.info("node for endpoint already exists, recreating: {}", fullPath); + try { + zk.delete(fullPath, -1); + } catch (NoNodeException nne) { + // it's a race condition, but as long as it got deleted - it's ok + } + zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + } + } + + public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) { + LOG.info("Local EndpointDescription removed: {}", endpoint); + + synchronized (endpoints) { + if (closed) { + return; + } + if (!endpoints.contains(endpoint)) { + return; + } + + try { + removeEndpoint(endpoint); + endpoints.remove(endpoint); + } catch (Exception ex) { + LOG.error("Exception while processing the removal of an endpoint", ex); + } + } + } + + private void removeEndpoint(EndpointDescription endpoint) throws UnknownHostException, URISyntaxException { + Collection<String> interfaces = endpoint.getInterfaces(); + String endpointKey = getKey(endpoint); + for (String name : interfaces) { + String path = Utils.getZooKeeperPath(name); + String fullPath = path + '/' + endpointKey; + LOG.debug("Removing ZooKeeper node: {}", fullPath); + try { + zk.delete(fullPath, -1); + } catch (Exception ex) { + LOG.debug("Error while removing endpoint: {}", ex); // e.g. session expired + } + } + } + + private static void createPath(String path, ZooKeeper zk) throws KeeperException, InterruptedException { + StringBuilder current = new StringBuilder(); + List<String> parts = Utils.removeEmpty(Arrays.asList(path.split("/"))); + for (String part : parts) { + current.append('/'); + current.append(part); + try { + zk.create(current.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } catch (NodeExistsException nee) { + // it's not the first node with this path to ever exist - that's normal + } + } + } + + private static String getKey(EndpointDescription endpoint) throws URISyntaxException { + URI uri = new URI(endpoint.getId()); + return new StringBuilder().append(uri.getHost()).append("#").append(uri.getPort()) + .append("#").append(uri.getPath().replace('/', '#')).toString(); + } + + public void close() { + LOG.debug("closing - removing all endpoints"); + synchronized (endpoints) { + closed = true; + for (EndpointDescription endpoint : endpoints) { + try { + removeEndpoint(endpoint); + } catch (Exception ex) { + LOG.error("Exception while removing endpoint during close", ex); + } + } + endpoints.clear(); + } + discoveryPluginTracker.close(); + } +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java new file mode 100644 index 0000000..1eabec3 --- /dev/null +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.discovery.zookeeper.publish; + +import java.util.ArrayList; +import java.util.Dictionary; +import java.util.Hashtable; +import java.util.List; + +import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery; +import org.apache.zookeeper.ZooKeeper; +import org.osgi.framework.Bundle; +import org.osgi.framework.BundleContext; +import org.osgi.framework.Constants; +import org.osgi.framework.ServiceFactory; +import org.osgi.framework.ServiceRegistration; +import org.osgi.service.remoteserviceadmin.EndpointListener; +import org.osgi.service.remoteserviceadmin.RemoteConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Creates local EndpointListeners that publish to ZooKeeper. + */ +public class PublishingEndpointListenerFactory implements ServiceFactory<PublishingEndpointListener> { + + private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListenerFactory.class); + + private final BundleContext bctx; + private final ZooKeeper zk; + private final List<PublishingEndpointListener> listeners = new ArrayList<PublishingEndpointListener>(); + private ServiceRegistration<?> serviceRegistration; + + public PublishingEndpointListenerFactory(ZooKeeper zk, BundleContext bctx) { + this.bctx = bctx; + this.zk = zk; + } + + public PublishingEndpointListener getService(Bundle b, ServiceRegistration<PublishingEndpointListener> sr) { + LOG.debug("new EndpointListener from factory"); + synchronized (listeners) { + PublishingEndpointListener pel = new PublishingEndpointListener(zk, bctx); + listeners.add(pel); + return pel; + } + } + + public void ungetService(Bundle b, ServiceRegistration<PublishingEndpointListener> sr, + PublishingEndpointListener pel) { + LOG.debug("remove EndpointListener"); + synchronized (listeners) { + if (listeners.remove(pel)) { + pel.close(); + } + } + } + + public synchronized void start() { + Dictionary<String, String> props = new Hashtable<String, String>(); + String uuid = bctx.getProperty(Constants.FRAMEWORK_UUID); + props.put(EndpointListener.ENDPOINT_LISTENER_SCOPE, + String.format("(&(%s=*)(%s=%s))", Constants.OBJECTCLASS, + RemoteConstants.ENDPOINT_FRAMEWORK_UUID, uuid)); + props.put(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID, "true"); + serviceRegistration = bctx.registerService(EndpointListener.class.getName(), this, props); + } + + public synchronized void stop() { + if (serviceRegistration != null) { + serviceRegistration.unregister(); + serviceRegistration = null; + } + synchronized (listeners) { + for (PublishingEndpointListener pel : listeners) { + pel.close(); + } + listeners.clear(); + } + } + + /** + * Only for the test case! + */ + protected List<PublishingEndpointListener> getListeners() { + synchronized (listeners) { + return listeners; + } + } +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/Utils.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/Utils.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/Utils.java new file mode 100644 index 0000000..67ea3a4 --- /dev/null +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/Utils.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.discovery.zookeeper.server; + +import java.util.ArrayList; +import java.util.Dictionary; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * General purpose utility methods. + */ +public final class Utils { + + private Utils() { + // prevent instantiation + } + + /** + * Remove entries whose values are empty from the given dictionary. + * + * @param dict a dictionary + */ + public static void removeEmptyValues(Dictionary<String, ?> dict) { + List<String> keysToRemove = new ArrayList<String>(); + Enumeration<String> keys = dict.keys(); + while (keys.hasMoreElements()) { + String key = keys.nextElement(); + Object value = dict.get(key); + if (value instanceof String && "".equals(value)) { + keysToRemove.add(key); + } + } + for (String key : keysToRemove) { + dict.remove(key); + } + } + + /** + * Puts the given key-value pair in the given dictionary if the key does not + * already exist in it or if its existing value is null. + * + * @param dict a dictionary + * @param key the key + * @param value the default value to set + */ + public static void setDefault(Dictionary<String, String> dict, String key, String value) { + if (dict.get(key) == null) { + dict.put(key, value); + } + } + + /** + * Converts the given Dictionary to a Map. + * + * @param dict a dictionary + * @param <K> the key type + * @param <V> the value type + * @return the converted map, or an empty map if the given dictionary is null + */ + public static <K, V> Map<K, V> toMap(Dictionary<K, V> dict) { + Map<K, V> map = new HashMap<K, V>(); + if (dict != null) { + Enumeration<K> keys = dict.keys(); + while (keys.hasMoreElements()) { + K key = keys.nextElement(); + map.put(key, dict.get(key)); + } + } + return map; + } + + /** + * Converts a Dictionary into a Properties instance. + * + * @param dict a dictionary + * @param <K> the key type + * @param <V> the value type + * @return the properties + */ + public static <K, V> Properties toProperties(Dictionary<K, V> dict) { + Properties props = new Properties(); + for (Enumeration<K> e = dict.keys(); e.hasMoreElements();) { + K key = e.nextElement(); + props.put(key, dict.get(key)); + } + return props; + } +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/ZookeeperStarter.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/ZookeeperStarter.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/ZookeeperStarter.java new file mode 100644 index 0000000..520aa99 --- /dev/null +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/ZookeeperStarter.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.discovery.zookeeper.server; + +import java.io.File; +import java.io.IOException; +import java.util.Dictionary; +import java.util.Map; + +import org.apache.zookeeper.server.ServerConfig; +import org.apache.zookeeper.server.ZooKeeperServerMain; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; +import org.apache.zookeeper.server.quorum.QuorumPeerMain; +import org.osgi.framework.BundleContext; +import org.osgi.service.cm.ConfigurationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ZookeeperStarter implements org.osgi.service.cm.ManagedService { + + private static final Logger LOG = LoggerFactory.getLogger(ZookeeperStarter.class); + + protected ZookeeperServer main; + private final BundleContext bundleContext; + private Thread zkMainThread; + private Map<String, ?> curConfiguration; + + public ZookeeperStarter(BundleContext ctx) { + bundleContext = ctx; + } + + public synchronized void shutdown() { + if (main != null) { + LOG.info("Shutting down ZooKeeper server"); + try { + main.shutdown(); + if (zkMainThread != null) { + zkMainThread.join(); + } + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + } + main = null; + zkMainThread = null; + } + } + + private void setDefaults(Dictionary<String, String> dict) throws IOException { + Utils.removeEmptyValues(dict); // to avoid NumberFormatExceptions + Utils.setDefault(dict, "tickTime", "2000"); + Utils.setDefault(dict, "initLimit", "10"); + Utils.setDefault(dict, "syncLimit", "5"); + Utils.setDefault(dict, "clientPort", "2181"); + Utils.setDefault(dict, "dataDir", new File(bundleContext.getDataFile(""), "zkdata").getCanonicalPath()); + } + + @SuppressWarnings("unchecked") + public synchronized void updated(Dictionary<String, ?> dict) throws ConfigurationException { + LOG.debug("Received configuration update for Zookeeper Server: " + dict); + try { + if (dict != null) { + setDefaults((Dictionary<String, String>)dict); + } + Map<String, ?> configMap = Utils.toMap(dict); + if (!configMap.equals(curConfiguration)) { // only if something actually changed + shutdown(); + curConfiguration = configMap; + // config is null if it doesn't exist, is being deleted or has not yet been loaded + // in which case we just stop running + if (dict != null) { + startFromConfig(parseConfig(dict)); + LOG.info("Applied configuration update: " + dict); + } + } + } catch (Exception th) { + LOG.error("Problem applying configuration update: " + dict, th); + } + } + + private QuorumPeerConfig parseConfig(Dictionary<String, ?> dict) throws IOException, ConfigException { + QuorumPeerConfig config = new QuorumPeerConfig(); + config.parseProperties(Utils.toProperties(dict)); + return config; + } + + protected void startFromConfig(final QuorumPeerConfig config) { + int numServers = config.getServers().size(); + main = numServers > 1 ? new MyQuorumPeerMain(config) : new MyZooKeeperServerMain(config); + zkMainThread = new Thread(new Runnable() { + public void run() { + try { + main.startup(); + } catch (Throwable e) { + LOG.error("Problem running ZooKeeper server.", e); + } + } + }); + zkMainThread.start(); + } + + interface ZookeeperServer { + void startup() throws IOException; + void shutdown(); + } + + static class MyQuorumPeerMain extends QuorumPeerMain implements ZookeeperServer { + + private QuorumPeerConfig config; + + MyQuorumPeerMain(QuorumPeerConfig config) { + this.config = config; + } + + public void startup() throws IOException { + runFromConfig(config); + } + + public void shutdown() { + if (null != quorumPeer) { + quorumPeer.shutdown(); + } + } + } + + static class MyZooKeeperServerMain extends ZooKeeperServerMain implements ZookeeperServer { + + private QuorumPeerConfig config; + + MyZooKeeperServerMain(QuorumPeerConfig config) { + this.config = config; + } + + public void startup() throws IOException { + ServerConfig serverConfig = new ServerConfig(); + serverConfig.readFrom(config); + runFromConfig(serverConfig); + } + + public void shutdown() { + try { + super.shutdown(); + } catch (Exception e) { + LOG.error("Error shutting down ZooKeeper", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java new file mode 100644 index 0000000..5909ee0 --- /dev/null +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.discovery.zookeeper.subscribe; + +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceReference; +import org.osgi.service.remoteserviceadmin.EndpointListener; +import org.osgi.util.tracker.ServiceTracker; + +/** + * Tracks interest in EndpointListeners. Delegates to InterfaceMonitorManager to manage + * interest in the scopes of each EndpointListener. + */ +public class EndpointListenerTracker extends ServiceTracker<EndpointListener, EndpointListener> { + private final InterfaceMonitorManager imManager; + + public EndpointListenerTracker(BundleContext bctx, InterfaceMonitorManager imManager) { + super(bctx, EndpointListener.class, null); + this.imManager = imManager; + } + + @Override + public EndpointListener addingService(ServiceReference<EndpointListener> endpointListener) { + imManager.addInterest(endpointListener); + return null; + } + + @Override + public void modifiedService(ServiceReference<EndpointListener> endpointListener, EndpointListener service) { + // called when an EndpointListener updates its service properties, + // e.g. when its interest scope is expanded/reduced + imManager.addInterest(endpointListener); + } + + @Override + public void removedService(ServiceReference<EndpointListener> endpointListener, EndpointListener service) { + imManager.removeInterest(endpointListener); + } + +}
