This is an automated email from the ASF dual-hosted git repository.

shuber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git


The following commit(s) were added to refs/heads/master by this push:
     new c22ff4722 Revert "UNOMI-877: Replace Karaf Cellar and Hazelcast with 
PersistenceService…" (#724)
c22ff4722 is described below

commit c22ff47221112ae6aa02a7a0c492e93840062d4d
Author: Jérôme Blanchard <[email protected]>
AuthorDate: Fri Aug 22 15:47:24 2025 +0200

    Revert "UNOMI-877: Replace Karaf Cellar and Hazelcast with 
PersistenceService…" (#724)
    
    This reverts commit 3f995e47ea8fbd33ca226e04f9f5cd0dfb719fff.
---
 .../java/org/apache/unomi/api/ClusterNode.java     |  65 +-
 .../apache/unomi/api/services/ClusterService.java  |   7 +
 .../router/core/context/RouterCamelContext.java    |  17 +
 .../main/resources/etc/custom.system.properties    |   5 +-
 services/pom.xml                                   |   7 -
 .../services/impl/cluster/ClusterServiceImpl.java  | 690 ++++++---------------
 .../resources/OSGI-INF/blueprint/blueprint.xml     |  10 +-
 .../main/resources/org.apache.unomi.cluster.cfg    |  10 +-
 8 files changed, 236 insertions(+), 575 deletions(-)

diff --git a/api/src/main/java/org/apache/unomi/api/ClusterNode.java 
b/api/src/main/java/org/apache/unomi/api/ClusterNode.java
index e096490b9..6c40ca21b 100644
--- a/api/src/main/java/org/apache/unomi/api/ClusterNode.java
+++ b/api/src/main/java/org/apache/unomi/api/ClusterNode.java
@@ -22,12 +22,10 @@ import java.io.Serializable;
 /**
  * Information about a cluster node.
  */
-public class ClusterNode extends Item {
+public class ClusterNode implements Serializable {
 
     private static final long serialVersionUID = 1281422346318230514L;
 
-    public static final String ITEM_TYPE = "clusterNode";
-
     private double cpuLoad;
     private double[] loadAverage;
     private String publicHostAddress;
@@ -35,18 +33,11 @@ public class ClusterNode extends Item {
     private long uptime;
     private boolean master;
     private boolean data;
-    private long startTime;
-    private long lastHeartbeat;
-
-    // Server information
-    private ServerInfo serverInfo;
 
     /**
      * Instantiates a new Cluster node.
      */
     public ClusterNode() {
-        super();
-        setItemType(ITEM_TYPE);
     }
 
     /**
@@ -174,58 +165,4 @@ public class ClusterNode extends Item {
     public void setData(boolean data) {
         this.data = data;
     }
-
-    /**
-     * Retrieves the node start time in milliseconds.
-     *
-     * @return the start time
-     */
-    public long getStartTime() {
-        return startTime;
-    }
-
-    /**
-     * Sets the node start time in milliseconds.
-     *
-     * @param startTime the start time
-     */
-    public void setStartTime(long startTime) {
-        this.startTime = startTime;
-    }
-
-    /**
-     * Retrieves the last heartbeat time in milliseconds.
-     *
-     * @return the last heartbeat time
-     */
-    public long getLastHeartbeat() {
-        return lastHeartbeat;
-    }
-
-    /**
-     * Sets the last heartbeat time in milliseconds.
-     *
-     * @param lastHeartbeat the last heartbeat time
-     */
-    public void setLastHeartbeat(long lastHeartbeat) {
-        this.lastHeartbeat = lastHeartbeat;
-    }
-
-    /**
-     * Gets the server information.
-     *
-     * @return the server information
-     */
-    public ServerInfo getServerInfo() {
-        return serverInfo;
-    }
-
-    /**
-     * Sets the server information.
-     *
-     * @param serverInfo the server information
-     */
-    public void setServerInfo(ServerInfo serverInfo) {
-        this.serverInfo = serverInfo;
-    }
 }
diff --git 
a/api/src/main/java/org/apache/unomi/api/services/ClusterService.java 
b/api/src/main/java/org/apache/unomi/api/services/ClusterService.java
index ad8777503..299ac9098 100644
--- a/api/src/main/java/org/apache/unomi/api/services/ClusterService.java
+++ b/api/src/main/java/org/apache/unomi/api/services/ClusterService.java
@@ -51,4 +51,11 @@ public interface ClusterService {
      */
     void purge(final String scope);
 
+    /**
+     * This function will send an event to the nodes of the cluster
+     * The function takes a Serializable to avoid dependency on any clustering 
framework
+     *
+     * @param event this object will be cast to a 
org.apache.karaf.cellar.core.event.Event object
+     */
+    void sendEvent(Serializable event);
 }
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
index c7e953132..67219f9c5 100644
--- 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
@@ -31,6 +31,7 @@ import org.apache.unomi.router.api.ImportConfiguration;
 import org.apache.unomi.router.api.RouterConstants;
 import org.apache.unomi.router.api.services.ImportExportConfigurationService;
 import org.apache.unomi.router.api.services.ProfileExportService;
+import org.apache.unomi.router.core.event.UpdateCamelRouteEvent;
 import org.apache.unomi.router.core.processor.ExportRouteCompletionProcessor;
 import org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor;
 import org.apache.unomi.router.core.processor.ImportRouteCompletionProcessor;
@@ -239,6 +240,12 @@ public class RouterCamelContext implements 
IRouterCamelContext {
                 camelContext.removeRouteDefinition(routeDefinition);
             }
         }
+
+        if (fireEvent) {
+            UpdateCamelRouteEvent event = new 
UpdateCamelRouteEvent(EVENT_ID_REMOVE);
+            event.setRouteId(routeId);
+            clusterService.sendEvent(event);
+        }
     }
 
     public void updateProfileImportReaderRoute(String configId, boolean 
fireEvent) throws Exception {
@@ -259,6 +266,11 @@ public class RouterCamelContext implements 
IRouterCamelContext {
             builder.setJacksonDataFormat(jacksonDataFormat);
             builder.setContext(camelContext);
             camelContext.addRoutes(builder);
+
+            if (fireEvent) {
+                UpdateCamelRouteEvent event = new 
UpdateCamelRouteEvent(EVENT_ID_IMPORT);
+                clusterService.sendEvent(event);
+            }
         }
     }
 
