Repository: karaf-cellar
Updated Branches:
  refs/heads/master 355b714e1 -> 790c22bd0


[KARAF-4698] Improve synchronizers. This closes #34


Project: http://git-wip-us.apache.org/repos/asf/karaf-cellar/repo
Commit: http://git-wip-us.apache.org/repos/asf/karaf-cellar/commit/790c22bd
Tree: http://git-wip-us.apache.org/repos/asf/karaf-cellar/tree/790c22bd
Diff: http://git-wip-us.apache.org/repos/asf/karaf-cellar/diff/790c22bd

Branch: refs/heads/master
Commit: 790c22bd01f4999548b10520c2e993680623deb3
Parents: 355b714
Author: Jean-Baptiste Onofré <jbono...@apache.org>
Authored: Tue Sep 6 19:05:09 2016 +0200
Committer: Jean-Baptiste Onofré <jbono...@apache.org>
Committed: Tue Sep 6 19:05:09 2016 +0200

----------------------------------------------------------------------
 .../karaf/cellar/bundle/BundleSynchronizer.java | 59 ++++++++++++++++++--
 .../config/ConfigurationSynchronizer.java       | 42 ++++++++++++--
 .../cellar/features/FeaturesSynchronizer.java   | 31 ++++++++--
 3 files changed, 117 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/790c22bd/bundle/src/main/java/org/apache/karaf/cellar/bundle/BundleSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/bundle/src/main/java/org/apache/karaf/cellar/bundle/BundleSynchronizer.java 
