This is an automated email from the ASF dual-hosted git repository.

shuber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f995e47e UNOMI-877: Replace Karaf Cellar and Hazelcast with 
PersistenceService for cluster synchronization (#723)
3f995e47e is described below

commit 3f995e47ea8fbd33ca226e04f9f5cd0dfb719fff
Author: Jérôme Blanchard <[email protected]>
AuthorDate: Fri Aug 22 13:33:20 2025 +0200

    UNOMI-877: Replace Karaf Cellar and Hazelcast with PersistenceService for 
cluster synchronization (#723)
    
    * UNOMI-877: Replace Karaf Cellar and Hazelcast with PersistenceService for 
cluster synchronization (code isolated from branch unomi-3-dev made by Serge 
Hubert)
    
    * UNOMI-877: Not sending event to cluster anymore.
    
    ---------
    
    Co-authored-by: Serge Huber <[email protected]>
---
 .../java/org/apache/unomi/api/ClusterNode.java     |  65 +-
 .../apache/unomi/api/services/ClusterService.java  |   7 -
 .../router/core/context/RouterCamelContext.java    |  17 -
 .../main/resources/etc/custom.system.properties    |   5 +-
 services/pom.xml                                   |   7 +
 .../services/impl/cluster/ClusterServiceImpl.java  | 690 +++++++++++++++------
 .../resources/OSGI-INF/blueprint/blueprint.xml     |  10 +-
 .../main/resources/org.apache.unomi.cluster.cfg    |  10 +-
 8 files changed, 575 insertions(+), 236 deletions(-)

diff --git a/api/src/main/java/org/apache/unomi/api/ClusterNode.java 
b/api/src/main/java/org/apache/unomi/api/ClusterNode.java
index 6c40ca21b..e096490b9 100644
--- a/api/src/main/java/org/apache/unomi/api/ClusterNode.java
+++ b/api/src/main/java/org/apache/unomi/api/ClusterNode.java
@@ -22,10 +22,12 @@ import java.io.Serializable;
 /**
  * Information about a cluster node.
  */
-public class ClusterNode implements Serializable {
+public class ClusterNode extends Item {
 
     private static final long serialVersionUID = 1281422346318230514L;
 
+    public static final String ITEM_TYPE = "clusterNode";
+
     private double cpuLoad;
     private double[] loadAverage;
     private String publicHostAddress;
@@ -33,11 +35,18 @@ public class ClusterNode implements Serializable {
     private long uptime;
     private boolean master;
     private boolean data;
+    private long startTime;
+    private long lastHeartbeat;
+
+    // Server information
+    private ServerInfo serverInfo;
 
     /**
      * Instantiates a new Cluster node.
      */
     public ClusterNode() {
+        super();
+        setItemType(ITEM_TYPE);
     }
 
     /**
@@ -165,4 +174,58 @@ public class ClusterNode implements Serializable {
     public void setData(boolean data) {
         this.data = data;
     }
+
+    /**
+     * Retrieves the node start time in milliseconds.
+     *
+     * @return the start time
+     */
+    public long getStartTime() {
+        return startTime;
+    }
+
+    /**
+     * Sets the node start time in milliseconds.
+     *
+     * @param startTime the start time
+     */
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+    }
+
+    /**
+     * Retrieves the last heartbeat time in milliseconds.
+     *
+     * @return the last heartbeat time
+     */
+    public long getLastHeartbeat() {
+        return lastHeartbeat;
+    }
+
+    /**
+     * Sets the last heartbeat time in milliseconds.
+     *
+     * @param lastHeartbeat the last heartbeat time
+     */
+    public void setLastHeartbeat(long lastHeartbeat) {
+        this.lastHeartbeat = lastHeartbeat;
+    }
+
+    /**
+     * Gets the server information.
+     *
+     * @return the server information
+     */
+    public ServerInfo getServerInfo() {
+        return serverInfo;
+    }
+
+    /**
+     * Sets the server information.
+     *
+     * @param serverInfo the server information
+     */
+    public void setServerInfo(ServerInfo serverInfo) {
+        this.serverInfo = serverInfo;
+    }
 }
diff --git 
a/api/src/main/java/org/apache/unomi/api/services/ClusterService.java 
b/api/src/main/java/org/apache/unomi/api/services/ClusterService.java
index 299ac9098..ad8777503 100644
--- a/api/src/main/java/org/apache/unomi/api/services/ClusterService.java
+++ b/api/src/main/java/org/apache/unomi/api/services/ClusterService.java
@@ -51,11 +51,4 @@ public interface ClusterService {
      */
     void purge(final String scope);
 
-    /**
-     * This function will send an event to the nodes of the cluster
-     * The function takes a Serializable to avoid dependency on any clustering 
framework
-     *
-     * @param event this object will be cast to a 
org.apache.karaf.cellar.core.event.Event object
-     */
-    void sendEvent(Serializable event);
 }
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
index 67219f9c5..c7e953132 100644
--- 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
@@ -31,7 +31,6 @@ import org.apache.unomi.router.api.ImportConfiguration;
 import org.apache.unomi.router.api.RouterConstants;
 import org.apache.unomi.router.api.services.ImportExportConfigurationService;
 import org.apache.unomi.router.api.services.ProfileExportService;
-import org.apache.unomi.router.core.event.UpdateCamelRouteEvent;
 import org.apache.unomi.router.core.processor.ExportRouteCompletionProcessor;
 import org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor;
 import org.apache.unomi.router.core.processor.ImportRouteCompletionProcessor;
@@ -240,12 +239,6 @@ public class RouterCamelContext implements 
IRouterCamelContext {
                 camelContext.removeRouteDefinition(routeDefinition);
             }
         }
-
-        if (fireEvent) {
-            UpdateCamelRouteEvent event = new 
UpdateCamelRouteEvent(EVENT_ID_REMOVE);
-            event.setRouteId(routeId);
-            clusterService.sendEvent(event);
-        }
     }
 
     public void updateProfileImportReaderRoute(String configId, boolean 
fireEvent) throws Exception {
@@ -266,11 +259,6 @@ public class RouterCamelContext implements 
IRouterCamelContext {
             builder.setJacksonDataFormat(jacksonDataFormat);
             builder.setContext(camelContext);
             camelContext.addRoutes(builder);
-
-            if (fireEvent) {
-                UpdateCamelRouteEvent event = new 
UpdateCamelRouteEvent(EVENT_ID_IMPORT);
-                clusterService.sendEvent(event);
-            }
         }
     }
 
@@ -291,11 +279,6 @@ public class RouterCamelContext implements 
IRouterCamelContext {
             
profileExportCollectRouteBuilder.setJacksonDataFormat(jacksonDataFormat);
             profileExportCollectRouteBuilder.setContext(camelContext);
             camelContext.addRoutes(profileExportCollectRouteBuilder);
-
-            if (fireEvent) {
-                UpdateCamelRouteEvent event = new 
UpdateCamelRouteEvent(EVENT_ID_EXPORT);
-                clusterService.sendEvent(event);
-            }
         }
     }
 