@@ -279,6 +291,11 @@ public class RouterCamelContext implements 
IRouterCamelContext {
             
profileExportCollectRouteBuilder.setJacksonDataFormat(jacksonDataFormat);
             profileExportCollectRouteBuilder.setContext(camelContext);
             camelContext.addRoutes(profileExportCollectRouteBuilder);
+
+            if (fireEvent) {
+                UpdateCamelRouteEvent event = new 
UpdateCamelRouteEvent(EVENT_ID_EXPORT);
+                clusterService.sendEvent(event);
+            }
         }
     }
 
diff --git a/package/src/main/resources/etc/custom.system.properties 
b/package/src/main/resources/etc/custom.system.properties
index 46a262816..5e97437c3 100644
--- a/package/src/main/resources/etc/custom.system.properties
+++ b/package/src/main/resources/etc/custom.system.properties
@@ -81,14 +81,11 @@ 
org.apache.unomi.admin.servlet.context=${env:UNOMI_ADMIN_CONTEXT:-/cxs}
 
#######################################################################################################################
 ## Cluster Settings                                                            
                                      ##
 
#######################################################################################################################
+org.apache.unomi.cluster.group=${env:UNOMI_CLUSTER_GROUP:-default}
 # To simplify testing we set the public address to use HTTP, but for 
production environments it is highly recommended
 # to switch to using HTTPS with a proper SSL certificate installed.
 
