Repository: karaf-cellar Updated Branches: refs/heads/master 355b714e1 -> 790c22bd0
[KARAF-4698] Improve synchronizers. This closes #34 Project: http://git-wip-us.apache.org/repos/asf/karaf-cellar/repo Commit: http://git-wip-us.apache.org/repos/asf/karaf-cellar/commit/790c22bd Tree: http://git-wip-us.apache.org/repos/asf/karaf-cellar/tree/790c22bd Diff: http://git-wip-us.apache.org/repos/asf/karaf-cellar/diff/790c22bd Branch: refs/heads/master Commit: 790c22bd01f4999548b10520c2e993680623deb3 Parents: 355b714 Author: Jean-Baptiste Onofré <jbono...@apache.org> Authored: Tue Sep 6 19:05:09 2016 +0200 Committer: Jean-Baptiste Onofré <jbono...@apache.org> Committed: Tue Sep 6 19:05:09 2016 +0200 ---------------------------------------------------------------------- .../karaf/cellar/bundle/BundleSynchronizer.java | 59 ++++++++++++++++++-- .../config/ConfigurationSynchronizer.java | 42 ++++++++++++-- .../cellar/features/FeaturesSynchronizer.java | 31 ++++++++-- 3 files changed, 117 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/790c22bd/bundle/src/main/java/org/apache/karaf/cellar/bundle/BundleSynchronizer.java ---------------------------------------------------------------------- diff --git a/bundle/src/main/java/org/apache/karaf/cellar/bundle/BundleSynchronizer.java b/bundle/src/main/java/org/apache/karaf/cellar/bundle/BundleSynchronizer.java index 6771d53..a234672 100644 --- a/bundle/src/main/java/org/apache/karaf/cellar/bundle/BundleSynchronizer.java +++ b/bundle/src/main/java/org/apache/karaf/cellar/bundle/BundleSynchronizer.java @@ -74,8 +74,12 @@ public class BundleSynchronizer extends BundleSupport implements Synchronizer { } if (policy.equalsIgnoreCase("cluster")) { LOGGER.debug("CELLAR BUNDLE: sync policy set as 'cluster' for cluster group {}", group.getName()); - LOGGER.debug("CELLAR BUNDLE: updating node from the cluster (pull first)"); - pull(group); + if (clusterManager.listNodesByGroup(group).size() > 1) { + LOGGER.debug("CELLAR BUNDLE: updating node from the cluster (pull first)"); + pull(group); + } else { + LOGGER.debug("CELLAR BUNDLE: node is the only one in the cluster group, no pull"); + } LOGGER.debug("CELLAR BUNDLE: updating cluster from the local node (push after)"); push(group); } else if (policy.equalsIgnoreCase("node")) { @@ -86,8 +90,12 @@ public class BundleSynchronizer extends BundleSupport implements Synchronizer { pull(group); } else if (policy.equalsIgnoreCase("clusterOnly")) { LOGGER.debug("CELLAR BUNDLE: sync policy set as 'clusterOnly' for cluster group " + group.getName()); - LOGGER.debug("CELLAR BUNDLE: updating node from the cluster (pull only)"); - pull(group); + if (clusterManager.listNodesByGroup(group).size() > 1) { + LOGGER.debug("CELLAR BUNDLE: updating node from the cluster (pull only)"); + pull(group); + } else { + LOGGER.debug("CELLAR BUNDLE: node is the only one in the cluster group, no pull"); + } } else if (policy.equalsIgnoreCase("nodeOnly")) { LOGGER.debug("CELLAR BUNDLE: sync policy set as 'nodeOnly' for cluster group " + group.getName()); LOGGER.debug("CELLAR BUNDLE: updating cluster from the local node (push only)"); @@ -113,6 +121,7 @@ public class BundleSynchronizer extends BundleSupport implements Synchronizer { ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); try { + // get the bundles on the cluster to update local bundles Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); for (Map.Entry<String, BundleState> entry : clusterBundles.entrySet()) { String id = entry.getKey(); @@ -152,6 +161,18 @@ public class BundleSynchronizer extends BundleSupport implements Synchronizer { } } } + // cleanup the local bundles not present on the cluster + for (Bundle bundle : bundleContext.getBundles()) { + String id = getId(bundle); + if (!clusterBundles.containsKey(id)) { + // the bundle is not present on the cluster, so it has to be uninstalled locally + try { + bundle.uninstall(); + } catch (Exception e) { + LOGGER.warn("Can't uninstall {}", id, e); + } + } + } } finally { Thread.currentThread().setContextClassLoader(originalClassLoader); } @@ -183,13 +204,15 @@ public class BundleSynchronizer extends BundleSupport implements Synchronizer { BundleContext bundleContext = ((BundleReference) getClass().getClassLoader()).getBundle().getBundleContext(); bundles = bundleContext.getBundles(); + // push local bundles to the cluster for (Bundle bundle : bundles) { long bundleId = bundle.getBundleId(); String symbolicName = bundle.getSymbolicName(); String version = bundle.getHeaders().get(org.osgi.framework.Constants.BUNDLE_VERSION); String bundleLocation = bundle.getLocation(); int status = bundle.getState(); - String id = symbolicName + "/" + version; + + String id = getId(bundle); // check if the pid is marked as local. if (isAllowed(group, Constants.CATEGORY, bundleLocation, EventType.OUTBOUND)) { @@ -234,6 +257,20 @@ public class BundleSynchronizer extends BundleSupport implements Synchronizer { } else LOGGER.trace("CELLAR BUNDLE: bundle {} is marked BLOCKED OUTBOUND for cluster group {}", bundleLocation, groupName); } + // clean bundles on the cluster not present locally + for (String id : clusterBundles.keySet()) { + boolean found = false; + for (Bundle bundle : bundleContext.getBundles()) { + String localBundleId = getId(bundle); + if (id.equals(localBundleId)) { + found = true; + break; + } + } + if (!found) { + clusterBundles.remove(id); + } + } } finally { Thread.currentThread().setContextClassLoader(originalClassLoader); } @@ -241,6 +278,18 @@ public class BundleSynchronizer extends BundleSupport implements Synchronizer { } /** + * Return the Cellar bundle ID for a given bundle. + * + * @param bundle The bundle. + * @return The Cellar bundle ID. + */ + private String getId(Bundle bundle) { + String symbolicName = bundle.getSymbolicName(); + String version = bundle.getHeaders().get(org.osgi.framework.Constants.BUNDLE_VERSION); + return symbolicName + "/" + version; + } + + /** * Get the bundle sync policy for the given cluster group. * * @param group the cluster group. http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/790c22bd/config/src/main/java/org/apache/karaf/cellar/config/ConfigurationSynchronizer.java ---------------------------------------------------------------------- diff --git a/config/src/main/java/org/apache/karaf/cellar/config/ConfigurationSynchronizer.java b/config/src/main/java/org/apache/karaf/cellar/config/ConfigurationSynchronizer.java index 75cb4c3..6bab460 100644 --- a/config/src/main/java/org/apache/karaf/cellar/config/ConfigurationSynchronizer.java +++ b/config/src/main/java/org/apache/karaf/cellar/config/ConfigurationSynchronizer.java @@ -72,8 +72,12 @@ public class ConfigurationSynchronizer extends ConfigurationSupport implements S } if (policy.equalsIgnoreCase("cluster")) { LOGGER.debug("CELLAR CONFIG: sync policy set as 'cluster' for cluster group {}", group.getName()); - LOGGER.debug("CELLAR CONFIG: updating node from the cluster (pull first)"); - pull(group); + if (clusterManager.listNodesByGroup(group).size() > 1) { + LOGGER.debug("CELLAR CONFIG: updating node from the cluster (pull first)"); + pull(group); + } else { + LOGGER.debug("CELLAR CONFIG: node is the first one in the cluster group, no pull"); + } LOGGER.debug("CELLAR CONFIG: updating cluster from the local node (push after)"); push(group); } else if (policy.equalsIgnoreCase("node")) { @@ -84,8 +88,12 @@ public class ConfigurationSynchronizer extends ConfigurationSupport implements S pull(group); } else if (policy.equalsIgnoreCase("clusterOnly")) { LOGGER.debug("CELLAR CONFIG: sync policy set as 'clusterOnly' for cluster group " + group.getName()); - LOGGER.debug("CELLAR CONFIG: updating node from the cluster (pull only)"); - pull(group); + if (clusterManager.listNodesByGroup(group).size() > 1) { + LOGGER.debug("CELLAR CONFIG: updating node from the cluster (pull only)"); + pull(group); + } else { + LOGGER.debug("CELLAR CONFIG: node is the first one in the cluster group, no pull"); + } } else if (policy.equalsIgnoreCase("nodeOnly")) { LOGGER.debug("CELLAR CONFIG: sync policy set as 'nodeOnly' for cluster group " + group.getName()); LOGGER.debug("CELLAR CONFIG: updating cluster from the local node (push only)"); @@ -112,6 +120,7 @@ public class ConfigurationSynchronizer extends ConfigurationSupport implements S try { Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + // get configurations on the cluster to update local configurations for (String pid : clusterConfigurations.keySet()) { if (isAllowed(group, Constants.CATEGORY, pid, EventType.INBOUND)) { Dictionary clusterDictionary = clusterConfigurations.get(pid); @@ -133,6 +142,17 @@ public class ConfigurationSynchronizer extends ConfigurationSupport implements S } } else LOGGER.trace("CELLAR CONFIG: configuration with PID {} is marked BLOCKED INBOUND for cluster group {}", pid, groupName); } + // cleanup the local configurations not present on the cluster + try { + for (Configuration configuration : configurationAdmin.listConfigurations(null)) { + String pid = configuration.getPid(); + if (!clusterConfigurations.containsKey(pid)) { + configuration.delete(); + } + } + } catch (Exception e) { + LOGGER.warn("Can't get local configurations", e); + } } finally { Thread.currentThread().setContextClassLoader(originalClassLoader); } @@ -162,6 +182,7 @@ public class ConfigurationSynchronizer extends ConfigurationSupport implements S Configuration[] localConfigurations; try { localConfigurations = configurationAdmin.listConfigurations(null); + // push local configurations to the cluster for (Configuration localConfiguration : localConfigurations) { String pid = localConfiguration.getPid(); // check if the pid is marked as local. @@ -195,6 +216,19 @@ public class ConfigurationSynchronizer extends ConfigurationSupport implements S } else LOGGER.trace("CELLAR CONFIG: configuration with PID {} is marked BLOCKED OUTBOUND for cluster group {}", pid, groupName); } + // clean configurations on the cluster not present locally + for (String pid : clusterConfigurations.keySet()) { + boolean found = false; + for (Configuration configuration : configurationAdmin.listConfigurations(null)) { + if (configuration.getPid().equals(pid)) { + found = true; + break; + } + } + if (!found) { + clusterConfigurations.remove(pid); + } + } } catch (IOException ex) { LOGGER.error("CELLAR CONFIG: failed to read configuration (IO error)", ex); } catch (InvalidSyntaxException ex) { http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/790c22bd/features/src/main/java/org/apache/karaf/cellar/features/FeaturesSynchronizer.java ---------------------------------------------------------------------- diff --git a/features/src/main/java/org/apache/karaf/cellar/features/FeaturesSynchronizer.java b/features/src/main/java/org/apache/karaf/cellar/features/FeaturesSynchronizer.java index be4df54..c25faf8 100644 --- a/features/src/main/java/org/apache/karaf/cellar/features/FeaturesSynchronizer.java +++ b/features/src/main/java/org/apache/karaf/cellar/features/FeaturesSynchronizer.java @@ -77,8 +77,12 @@ public class FeaturesSynchronizer extends FeaturesSupport implements Synchronize } if (policy.equalsIgnoreCase("cluster")) { LOGGER.debug("CELLAR FEATURE: sync policy set as 'cluster' for cluster group {}", group.getName()); - LOGGER.debug("CELLAR FEATURE: updating node from the cluster (pull first)"); - pull(group); + if (clusterManager.listNodesByGroup(group).size() > 1) { + LOGGER.debug("CELLAR FEATURE: updating node from the cluster (pull first)"); + pull(group); + } else { + LOGGER.debug("CELLAR FEATURE: node is the first one in the cluster group, no pull"); + } LOGGER.debug("CELLAR FEATURE: updating cluster from the local node (push after)"); push(group); } else if (policy.equalsIgnoreCase("node")) { @@ -89,8 +93,12 @@ public class FeaturesSynchronizer extends FeaturesSupport implements Synchronize pull(group); } else if (policy.equalsIgnoreCase("clusterOnly")) { LOGGER.debug("CELLAR FEATURE: sync policy set as 'clusterOnly' for cluster group " + group.getName()); - LOGGER.debug("CELLAR FEATURE: updating node from the cluster (pull only)"); - pull(group); + if (clusterManager.listNodesByGroup(group).size() > 1) { + LOGGER.debug("CELLAR FEATURE: updating node from the cluster (pull only)"); + pull(group); + } else { + LOGGER.debug("CELLAR FEATURE: node is the first one in the cluster group, no pull"); + } } else if (policy.equalsIgnoreCase("nodeOnly")) { LOGGER.debug("CELLAR FEATURE: sync policy set as 'nodeOnly' for cluster group " + group.getName()); LOGGER.debug("CELLAR FEATURE: updating cluster from the local node (push only)"); @@ -118,8 +126,8 @@ public class FeaturesSynchronizer extends FeaturesSupport implements Synchronize Map<String, String> clusterRepositories = clusterManager.getMap(Constants.REPOSITORIES_MAP + Configurations.SEPARATOR + groupName); Map<String, FeatureState> clusterFeatures = clusterManager.getMap(Constants.FEATURES_MAP + Configurations.SEPARATOR + groupName); - // get the features repositories URLs from the cluster group if (clusterRepositories != null && !clusterRepositories.isEmpty()) { + // get the features repositories from the cluster to update locally for (String url : clusterRepositories.keySet()) { try { if (!isRepositoryRegisteredLocally(url)) { @@ -132,10 +140,21 @@ public class FeaturesSynchronizer extends FeaturesSupport implements Synchronize LOGGER.error("CELLAR FEATURE: failed to add repository URL {}", url, e); } } + // cleanup the local features repositories not present on the cluster + try { + for (Repository repository : featuresService.listRepositories()) { + URI uri = repository.getURI(); + if (!clusterRepositories.containsKey(uri.toString())) { + featuresService.removeRepository(uri); + } + } + } catch (Exception e) { + LOGGER.warn("Can't get local features repositories", e); + } } - // get the features from the cluster group if (clusterFeatures != null && !clusterFeatures.isEmpty()) { + // get the features from the cluster group and update locally for (FeatureState state : clusterFeatures.values()) { String name = state.getName(); // check if feature is blocked