This is an automated email from the ASF dual-hosted git repository.
shuber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/master by this push:
new 3f995e47e UNOMI-877: Replace Karaf Cellar and Hazelcast with
PersistenceService for cluster synchronization (#723)
3f995e47e is described below
commit 3f995e47ea8fbd33ca226e04f9f5cd0dfb719fff
Author: Jérôme Blanchard <[email protected]>
AuthorDate: Fri Aug 22 13:33:20 2025 +0200
UNOMI-877: Replace Karaf Cellar and Hazelcast with PersistenceService for
cluster synchronization (#723)
* UNOMI-877: Replace Karaf Cellar and Hazelcast with PersistenceService for
cluster synchronization (code isolated from branch unomi-3-dev made by Serge
Hubert)
* UNOMI-877: Not sending event to cluster anymore.
---------
Co-authored-by: Serge Huber <[email protected]>
---
.../java/org/apache/unomi/api/ClusterNode.java | 65 +-
.../apache/unomi/api/services/ClusterService.java | 7 -
.../router/core/context/RouterCamelContext.java | 17 -
.../main/resources/etc/custom.system.properties | 5 +-
services/pom.xml | 7 +
.../services/impl/cluster/ClusterServiceImpl.java | 690 +++++++++++++++------
.../resources/OSGI-INF/blueprint/blueprint.xml | 10 +-
.../main/resources/org.apache.unomi.cluster.cfg | 10 +-
8 files changed, 575 insertions(+), 236 deletions(-)
diff --git a/api/src/main/java/org/apache/unomi/api/ClusterNode.java
b/api/src/main/java/org/apache/unomi/api/ClusterNode.java
index 6c40ca21b..e096490b9 100644
--- a/api/src/main/java/org/apache/unomi/api/ClusterNode.java
+++ b/api/src/main/java/org/apache/unomi/api/ClusterNode.java
@@ -22,10 +22,12 @@ import java.io.Serializable;
/**
* Information about a cluster node.
*/
-public class ClusterNode implements Serializable {
+public class ClusterNode extends Item {
private static final long serialVersionUID = 1281422346318230514L;
+ public static final String ITEM_TYPE = "clusterNode";
+
private double cpuLoad;
private double[] loadAverage;
private String publicHostAddress;
@@ -33,11 +35,18 @@ public class ClusterNode implements Serializable {
private long uptime;
private boolean master;
private boolean data;
+ private long startTime;
+ private long lastHeartbeat;
+
+ // Server information
+ private ServerInfo serverInfo;
/**
* Instantiates a new Cluster node.
*/
public ClusterNode() {
+ super();
+ setItemType(ITEM_TYPE);
}
/**
@@ -165,4 +174,58 @@ public class ClusterNode implements Serializable {
public void setData(boolean data) {
this.data = data;
}
+
+ /**
+ * Retrieves the node start time in milliseconds.
+ *
+ * @return the start time
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+
+ /**
+ * Sets the node start time in milliseconds.
+ *
+ * @param startTime the start time
+ */
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ /**
+ * Retrieves the last heartbeat time in milliseconds.
+ *
+ * @return the last heartbeat time
+ */
+ public long getLastHeartbeat() {
+ return lastHeartbeat;
+ }
+
+ /**
+ * Sets the last heartbeat time in milliseconds.
+ *
+ * @param lastHeartbeat the last heartbeat time
+ */
+ public void setLastHeartbeat(long lastHeartbeat) {
+ this.lastHeartbeat = lastHeartbeat;
+ }
+
+ /**
+ * Gets the server information.
+ *
+ * @return the server information
+ */
+ public ServerInfo getServerInfo() {
+ return serverInfo;
+ }
+
+ /**
+ * Sets the server information.
+ *
+ * @param serverInfo the server information
+ */
+ public void setServerInfo(ServerInfo serverInfo) {
+ this.serverInfo = serverInfo;
+ }
}
diff --git
a/api/src/main/java/org/apache/unomi/api/services/ClusterService.java
b/api/src/main/java/org/apache/unomi/api/services/ClusterService.java
index 299ac9098..ad8777503 100644
--- a/api/src/main/java/org/apache/unomi/api/services/ClusterService.java
+++ b/api/src/main/java/org/apache/unomi/api/services/ClusterService.java
@@ -51,11 +51,4 @@ public interface ClusterService {
*/
void purge(final String scope);
- /**
- * This function will send an event to the nodes of the cluster
- * The function takes a Serializable to avoid dependency on any clustering
framework
- *
- * @param event this object will be cast to a
org.apache.karaf.cellar.core.event.Event object
- */
- void sendEvent(Serializable event);
}
diff --git
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
index 67219f9c5..c7e953132 100644
---
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
+++
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
@@ -31,7 +31,6 @@ import org.apache.unomi.router.api.ImportConfiguration;
import org.apache.unomi.router.api.RouterConstants;
import org.apache.unomi.router.api.services.ImportExportConfigurationService;
import org.apache.unomi.router.api.services.ProfileExportService;
-import org.apache.unomi.router.core.event.UpdateCamelRouteEvent;
import org.apache.unomi.router.core.processor.ExportRouteCompletionProcessor;
import org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor;
import org.apache.unomi.router.core.processor.ImportRouteCompletionProcessor;
@@ -240,12 +239,6 @@ public class RouterCamelContext implements
IRouterCamelContext {
camelContext.removeRouteDefinition(routeDefinition);
}
}
-
- if (fireEvent) {
- UpdateCamelRouteEvent event = new
UpdateCamelRouteEvent(EVENT_ID_REMOVE);
- event.setRouteId(routeId);
- clusterService.sendEvent(event);
- }
}
public void updateProfileImportReaderRoute(String configId, boolean
fireEvent) throws Exception {
@@ -266,11 +259,6 @@ public class RouterCamelContext implements
IRouterCamelContext {
builder.setJacksonDataFormat(jacksonDataFormat);
builder.setContext(camelContext);
camelContext.addRoutes(builder);
-
- if (fireEvent) {
- UpdateCamelRouteEvent event = new
UpdateCamelRouteEvent(EVENT_ID_IMPORT);
- clusterService.sendEvent(event);
- }
}
}
@@ -291,11 +279,6 @@ public class RouterCamelContext implements
IRouterCamelContext {
profileExportCollectRouteBuilder.setJacksonDataFormat(jacksonDataFormat);
profileExportCollectRouteBuilder.setContext(camelContext);
camelContext.addRoutes(profileExportCollectRouteBuilder);
-
- if (fireEvent) {
- UpdateCamelRouteEvent event = new
UpdateCamelRouteEvent(EVENT_ID_EXPORT);
- clusterService.sendEvent(event);
- }
}
}
diff --git a/package/src/main/resources/etc/custom.system.properties
b/package/src/main/resources/etc/custom.system.properties
index 5e97437c3..46a262816 100644
--- a/package/src/main/resources/etc/custom.system.properties
+++ b/package/src/main/resources/etc/custom.system.properties
@@ -81,11 +81,14 @@
org.apache.unomi.admin.servlet.context=${env:UNOMI_ADMIN_CONTEXT:-/cxs}
#######################################################################################################################
## Cluster Settings
##
#######################################################################################################################
-org.apache.unomi.cluster.group=${env:UNOMI_CLUSTER_GROUP:-default}
# To simplify testing we set the public address to use HTTP, but for
production environments it is highly recommended
# to switch to using HTTPS with a proper SSL certificate installed.
org.apache.unomi.cluster.public.address=${env:UNOMI_CLUSTER_PUBLIC_ADDRESS:-http://localhost:8181}
org.apache.unomi.cluster.internal.address=${env:UNOMI_CLUSTER_INTERNAL_ADDRESS:-https://localhost:9443}
+# The nodeId is a required setting that uniquely identifies this node in the
cluster.
+# It must be set to a unique value for each node in the cluster.
+# Example: nodeId=node1
+org.apache.unomi.cluster.nodeId=${env:UNOMI_CLUSTER_NODEID:-unomi-node-1}
# The nodeStatisticsUpdateFrequency controls the frequency of the update of
system statistics such as CPU load,
# system load average and uptime. This value is set in milliseconds and is set
to 10 seconds by default. Each node
# will retrieve the local values and broadcast them through a cluster event to
all the other nodes to update
diff --git a/services/pom.xml b/services/pom.xml
index c4cf7e4da..9a0862db8 100644
--- a/services/pom.xml
+++ b/services/pom.xml
@@ -56,6 +56,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.unomi</groupId>
+ <artifactId>unomi-lifecycle-watcher</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>javax.servlet</groupId>
@@ -175,6 +181,7 @@
<Embed-Dependency>*;scope=compile|runtime</Embed-Dependency>
<Import-Package>
sun.misc;resolution:=optional,
+ com.sun.management;resolution:=optional,
*
</Import-Package>
</instructions>
diff --git
a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
b/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
index ec4cfe523..b4bc2c421 100644
---
a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
+++
b/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
@@ -18,28 +18,30 @@
package org.apache.unomi.services.impl.cluster;
import org.apache.commons.lang3.ArrayUtils;
-import org.apache.karaf.cellar.config.ClusterConfigurationEvent;
-import org.apache.karaf.cellar.config.Constants;
-import org.apache.karaf.cellar.core.*;
-import org.apache.karaf.cellar.core.control.SwitchStatus;
-import org.apache.karaf.cellar.core.event.Event;
-import org.apache.karaf.cellar.core.event.EventProducer;
-import org.apache.karaf.cellar.core.event.EventType;
+import org.apache.commons.lang3.StringUtils;
import org.apache.unomi.api.ClusterNode;
+import org.apache.unomi.api.PartialList;
+import org.apache.unomi.api.ServerInfo;
+import org.apache.unomi.api.conditions.Condition;
+import org.apache.unomi.api.conditions.ConditionType;
import org.apache.unomi.api.services.ClusterService;
-import org.apache.unomi.api.services.SchedulerService;
+import org.apache.unomi.lifecycle.BundleWatcher;
import org.apache.unomi.persistence.spi.PersistenceService;
-import org.osgi.service.cm.ConfigurationAdmin;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.util.tracker.ServiceTrackerCustomizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.management.*;
import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.RuntimeMXBean;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
@@ -47,46 +49,185 @@ import java.util.concurrent.TimeUnit;
*/
public class ClusterServiceImpl implements ClusterService {
- public static final String KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION =
"org.apache.unomi.nodes";
- public static final String KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS =
"publicEndpoints";
- public static final String KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS
= "internalEndpoints";
private static final Logger LOGGER =
LoggerFactory.getLogger(ClusterServiceImpl.class.getName());
- PersistenceService persistenceService;
- private ClusterManager karafCellarClusterManager;
- private EventProducer karafCellarEventProducer;
- private GroupManager karafCellarGroupManager;
- private String karafCellarGroupName = Configurations.DEFAULT_GROUP_NAME;
- private ConfigurationAdmin osgiConfigurationAdmin;
+
+ /**
+ * We use ServiceTracker instead of Blueprint dependency injection due to
a known bug in Apache Aries Blueprint
+ * where service dependencies are not properly shut down in reverse order
of their initialization.
+ *
+ * The bug manifests in two ways:
+ * 1. Services are not shut down in reverse order of their initialization,
causing potential deadlocks
+ * 2. The PersistenceService is often shut down before other services that
depend on it, leading to timeout waits
+ *
+ * By using ServiceTracker, we have explicit control over:
+ * - Service lifecycle management
+ * - Shutdown order
+ * - Service availability checks
+ * - Graceful degradation when services become unavailable
+ */
+ private ServiceTracker<PersistenceService, PersistenceService>
persistenceServiceTracker;
+
+ // Keep direct reference for backward compatibility and unit tests
+ private PersistenceService persistenceService;
+
private String publicAddress;
private String internalAddress;
- private Map<String, Map<String,Serializable>> nodeSystemStatistics = new
ConcurrentHashMap<>();
- private Group group = null;
- private SchedulerService schedulerService;
-
+ //private SchedulerService schedulerService; /* Wait for PR UNOMI-878 to
reactivate that code
+ private ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(3);
+ private String nodeId;
+ private long nodeStartTime;
private long nodeStatisticsUpdateFrequency = 10000;
+ private Map<String, Map<String, Serializable>> nodeSystemStatistics = new
ConcurrentHashMap<>();
+ private BundleContext bundleContext;
+ private volatile boolean shutdownNow = false;
- public void setPersistenceService(PersistenceService persistenceService) {
- this.persistenceService = persistenceService;
+ private BundleWatcher bundleWatcher;
+
+ /**
+ * Max time to wait for persistence service (in milliseconds)
+ */
+ private static final long MAX_WAIT_TIME = 60000; // 60 seconds
+
+ /**
+ * Sets the bundle context, which is needed to create service trackers
+ * @param bundleContext the OSGi bundle context
+ */
+ public void setBundleContext(BundleContext bundleContext) {
+ this.bundleContext = bundleContext;
}
- public void setKarafCellarClusterManager(ClusterManager
karafCellarClusterManager) {
- this.karafCellarClusterManager = karafCellarClusterManager;
+ /**
+ * Sets the bundle watcher used to retrieve server information
+ *
+ * @param bundleWatcher the bundle watcher
+ */
+ public void setBundleWatcher(BundleWatcher bundleWatcher) {
+ this.bundleWatcher = bundleWatcher;
+ LOGGER.info("BundleWatcher service set");
}
- public void setKarafCellarEventProducer(EventProducer
karafCellarEventProducer) {
- this.karafCellarEventProducer = karafCellarEventProducer;
+ /**
+ * Waits for the persistence service to become available.
+ * This method will retry getting the persistence service with exponential
backoff
+ * until it's available or until the maximum wait time is reached.
+ *
+ * @throws IllegalStateException if the persistence service is not
available after the maximum wait time
+ */
+ private void waitForPersistenceService() {
+ if (shutdownNow) {
+ return;
+ }
+
+ // If persistence service is directly set (e.g., in unit tests), no
need to wait
+ if (persistenceService != null) {
+ LOGGER.debug("Persistence service is already available, no need to
wait");
+ return;
+ }
+
+ // If no bundle context, we can't get the service via tracker
+ if (bundleContext == null) {
+ LOGGER.error("No BundleContext available, cannot wait for
persistence service");
+ throw new IllegalStateException("No BundleContext available to get
persistence service");
+ }
+
+ // Initialize service tracker if needed
+ if (persistenceServiceTracker == null) {
+ initializeServiceTrackers();
+ }
+
+ // Try to get the service with retries
+ long startTime = System.currentTimeMillis();
+ long waitTime = 50; // Start with 50ms wait time
+
+ while (System.currentTimeMillis() - startTime < MAX_WAIT_TIME) {
+ PersistenceService service = getPersistenceService();
+ if (service != null) {
+ LOGGER.info("Persistence service is now available");
+ return;
+ }
+
+ try {
+ LOGGER.debug("Waiting for persistence service... ({}ms
elapsed)", System.currentTimeMillis() - startTime);
+ Thread.sleep(waitTime);
+ // Exponential backoff with a maximum of 5 seconds
+ waitTime = Math.min(waitTime * 2, 5000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.error("Interrupted while waiting for persistence
service", e);
+ break;
+ }
+ }
+
+ throw new IllegalStateException("PersistenceService not available
after waiting " + MAX_WAIT_TIME + "ms");
}
- public void setKarafCellarGroupManager(GroupManager
karafCellarGroupManager) {
- this.karafCellarGroupManager = karafCellarGroupManager;
+ /**
+ * Safely gets the persistence service, either from the direct reference
(for tests)
+ * or from the service tracker (for OSGi runtime)
+ * @return the persistence service or null if not available
+ */
+ private PersistenceService getPersistenceService() {
+ if (shutdownNow) return null;
+
+ // For unit tests or if already directly set
+ if (persistenceService != null) {
+ return persistenceService;
+ }
+
+ // Otherwise try to get from service tracker
+ return persistenceServiceTracker != null ?
persistenceServiceTracker.getService() : null;
}
- public void setKarafCellarGroupName(String karafCellarGroupName) {
- this.karafCellarGroupName = karafCellarGroupName;
+ /**
+ * Initialize service tracker for PersistenceService
+ */
+ private void initializeServiceTrackers() {
+ if (bundleContext == null) {
+ LOGGER.warn("BundleContext is null, cannot initialize service
trackers");
+ return;
+ }
+
+ // Only create service tracker if direct reference isn't set
+ if (persistenceService == null) {
+ LOGGER.info("Initializing PersistenceService tracker");
+ persistenceServiceTracker = new ServiceTracker<>(
+ bundleContext,
+ PersistenceService.class,
+ new ServiceTrackerCustomizer<PersistenceService,
PersistenceService>() {
+ @Override
+ public PersistenceService
addingService(ServiceReference<PersistenceService> reference) {
+ PersistenceService service =
bundleContext.getService(reference);
+ if (service != null) {
+ persistenceService = service;
+ LOGGER.info("PersistenceService acquired
through tracker");
+ }
+ return service;
+ }
+
+ @Override
+ public void
modifiedService(ServiceReference<PersistenceService> reference,
PersistenceService service) {
+ // No action needed
+ }
+
+ @Override
+ public void
removedService(ServiceReference<PersistenceService> reference,
PersistenceService service) {
+ LOGGER.info("PersistenceService removed");
+ persistenceService = null;
+ bundleContext.ungetService(reference);
+ }
+ }
+ );
+ persistenceServiceTracker.open();
+ }
}
- public void setOsgiConfigurationAdmin(ConfigurationAdmin
osgiConfigurationAdmin) {
- this.osgiConfigurationAdmin = osgiConfigurationAdmin;
+ /**
+ * For unit tests and backward compatibility - directly sets the
persistence service
+ * @param persistenceService the persistence service to set
+ */
+ public void setPersistenceService(PersistenceService persistenceService) {
+ this.persistenceService = persistenceService;
+ LOGGER.info("PersistenceService set directly");
}
public void setPublicAddress(String publicAddress) {
@@ -101,8 +242,35 @@ public class ClusterServiceImpl implements ClusterService {
this.nodeStatisticsUpdateFrequency = nodeStatisticsUpdateFrequency;
}
+ /* Wait for PR UNOMI-878 to reactivate that code
public void setSchedulerService(SchedulerService schedulerService) {
this.schedulerService = schedulerService;
+
+ // If we're already initialized, initialize scheduled tasks now
+ // This handles the case when ClusterService was initialized before
SchedulerService was set
+ if (schedulerService != null && System.currentTimeMillis() >
nodeStartTime && nodeStartTime > 0) {
+ LOGGER.info("SchedulerService was set after ClusterService
initialization, initializing scheduled tasks now");
+ initializeScheduledTasks();
+ }
+ }
+ */
+
+ /* Wait for PR UNOMI-878 to reactivate that code
+ /**
+ * Unbind method for the scheduler service, called by the OSGi framework
when the service is unregistered
+ * @param schedulerService The scheduler service being unregistered
+ */
+ /*
+ public void unsetSchedulerService(SchedulerService schedulerService) {
+ if (this.schedulerService == schedulerService) {
+ LOGGER.info("SchedulerService was unset");
+ this.schedulerService = null;
+ }
+ }
+ */
+
+ public void setNodeId(String nodeId) {
+ this.nodeId = nodeId;
}
public Map<String, Map<String, Serializable>> getNodeSystemStatistics() {
@@ -110,211 +278,331 @@ public class ClusterServiceImpl implements
ClusterService {
}
public void init() {
- if (karafCellarEventProducer != null && karafCellarClusterManager !=
null) {
-
- boolean setupConfigOk = true;
- group =
karafCellarGroupManager.findGroupByName(karafCellarGroupName);
- if (setupConfigOk && group == null) {
- LOGGER.error("Cluster group {} doesn't exist, creating it...",
karafCellarGroupName);
- group =
karafCellarGroupManager.createGroup(karafCellarGroupName);
- if (group != null) {
- setupConfigOk = true;
- } else {
- setupConfigOk = false;
- }
- }
-
- // check if the producer is ON
- if (setupConfigOk &&
karafCellarEventProducer.getSwitch().getStatus().equals(SwitchStatus.OFF)) {
- LOGGER.error("Cluster event producer is OFF");
- setupConfigOk = false;
- }
-
- // check if the config pid is allowed
- if (setupConfigOk && !isClusterConfigPIDAllowed(group,
Constants.CATEGORY, KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION,
EventType.OUTBOUND)) {
- LOGGER.error("Configuration PID " +
KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION + " is blocked outbound for cluster
group {}",
- karafCellarGroupName);
- setupConfigOk = false;
- }
-
- if (setupConfigOk) {
- Map<String, Properties> configurations =
karafCellarClusterManager.getMap(Constants.CONFIGURATION_MAP +
Configurations.SEPARATOR + karafCellarGroupName);
- org.apache.karaf.cellar.core.Node thisKarafNode =
karafCellarClusterManager.getNode();
- Properties karafCellarClusterNodeConfiguration =
configurations.get(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION);
- if (karafCellarClusterNodeConfiguration == null) {
- karafCellarClusterNodeConfiguration = new Properties();
- }
- Map<String, String> publicEndpoints =
getMapProperty(karafCellarClusterNodeConfiguration,
KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, thisKarafNode.getId() + "=" +
publicAddress);
- publicEndpoints.put(thisKarafNode.getId(), publicAddress);
- setMapProperty(karafCellarClusterNodeConfiguration,
KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, publicEndpoints);
+ // Initialize service trackers if not set directly (common in unit
tests)
+ initializeServiceTrackers();
+
+ // Validate that nodeId is provided
+ if (StringUtils.isBlank(nodeId)) {
+ String errorMessage = "CRITICAL: nodeId is not set. This is a
required setting for cluster operation.";
+ LOGGER.error(errorMessage);
+ throw new IllegalStateException(errorMessage);
+ }
- Map<String, String> internalEndpoints =
getMapProperty(karafCellarClusterNodeConfiguration,
KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS, thisKarafNode.getId() + "=" +
internalAddress);
- internalEndpoints.put(thisKarafNode.getId(), internalAddress);
- setMapProperty(karafCellarClusterNodeConfiguration,
KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS, internalEndpoints);
+ // Wait for persistence service to be available
+ try {
+ waitForPersistenceService();
+ } catch (IllegalStateException e) {
+ LOGGER.error("Failed to initialize cluster service: {}",
e.getMessage());
+ return;
+ }
- configurations.put(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION,
karafCellarClusterNodeConfiguration);
- ClusterConfigurationEvent clusterConfigurationEvent = new
ClusterConfigurationEvent(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION);
- sendEvent(clusterConfigurationEvent);
- }
+ nodeStartTime = System.currentTimeMillis();
- TimerTask statisticsTask = new TimerTask() {
- @Override
- public void run() {
- try {
- updateSystemStats();
- } catch (Throwable t) {
- LOGGER.error("Error updating system statistics", t);
- }
- }
- };
-
schedulerService.getScheduleExecutorService().scheduleWithFixedDelay(statisticsTask,
0, nodeStatisticsUpdateFrequency, TimeUnit.MILLISECONDS);
+ // Register this node in the persistence service
+ registerNodeInPersistence();
+ /* Wait for PR UNOMI-878 to reactivate that code
+ /*
+ // Only initialize scheduled tasks if scheduler service is available
+ if (schedulerService != null) {
+ initializeScheduledTasks();
+ } else {
+ LOGGER.warn("SchedulerService not available during ClusterService
initialization. Scheduled tasks will not be registered. They will be registered
when SchedulerService becomes available.");
}
- LOGGER.info("Cluster service initialized.");
- }
+ */
+ initializeScheduledTasks();
- public void destroy() {
- LOGGER.info("Cluster service shutdown.");
+ LOGGER.info("Cluster service initialized with node ID: {}", nodeId);
}
- @Override
- public List<ClusterNode> getClusterNodes() {
- Map<String, ClusterNode> clusterNodes = new LinkedHashMap<String,
ClusterNode>();
-
- Set<org.apache.karaf.cellar.core.Node> karafCellarNodes =
karafCellarClusterManager.listNodes();
- org.apache.karaf.cellar.core.Node thisKarafNode =
karafCellarClusterManager.getNode();
- Map<String, Properties> clusterConfigurations =
karafCellarClusterManager.getMap(Constants.CONFIGURATION_MAP +
Configurations.SEPARATOR + karafCellarGroupName);
- Properties karafCellarClusterNodeConfiguration =
clusterConfigurations.get(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION);
- Map<String, String> publicNodeEndpoints = new TreeMap<>();
- Map<String, String> internalNodeEndpoints = new TreeMap<>();
- if (karafCellarClusterNodeConfiguration != null) {
- publicNodeEndpoints =
getMapProperty(karafCellarClusterNodeConfiguration,
KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, thisKarafNode.getId() + "=" +
publicAddress);
- internalNodeEndpoints =
getMapProperty(karafCellarClusterNodeConfiguration,
KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS, thisKarafNode.getId() + "=" +
internalAddress);
+ /**
+ * Initializes scheduled tasks for cluster management.
+ * This method can be called later if schedulerService wasn't available
during init.
+ */
+ public void initializeScheduledTasks() {
+ /* Wait for PR UNOMI-878 to reactivate that code
+ if (schedulerService == null) {
+ LOGGER.error("Cannot initialize scheduled tasks: SchedulerService
is not set");
+ return;
}
- for (org.apache.karaf.cellar.core.Node karafCellarNode :
karafCellarNodes) {
- ClusterNode clusterNode = new ClusterNode();
- String publicEndpoint =
publicNodeEndpoints.get(karafCellarNode.getId());
- if (publicEndpoint != null) {
- clusterNode.setPublicHostAddress(publicEndpoint);
- }
- String internalEndpoint =
internalNodeEndpoints.get(karafCellarNode.getId());
- if (internalEndpoint != null) {
- clusterNode.setInternalHostAddress(internalEndpoint);
- }
- Map<String,Serializable> nodeStatistics =
nodeSystemStatistics.get(karafCellarNode.getId());
- if (nodeStatistics != null) {
- Long uptime = (Long) nodeStatistics.get("uptime");
- if (uptime != null) {
- clusterNode.setUptime(uptime);
- }
- Double systemCpuLoad = (Double)
nodeStatistics.get("systemCpuLoad");
- if (systemCpuLoad != null) {
- clusterNode.setCpuLoad(systemCpuLoad);
+ */
+
+ // Schedule regular updates of the node statistics
+ TimerTask statisticsTask = new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ updateSystemStats();
+ } catch (Throwable t) {
+ LOGGER.error("Error updating system statistics", t);
}
- List<Double> loadAverage = (List<Double>)
nodeStatistics.get("systemLoadAverage");
- if (loadAverage != null) {
- Double[] loadAverageArray = loadAverage.toArray(new
Double[loadAverage.size()]);
- ArrayUtils.toPrimitive(loadAverageArray);
-
clusterNode.setLoadAverage(ArrayUtils.toPrimitive(loadAverageArray));
+ }
+ };
+ /* Wait for PR UNOMI-878 to reactivate that code
+ schedulerService.createRecurringTask("clusterNodeStatisticsUpdate",
nodeStatisticsUpdateFrequency, TimeUnit.MILLISECONDS, statisticsTask, false);
+ */
+ scheduledExecutorService.scheduleAtFixedRate(statisticsTask, 100,
nodeStatisticsUpdateFrequency, TimeUnit.MILLISECONDS);
+
+ // Schedule cleanup of stale nodes
+ TimerTask cleanupTask = new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ cleanupStaleNodes();
+ } catch (Throwable t) {
+ LOGGER.error("Error cleaning up stale nodes", t);
}
}
- clusterNodes.put(karafCellarNode.getId(), clusterNode);
- }
+ };
+ /* Wait for PR UNOMI-878 to reactivate that code
+ schedulerService.createRecurringTask("clusterStaleNodesCleanup",
60000, TimeUnit.MILLISECONDS, cleanupTask, false);
+ */
+ scheduledExecutorService.scheduleAtFixedRate(cleanupTask, 100, 60000,
TimeUnit.MILLISECONDS);
- return new ArrayList<ClusterNode>(clusterNodes.values());
+ LOGGER.info("Cluster service scheduled tasks initialized");
}
- @Override
- public void purge(Date date) {
- persistenceService.purge(date);
- }
+ public void destroy() {
+ shutdownNow = true;
+
+ // Remove this node from the persistence service
+ PersistenceService service = getPersistenceService();
+ if (service != null) {
+ try {
+ service.remove(nodeId, ClusterNode.class);
+ LOGGER.info("Node {} removed from cluster", nodeId);
+ } catch (Exception e) {
+ LOGGER.error("Error removing node from cluster", e);
+ }
+ }
- @Override
- public void purge(String scope) {
- persistenceService.purge(scope);
- }
+ // Close service trackers
+ if (persistenceServiceTracker != null) {
+ try {
+ persistenceServiceTracker.close();
+ LOGGER.debug("Persistence service tracker closed");
+ } catch (Exception e) {
+ LOGGER.debug("Error closing persistence service tracker: {}",
e.getMessage());
+ }
+ persistenceServiceTracker = null;
+ }
- @Override
- public void sendEvent(Serializable eventObject) {
- Event event = (Event) eventObject;
- event.setSourceGroup(group);
- event.setSourceNode(karafCellarClusterManager.getNode());
- karafCellarEventProducer.produce(event);
+ // Clear references
+ persistenceService = null;
+ bundleWatcher = null;
+
+ LOGGER.info("Cluster service shutdown.");
}
/**
- * Check if a configuration is allowed.
- *
- * @param group the cluster group.
- * @param category the configuration category constant.
- * @param pid the configuration PID.
- * @param type the cluster event type.
- * @return true if the cluster event type is allowed, false else.
+ * Register this node in the persistence service
*/
- public boolean isClusterConfigPIDAllowed(Group group, String category,
String pid, EventType type) {
- CellarSupport support = new CellarSupport();
- support.setClusterManager(this.karafCellarClusterManager);
- support.setGroupManager(this.karafCellarGroupManager);
- support.setConfigurationAdmin(this.osgiConfigurationAdmin);
- return support.isAllowed(group, category, pid, type);
- }
-
- private Map<String, String> getMapProperty(Properties properties, String
propertyName, String defaultValue) {
- String propertyValue = properties.getProperty(propertyName,
defaultValue);
- return getMapProperty(propertyValue);
- }
-
- private Map<String, String> getMapProperty(String propertyValue) {
- String[] propertyValueArray = propertyValue.split(",");
- Map<String, String> propertyMapValue = new LinkedHashMap<>();
- for (String propertyValueElement : propertyValueArray) {
- String[] propertyValueElementPrats =
propertyValueElement.split("=");
- propertyMapValue.put(propertyValueElementPrats[0],
propertyValueElementPrats[1]);
+ private void registerNodeInPersistence() {
+ PersistenceService service = getPersistenceService();
+ if (service == null) {
+ LOGGER.error("Cannot register node: PersistenceService not
available");
+ return;
}
- return propertyMapValue;
- }
- private Map<String, String> setMapProperty(Properties properties, String
propertyName, Map<String, String> propertyMapValue) {
- StringBuilder propertyValueBuilder = new StringBuilder();
- int entryCount = 0;
- for (Map.Entry<String, String> propertyMapValueEntry :
propertyMapValue.entrySet()) {
- propertyValueBuilder.append(propertyMapValueEntry.getKey());
- propertyValueBuilder.append("=");
- propertyValueBuilder.append(propertyMapValueEntry.getValue());
- if (entryCount < propertyMapValue.size() - 1) {
- propertyValueBuilder.append(",");
- }
+ ClusterNode clusterNode = new ClusterNode();
+ clusterNode.setItemId(nodeId);
+ clusterNode.setPublicHostAddress(publicAddress);
+ clusterNode.setInternalHostAddress(internalAddress);
+ clusterNode.setStartTime(nodeStartTime);
+ clusterNode.setLastHeartbeat(System.currentTimeMillis());
+
+ // Set server information if BundleWatcher is available
+ if (bundleWatcher != null &&
!bundleWatcher.getServerInfos().isEmpty()) {
+ ServerInfo serverInfo = bundleWatcher.getServerInfos().get(0);
+ clusterNode.setServerInfo(serverInfo);
+ LOGGER.info("Added server info to node: version={}, build={}",
+ serverInfo.getServerVersion(),
serverInfo.getServerBuildNumber());
+ } else {
+ LOGGER.warn("BundleWatcher not available at registration time,
server info will not be available");
}
- String oldPropertyValue = (String)
properties.setProperty(propertyName, propertyValueBuilder.toString());
- if (oldPropertyValue == null) {
- return null;
+
+ updateSystemStatsForNode(clusterNode);
+
+ boolean success = service.save(clusterNode);
+ if (success) {
+ LOGGER.info("Node {} registered in cluster", nodeId);
+ } else {
+ LOGGER.error("Failed to register node {} in cluster", nodeId);
}
- return getMapProperty(oldPropertyValue);
}
- private void updateSystemStats() {
+ /**
+ * Updates system stats for the given cluster node
+ */
+ private void updateSystemStatsForNode(ClusterNode node) {
final RuntimeMXBean remoteRuntime =
ManagementFactory.getRuntimeMXBean();
long uptime = remoteRuntime.getUptime();
- ObjectName operatingSystemMXBeanName =
ManagementFactory.getOperatingSystemMXBean().getObjectName();
- Double systemCpuLoad = null;
+
+ double systemCpuLoad = 0.0;
try {
- systemCpuLoad = (Double)
ManagementFactory.getPlatformMBeanServer().getAttribute(operatingSystemMXBeanName,
"SystemCpuLoad");
- } catch (MBeanException | AttributeNotFoundException |
InstanceNotFoundException | ReflectionException e) {
- LOGGER.error("Error retrieving system CPU load", e);
+ systemCpuLoad = ((com.sun.management.OperatingSystemMXBean)
ManagementFactory.getOperatingSystemMXBean()).getSystemCpuLoad();
+ // Check for NaN value which Elasticsearch and OpenSearch don't
support for float fields
+ if (Double.isNaN(systemCpuLoad)) {
+ LOGGER.debug("System CPU load is NaN, setting to 0.0");
+ systemCpuLoad = 0.0;
+ }
+ } catch (Exception e) {
+ LOGGER.debug("Error retrieving system CPU load", e);
}
+
final OperatingSystemMXBean operatingSystemMXBean =
ManagementFactory.getOperatingSystemMXBean();
double systemLoadAverage =
operatingSystemMXBean.getSystemLoadAverage();
+ // Check for NaN value which Elasticsearch/OpenSearch doesn't support
for float fields
+ if (Double.isNaN(systemLoadAverage)) {
+ LOGGER.debug("System load average is NaN, setting to 0.0");
+ systemLoadAverage = 0.0;
+ }
+
+ node.setCpuLoad(systemCpuLoad);
+ node.setUptime(uptime);
- ClusterSystemStatisticsEvent clusterSystemStatisticsEvent = new
ClusterSystemStatisticsEvent("org.apache.unomi.cluster.system.statistics");
- Map<String,Serializable> systemStatistics = new TreeMap<>();
ArrayList<Double> systemLoadAverageArray = new ArrayList<>();
systemLoadAverageArray.add(systemLoadAverage);
+
node.setLoadAverage(ArrayUtils.toPrimitive(systemLoadAverageArray.toArray(new
Double[0])));
+
+ // Store system statistics in memory as well
+ Map<String, Serializable> systemStatistics = new TreeMap<>();
systemStatistics.put("systemLoadAverage", systemLoadAverageArray);
systemStatistics.put("systemCpuLoad", systemCpuLoad);
systemStatistics.put("uptime", uptime);
- clusterSystemStatisticsEvent.setStatistics(systemStatistics);
- nodeSystemStatistics.put(karafCellarClusterManager.getNode().getId(),
systemStatistics);
- sendEvent(clusterSystemStatisticsEvent);
+ nodeSystemStatistics.put(nodeId, systemStatistics);
+ }
+
+ /**
+ * Updates the system statistics for this node and stores them in the
persistence service
+ */
+ private void updateSystemStats() {
+ if (shutdownNow) {
+ return;
+ }
+
+ PersistenceService service = getPersistenceService();
+ if (service == null) {
+ LOGGER.warn("Cannot update system stats: PersistenceService not
available");
+ return;
+ }
+
+ // Load node from persistence
+ ClusterNode node = service.load(nodeId, ClusterNode.class);
+ if (node == null) {
+ LOGGER.warn("Node {} not found in persistence, re-registering",
nodeId);
+ registerNodeInPersistence();
+ return;
+ }
+
+ try {
+ // Update its stats
+ updateSystemStatsForNode(node);
+
+ // Update server info if needed
+ if (bundleWatcher != null &&
!bundleWatcher.getServerInfos().isEmpty()) {
+ ServerInfo currentInfo = bundleWatcher.getServerInfos().get(0);
+ // Check if server info needs updating
+ if (node.getServerInfo() == null ||
+
!currentInfo.getServerVersion().equals(node.getServerInfo().getServerVersion()))
{
+
+ node.setServerInfo(currentInfo);
+ LOGGER.info("Updated server info for node {}: version={},
build={}",
+ nodeId, currentInfo.getServerVersion(),
currentInfo.getServerBuildNumber());
+ }
+ }
+
+ node.setLastHeartbeat(System.currentTimeMillis());
+
+ // Save back to persistence
+ boolean success = service.save(node);
+ if (!success) {
+ LOGGER.error("Failed to update node {} statistics", nodeId);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Error updating system statistics for node {}: {}",
nodeId, e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Removes stale nodes from the cluster
+ */
+ private void cleanupStaleNodes() {
+ if (shutdownNow) {
+ return;
+ }
+
+ PersistenceService service = getPersistenceService();
+ if (service == null) {
+ LOGGER.warn("Cannot cleanup stale nodes: PersistenceService not
available");
+ return;
+ }
+
+ long cutoffTime = System.currentTimeMillis() -
(nodeStatisticsUpdateFrequency * 3); // Node is stale if no heartbeat for 3x
the update frequency
+
+ Condition staleNodesCondition = new Condition();
+ ConditionType propertyConditionType = new ConditionType();
+ propertyConditionType.setItemId("propertyCondition");
+ propertyConditionType.setItemType(ConditionType.ITEM_TYPE);
+
propertyConditionType.setConditionEvaluator("propertyConditionEvaluator");
+
propertyConditionType.setQueryBuilder("propertyConditionESQueryBuilder");
+ staleNodesCondition.setConditionType(propertyConditionType);
+ staleNodesCondition.setConditionTypeId("propertyCondition");
+ staleNodesCondition.setParameter("propertyName", "lastHeartbeat");
+ staleNodesCondition.setParameter("comparisonOperator", "lessThan");
+ staleNodesCondition.setParameter("propertyValueInteger", cutoffTime);
+
+ PartialList<ClusterNode> staleNodes =
service.query(staleNodesCondition, null, ClusterNode.class, 0, -1);
+
+ for (ClusterNode staleNode : staleNodes.getList()) {
+ LOGGER.info("Removing stale node: {}", staleNode.getItemId());
+ service.remove(staleNode.getItemId(), ClusterNode.class);
+ nodeSystemStatistics.remove(staleNode.getItemId());
+ }
+ }
+
+ @Override
+ public List<ClusterNode> getClusterNodes() {
+ PersistenceService service = getPersistenceService();
+ if (service == null) {
+ LOGGER.warn("Cannot get cluster nodes: PersistenceService not
available");
+ return Collections.emptyList();
+ }
+
+ // Query all nodes from the persistence service
+ return service.getAllItems(ClusterNode.class, 0, -1, null).getList();
}
+ @Override
+ public void purge(Date date) {
+ PersistenceService service = getPersistenceService();
+ if (service == null) {
+ LOGGER.warn("Cannot purge by date: PersistenceService not
available");
+ return;
+ }
+
+ service.purge(date);
+ }
+
+ @Override
+ public void purge(String scope) {
+ PersistenceService service = getPersistenceService();
+ if (service == null) {
+ LOGGER.warn("Cannot purge by scope: PersistenceService not
available");
+ return;
+ }
+
+ service.purge(scope);
+ }
+
+ /**
+ * Check if a persistence service is available.
+ * This can be used to quickly check before performing operations.
+ *
+ * @return true if a persistence service is available (either directly set
or via tracker)
+ */
+ public boolean isPersistenceServiceAvailable() {
+ return getPersistenceService() != null;
+ }
}
+
diff --git a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index f49f3c849..994c5598d 100644
--- a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -53,7 +53,7 @@
<cm:property-placeholder persistent-id="org.apache.unomi.cluster"
update-strategy="reload"
placeholder-prefix="${cluster.">
<cm:default-properties>
- <cm:property name="group" value="default"/>
+ <cm:property name="nodeId" value="unomi-node-1"/>
<cm:property name="contextserver.publicAddress"
value="https://localhost:9443"/>
<cm:property name="contextserver.internalAddress"
value="http://127.0.0.1:8181"/>
<cm:property name="nodeStatisticsUpdateFrequency" value="10000"/>
@@ -262,13 +262,11 @@
<property name="publicAddress"
value="${cluster.contextserver.publicAddress}"/>
<property name="internalAddress"
value="${cluster.contextserver.internalAddress}"/>
<property name="persistenceService" ref="persistenceService"/>
- <property name="karafCellarClusterManager"
ref="karafCellarClusterManager"/>
- <property name="karafCellarEventProducer"
ref="karafCellarEventProducer"/>
- <property name="karafCellarGroupManager"
ref="karafCellarGroupManager"/>
- <property name="karafCellarGroupName" value="${cluster.group}"/>
- <property name="osgiConfigurationAdmin" ref="osgiConfigurationAdmin"/>
+ <property name="nodeId" value="${cluster.nodeId}"/>
<property name="nodeStatisticsUpdateFrequency"
value="${cluster.nodeStatisticsUpdateFrequency}"/>
+ <!-- Wait for UNOMI-878 to be available to activate that
<property name="schedulerService" ref="schedulerServiceImpl"/>
+ -->
</bean>
<service id="clusterService" ref="clusterServiceImpl"
interface="org.apache.unomi.api.services.ClusterService"/>
diff --git a/services/src/main/resources/org.apache.unomi.cluster.cfg
b/services/src/main/resources/org.apache.unomi.cluster.cfg
index bfbb189d9..eecb7e1de 100644
--- a/services/src/main/resources/org.apache.unomi.cluster.cfg
+++ b/services/src/main/resources/org.apache.unomi.cluster.cfg
@@ -14,14 +14,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-group=${org.apache.unomi.cluster.group:-default}
# To simplify testing we set the public address to use HTTP, but for
production environments it is highly recommended
# to switch to using HTTPS with a proper SSL certificate installed.
contextserver.publicAddress=${org.apache.unomi.cluster.public.address:-http://localhost:8181}
contextserver.internalAddress=${org.apache.unomi.cluster.internal.address:-https://localhost:9443}
#
-# The nodeStatisticsUpdateFrequency controls the frequency of the update of
system statistics such as CPU load,
+# The nodeId is a required setting that uniquely identifies this node in the
cluster.
+# It must be set to a unique value for each node in the cluster.
+# Example: nodeId=node1
+nodeId=${org.apache.unomi.cluster.nodeId:-unomi-node-1}
+#
+## The nodeStatisticsUpdateFrequency controls the frequency of the update of
system statistics such as CPU load,
# system load average and uptime. This value is set in milliseconds and is set
to 10 seconds by default. Each node
# will retrieve the local values and broadcast them through a cluster event to
all the other nodes to update
# the global cluster statistics.
-nodeStatisticsUpdateFrequency=${org.apache.unomi.cluster.nodeStatisticsUpdateFrequency:-10000}
\ No newline at end of file
+nodeStatisticsUpdateFrequency=${org.apache.unomi.cluster.nodeStatisticsUpdateFrequency:-10000}