org.apache.unomi.cluster.public.address=${env:UNOMI_CLUSTER_PUBLIC_ADDRESS:-http://localhost:8181}
 
org.apache.unomi.cluster.internal.address=${env:UNOMI_CLUSTER_INTERNAL_ADDRESS:-https://localhost:9443}
-# The nodeId is a required setting that uniquely identifies this node in the 
cluster.
-# It must be set to a unique value for each node in the cluster.
-# Example: nodeId=node1
-org.apache.unomi.cluster.nodeId=${env:UNOMI_CLUSTER_NODEID:-unomi-node-1}
 # The nodeStatisticsUpdateFrequency controls the frequency of the update of 
system statistics such as CPU load,
 # system load average and uptime. This value is set in milliseconds and is set 
to 10 seconds by default. Each node
 # will retrieve the local values and broadcast them through a cluster event to 
all the other nodes to update
diff --git a/services/pom.xml b/services/pom.xml
index 9a0862db8..c4cf7e4da 100644
--- a/services/pom.xml
+++ b/services/pom.xml
@@ -56,12 +56,6 @@
             <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
-        <dependency>
-            <groupId>org.apache.unomi</groupId>
-            <artifactId>unomi-lifecycle-watcher</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
 
         <dependency>
             <groupId>javax.servlet</groupId>
@@ -181,7 +175,6 @@
                         
<Embed-Dependency>*;scope=compile|runtime</Embed-Dependency>
                         <Import-Package>
                             sun.misc;resolution:=optional,
-                            com.sun.management;resolution:=optional,
                             *
                         </Import-Package>
                     </instructions>
diff --git 
a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
 
b/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
index b4bc2c421..ec4cfe523 100644
--- 
a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
+++ 
b/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
@@ -18,30 +18,28 @@
 package org.apache.unomi.services.impl.cluster;
 
 import org.apache.commons.lang3.ArrayUtils;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.karaf.cellar.config.ClusterConfigurationEvent;
+import org.apache.karaf.cellar.config.Constants;
+import org.apache.karaf.cellar.core.*;
+import org.apache.karaf.cellar.core.control.SwitchStatus;
+import org.apache.karaf.cellar.core.event.Event;
+import org.apache.karaf.cellar.core.event.EventProducer;
+import org.apache.karaf.cellar.core.event.EventType;
 import org.apache.unomi.api.ClusterNode;
-import org.apache.unomi.api.PartialList;
-import org.apache.unomi.api.ServerInfo;
-import org.apache.unomi.api.conditions.Condition;
-import org.apache.unomi.api.conditions.ConditionType;
 import org.apache.unomi.api.services.ClusterService;
-import org.apache.unomi.lifecycle.BundleWatcher;
+import org.apache.unomi.api.services.SchedulerService;
 import org.apache.unomi.persistence.spi.PersistenceService;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceReference;
-import org.osgi.util.tracker.ServiceTracker;
-import org.osgi.util.tracker.ServiceTrackerCustomizer;
+import org.osgi.service.cm.ConfigurationAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.management.*;
 import java.io.Serializable;
 import java.lang.management.ManagementFactory;
 import java.lang.management.OperatingSystemMXBean;
 import java.lang.management.RuntimeMXBean;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -49,185 +47,46 @@ import java.util.concurrent.TimeUnit;
  */
 public class ClusterServiceImpl implements ClusterService {
 
+    public static final String KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION = 
"org.apache.unomi.nodes";
+    public static final String KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS = 
"publicEndpoints";
+    public static final String KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS 
= "internalEndpoints";
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterServiceImpl.class.getName());
-
-    /**
-     * We use ServiceTracker instead of Blueprint dependency injection due to 
a known bug in Apache Aries Blueprint
-     * where service dependencies are not properly shut down in reverse order 
of their initialization.
-     *
-     * The bug manifests in two ways:
-     * 1. Services are not shut down in reverse order of their initialization, 
causing potential deadlocks
-     * 2. The PersistenceService is often shut down before other services that 
depend on it, leading to timeout waits
-     *
-     * By using ServiceTracker, we have explicit control over:
-     * - Service lifecycle management
-     * - Shutdown order
-     * - Service availability checks
-     * - Graceful degradation when services become unavailable
-     */
-    private ServiceTracker<PersistenceService, PersistenceService> 
persistenceServiceTracker;
-
-    // Keep direct reference for backward compatibility and unit tests
-    private PersistenceService persistenceService;
-
+    PersistenceService persistenceService;
+    private ClusterManager karafCellarClusterManager;
+    private EventProducer karafCellarEventProducer;
+    private GroupManager karafCellarGroupManager;
+    private String karafCellarGroupName = Configurations.DEFAULT_GROUP_NAME;
+    private ConfigurationAdmin osgiConfigurationAdmin;
     private String publicAddress;
     private String internalAddress;
-    //private SchedulerService schedulerService; /* Wait for PR UNOMI-878 to 
reactivate that code
-    private ScheduledExecutorService scheduledExecutorService = 
Executors.newScheduledThreadPool(3);
-    private String nodeId;
-    private long nodeStartTime;
-    private long nodeStatisticsUpdateFrequency = 10000;
-    private Map<String, Map<String, Serializable>> nodeSystemStatistics = new 
ConcurrentHashMap<>();
-    private BundleContext bundleContext;
-    private volatile boolean shutdownNow = false;
-
-    private BundleWatcher bundleWatcher;
+    private Map<String, Map<String,Serializable>> nodeSystemStatistics = new 
ConcurrentHashMap<>();
+    private Group group = null;
+    private SchedulerService schedulerService;
 
-    /**
-     * Max time to wait for persistence service (in milliseconds)
-     */
-    private static final long MAX_WAIT_TIME = 60000; // 60 seconds
+    private long nodeStatisticsUpdateFrequency = 10000;
 
-    /**
-     * Sets the bundle context, which is needed to create service trackers
-     * @param bundleContext the OSGi bundle context
-     */
-    public void setBundleContext(BundleContext bundleContext) {
-        this.bundleContext = bundleContext;
+    public void setPersistenceService(PersistenceService persistenceService) {
+        this.persistenceService = persistenceService;
     }
 
-    /**
-     * Sets the bundle watcher used to retrieve server information
-     *
-     * @param bundleWatcher the bundle watcher
-     */
-    public void setBundleWatcher(BundleWatcher bundleWatcher) {
-        this.bundleWatcher = bundleWatcher;
-        LOGGER.info("BundleWatcher service set");
+    public void setKarafCellarClusterManager(ClusterManager 
karafCellarClusterManager) {
+        this.karafCellarClusterManager = karafCellarClusterManager;
     }
 
-    /**
-     * Waits for the persistence service to become available.
-     * This method will retry getting the persistence service with exponential 
backoff
-     * until it's available or until the maximum wait time is reached.
-     *
-     * @throws IllegalStateException if the persistence service is not 
available after the maximum wait time
-     */
-    private void waitForPersistenceService() {
-        if (shutdownNow) {
-            return;
-        }
-
-        // If persistence service is directly set (e.g., in unit tests), no 
need to wait
-        if (persistenceService != null) {
-            LOGGER.debug("Persistence service is already available, no need to 
wait");
-            return;
-        }
-
-        // If no bundle context, we can't get the service via tracker
-        if (bundleContext == null) {
-            LOGGER.error("No BundleContext available, cannot wait for 
persistence service");
-            throw new IllegalStateException("No BundleContext available to get 
persistence service");
-        }
-
-        // Initialize service tracker if needed
-        if (persistenceServiceTracker == null) {
-            initializeServiceTrackers();
-        }
-
-        // Try to get the service with retries
-        long startTime = System.currentTimeMillis();
-        long waitTime = 50; // Start with 50ms wait time
-
-        while (System.currentTimeMillis() - startTime < MAX_WAIT_TIME) {
-            PersistenceService service = getPersistenceService();
-            if (service != null) {
-                LOGGER.info("Persistence service is now available");
-                return;
-            }
-
-            try {
-                LOGGER.debug("Waiting for persistence service... ({}ms 
elapsed)", System.currentTimeMillis() - startTime);
-                Thread.sleep(waitTime);
-                // Exponential backoff with a maximum of 5 seconds
-                waitTime = Math.min(waitTime * 2, 5000);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                LOGGER.error("Interrupted while waiting for persistence 
service", e);
-                break;
-            }
-        }
-
-        throw new IllegalStateException("PersistenceService not available 
after waiting " + MAX_WAIT_TIME + "ms");
+    public void setKarafCellarEventProducer(EventProducer 
karafCellarEventProducer) {
+        this.karafCellarEventProducer = karafCellarEventProducer;
     }
 
-    /**
-     * Safely gets the persistence service, either from the direct reference 
(for tests)
-     * or from the service tracker (for OSGi runtime)
-     * @return the persistence service or null if not available
-     */
-    private PersistenceService getPersistenceService() {
-        if (shutdownNow) return null;
-
-        // For unit tests or if already directly set
-        if (persistenceService != null) {
-            return persistenceService;
-        }
-
-        // Otherwise try to get from service tracker
-        return persistenceServiceTracker != null ? 
persistenceServiceTracker.getService() : null;
+    public void setKarafCellarGroupManager(GroupManager 
karafCellarGroupManager) {
+        this.karafCellarGroupManager = karafCellarGroupManager;
     }
 
-    /**
-     * Initialize service tracker for PersistenceService
-     */
-    private void initializeServiceTrackers() {
-        if (bundleContext == null) {
-            LOGGER.warn("BundleContext is null, cannot initialize service 
trackers");
-            return;
-        }
-
-        // Only create service tracker if direct reference isn't set
-        if (persistenceService == null) {
-            LOGGER.info("Initializing PersistenceService tracker");
-            persistenceServiceTracker = new ServiceTracker<>(
-                    bundleContext,
-                    PersistenceService.class,
-                    new ServiceTrackerCustomizer<PersistenceService, 
PersistenceService>() {
-                        @Override
-                        public PersistenceService 
addingService(ServiceReference<PersistenceService> reference) {
-                            PersistenceService service = 
bundleContext.getService(reference);
-                            if (service != null) {
-                                persistenceService = service;
-                                LOGGER.info("PersistenceService acquired 
through tracker");
-                            }
-                            return service;
-                        }
-
-                        @Override
-                        public void 
modifiedService(ServiceReference<PersistenceService> reference, 
PersistenceService service) {
-                            // No action needed
-                        }
-
-                        @Override
-                        public void 
removedService(ServiceReference<PersistenceService> reference, 
PersistenceService service) {
-                            LOGGER.info("PersistenceService removed");
-                            persistenceService = null;
-                            bundleContext.ungetService(reference);
-                        }
-                    }
-            );
-            persistenceServiceTracker.open();
-        }
+    public void setKarafCellarGroupName(String karafCellarGroupName) {
+        this.karafCellarGroupName = karafCellarGroupName;
     }
 
-    /**
-     * For unit tests and backward compatibility - directly sets the 
persistence service
-     * @param persistenceService the persistence service to set
-     */
-    public void setPersistenceService(PersistenceService persistenceService) {
-        this.persistenceService = persistenceService;
-        LOGGER.info("PersistenceService set directly");
+    public void setOsgiConfigurationAdmin(ConfigurationAdmin 
osgiConfigurationAdmin) {
+        this.osgiConfigurationAdmin = osgiConfigurationAdmin;
     }
 
     public void setPublicAddress(String publicAddress) {
@@ -242,35 +101,8 @@ public class ClusterServiceImpl implements ClusterService {
         this.nodeStatisticsUpdateFrequency = nodeStatisticsUpdateFrequency;
     }
 
-    /* Wait for PR UNOMI-878 to reactivate that code
     public void setSchedulerService(SchedulerService schedulerService) {
         this.schedulerService = schedulerService;
-
-        // If we're already initialized, initialize scheduled tasks now
-        // This handles the case when ClusterService was initialized before 
SchedulerService was set
-        if (schedulerService != null && System.currentTimeMillis() > 
nodeStartTime && nodeStartTime > 0) {
-            LOGGER.info("SchedulerService was set after ClusterService 
initialization, initializing scheduled tasks now");
-            initializeScheduledTasks();
-        }
-    }
-    */
-
-    /* Wait for PR UNOMI-878 to reactivate that code
-    /**
-     * Unbind method for the scheduler service, called by the OSGi framework 
when the service is unregistered
-     * @param schedulerService The scheduler service being unregistered
-     */
-    /*
-    public void unsetSchedulerService(SchedulerService schedulerService) {
-        if (this.schedulerService == schedulerService) {
-            LOGGER.info("SchedulerService was unset");
-            this.schedulerService = null;
-        }
-    }
-    */
-
-    public void setNodeId(String nodeId) {
-        this.nodeId = nodeId;
     }
 
     public Map<String, Map<String, Serializable>> getNodeSystemStatistics() {
@@ -278,331 +110,211 @@ public class ClusterServiceImpl implements 
ClusterService {
     }
 
     public void init() {
-        // Initialize service trackers if not set directly (common in unit 
tests)
-        initializeServiceTrackers();
-
-        // Validate that nodeId is provided
-        if (StringUtils.isBlank(nodeId)) {
-            String errorMessage = "CRITICAL: nodeId is not set. This is a 
required setting for cluster operation.";
-            LOGGER.error(errorMessage);
-            throw new IllegalStateException(errorMessage);
-        }
+        if (karafCellarEventProducer != null && karafCellarClusterManager != 
null) {
+
+            boolean setupConfigOk = true;
+            group = 
karafCellarGroupManager.findGroupByName(karafCellarGroupName);
+            if (setupConfigOk && group == null) {
+                LOGGER.error("Cluster group {} doesn't exist, creating it...", 
karafCellarGroupName);
+                group = 
karafCellarGroupManager.createGroup(karafCellarGroupName);
+                if (group != null) {
+                    setupConfigOk = true;
+                } else {
+                    setupConfigOk = false;
+                }
+            }
 
-        // Wait for persistence service to be available
-        try {
-            waitForPersistenceService();
-        } catch (IllegalStateException e) {
-            LOGGER.error("Failed to initialize cluster service: {}", 
e.getMessage());
-            return;
-        }
+            // check if the producer is ON
+            if (setupConfigOk && 
karafCellarEventProducer.getSwitch().getStatus().equals(SwitchStatus.OFF)) {
+                LOGGER.error("Cluster event producer is OFF");
+                setupConfigOk = false;
+            }
 
-        nodeStartTime = System.currentTimeMillis();
+            // check if the config pid is allowed
+            if (setupConfigOk && !isClusterConfigPIDAllowed(group, 
Constants.CATEGORY, KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION, 
EventType.OUTBOUND)) {
+                LOGGER.error("Configuration PID " + 
KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION + " is blocked outbound for cluster 
group {}",
+                        karafCellarGroupName);
+                setupConfigOk = false;
+            }
+
+            if (setupConfigOk) {
+                Map<String, Properties> configurations = 
karafCellarClusterManager.getMap(Constants.CONFIGURATION_MAP + 
Configurations.SEPARATOR + karafCellarGroupName);
+                org.apache.karaf.cellar.core.Node thisKarafNode = 
karafCellarClusterManager.getNode();
+                Properties karafCellarClusterNodeConfiguration = 
configurations.get(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION);
+                if (karafCellarClusterNodeConfiguration == null) {
+                    karafCellarClusterNodeConfiguration = new Properties();
+                }
+                Map<String, String> publicEndpoints = 
getMapProperty(karafCellarClusterNodeConfiguration, 
KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, thisKarafNode.getId() + "=" + 
publicAddress);
+                publicEndpoints.put(thisKarafNode.getId(), publicAddress);
+                setMapProperty(karafCellarClusterNodeConfiguration, 
KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, publicEndpoints);
 
-        // Register this node in the persistence service
-        registerNodeInPersistence();
+                Map<String, String> internalEndpoints = 
getMapProperty(karafCellarClusterNodeConfiguration, 
KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS, thisKarafNode.getId() + "=" + 
internalAddress);
+                internalEndpoints.put(thisKarafNode.getId(), internalAddress);
+                setMapProperty(karafCellarClusterNodeConfiguration, 
KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS, internalEndpoints);
+
+                configurations.put(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION, 
karafCellarClusterNodeConfiguration);
+                ClusterConfigurationEvent clusterConfigurationEvent = new 
ClusterConfigurationEvent(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION);
+                sendEvent(clusterConfigurationEvent);
+            }
+
+            TimerTask statisticsTask = new TimerTask() {
+                @Override
+                public void run() {
+                    try {
+                        updateSystemStats();
+                    } catch (Throwable t) {
+                        LOGGER.error("Error updating system statistics", t);
+                    }
+                }
+            };
+            
schedulerService.getScheduleExecutorService().scheduleWithFixedDelay(statisticsTask,
 0, nodeStatisticsUpdateFrequency, TimeUnit.MILLISECONDS);
 
-        /* Wait for PR UNOMI-878 to reactivate that code
-        /*
-        // Only initialize scheduled tasks if scheduler service is available
-        if (schedulerService != null) {
-            initializeScheduledTasks();
-        } else {
-            LOGGER.warn("SchedulerService not available during ClusterService 
initialization. Scheduled tasks will not be registered. They will be registered 
when SchedulerService becomes available.");
         }
-        */
-        initializeScheduledTasks();
+        LOGGER.info("Cluster service initialized.");
+    }
 
-        LOGGER.info("Cluster service initialized with node ID: {}", nodeId);
+    public void destroy() {
+        LOGGER.info("Cluster service shutdown.");
     }
 
-    /**
-     * Initializes scheduled tasks for cluster management.
-     * This method can be called later if schedulerService wasn't available 
during init.
-     */
-    public void initializeScheduledTasks() {
-        /* Wait for PR UNOMI-878 to reactivate that code
-        if (schedulerService == null) {
-            LOGGER.error("Cannot initialize scheduled tasks: SchedulerService 
is not set");
-            return;
+    @Override
+    public List<ClusterNode> getClusterNodes() {
+        Map<String, ClusterNode> clusterNodes = new LinkedHashMap<String, 
ClusterNode>();
+
+        Set<org.apache.karaf.cellar.core.Node> karafCellarNodes = 
karafCellarClusterManager.listNodes();
+        org.apache.karaf.cellar.core.Node thisKarafNode = 
karafCellarClusterManager.getNode();
+        Map<String, Properties> clusterConfigurations = 
karafCellarClusterManager.getMap(Constants.CONFIGURATION_MAP + 
Configurations.SEPARATOR + karafCellarGroupName);
+        Properties karafCellarClusterNodeConfiguration = 
clusterConfigurations.get(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION);
+        Map<String, String> publicNodeEndpoints = new TreeMap<>();
+        Map<String, String> internalNodeEndpoints = new TreeMap<>();
+        if (karafCellarClusterNodeConfiguration != null) {
+            publicNodeEndpoints = 
getMapProperty(karafCellarClusterNodeConfiguration, 
KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, thisKarafNode.getId() + "=" + 
publicAddress);
+            internalNodeEndpoints = 
getMapProperty(karafCellarClusterNodeConfiguration, 
KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS, thisKarafNode.getId() + "=" + 
internalAddress);
         }
-        */
-
-        // Schedule regular updates of the node statistics
-        TimerTask statisticsTask = new TimerTask() {
-            @Override
-            public void run() {
-                try {
-                    updateSystemStats();
-                } catch (Throwable t) {
-                    LOGGER.error("Error updating system statistics", t);
-                }
+        for (org.apache.karaf.cellar.core.Node karafCellarNode : 
karafCellarNodes) {
+            ClusterNode clusterNode = new ClusterNode();
+            String publicEndpoint = 
publicNodeEndpoints.get(karafCellarNode.getId());
+            if (publicEndpoint != null) {
+                clusterNode.setPublicHostAddress(publicEndpoint);
             }
-        };
-        /* Wait for PR UNOMI-878 to reactivate that code
-        schedulerService.createRecurringTask("clusterNodeStatisticsUpdate", 
nodeStatisticsUpdateFrequency, TimeUnit.MILLISECONDS, statisticsTask, false);
-        */
-        scheduledExecutorService.scheduleAtFixedRate(statisticsTask, 100, 
nodeStatisticsUpdateFrequency, TimeUnit.MILLISECONDS);
-
-        // Schedule cleanup of stale nodes
-        TimerTask cleanupTask = new TimerTask() {
-            @Override
-            public void run() {
-                try {
-                    cleanupStaleNodes();
-                } catch (Throwable t) {
-                    LOGGER.error("Error cleaning up stale nodes", t);
+            String internalEndpoint = 
internalNodeEndpoints.get(karafCellarNode.getId());
+            if (internalEndpoint != null) {
+                clusterNode.setInternalHostAddress(internalEndpoint);
+            }
+            Map<String,Serializable> nodeStatistics = 
nodeSystemStatistics.get(karafCellarNode.getId());
+            if (nodeStatistics != null) {
+                Long uptime = (Long) nodeStatistics.get("uptime");
+                if (uptime != null) {
+                    clusterNode.setUptime(uptime);
+                }
+                Double systemCpuLoad = (Double) 
nodeStatistics.get("systemCpuLoad");
+                if (systemCpuLoad != null) {
+                    clusterNode.setCpuLoad(systemCpuLoad);
+                }
+                List<Double> loadAverage = (List<Double>) 
nodeStatistics.get("systemLoadAverage");
+                if (loadAverage != null) {
+                    Double[] loadAverageArray = loadAverage.toArray(new 
Double[loadAverage.size()]);
+                    ArrayUtils.toPrimitive(loadAverageArray);
+                    
clusterNode.setLoadAverage(ArrayUtils.toPrimitive(loadAverageArray));
                 }
             }
-        };
-        /* Wait for PR UNOMI-878 to reactivate that code
-        schedulerService.createRecurringTask("clusterStaleNodesCleanup", 
60000, TimeUnit.MILLISECONDS, cleanupTask, false);
-        */
-        scheduledExecutorService.scheduleAtFixedRate(cleanupTask, 100, 60000, 
TimeUnit.MILLISECONDS);
+            clusterNodes.put(karafCellarNode.getId(), clusterNode);
+        }
 
-        LOGGER.info("Cluster service scheduled tasks initialized");
+        return new ArrayList<ClusterNode>(clusterNodes.values());
     }
 
-    public void destroy() {
-        shutdownNow = true;
-
-        // Remove this node from the persistence service
-        PersistenceService service = getPersistenceService();
-        if (service != null) {
-            try {
-                service.remove(nodeId, ClusterNode.class);
-                LOGGER.info("Node {} removed from cluster", nodeId);
-            } catch (Exception e) {
-                LOGGER.error("Error removing node from cluster", e);
-            }
-        }
-
-        // Close service trackers
-        if (persistenceServiceTracker != null) {
-            try {
-                persistenceServiceTracker.close();
-                LOGGER.debug("Persistence service tracker closed");
-            } catch (Exception e) {
-                LOGGER.debug("Error closing persistence service tracker: {}", 
e.getMessage());
-            }
-            persistenceServiceTracker = null;
-        }
+    @Override
+    public void purge(Date date) {
+        persistenceService.purge(date);
+    }
 
-        // Clear references
-        persistenceService = null;
-        bundleWatcher = null;
+    @Override
+    public void purge(String scope) {
+        persistenceService.purge(scope);
+    }
 
-        LOGGER.info("Cluster service shutdown.");
+    @Override
+    public void sendEvent(Serializable eventObject) {
+        Event event = (Event) eventObject;
+        event.setSourceGroup(group);
+        event.setSourceNode(karafCellarClusterManager.getNode());
+        karafCellarEventProducer.produce(event);
     }
 
     /**
-     * Register this node in the persistence service
+     * Check if a configuration is allowed.
+     *
+     * @param group    the cluster group.
+     * @param category the configuration category constant.
+     * @param pid      the configuration PID.
+     * @param type     the cluster event type.
+     * @return true if the cluster event type is allowed, false else.
      */
-    private void registerNodeInPersistence() {
-        PersistenceService service = getPersistenceService();
-        if (service == null) {
-            LOGGER.error("Cannot register node: PersistenceService not 
available");
-            return;
-        }
+    public boolean isClusterConfigPIDAllowed(Group group, String category, 
String pid, EventType type) {
+        CellarSupport support = new CellarSupport();
+        support.setClusterManager(this.karafCellarClusterManager);
+        support.setGroupManager(this.karafCellarGroupManager);
+        support.setConfigurationAdmin(this.osgiConfigurationAdmin);
+        return support.isAllowed(group, category, pid, type);
+    }
 
-        ClusterNode clusterNode = new ClusterNode();
-        clusterNode.setItemId(nodeId);
-        clusterNode.setPublicHostAddress(publicAddress);
-        clusterNode.setInternalHostAddress(internalAddress);
-        clusterNode.setStartTime(nodeStartTime);
-        clusterNode.setLastHeartbeat(System.currentTimeMillis());
-
-        // Set server information if BundleWatcher is available
-        if (bundleWatcher != null && 
!bundleWatcher.getServerInfos().isEmpty()) {
-            ServerInfo serverInfo = bundleWatcher.getServerInfos().get(0);
-            clusterNode.setServerInfo(serverInfo);
-            LOGGER.info("Added server info to node: version={}, build={}",
-                    serverInfo.getServerVersion(), 
serverInfo.getServerBuildNumber());
-        } else {
-            LOGGER.warn("BundleWatcher not available at registration time, 
server info will not be available");
-        }
+    private Map<String, String> getMapProperty(Properties properties, String 
propertyName, String defaultValue) {
+        String propertyValue = properties.getProperty(propertyName, 
defaultValue);
+        return getMapProperty(propertyValue);
+    }
 
-        updateSystemStatsForNode(clusterNode);
+    private Map<String, String> getMapProperty(String propertyValue) {
+        String[] propertyValueArray = propertyValue.split(",");
+        Map<String, String> propertyMapValue = new LinkedHashMap<>();
+        for (String propertyValueElement : propertyValueArray) {
+            String[] propertyValueElementPrats = 
propertyValueElement.split("=");
+            propertyMapValue.put(propertyValueElementPrats[0], 
propertyValueElementPrats[1]);
+        }
+        return propertyMapValue;
+    }
 
-        boolean success = service.save(clusterNode);
-        if (success) {
-            LOGGER.info("Node {} registered in cluster", nodeId);
-        } else {
-            LOGGER.error("Failed to register node {} in cluster", nodeId);
+    private Map<String, String> setMapProperty(Properties properties, String 
propertyName, Map<String, String> propertyMapValue) {
+        StringBuilder propertyValueBuilder = new StringBuilder();
+        int entryCount = 0;
+        for (Map.Entry<String, String> propertyMapValueEntry : 
propertyMapValue.entrySet()) {
+            propertyValueBuilder.append(propertyMapValueEntry.getKey());
+            propertyValueBuilder.append("=");
+            propertyValueBuilder.append(propertyMapValueEntry.getValue());
+            if (entryCount < propertyMapValue.size() - 1) {
+                propertyValueBuilder.append(",");
+            }
         }
+        String oldPropertyValue = (String) 
properties.setProperty(propertyName, propertyValueBuilder.toString());
+        if (oldPropertyValue == null) {
+            return null;
+        }
+        return getMapProperty(oldPropertyValue);
     }
 
-    /**
-     * Updates system stats for the given cluster node
-     */
-    private void updateSystemStatsForNode(ClusterNode node) {
+    private void updateSystemStats() {
         final RuntimeMXBean remoteRuntime = 
ManagementFactory.getRuntimeMXBean();
         long uptime = remoteRuntime.getUptime();
-
-        double systemCpuLoad = 0.0;
+        ObjectName operatingSystemMXBeanName = 
ManagementFactory.getOperatingSystemMXBean().getObjectName();
+        Double systemCpuLoad = null;
         try {
-            systemCpuLoad = ((com.sun.management.OperatingSystemMXBean) 
ManagementFactory.getOperatingSystemMXBean()).getSystemCpuLoad();
-            // Check for NaN value which Elasticsearch and OpenSearch don't 
support for float fields
-            if (Double.isNaN(systemCpuLoad)) {
-                LOGGER.debug("System CPU load is NaN, setting to 0.0");
-                systemCpuLoad = 0.0;
-            }
-        } catch (Exception e) {
-            LOGGER.debug("Error retrieving system CPU load", e);
+            systemCpuLoad = (Double) 
ManagementFactory.getPlatformMBeanServer().getAttribute(operatingSystemMXBeanName,
 "SystemCpuLoad");
+        } catch (MBeanException | AttributeNotFoundException | 
InstanceNotFoundException | ReflectionException e) {
+            LOGGER.error("Error retrieving system CPU load", e);
         }
-
         final OperatingSystemMXBean operatingSystemMXBean = 
ManagementFactory.getOperatingSystemMXBean();
         double systemLoadAverage = 
operatingSystemMXBean.getSystemLoadAverage();
-        // Check for NaN value which Elasticsearch/OpenSearch doesn't support 
for float fields
-        if (Double.isNaN(systemLoadAverage)) {
-            LOGGER.debug("System load average is NaN, setting to 0.0");
-            systemLoadAverage = 0.0;
-        }
-
-        node.setCpuLoad(systemCpuLoad);
-        node.setUptime(uptime);
 
+        ClusterSystemStatisticsEvent clusterSystemStatisticsEvent = new 
ClusterSystemStatisticsEvent("org.apache.unomi.cluster.system.statistics");
+        Map<String,Serializable> systemStatistics = new TreeMap<>();
         ArrayList<Double> systemLoadAverageArray = new ArrayList<>();
         systemLoadAverageArray.add(systemLoadAverage);
-        
node.setLoadAverage(ArrayUtils.toPrimitive(systemLoadAverageArray.toArray(new 
Double[0])));
-
-        // Store system statistics in memory as well
-        Map<String, Serializable> systemStatistics = new TreeMap<>();
         systemStatistics.put("systemLoadAverage", systemLoadAverageArray);
         systemStatistics.put("systemCpuLoad", systemCpuLoad);
         systemStatistics.put("uptime", uptime);
-        nodeSystemStatistics.put(nodeId, systemStatistics);
-    }
-
-    /**
-     * Updates the system statistics for this node and stores them in the 
persistence service
-     */
-    private void updateSystemStats() {
-        if (shutdownNow) {
-            return;
-        }
-
-        PersistenceService service = getPersistenceService();
-        if (service == null) {
-            LOGGER.warn("Cannot update system stats: PersistenceService not 
available");
-            return;
-        }
-
-        // Load node from persistence
-        ClusterNode node = service.load(nodeId, ClusterNode.class);
-        if (node == null) {
-            LOGGER.warn("Node {} not found in persistence, re-registering", 
nodeId);
-            registerNodeInPersistence();
-            return;
-        }
-
-        try {
-            // Update its stats
-            updateSystemStatsForNode(node);
-
-            // Update server info if needed
-            if (bundleWatcher != null && 
!bundleWatcher.getServerInfos().isEmpty()) {
-                ServerInfo currentInfo = bundleWatcher.getServerInfos().get(0);
-                // Check if server info needs updating
-                if (node.getServerInfo() == null ||
-                        
!currentInfo.getServerVersion().equals(node.getServerInfo().getServerVersion()))
 {
-
-                    node.setServerInfo(currentInfo);
-                    LOGGER.info("Updated server info for node {}: version={}, 
build={}",
-                            nodeId, currentInfo.getServerVersion(), 
currentInfo.getServerBuildNumber());
-                }
-            }
-
-            node.setLastHeartbeat(System.currentTimeMillis());
-
-            // Save back to persistence
-            boolean success = service.save(node);
-            if (!success) {
-                LOGGER.error("Failed to update node {} statistics", nodeId);
-            }
-        } catch (Exception e) {
-            LOGGER.error("Error updating system statistics for node {}: {}", 
nodeId, e.getMessage(), e);
-        }
-    }
-
-    /**
-     * Removes stale nodes from the cluster
-     */
-    private void cleanupStaleNodes() {
-        if (shutdownNow) {
-            return;
-        }
-
-        PersistenceService service = getPersistenceService();
-        if (service == null) {
-            LOGGER.warn("Cannot cleanup stale nodes: PersistenceService not 
available");
-            return;
-        }
-
-        long cutoffTime = System.currentTimeMillis() - 
(nodeStatisticsUpdateFrequency * 3); // Node is stale if no heartbeat for 3x 
the update frequency
-
-        Condition staleNodesCondition = new Condition();
-        ConditionType propertyConditionType = new ConditionType();
-        propertyConditionType.setItemId("propertyCondition");
-        propertyConditionType.setItemType(ConditionType.ITEM_TYPE);
-        
propertyConditionType.setConditionEvaluator("propertyConditionEvaluator");
-        
propertyConditionType.setQueryBuilder("propertyConditionESQueryBuilder");
-        staleNodesCondition.setConditionType(propertyConditionType);
-        staleNodesCondition.setConditionTypeId("propertyCondition");
-        staleNodesCondition.setParameter("propertyName", "lastHeartbeat");
-        staleNodesCondition.setParameter("comparisonOperator", "lessThan");
-        staleNodesCondition.setParameter("propertyValueInteger", cutoffTime);
-
-        PartialList<ClusterNode> staleNodes = 
service.query(staleNodesCondition, null, ClusterNode.class, 0, -1);
-
-        for (ClusterNode staleNode : staleNodes.getList()) {
-            LOGGER.info("Removing stale node: {}", staleNode.getItemId());
-            service.remove(staleNode.getItemId(), ClusterNode.class);
-            nodeSystemStatistics.remove(staleNode.getItemId());
-        }
-    }
-
-    @Override
-    public List<ClusterNode> getClusterNodes() {
-        PersistenceService service = getPersistenceService();
-        if (service == null) {
-            LOGGER.warn("Cannot get cluster nodes: PersistenceService not 
available");
-            return Collections.emptyList();
-        }
-
-        // Query all nodes from the persistence service
-        return service.getAllItems(ClusterNode.class, 0, -1, null).getList();
+        clusterSystemStatisticsEvent.setStatistics(systemStatistics);
+        nodeSystemStatistics.put(karafCellarClusterManager.getNode().getId(), 
systemStatistics);
+        sendEvent(clusterSystemStatisticsEvent);
     }
 
-    @Override
-    public void purge(Date date) {
-        PersistenceService service = getPersistenceService();
-        if (service == null) {
-            LOGGER.warn("Cannot purge by date: PersistenceService not 
available");
-            return;
-        }
-
-        service.purge(date);
-    }
-
-    @Override
-    public void purge(String scope) {
-        PersistenceService service = getPersistenceService();
-        if (service == null) {
-            LOGGER.warn("Cannot purge by scope: PersistenceService not 
available");
-            return;
-        }
-
-        service.purge(scope);
-    }
-
-    /**
-     * Check if a persistence service is available.
-     * This can be used to quickly check before performing operations.
-     *
-     * @return true if a persistence service is available (either directly set 
or via tracker)
-     */
-    public boolean isPersistenceServiceAvailable() {
-        return getPersistenceService() != null;
-    }
 }
-
diff --git a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml 
b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 994c5598d..f49f3c849 100644
--- a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -53,7 +53,7 @@
     <cm:property-placeholder persistent-id="org.apache.unomi.cluster"
                              update-strategy="reload" 
placeholder-prefix="${cluster.">
         <cm:default-properties>
-            <cm:property name="nodeId" value="unomi-node-1"/>
+            <cm:property name="group" value="default"/>
             <cm:property name="contextserver.publicAddress" 
value="https://localhost:9443"/>
             <cm:property name="contextserver.internalAddress" 
value="http://127.0.0.1:8181"/>
             <cm:property name="nodeStatisticsUpdateFrequency" value="10000"/>
@@ -262,11 +262,13 @@
         <property name="publicAddress" 
value="${cluster.contextserver.publicAddress}"/>
         <property name="internalAddress" 
value="${cluster.contextserver.internalAddress}"/>
         <property name="persistenceService" ref="persistenceService"/>
-        <property name="nodeId" value="${cluster.nodeId}"/>
+        <property name="karafCellarClusterManager" 
ref="karafCellarClusterManager"/>
+        <property name="karafCellarEventProducer" 
ref="karafCellarEventProducer"/>
+        <property name="karafCellarGroupManager" 
ref="karafCellarGroupManager"/>
+        <property name="karafCellarGroupName" value="${cluster.group}"/>
+        <property name="osgiConfigurationAdmin" ref="osgiConfigurationAdmin"/>
         <property name="nodeStatisticsUpdateFrequency" 
value="${cluster.nodeStatisticsUpdateFrequency}"/>
-        <!-- Wait for UNOMI-878 to be available to activate that
         <property name="schedulerService" ref="schedulerServiceImpl"/>
-        -->
     </bean>
     <service id="clusterService" ref="clusterServiceImpl" 
interface="org.apache.unomi.api.services.ClusterService"/>
 
diff --git a/services/src/main/resources/org.apache.unomi.cluster.cfg 
b/services/src/main/resources/org.apache.unomi.cluster.cfg
index eecb7e1de..bfbb189d9 100644
--- a/services/src/main/resources/org.apache.unomi.cluster.cfg
+++ b/services/src/main/resources/org.apache.unomi.cluster.cfg
@@ -14,18 +14,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+group=${org.apache.unomi.cluster.group:-default}
 # To simplify testing we set the public address to use HTTP, but for 
production environments it is highly recommended
 # to switch to using HTTPS with a proper SSL certificate installed.
 
contextserver.publicAddress=${org.apache.unomi.cluster.public.address:-http://localhost:8181}
 
contextserver.internalAddress=${org.apache.unomi.cluster.internal.address:-https://localhost:9443}
 #
-# The nodeId is a required setting that uniquely identifies this node in the 
cluster.
-# It must be set to a unique value for each node in the cluster.
-# Example: nodeId=node1
-nodeId=${org.apache.unomi.cluster.nodeId:-unomi-node-1}
-#
-## The nodeStatisticsUpdateFrequency controls the frequency of the update of 
system statistics such as CPU load,
+# The nodeStatisticsUpdateFrequency controls the frequency of the update of 
system statistics such as CPU load,
 # system load average and uptime. This value is set in milliseconds and is set 
to 10 seconds by default. Each node
 # will retrieve the local values and broadcast them through a cluster event to 
all the other nodes to update
 # the global cluster statistics.
-nodeStatisticsUpdateFrequency=${org.apache.unomi.cluster.nodeStatisticsUpdateFrequency:-10000}
+nodeStatisticsUpdateFrequency=${org.apache.unomi.cluster.nodeStatisticsUpdateFrequency:-10000}
\ No newline at end of file

Reply via email to