diff --git a/package/src/main/resources/etc/custom.system.properties 
b/package/src/main/resources/etc/custom.system.properties
index 5e97437c3..46a262816 100644
--- a/package/src/main/resources/etc/custom.system.properties
+++ b/package/src/main/resources/etc/custom.system.properties
@@ -81,11 +81,14 @@ 
org.apache.unomi.admin.servlet.context=${env:UNOMI_ADMIN_CONTEXT:-/cxs}
 
#######################################################################################################################
 ## Cluster Settings                                                            
                                      ##
 
#######################################################################################################################
-org.apache.unomi.cluster.group=${env:UNOMI_CLUSTER_GROUP:-default}
 # To simplify testing we set the public address to use HTTP, but for 
production environments it is highly recommended
 # to switch to using HTTPS with a proper SSL certificate installed.
 
org.apache.unomi.cluster.public.address=${env:UNOMI_CLUSTER_PUBLIC_ADDRESS:-http://localhost:8181}
 
org.apache.unomi.cluster.internal.address=${env:UNOMI_CLUSTER_INTERNAL_ADDRESS:-https://localhost:9443}
+# The nodeId is a required setting that uniquely identifies this node in the 
cluster.
+# It must be set to a unique value for each node in the cluster.
+# Example: nodeId=node1
+org.apache.unomi.cluster.nodeId=${env:UNOMI_CLUSTER_NODEID:-unomi-node-1}
 # The nodeStatisticsUpdateFrequency controls the frequency of the update of 
system statistics such as CPU load,
 # system load average and uptime. This value is set in milliseconds and is set 
to 10 seconds by default. Each node
 # will retrieve the local values and broadcast them through a cluster event to 
all the other nodes to update
diff --git a/services/pom.xml b/services/pom.xml
index c4cf7e4da..9a0862db8 100644
--- a/services/pom.xml
+++ b/services/pom.xml
@@ -56,6 +56,12 @@
             <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.unomi</groupId>
+            <artifactId>unomi-lifecycle-watcher</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
 
         <dependency>
             <groupId>javax.servlet</groupId>
@@ -175,6 +181,7 @@
                         
<Embed-Dependency>*;scope=compile|runtime</Embed-Dependency>
                         <Import-Package>
                             sun.misc;resolution:=optional,
+                            com.sun.management;resolution:=optional,
                             *
                         </Import-Package>
                     </instructions>
diff --git 
a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
 
b/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
index ec4cfe523..b4bc2c421 100644
--- 
a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
+++ 
b/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
@@ -18,28 +18,30 @@
 package org.apache.unomi.services.impl.cluster;
 
 import org.apache.commons.lang3.ArrayUtils;
-import org.apache.karaf.cellar.config.ClusterConfigurationEvent;
-import org.apache.karaf.cellar.config.Constants;
-import org.apache.karaf.cellar.core.*;
-import org.apache.karaf.cellar.core.control.SwitchStatus;
-import org.apache.karaf.cellar.core.event.Event;
-import org.apache.karaf.cellar.core.event.EventProducer;
-import org.apache.karaf.cellar.core.event.EventType;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.unomi.api.ClusterNode;
+import org.apache.unomi.api.PartialList;
+import org.apache.unomi.api.ServerInfo;
+import org.apache.unomi.api.conditions.Condition;
+import org.apache.unomi.api.conditions.ConditionType;
 import org.apache.unomi.api.services.ClusterService;
-import org.apache.unomi.api.services.SchedulerService;
+import org.apache.unomi.lifecycle.BundleWatcher;
 import org.apache.unomi.persistence.spi.PersistenceService;
-import org.osgi.service.cm.ConfigurationAdmin;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.util.tracker.ServiceTrackerCustomizer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.management.*;
 import java.io.Serializable;
 import java.lang.management.ManagementFactory;
 import java.lang.management.OperatingSystemMXBean;
 import java.lang.management.RuntimeMXBean;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -47,46 +49,185 @@ import java.util.concurrent.TimeUnit;
  */
 public class ClusterServiceImpl implements ClusterService {
 
-    public static final String KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION = 
"org.apache.unomi.nodes";
-    public static final String KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS = 
"publicEndpoints";
-    public static final String KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS 
= "internalEndpoints";
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterServiceImpl.class.getName());
-    PersistenceService persistenceService;
-    private ClusterManager karafCellarClusterManager;
-    private EventProducer karafCellarEventProducer;
-    private GroupManager karafCellarGroupManager;
-    private String karafCellarGroupName = Configurations.DEFAULT_GROUP_NAME;
-    private ConfigurationAdmin osgiConfigurationAdmin;
+
+    /**
+     * We use ServiceTracker instead of Blueprint dependency injection due to 
a known bug in Apache Aries Blueprint
+     * where service dependencies are not properly shut down in reverse order 
of their initialization.
+     *
+     * The bug manifests in two ways:
+     * 1. Services are not shut down in reverse order of their initialization, 
causing potential deadlocks
+     * 2. The PersistenceService is often shut down before other services that 
depend on it, leading to timeout waits
+     *
+     * By using ServiceTracker, we have explicit control over:
+     * - Service lifecycle management
+     * - Shutdown order
+     * - Service availability checks
+     * - Graceful degradation when services become unavailable
+     */
+    private ServiceTracker<PersistenceService, PersistenceService> 
persistenceServiceTracker;
+
+    // Keep direct reference for backward compatibility and unit tests
+    private PersistenceService persistenceService;
+
     private String publicAddress;
     private String internalAddress;
-    private Map<String, Map<String,Serializable>> nodeSystemStatistics = new 
ConcurrentHashMap<>();
-    private Group group = null;
-    private SchedulerService schedulerService;
-
+    //private SchedulerService schedulerService; /* Wait for PR UNOMI-878 to 
reactivate that code
+    private ScheduledExecutorService scheduledExecutorService = 
Executors.newScheduledThreadPool(3);
+    private String nodeId;
+    private long nodeStartTime;
     private long nodeStatisticsUpdateFrequency = 10000;
+    private Map<String, Map<String, Serializable>> nodeSystemStatistics = new 
ConcurrentHashMap<>();
+    private BundleContext bundleContext;
+    private volatile boolean shutdownNow = false;
 
-    public void setPersistenceService(PersistenceService persistenceService) {
-        this.persistenceService = persistenceService;
+    private BundleWatcher bundleWatcher;
+
+    /**
+     * Max time to wait for persistence service (in milliseconds)
+     */
+    private static final long MAX_WAIT_TIME = 60000; // 60 seconds
+
+    /**
+     * Sets the bundle context, which is needed to create service trackers
+     * @param bundleContext the OSGi bundle context
+     */
+    public void setBundleContext(BundleContext bundleContext) {
+        this.bundleContext = bundleContext;
     }
 
-    public void setKarafCellarClusterManager(ClusterManager 
karafCellarClusterManager) {
-        this.karafCellarClusterManager = karafCellarClusterManager;
+    /**
+     * Sets the bundle watcher used to retrieve server information
+     *
+     * @param bundleWatcher the bundle watcher
+     */
+    public void setBundleWatcher(BundleWatcher bundleWatcher) {
+        this.bundleWatcher = bundleWatcher;
+        LOGGER.info("BundleWatcher service set");
     }
 
-    public void setKarafCellarEventProducer(EventProducer 
karafCellarEventProducer) {
-        this.karafCellarEventProducer = karafCellarEventProducer;
+    /**
+     * Waits for the persistence service to become available.
+     * This method will retry getting the persistence service with exponential 
backoff
+     * until it's available or until the maximum wait time is reached.
+     *
+     * @throws IllegalStateException if the persistence service is not 
available after the maximum wait time
+     */
+    private void waitForPersistenceService() {
+        if (shutdownNow) {
+            return;
+        }
+
+        // If persistence service is directly set (e.g., in unit tests), no 
need to wait
+        if (persistenceService != null) {
+            LOGGER.debug("Persistence service is already available, no need to 
wait");
+            return;
+        }
+
+        // If no bundle context, we can't get the service via tracker
+        if (bundleContext == null) {
+            LOGGER.error("No BundleContext available, cannot wait for 
persistence service");
+            throw new IllegalStateException("No BundleContext available to get 
persistence service");
+        }
+
+        // Initialize service tracker if needed
+        if (persistenceServiceTracker == null) {
+            initializeServiceTrackers();
+        }
+
+        // Try to get the service with retries
+        long startTime = System.currentTimeMillis();
+        long waitTime = 50; // Start with 50ms wait time
+
+        while (System.currentTimeMillis() - startTime < MAX_WAIT_TIME) {
+            PersistenceService service = getPersistenceService();
+            if (service != null) {
+                LOGGER.info("Persistence service is now available");
+                return;
+            }
+
+            try {
+                LOGGER.debug("Waiting for persistence service... ({}ms 
elapsed)", System.currentTimeMillis() - startTime);
+                Thread.sleep(waitTime);
+                // Exponential backoff with a maximum of 5 seconds
+                waitTime = Math.min(waitTime * 2, 5000);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                LOGGER.error("Interrupted while waiting for persistence 
service", e);
+                break;
+            }
+        }
+
+        throw new IllegalStateException("PersistenceService not available 
after waiting " + MAX_WAIT_TIME + "ms");
     }
 
-    public void setKarafCellarGroupManager(GroupManager 
karafCellarGroupManager) {
-        this.karafCellarGroupManager = karafCellarGroupManager;
+    /**
+     * Safely gets the persistence service, either from the direct reference 
(for tests)
+     * or from the service tracker (for OSGi runtime)
+     * @return the persistence service or null if not available
+     */
+    private PersistenceService getPersistenceService() {
+        if (shutdownNow) return null;
+
+        // For unit tests or if already directly set
+        if (persistenceService != null) {
+            return persistenceService;
+        }
+
+        // Otherwise try to get from service tracker
+        return persistenceServiceTracker != null ? 
persistenceServiceTracker.getService() : null;
     }
 
-    public void setKarafCellarGroupName(String karafCellarGroupName) {
-        this.karafCellarGroupName = karafCellarGroupName;
+    /**
+     * Initialize service tracker for PersistenceService
+     */
+    private void initializeServiceTrackers() {
+        if (bundleContext == null) {
+            LOGGER.warn("BundleContext is null, cannot initialize service 
trackers");
+            return;
+        }
+
+        // Only create service tracker if direct reference isn't set
+        if (persistenceService == null) {
+            LOGGER.info("Initializing PersistenceService tracker");
+            persistenceServiceTracker = new ServiceTracker<>(
+                    bundleContext,
+                    PersistenceService.class,
+                    new ServiceTrackerCustomizer<PersistenceService, 
PersistenceService>() {
+                        @Override
+                        public PersistenceService 
addingService(ServiceReference<PersistenceService> reference) {
+                            PersistenceService service = 
bundleContext.getService(reference);
+                            if (service != null) {
+                                persistenceService = service;
+                                LOGGER.info("PersistenceService acquired 
through tracker");
+                            }
+                            return service;
+                        }
+
+                        @Override
+                        public void 
modifiedService(ServiceReference<PersistenceService> reference, 
PersistenceService service) {
+                            // No action needed
+                        }
+
+                        @Override
+                        public void 
removedService(ServiceReference<PersistenceService> reference, 
PersistenceService service) {
+                            LOGGER.info("PersistenceService removed");
+                            persistenceService = null;
+                            bundleContext.ungetService(reference);
+                        }
+                    }
+            );
+            persistenceServiceTracker.open();
+        }
     }
 
-    public void setOsgiConfigurationAdmin(ConfigurationAdmin 
osgiConfigurationAdmin) {
-        this.osgiConfigurationAdmin = osgiConfigurationAdmin;
+    /**
+     * For unit tests and backward compatibility - directly sets the 
persistence service
+     * @param persistenceService the persistence service to set
+     */
+    public void setPersistenceService(PersistenceService persistenceService) {
+        this.persistenceService = persistenceService;
+        LOGGER.info("PersistenceService set directly");
     }
 
     public void setPublicAddress(String publicAddress) {
@@ -101,8 +242,35 @@ public class ClusterServiceImpl implements ClusterService {
         this.nodeStatisticsUpdateFrequency = nodeStatisticsUpdateFrequency;
     }
 
+    /* Wait for PR UNOMI-878 to reactivate that code
     public void setSchedulerService(SchedulerService schedulerService) {
         this.schedulerService = schedulerService;
+
+        // If we're already initialized, initialize scheduled tasks now
+        // This handles the case when ClusterService was initialized before 
SchedulerService was set
+        if (schedulerService != null && System.currentTimeMillis() > 
nodeStartTime && nodeStartTime > 0) {
+            LOGGER.info("SchedulerService was set after ClusterService 
initialization, initializing scheduled tasks now");
+            initializeScheduledTasks();
+        }
+    }
+    */
+
+    /* Wait for PR UNOMI-878 to reactivate that code
+    /**
+     * Unbind method for the scheduler service, called by the OSGi framework 
when the service is unregistered
+     * @param schedulerService The scheduler service being unregistered
+     */
+    /*
+    public void unsetSchedulerService(SchedulerService schedulerService) {
+        if (this.schedulerService == schedulerService) {
+            LOGGER.info("SchedulerService was unset");
+            this.schedulerService = null;
+        }
+    }
+    */
+
+    public void setNodeId(String nodeId) {
+        this.nodeId = nodeId;
     }
 
     public Map<String, Map<String, Serializable>> getNodeSystemStatistics() {
@@ -110,211 +278,331 @@ public class ClusterServiceImpl implements 
ClusterService {
     }
 
     public void init() {
-        if (karafCellarEventProducer != null && karafCellarClusterManager != 
null) {
-
-            boolean setupConfigOk = true;
-            group = 
karafCellarGroupManager.findGroupByName(karafCellarGroupName);
-            if (setupConfigOk && group == null) {
-                LOGGER.error("Cluster group {} doesn't exist, creating it...", 
karafCellarGroupName);
-                group = 
karafCellarGroupManager.createGroup(karafCellarGroupName);
-                if (group != null) {
-                    setupConfigOk = true;
-                } else {
-                    setupConfigOk = false;
-                }
-            }
-
-            // check if the producer is ON
-            if (setupConfigOk && 
karafCellarEventProducer.getSwitch().getStatus().equals(SwitchStatus.OFF)) {
-                LOGGER.error("Cluster event producer is OFF");
-                setupConfigOk = false;
-            }
-
-            // check if the config pid is allowed
-            if (setupConfigOk && !isClusterConfigPIDAllowed(group, 
Constants.CATEGORY, KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION, 
EventType.OUTBOUND)) {
-                LOGGER.error("Configuration PID " + 
KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION + " is blocked outbound for cluster 
group {}",
-                        karafCellarGroupName);
-                setupConfigOk = false;
-            }
-
-            if (setupConfigOk) {
-                Map<String, Properties> configurations = 
karafCellarClusterManager.getMap(Constants.CONFIGURATION_MAP + 
Configurations.SEPARATOR + karafCellarGroupName);
-                org.apache.karaf.cellar.core.Node thisKarafNode = 
karafCellarClusterManager.getNode();
-                Properties karafCellarClusterNodeConfiguration = 
configurations.get(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION);
-                if (karafCellarClusterNodeConfiguration == null) {
-                    karafCellarClusterNodeConfiguration = new Properties();
-                }
-                Map<String, String> publicEndpoints = 
getMapProperty(karafCellarClusterNodeConfiguration, 
KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, thisKarafNode.getId() + "=" + 
publicAddress);
-                publicEndpoints.put(thisKarafNode.getId(), publicAddress);
-                setMapProperty(karafCellarClusterNodeConfiguration, 
KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, publicEndpoints);
+        // Initialize service trackers if not set directly (common in unit 
tests)
+        initializeServiceTrackers();
+
+        // Validate that nodeId is provided
+        if (StringUtils.isBlank(nodeId)) {
+            String errorMessage = "CRITICAL: nodeId is not set. This is a 
required setting for cluster operation.";
+            LOGGER.error(errorMessage);
+            throw new IllegalStateException(errorMessage);
+        }
 
-                Map<String, String> internalEndpoints = 
getMapProperty(karafCellarClusterNodeConfiguration, 
KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS, thisKarafNode.getId() + "=" + 
internalAddress);
-                internalEndpoints.put(thisKarafNode.getId(), internalAddress);
-                setMapProperty(karafCellarClusterNodeConfiguration, 
KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS, internalEndpoints);
+        // Wait for persistence service to be available
+        try {
+            waitForPersistenceService();
+        } catch (IllegalStateException e) {
+            LOGGER.error("Failed to initialize cluster service: {}", 
e.getMessage());
+            return;
+        }
 
-                configurations.put(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION, 
karafCellarClusterNodeConfiguration);
-                ClusterConfigurationEvent clusterConfigurationEvent = new 
ClusterConfigurationEvent(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION);
-                sendEvent(clusterConfigurationEvent);
-            }
+        nodeStartTime = System.currentTimeMillis();
 
-            TimerTask statisticsTask = new TimerTask() {
-                @Override
-                public void run() {
-                    try {
-                        updateSystemStats();
-                    } catch (Throwable t) {
-                        LOGGER.error("Error updating system statistics", t);
-                    }
-                }
-            };
-            
schedulerService.getScheduleExecutorService().scheduleWithFixedDelay(statisticsTask,
 0, nodeStatisticsUpdateFrequency, TimeUnit.MILLISECONDS);
+        // Register this node in the persistence service
+        registerNodeInPersistence();
 
+        /* Wait for PR UNOMI-878 to reactivate that code
+        /*
+        // Only initialize scheduled tasks if scheduler service is available
+        if (schedulerService != null) {
+            initializeScheduledTasks();
+        } else {
+            LOGGER.warn("SchedulerService not available during ClusterService 
initialization. Scheduled tasks will not be registered. They will be registered 
when SchedulerService becomes available.");
         }
-        LOGGER.info("Cluster service initialized.");
-    }
+        */
+        initializeScheduledTasks();
 
-    public void destroy() {
-        LOGGER.info("Cluster service shutdown.");
+        LOGGER.info("Cluster service initialized with node ID: {}", nodeId);
     }
 
-    @Override
-    public List<ClusterNode> getClusterNodes() {
-        Map<String, ClusterNode> clusterNodes = new LinkedHashMap<String, 
ClusterNode>();
-
-        Set<org.apache.karaf.cellar.core.Node> karafCellarNodes = 
karafCellarClusterManager.listNodes();
-        org.apache.karaf.cellar.core.Node thisKarafNode = 
karafCellarClusterManager.getNode();
-        Map<String, Properties> clusterConfigurations = 
karafCellarClusterManager.getMap(Constants.CONFIGURATION_MAP + 
Configurations.SEPARATOR + karafCellarGroupName);
-        Properties karafCellarClusterNodeConfiguration = 
clusterConfigurations.get(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION);
-        Map<String, String> publicNodeEndpoints = new TreeMap<>();
-        Map<String, String> internalNodeEndpoints = new TreeMap<>();
-        if (karafCellarClusterNodeConfiguration != null) {
-            publicNodeEndpoints = 
getMapProperty(karafCellarClusterNodeConfiguration, 
KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, thisKarafNode.getId() + "=" + 
publicAddress);
-            internalNodeEndpoints = 
getMapProperty(karafCellarClusterNodeConfiguration, 
KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS, thisKarafNode.getId() + "=" + 
internalAddress);
+    /**
+     * Initializes scheduled tasks for cluster management.
+     * This method can be called later if schedulerService wasn't available 
during init.
+     */
+    public void initializeScheduledTasks() {
+        /* Wait for PR UNOMI-878 to reactivate that code
+        if (schedulerService == null) {
+            LOGGER.error("Cannot initialize scheduled tasks: SchedulerService 
is not set");
+            return;
         }
-        for (org.apache.karaf.cellar.core.Node karafCellarNode : 
karafCellarNodes) {
-            ClusterNode clusterNode = new ClusterNode();
-            String publicEndpoint = 
publicNodeEndpoints.get(karafCellarNode.getId());
-            if (publicEndpoint != null) {
-                clusterNode.setPublicHostAddress(publicEndpoint);
-            }
-            String internalEndpoint = 
internalNodeEndpoints.get(karafCellarNode.getId());
-            if (internalEndpoint != null) {
-                clusterNode.setInternalHostAddress(internalEndpoint);
-            }
-            Map<String,Serializable> nodeStatistics = 
nodeSystemStatistics.get(karafCellarNode.getId());
-            if (nodeStatistics != null) {
-                Long uptime = (Long) nodeStatistics.get("uptime");
-                if (uptime != null) {
-                    clusterNode.setUptime(uptime);
-                }
-                Double systemCpuLoad = (Double) 
nodeStatistics.get("systemCpuLoad");
-                if (systemCpuLoad != null) {
-                    clusterNode.setCpuLoad(systemCpuLoad);
+        */
+
+        // Schedule regular updates of the node statistics
+        TimerTask statisticsTask = new TimerTask() {
+            @Override
+            public void run() {
+                try {
+                    updateSystemStats();
+                } catch (Throwable t) {
+                    LOGGER.error("Error updating system statistics", t);
                 }
-                List<Double> loadAverage = (List<Double>) 
nodeStatistics.get("systemLoadAverage");
-                if (loadAverage != null) {
-                    Double[] loadAverageArray = loadAverage.toArray(new 
Double[loadAverage.size()]);
-                    ArrayUtils.toPrimitive(loadAverageArray);
-                    
clusterNode.setLoadAverage(ArrayUtils.toPrimitive(loadAverageArray));
+            }
+        };
+        /* Wait for PR UNOMI-878 to reactivate that code
+        schedulerService.createRecurringTask("clusterNodeStatisticsUpdate", 
nodeStatisticsUpdateFrequency, TimeUnit.MILLISECONDS, statisticsTask, false);
+        */
+        scheduledExecutorService.scheduleAtFixedRate(statisticsTask, 100, 
nodeStatisticsUpdateFrequency, TimeUnit.MILLISECONDS);
+
+        // Schedule cleanup of stale nodes
+        TimerTask cleanupTask = new TimerTask() {
+            @Override
+            public void run() {
+                try {
+                    cleanupStaleNodes();
+                } catch (Throwable t) {
+                    LOGGER.error("Error cleaning up stale nodes", t);
                 }
             }
-            clusterNodes.put(karafCellarNode.getId(), clusterNode);
-        }
+        };
+        /* Wait for PR UNOMI-878 to reactivate that code
+        schedulerService.createRecurringTask("clusterStaleNodesCleanup", 
60000, TimeUnit.MILLISECONDS, cleanupTask, false);
+        */
+        scheduledExecutorService.scheduleAtFixedRate(cleanupTask, 100, 60000, 
TimeUnit.MILLISECONDS);
 
-        return new ArrayList<ClusterNode>(clusterNodes.values());
+        LOGGER.info("Cluster service scheduled tasks initialized");
     }
 
-    @Override
-    public void purge(Date date) {
-        persistenceService.purge(date);
-    }
+    public void destroy() {
+        shutdownNow = true;
+
+        // Remove this node from the persistence service
+        PersistenceService service = getPersistenceService();
+        if (service != null) {
+            try {
+                service.remove(nodeId, ClusterNode.class);
+                LOGGER.info("Node {} removed from cluster", nodeId);
+            } catch (Exception e) {
+                LOGGER.error("Error removing node from cluster", e);
+            }
+        }
 
-    @Override
-    public void purge(String scope) {
-        persistenceService.purge(scope);
-    }
+        // Close service trackers
+        if (persistenceServiceTracker != null) {
+            try {
+                persistenceServiceTracker.close();
+                LOGGER.debug("Persistence service tracker closed");
+            } catch (Exception e) {
+                LOGGER.debug("Error closing persistence service tracker: {}", 
e.getMessage());
+            }
+            persistenceServiceTracker = null;
+        }
 
-    @Override
-    public void sendEvent(Serializable eventObject) {
-        Event event = (Event) eventObject;
-        event.setSourceGroup(group);
-        event.setSourceNode(karafCellarClusterManager.getNode());
-        karafCellarEventProducer.produce(event);
+        // Clear references
+        persistenceService = null;
+        bundleWatcher = null;
+
+        LOGGER.info("Cluster service shutdown.");
     }
 
     /**
-     * Check if a configuration is allowed.
-     *
-     * @param group    the cluster group.
-     * @param category the configuration category constant.
-     * @param pid      the configuration PID.
-     * @param type     the cluster event type.
-     * @return true if the cluster event type is allowed, false else.
+     * Register this node in the persistence service
      */
-    public boolean isClusterConfigPIDAllowed(Group group, String category, 
String pid, EventType type) {
-        CellarSupport support = new CellarSupport();
-        support.setClusterManager(this.karafCellarClusterManager);
-        support.setGroupManager(this.karafCellarGroupManager);
-        support.setConfigurationAdmin(this.osgiConfigurationAdmin);
-        return support.isAllowed(group, category, pid, type);
-    }
-
-    private Map<String, String> getMapProperty(Properties properties, String 
propertyName, String defaultValue) {
-        String propertyValue = properties.getProperty(propertyName, 
defaultValue);
-        return getMapProperty(propertyValue);
-    }
-
-    private Map<String, String> getMapProperty(String propertyValue) {
-        String[] propertyValueArray = propertyValue.split(",");
-        Map<String, String> propertyMapValue = new LinkedHashMap<>();
-        for (String propertyValueElement : propertyValueArray) {
-            String[] propertyValueElementPrats = 
propertyValueElement.split("=");
-            propertyMapValue.put(propertyValueElementPrats[0], 
propertyValueElementPrats[1]);
+    private void registerNodeInPersistence() {
+        PersistenceService service = getPersistenceService();
+        if (service == null) {
+            LOGGER.error("Cannot register node: PersistenceService not 
available");
+            return;
         }
-        return propertyMapValue;
-    }
 
-    private Map<String, String> setMapProperty(Properties properties, String 
propertyName, Map<String, String> propertyMapValue) {
-        StringBuilder propertyValueBuilder = new StringBuilder();
-        int entryCount = 0;
-        for (Map.Entry<String, String> propertyMapValueEntry : 
propertyMapValue.entrySet()) {
-            propertyValueBuilder.append(propertyMapValueEntry.getKey());
-            propertyValueBuilder.append("=");
-            propertyValueBuilder.append(propertyMapValueEntry.getValue());
-            if (entryCount < propertyMapValue.size() - 1) {
-                propertyValueBuilder.append(",");
-            }
+        ClusterNode clusterNode = new ClusterNode();
+        clusterNode.setItemId(nodeId);
+        clusterNode.setPublicHostAddress(publicAddress);
+        clusterNode.setInternalHostAddress(internalAddress);
+        clusterNode.setStartTime(nodeStartTime);
+        clusterNode.setLastHeartbeat(System.currentTimeMillis());
+
+        // Set server information if BundleWatcher is available
+        if (bundleWatcher != null && 
!bundleWatcher.getServerInfos().isEmpty()) {
+            ServerInfo serverInfo = bundleWatcher.getServerInfos().get(0);
+            clusterNode.setServerInfo(serverInfo);
+            LOGGER.info("Added server info to node: version={}, build={}",
+                    serverInfo.getServerVersion(), 
serverInfo.getServerBuildNumber());
+        } else {
+            LOGGER.warn("BundleWatcher not available at registration time, 
server info will not be available");
         }
-        String oldPropertyValue = (String) 
properties.setProperty(propertyName, propertyValueBuilder.toString());
-        if (oldPropertyValue == null) {
-            return null;
+
+        updateSystemStatsForNode(clusterNode);
+
+        boolean success = service.save(clusterNode);
+        if (success) {
+            LOGGER.info("Node {} registered in cluster", nodeId);
+        } else {
+            LOGGER.error("Failed to register node {} in cluster", nodeId);
         }
-        return getMapProperty(oldPropertyValue);
     }
 
-    private void updateSystemStats() {
+    /**
+     * Updates system stats for the given cluster node
+     */
+    private void updateSystemStatsForNode(ClusterNode node) {
         final RuntimeMXBean remoteRuntime = 
ManagementFactory.getRuntimeMXBean();
         long uptime = remoteRuntime.getUptime();
-        ObjectName operatingSystemMXBeanName = 
ManagementFactory.getOperatingSystemMXBean().getObjectName();
-        Double systemCpuLoad = null;
+
+        double systemCpuLoad = 0.0;
         try {
-            systemCpuLoad = (Double) 
ManagementFactory.getPlatformMBeanServer().getAttribute(operatingSystemMXBeanName,
 "SystemCpuLoad");
-        } catch (MBeanException | AttributeNotFoundException | 
InstanceNotFoundException | ReflectionException e) {
-            LOGGER.error("Error retrieving system CPU load", e);
+            systemCpuLoad = ((com.sun.management.OperatingSystemMXBean) 
ManagementFactory.getOperatingSystemMXBean()).getSystemCpuLoad();
+            // Check for NaN value which Elasticsearch and OpenSearch don't 
support for float fields
+            if (Double.isNaN(systemCpuLoad)) {
+                LOGGER.debug("System CPU load is NaN, setting to 0.0");
+                systemCpuLoad = 0.0;
+            }
+        } catch (Exception e) {
+            LOGGER.debug("Error retrieving system CPU load", e);
         }
+
         final OperatingSystemMXBean operatingSystemMXBean = 
ManagementFactory.getOperatingSystemMXBean();
         double systemLoadAverage = 
operatingSystemMXBean.getSystemLoadAverage();
+        // Check for NaN value which Elasticsearch/OpenSearch doesn't support 
for float fields
+        if (Double.isNaN(systemLoadAverage)) {
+            LOGGER.debug("System load average is NaN, setting to 0.0");
+            systemLoadAverage = 0.0;
+        }
+
+        node.setCpuLoad(systemCpuLoad);
+        node.setUptime(uptime);
 
-        ClusterSystemStatisticsEvent clusterSystemStatisticsEvent = new 
ClusterSystemStatisticsEvent("org.apache.unomi.cluster.system.statistics");
-        Map<String,Serializable> systemStatistics = new TreeMap<>();
         ArrayList<Double> systemLoadAverageArray = new ArrayList<>();
         systemLoadAverageArray.add(systemLoadAverage);
+        
node.setLoadAverage(ArrayUtils.toPrimitive(systemLoadAverageArray.toArray(new 
Double[0])));
+
+        // Store system statistics in memory as well
+        Map<String, Serializable> systemStatistics = new TreeMap<>();
         systemStatistics.put("systemLoadAverage", systemLoadAverageArray);
         systemStatistics.put("systemCpuLoad", systemCpuLoad);
         systemStatistics.put("uptime", uptime);
-        clusterSystemStatisticsEvent.setStatistics(systemStatistics);
-        nodeSystemStatistics.put(karafCellarClusterManager.getNode().getId(), 
systemStatistics);
-        sendEvent(clusterSystemStatisticsEvent);
+        nodeSystemStatistics.put(nodeId, systemStatistics);
+    }
+
+    /**
+     * Updates the system statistics for this node and stores them in the 
persistence service
+     */
+    private void updateSystemStats() {
+        if (shutdownNow) {
+            return;
+        }
+
+        PersistenceService service = getPersistenceService();
+        if (service == null) {
+            LOGGER.warn("Cannot update system stats: PersistenceService not 
available");
+            return;
+        }
+
+        // Load node from persistence
+        ClusterNode node = service.load(nodeId, ClusterNode.class);
+        if (node == null) {
+            LOGGER.warn("Node {} not found in persistence, re-registering", 
nodeId);
+            registerNodeInPersistence();
+            return;
+        }
+
+        try {
+            // Update its stats
+            updateSystemStatsForNode(node);
+
+            // Update server info if needed
+            if (bundleWatcher != null && 
!bundleWatcher.getServerInfos().isEmpty()) {
+                ServerInfo currentInfo = bundleWatcher.getServerInfos().get(0);
+                // Check if server info needs updating
+                if (node.getServerInfo() == null ||
+                        
!currentInfo.getServerVersion().equals(node.getServerInfo().getServerVersion()))
 {
+
+                    node.setServerInfo(currentInfo);
+                    LOGGER.info("Updated server info for node {}: version={}, 
build={}",
+                            nodeId, currentInfo.getServerVersion(), 
currentInfo.getServerBuildNumber());
+                }
+            }
+
+            node.setLastHeartbeat(System.currentTimeMillis());
+
+            // Save back to persistence
+            boolean success = service.save(node);
+            if (!success) {
+                LOGGER.error("Failed to update node {} statistics", nodeId);
+            }
+        } catch (Exception e) {
+            LOGGER.error("Error updating system statistics for node {}: {}", 
nodeId, e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Removes stale nodes from the cluster
+     */
+    private void cleanupStaleNodes() {
+        if (shutdownNow) {
+            return;
+        }
+
+        PersistenceService service = getPersistenceService();
+        if (service == null) {
+            LOGGER.warn("Cannot cleanup stale nodes: PersistenceService not 
available");
+            return;
+        }
+
+        long cutoffTime = System.currentTimeMillis() - 
(nodeStatisticsUpdateFrequency * 3); // Node is stale if no heartbeat for 3x 
the update frequency
+
+        Condition staleNodesCondition = new Condition();
+        ConditionType propertyConditionType = new ConditionType();
+        propertyConditionType.setItemId("propertyCondition");
+        propertyConditionType.setItemType(ConditionType.ITEM_TYPE);
+        
propertyConditionType.setConditionEvaluator("propertyConditionEvaluator");
+        
propertyConditionType.setQueryBuilder("propertyConditionESQueryBuilder");
+        staleNodesCondition.setConditionType(propertyConditionType);
+        staleNodesCondition.setConditionTypeId("propertyCondition");
+        staleNodesCondition.setParameter("propertyName", "lastHeartbeat");
+        staleNodesCondition.setParameter("comparisonOperator", "lessThan");
+        staleNodesCondition.setParameter("propertyValueInteger", cutoffTime);
+
+        PartialList<ClusterNode> staleNodes = 
service.query(staleNodesCondition, null, ClusterNode.class, 0, -1);
+
+        for (ClusterNode staleNode : staleNodes.getList()) {
+            LOGGER.info("Removing stale node: {}", staleNode.getItemId());
+            service.remove(staleNode.getItemId(), ClusterNode.class);
+            nodeSystemStatistics.remove(staleNode.getItemId());
+        }
+    }
+
+    @Override
+    public List<ClusterNode> getClusterNodes() {
+        PersistenceService service = getPersistenceService();
+        if (service == null) {
+            LOGGER.warn("Cannot get cluster nodes: PersistenceService not 
available");
+            return Collections.emptyList();
+        }
+
+        // Query all nodes from the persistence service
+        return service.getAllItems(ClusterNode.class, 0, -1, null).getList();
     }
 
+    @Override
+    public void purge(Date date) {
+        PersistenceService service = getPersistenceService();
+        if (service == null) {
+            LOGGER.warn("Cannot purge by date: PersistenceService not 
available");
+            return;
+        }
+
+        service.purge(date);
+    }
+
+    @Override
+    public void purge(String scope) {
+        PersistenceService service = getPersistenceService();
+        if (service == null) {
+            LOGGER.warn("Cannot purge by scope: PersistenceService not 
available");
+            return;
+        }
+
+        service.purge(scope);
+    }
+
+    /**
+     * Check if a persistence service is available.
+     * This can be used to quickly check before performing operations.
+     *
+     * @return true if a persistence service is available (either directly set 
or via tracker)
+     */
+    public boolean isPersistenceServiceAvailable() {
+        return getPersistenceService() != null;
+    }
 }
+
diff --git a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml 
b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index f49f3c849..994c5598d 100644
--- a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -53,7 +53,7 @@
     <cm:property-placeholder persistent-id="org.apache.unomi.cluster"
                              update-strategy="reload" 
placeholder-prefix="${cluster.">
         <cm:default-properties>
-            <cm:property name="group" value="default"/>
+            <cm:property name="nodeId" value="unomi-node-1"/>
             <cm:property name="contextserver.publicAddress" 
value="https://localhost:9443"/>
             <cm:property name="contextserver.internalAddress" 
value="http://127.0.0.1:8181"/>
             <cm:property name="nodeStatisticsUpdateFrequency" value="10000"/>
@@ -262,13 +262,11 @@
         <property name="publicAddress" 
value="${cluster.contextserver.publicAddress}"/>
         <property name="internalAddress" 
value="${cluster.contextserver.internalAddress}"/>
         <property name="persistenceService" ref="persistenceService"/>
-        <property name="karafCellarClusterManager" 
ref="karafCellarClusterManager"/>
-        <property name="karafCellarEventProducer" 
ref="karafCellarEventProducer"/>
-        <property name="karafCellarGroupManager" 
ref="karafCellarGroupManager"/>
-        <property name="karafCellarGroupName" value="${cluster.group}"/>
-        <property name="osgiConfigurationAdmin" ref="osgiConfigurationAdmin"/>
+        <property name="nodeId" value="${cluster.nodeId}"/>
         <property name="nodeStatisticsUpdateFrequency" 
value="${cluster.nodeStatisticsUpdateFrequency}"/>
+        <!-- Wait for UNOMI-878 to be available to activate that
         <property name="schedulerService" ref="schedulerServiceImpl"/>
+        -->
     </bean>
     <service id="clusterService" ref="clusterServiceImpl" 
interface="org.apache.unomi.api.services.ClusterService"/>
 
diff --git a/services/src/main/resources/org.apache.unomi.cluster.cfg 
b/services/src/main/resources/org.apache.unomi.cluster.cfg
index bfbb189d9..eecb7e1de 100644
--- a/services/src/main/resources/org.apache.unomi.cluster.cfg
+++ b/services/src/main/resources/org.apache.unomi.cluster.cfg
@@ -14,14 +14,18 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-group=${org.apache.unomi.cluster.group:-default}
 # To simplify testing we set the public address to use HTTP, but for 
production environments it is highly recommended
 # to switch to using HTTPS with a proper SSL certificate installed.
 
contextserver.publicAddress=${org.apache.unomi.cluster.public.address:-http://localhost:8181}
 
contextserver.internalAddress=${org.apache.unomi.cluster.internal.address:-https://localhost:9443}
 #
-# The nodeStatisticsUpdateFrequency controls the frequency of the update of 
system statistics such as CPU load,
+# The nodeId is a required setting that uniquely identifies this node in the 
cluster.
+# It must be set to a unique value for each node in the cluster.
+# Example: nodeId=node1
+nodeId=${org.apache.unomi.cluster.nodeId:-unomi-node-1}
+#
+## The nodeStatisticsUpdateFrequency controls the frequency of the update of 
system statistics such as CPU load,
 # system load average and uptime. This value is set in milliseconds and is set 
to 10 seconds by default. Each node
 # will retrieve the local values and broadcast them through a cluster event to 
all the other nodes to update
 # the global cluster statistics.
-nodeStatisticsUpdateFrequency=${org.apache.unomi.cluster.nodeStatisticsUpdateFrequency:-10000}
\ No newline at end of file
+nodeStatisticsUpdateFrequency=${org.apache.unomi.cluster.nodeStatisticsUpdateFrequency:-10000}

Reply via email to