b/bundle/src/main/java/org/apache/karaf/cellar/bundle/BundleSynchronizer.java
index 6771d53..a234672 100644
--- 
a/bundle/src/main/java/org/apache/karaf/cellar/bundle/BundleSynchronizer.java
+++ 
b/bundle/src/main/java/org/apache/karaf/cellar/bundle/BundleSynchronizer.java
@@ -74,8 +74,12 @@ public class BundleSynchronizer extends BundleSupport 
implements Synchronizer {
         }
         if (policy.equalsIgnoreCase("cluster")) {
             LOGGER.debug("CELLAR BUNDLE: sync policy set as 'cluster' for 
cluster group {}", group.getName());
-            LOGGER.debug("CELLAR BUNDLE: updating node from the cluster (pull 
first)");
-            pull(group);
+            if (clusterManager.listNodesByGroup(group).size() > 1) {
+                LOGGER.debug("CELLAR BUNDLE: updating node from the cluster 
(pull first)");
+                pull(group);
+            } else {
+                LOGGER.debug("CELLAR BUNDLE: node is the only one in the 
cluster group, no pull");
+            }
             LOGGER.debug("CELLAR BUNDLE: updating cluster from the local node 
(push after)");
             push(group);
         } else if (policy.equalsIgnoreCase("node")) {
@@ -86,8 +90,12 @@ public class BundleSynchronizer extends BundleSupport 
implements Synchronizer {
             pull(group);
         } else if (policy.equalsIgnoreCase("clusterOnly")) {
             LOGGER.debug("CELLAR BUNDLE: sync policy set as 'clusterOnly' for 
cluster group " + group.getName());
-            LOGGER.debug("CELLAR BUNDLE: updating node from the cluster (pull 
only)");
-            pull(group);
+            if (clusterManager.listNodesByGroup(group).size() > 1) {
+                LOGGER.debug("CELLAR BUNDLE: updating node from the cluster 
(pull only)");
+                pull(group);
+            } else {
+                LOGGER.debug("CELLAR BUNDLE: node is the only one in the 
cluster group, no pull");
+            }
         } else if (policy.equalsIgnoreCase("nodeOnly")) {
             LOGGER.debug("CELLAR BUNDLE: sync policy set as 'nodeOnly' for 
cluster group " + group.getName());
             LOGGER.debug("CELLAR BUNDLE: updating cluster from the local node 
(push only)");
@@ -113,6 +121,7 @@ public class BundleSynchronizer extends BundleSupport 
implements Synchronizer {
             ClassLoader originalClassLoader = 
Thread.currentThread().getContextClassLoader();
 
             try {
+                // get the bundles on the cluster to update local bundles
                 
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
                 for (Map.Entry<String, BundleState> entry : 
clusterBundles.entrySet()) {
                     String id = entry.getKey();
@@ -152,6 +161,18 @@ public class BundleSynchronizer extends BundleSupport 
implements Synchronizer {
                         }
                     }
                 }
+                // cleanup the local bundles not present on the cluster
+                for (Bundle bundle : bundleContext.getBundles()) {
+                    String id = getId(bundle);
+                    if (!clusterBundles.containsKey(id)) {
+                        // the bundle is not present on the cluster, so it has 
to be uninstalled locally
+                        try {
+                            bundle.uninstall();
+                        } catch (Exception e) {
+                            LOGGER.warn("Can't uninstall {}", id, e);
+                        }
+                    }
+                }
             } finally {
                 
Thread.currentThread().setContextClassLoader(originalClassLoader);
             }
@@ -183,13 +204,15 @@ public class BundleSynchronizer extends BundleSupport 
implements Synchronizer {
                 BundleContext bundleContext = ((BundleReference) 
getClass().getClassLoader()).getBundle().getBundleContext();
 
                 bundles = bundleContext.getBundles();
+                // push local bundles to the cluster
                 for (Bundle bundle : bundles) {
                     long bundleId = bundle.getBundleId();
                     String symbolicName = bundle.getSymbolicName();
                     String version = 
bundle.getHeaders().get(org.osgi.framework.Constants.BUNDLE_VERSION);
                     String bundleLocation = bundle.getLocation();
                     int status = bundle.getState();
-                    String id = symbolicName + "/" + version;
+
+                    String id = getId(bundle);
 
                     // check if the pid is marked as local.
                     if (isAllowed(group, Constants.CATEGORY, bundleLocation, 
EventType.OUTBOUND)) {
@@ -234,6 +257,20 @@ public class BundleSynchronizer extends BundleSupport 
implements Synchronizer {
 
                     } else LOGGER.trace("CELLAR BUNDLE: bundle {} is marked 
BLOCKED OUTBOUND for cluster group {}", bundleLocation, groupName);
                 }
+                // clean bundles on the cluster not present locally
+                for (String id : clusterBundles.keySet()) {
+                    boolean found = false;
+                    for (Bundle bundle : bundleContext.getBundles()) {
+                        String localBundleId = getId(bundle);
+                        if (id.equals(localBundleId)) {
+                            found = true;
+                            break;
+                        }
+                    }
+                    if (!found) {
+                        clusterBundles.remove(id);
+                    }
+                }
             } finally {
                 
Thread.currentThread().setContextClassLoader(originalClassLoader);
             }
@@ -241,6 +278,18 @@ public class BundleSynchronizer extends BundleSupport 
implements Synchronizer {
     }
 
     /**
+     * Return the Cellar bundle ID for a given bundle.
+     *
+     * @param bundle The bundle.
+     * @return The Cellar bundle ID.
+     */
+    private String getId(Bundle bundle) {
+        String symbolicName = bundle.getSymbolicName();
+        String version = 
bundle.getHeaders().get(org.osgi.framework.Constants.BUNDLE_VERSION);
+        return symbolicName + "/" + version;
+    }
+
+    /**
      * Get the bundle sync policy for the given cluster group.
      *
      * @param group the cluster group.

http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/790c22bd/config/src/main/java/org/apache/karaf/cellar/config/ConfigurationSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/config/src/main/java/org/apache/karaf/cellar/config/ConfigurationSynchronizer.java
 
b/config/src/main/java/org/apache/karaf/cellar/config/ConfigurationSynchronizer.java
index 75cb4c3..6bab460 100644
--- 
a/config/src/main/java/org/apache/karaf/cellar/config/ConfigurationSynchronizer.java
+++ 
b/config/src/main/java/org/apache/karaf/cellar/config/ConfigurationSynchronizer.java
@@ -72,8 +72,12 @@ public class ConfigurationSynchronizer extends 
ConfigurationSupport implements S
         }
         if (policy.equalsIgnoreCase("cluster")) {
             LOGGER.debug("CELLAR CONFIG: sync policy set as 'cluster' for 
cluster group {}", group.getName());
-            LOGGER.debug("CELLAR CONFIG: updating node from the cluster (pull 
first)");
-            pull(group);
+            if (clusterManager.listNodesByGroup(group).size() > 1) {
+                LOGGER.debug("CELLAR CONFIG: updating node from the cluster 
(pull first)");
+                pull(group);
+            } else {
+                LOGGER.debug("CELLAR CONFIG: node is the first one in the 
cluster group, no pull");
+            }
             LOGGER.debug("CELLAR CONFIG: updating cluster from the local node 
(push after)");
             push(group);
         } else if (policy.equalsIgnoreCase("node")) {
@@ -84,8 +88,12 @@ public class ConfigurationSynchronizer extends 
ConfigurationSupport implements S
             pull(group);
         } else if (policy.equalsIgnoreCase("clusterOnly")) {
             LOGGER.debug("CELLAR CONFIG: sync policy set as 'clusterOnly' for 
cluster group " + group.getName());
-            LOGGER.debug("CELLAR CONFIG: updating node from the cluster (pull 
only)");
-            pull(group);
+            if (clusterManager.listNodesByGroup(group).size() > 1) {
+                LOGGER.debug("CELLAR CONFIG: updating node from the cluster 
(pull only)");
+                pull(group);
+            } else {
+                LOGGER.debug("CELLAR CONFIG: node is the first one in the 
cluster group, no pull");
+            }
         } else if (policy.equalsIgnoreCase("nodeOnly")) {
             LOGGER.debug("CELLAR CONFIG: sync policy set as 'nodeOnly' for 
cluster group " + group.getName());
             LOGGER.debug("CELLAR CONFIG: updating cluster from the local node 
(push only)");
@@ -112,6 +120,7 @@ public class ConfigurationSynchronizer extends 
ConfigurationSupport implements S
             try {
                 
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
 
+                // get configurations on the cluster to update local 
configurations
                 for (String pid : clusterConfigurations.keySet()) {
                     if (isAllowed(group, Constants.CATEGORY, pid, 
EventType.INBOUND)) {
                         Dictionary clusterDictionary = 
clusterConfigurations.get(pid);
@@ -133,6 +142,17 @@ public class ConfigurationSynchronizer extends 
ConfigurationSupport implements S
                         }
                     } else  LOGGER.trace("CELLAR CONFIG: configuration with 
PID {} is marked BLOCKED INBOUND for cluster group {}", pid, groupName);
                 }
+                // cleanup the local configurations not present on the cluster
+                try {
+                    for (Configuration configuration : 
configurationAdmin.listConfigurations(null)) {
+                        String pid = configuration.getPid();
+                        if (!clusterConfigurations.containsKey(pid)) {
+                            configuration.delete();
+                        }
+                    }
+                } catch (Exception e) {
+                    LOGGER.warn("Can't get local configurations", e);
+                }
             } finally {
                 
Thread.currentThread().setContextClassLoader(originalClassLoader);
             }
@@ -162,6 +182,7 @@ public class ConfigurationSynchronizer extends 
ConfigurationSupport implements S
                 Configuration[] localConfigurations;
                 try {
                     localConfigurations = 
configurationAdmin.listConfigurations(null);
+                    // push local configurations to the cluster
                     for (Configuration localConfiguration : 
localConfigurations) {
                         String pid = localConfiguration.getPid();
                         // check if the pid is marked as local.
@@ -195,6 +216,19 @@ public class ConfigurationSynchronizer extends 
ConfigurationSupport implements S
                         } else
                             LOGGER.trace("CELLAR CONFIG: configuration with 
PID {} is marked BLOCKED OUTBOUND for cluster group {}", pid, groupName);
                     }
+                    // clean configurations on the cluster not present locally
+                    for (String pid : clusterConfigurations.keySet()) {
+                        boolean found = false;
+                        for (Configuration configuration : 
configurationAdmin.listConfigurations(null)) {
+                            if (configuration.getPid().equals(pid)) {
+                                found = true;
+                                break;
+                            }
+                        }
+                        if (!found) {
+                            clusterConfigurations.remove(pid);
+                        }
+                    }
                 } catch (IOException ex) {
                     LOGGER.error("CELLAR CONFIG: failed to read configuration 
(IO error)", ex);
                 } catch (InvalidSyntaxException ex) {

http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/790c22bd/features/src/main/java/org/apache/karaf/cellar/features/FeaturesSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/features/src/main/java/org/apache/karaf/cellar/features/FeaturesSynchronizer.java
 
b/features/src/main/java/org/apache/karaf/cellar/features/FeaturesSynchronizer.java
index be4df54..c25faf8 100644
--- 
a/features/src/main/java/org/apache/karaf/cellar/features/FeaturesSynchronizer.java
+++ 
b/features/src/main/java/org/apache/karaf/cellar/features/FeaturesSynchronizer.java
@@ -77,8 +77,12 @@ public class FeaturesSynchronizer extends FeaturesSupport 
implements Synchronize
         }
         if (policy.equalsIgnoreCase("cluster")) {
             LOGGER.debug("CELLAR FEATURE: sync policy set as 'cluster' for 
cluster group {}", group.getName());
-            LOGGER.debug("CELLAR FEATURE: updating node from the cluster (pull 
first)");
-            pull(group);
+            if (clusterManager.listNodesByGroup(group).size() > 1) {
+                LOGGER.debug("CELLAR FEATURE: updating node from the cluster 
(pull first)");
+                pull(group);
+            } else {
+                LOGGER.debug("CELLAR FEATURE: node is the first one in the 
cluster group, no pull");
+            }
             LOGGER.debug("CELLAR FEATURE: updating cluster from the local node 
(push after)");
             push(group);
         } else if (policy.equalsIgnoreCase("node")) {
@@ -89,8 +93,12 @@ public class FeaturesSynchronizer extends FeaturesSupport 
implements Synchronize
             pull(group);
         } else if (policy.equalsIgnoreCase("clusterOnly")) {
             LOGGER.debug("CELLAR FEATURE: sync policy set as 'clusterOnly' for 
cluster group " + group.getName());
-            LOGGER.debug("CELLAR FEATURE: updating node from the cluster (pull 
only)");
-            pull(group);
+            if (clusterManager.listNodesByGroup(group).size() > 1) {
+                LOGGER.debug("CELLAR FEATURE: updating node from the cluster 
(pull only)");
+                pull(group);
+            } else {
+                LOGGER.debug("CELLAR FEATURE: node is the first one in the 
cluster group, no pull");
+            }
         } else if (policy.equalsIgnoreCase("nodeOnly")) {
             LOGGER.debug("CELLAR FEATURE: sync policy set as 'nodeOnly' for 
cluster group " + group.getName());
             LOGGER.debug("CELLAR FEATURE: updating cluster from the local node 
(push only)");
@@ -118,8 +126,8 @@ public class FeaturesSynchronizer extends FeaturesSupport 
implements Synchronize
                 Map<String, String> clusterRepositories = 
clusterManager.getMap(Constants.REPOSITORIES_MAP + Configurations.SEPARATOR + 
groupName);
                 Map<String, FeatureState> clusterFeatures = 
clusterManager.getMap(Constants.FEATURES_MAP + Configurations.SEPARATOR + 
groupName);
 
-                // get the features repositories URLs from the cluster group
                 if (clusterRepositories != null && 
!clusterRepositories.isEmpty()) {
+                    // get the features repositories from the cluster to 
update locally
                     for (String url : clusterRepositories.keySet()) {
                         try {
                             if (!isRepositoryRegisteredLocally(url)) {
@@ -132,10 +140,21 @@ public class FeaturesSynchronizer extends FeaturesSupport 
implements Synchronize
                             LOGGER.error("CELLAR FEATURE: failed to add 
repository URL {}", url, e);
                         }
                     }
+                    // cleanup the local features repositories not present on 
the cluster
+                    try {
+                        for (Repository repository : 
featuresService.listRepositories()) {
+                            URI uri = repository.getURI();
+                            if 
(!clusterRepositories.containsKey(uri.toString())) {
+                                featuresService.removeRepository(uri);
+                            }
+                        }
+                    } catch (Exception e) {
+                        LOGGER.warn("Can't get local features repositories", 
e);
+                    }
                 }
 
-                // get the features from the cluster group
                 if (clusterFeatures != null && !clusterFeatures.isEmpty()) {
+                    // get the features from the cluster group and update 
locally
                     for (FeatureState state : clusterFeatures.values()) {
                         String name = state.getName();
                         // check if feature is blocked

Reply via email to