This is an automated email from the ASF dual-hosted git repository.
shuber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/master by this push:
new c22ff4722 Revert "UNOMI-877: Replace Karaf Cellar and Hazelcast with
PersistenceService…" (#724)
c22ff4722 is described below
commit c22ff47221112ae6aa02a7a0c492e93840062d4d
Author: Jérôme Blanchard <[email protected]>
AuthorDate: Fri Aug 22 15:47:24 2025 +0200
Revert "UNOMI-877: Replace Karaf Cellar and Hazelcast with
PersistenceService…" (#724)
This reverts commit 3f995e47ea8fbd33ca226e04f9f5cd0dfb719fff.
---
.../java/org/apache/unomi/api/ClusterNode.java | 65 +-
.../apache/unomi/api/services/ClusterService.java | 7 +
.../router/core/context/RouterCamelContext.java | 17 +
.../main/resources/etc/custom.system.properties | 5 +-
services/pom.xml | 7 -
.../services/impl/cluster/ClusterServiceImpl.java | 690 ++++++---------------
.../resources/OSGI-INF/blueprint/blueprint.xml | 10 +-
.../main/resources/org.apache.unomi.cluster.cfg | 10 +-
8 files changed, 236 insertions(+), 575 deletions(-)
diff --git a/api/src/main/java/org/apache/unomi/api/ClusterNode.java
b/api/src/main/java/org/apache/unomi/api/ClusterNode.java
index e096490b9..6c40ca21b 100644
--- a/api/src/main/java/org/apache/unomi/api/ClusterNode.java
+++ b/api/src/main/java/org/apache/unomi/api/ClusterNode.java
@@ -22,12 +22,10 @@ import java.io.Serializable;
/**
* Information about a cluster node.
*/
-public class ClusterNode extends Item {
+public class ClusterNode implements Serializable {
private static final long serialVersionUID = 1281422346318230514L;
- public static final String ITEM_TYPE = "clusterNode";
-
private double cpuLoad;
private double[] loadAverage;
private String publicHostAddress;
@@ -35,18 +33,11 @@ public class ClusterNode extends Item {
private long uptime;
private boolean master;
private boolean data;
- private long startTime;
- private long lastHeartbeat;
-
- // Server information
- private ServerInfo serverInfo;
/**
* Instantiates a new Cluster node.
*/
public ClusterNode() {
- super();
- setItemType(ITEM_TYPE);
}
/**
@@ -174,58 +165,4 @@ public class ClusterNode extends Item {
public void setData(boolean data) {
this.data = data;
}
-
- /**
- * Retrieves the node start time in milliseconds.
- *
- * @return the start time
- */
- public long getStartTime() {
- return startTime;
- }
-
- /**
- * Sets the node start time in milliseconds.
- *
- * @param startTime the start time
- */
- public void setStartTime(long startTime) {
- this.startTime = startTime;
- }
-
- /**
- * Retrieves the last heartbeat time in milliseconds.
- *
- * @return the last heartbeat time
- */
- public long getLastHeartbeat() {
- return lastHeartbeat;
- }
-
- /**
- * Sets the last heartbeat time in milliseconds.
- *
- * @param lastHeartbeat the last heartbeat time
- */
- public void setLastHeartbeat(long lastHeartbeat) {
- this.lastHeartbeat = lastHeartbeat;
- }
-
- /**
- * Gets the server information.
- *
- * @return the server information
- */
- public ServerInfo getServerInfo() {
- return serverInfo;
- }
-
- /**
- * Sets the server information.
- *
- * @param serverInfo the server information
- */
- public void setServerInfo(ServerInfo serverInfo) {
- this.serverInfo = serverInfo;
- }
}
diff --git
a/api/src/main/java/org/apache/unomi/api/services/ClusterService.java
b/api/src/main/java/org/apache/unomi/api/services/ClusterService.java
index ad8777503..299ac9098 100644
--- a/api/src/main/java/org/apache/unomi/api/services/ClusterService.java
+++ b/api/src/main/java/org/apache/unomi/api/services/ClusterService.java
@@ -51,4 +51,11 @@ public interface ClusterService {
*/
void purge(final String scope);
+ /**
+ * This function will send an event to the nodes of the cluster
+ * The function takes a Serializable to avoid dependency on any clustering
framework
+ *
+ * @param event this object will be cast to a
org.apache.karaf.cellar.core.event.Event object
+ */
+ void sendEvent(Serializable event);
}
diff --git
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
index c7e953132..67219f9c5 100644
---
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
+++
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
@@ -31,6 +31,7 @@ import org.apache.unomi.router.api.ImportConfiguration;
import org.apache.unomi.router.api.RouterConstants;
import org.apache.unomi.router.api.services.ImportExportConfigurationService;
import org.apache.unomi.router.api.services.ProfileExportService;
+import org.apache.unomi.router.core.event.UpdateCamelRouteEvent;
import org.apache.unomi.router.core.processor.ExportRouteCompletionProcessor;
import org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor;
import org.apache.unomi.router.core.processor.ImportRouteCompletionProcessor;
@@ -239,6 +240,12 @@ public class RouterCamelContext implements
IRouterCamelContext {
camelContext.removeRouteDefinition(routeDefinition);
}
}
+
+ if (fireEvent) {
+ UpdateCamelRouteEvent event = new
UpdateCamelRouteEvent(EVENT_ID_REMOVE);
+ event.setRouteId(routeId);
+ clusterService.sendEvent(event);
+ }
}
public void updateProfileImportReaderRoute(String configId, boolean
fireEvent) throws Exception {
@@ -259,6 +266,11 @@ public class RouterCamelContext implements
IRouterCamelContext {
builder.setJacksonDataFormat(jacksonDataFormat);
builder.setContext(camelContext);
camelContext.addRoutes(builder);
+
+ if (fireEvent) {
+ UpdateCamelRouteEvent event = new
UpdateCamelRouteEvent(EVENT_ID_IMPORT);
+ clusterService.sendEvent(event);
+ }
}
}
@@ -279,6 +291,11 @@ public class RouterCamelContext implements
IRouterCamelContext {
profileExportCollectRouteBuilder.setJacksonDataFormat(jacksonDataFormat);
profileExportCollectRouteBuilder.setContext(camelContext);
camelContext.addRoutes(profileExportCollectRouteBuilder);
+
+ if (fireEvent) {
+ UpdateCamelRouteEvent event = new
UpdateCamelRouteEvent(EVENT_ID_EXPORT);
+ clusterService.sendEvent(event);
+ }
}
}
diff --git a/package/src/main/resources/etc/custom.system.properties
b/package/src/main/resources/etc/custom.system.properties
index 46a262816..5e97437c3 100644
--- a/package/src/main/resources/etc/custom.system.properties
+++ b/package/src/main/resources/etc/custom.system.properties
@@ -81,14 +81,11 @@
org.apache.unomi.admin.servlet.context=${env:UNOMI_ADMIN_CONTEXT:-/cxs}
#######################################################################################################################
## Cluster Settings
##
#######################################################################################################################
+org.apache.unomi.cluster.group=${env:UNOMI_CLUSTER_GROUP:-default}
# To simplify testing we set the public address to use HTTP, but for
production environments it is highly recommended
# to switch to using HTTPS with a proper SSL certificate installed.
org.apache.unomi.cluster.public.address=${env:UNOMI_CLUSTER_PUBLIC_ADDRESS:-http://localhost:8181}
org.apache.unomi.cluster.internal.address=${env:UNOMI_CLUSTER_INTERNAL_ADDRESS:-https://localhost:9443}
-# The nodeId is a required setting that uniquely identifies this node in the
cluster.
-# It must be set to a unique value for each node in the cluster.
-# Example: nodeId=node1
-org.apache.unomi.cluster.nodeId=${env:UNOMI_CLUSTER_NODEID:-unomi-node-1}
# The nodeStatisticsUpdateFrequency controls the frequency of the update of
system statistics such as CPU load,
# system load average and uptime. This value is set in milliseconds and is set
to 10 seconds by default. Each node
# will retrieve the local values and broadcast them through a cluster event to
all the other nodes to update
diff --git a/services/pom.xml b/services/pom.xml
index 9a0862db8..c4cf7e4da 100644
--- a/services/pom.xml
+++ b/services/pom.xml
@@ -56,12 +56,6 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.apache.unomi</groupId>
- <artifactId>unomi-lifecycle-watcher</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
<dependency>
<groupId>javax.servlet</groupId>
@@ -181,7 +175,6 @@
<Embed-Dependency>*;scope=compile|runtime</Embed-Dependency>
<Import-Package>
sun.misc;resolution:=optional,
- com.sun.management;resolution:=optional,
*
</Import-Package>
</instructions>
diff --git
a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
b/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
index b4bc2c421..ec4cfe523 100644
---
a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
+++
b/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
@@ -18,30 +18,28 @@
package org.apache.unomi.services.impl.cluster;
import org.apache.commons.lang3.ArrayUtils;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.karaf.cellar.config.ClusterConfigurationEvent;
+import org.apache.karaf.cellar.config.Constants;
+import org.apache.karaf.cellar.core.*;
+import org.apache.karaf.cellar.core.control.SwitchStatus;
+import org.apache.karaf.cellar.core.event.Event;
+import org.apache.karaf.cellar.core.event.EventProducer;
+import org.apache.karaf.cellar.core.event.EventType;
import org.apache.unomi.api.ClusterNode;
-import org.apache.unomi.api.PartialList;
-import org.apache.unomi.api.ServerInfo;
-import org.apache.unomi.api.conditions.Condition;
-import org.apache.unomi.api.conditions.ConditionType;
import org.apache.unomi.api.services.ClusterService;
-import org.apache.unomi.lifecycle.BundleWatcher;
+import org.apache.unomi.api.services.SchedulerService;
import org.apache.unomi.persistence.spi.PersistenceService;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceReference;
-import org.osgi.util.tracker.ServiceTracker;
-import org.osgi.util.tracker.ServiceTrackerCustomizer;
+import org.osgi.service.cm.ConfigurationAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.management.*;
import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.RuntimeMXBean;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
@@ -49,185 +47,46 @@ import java.util.concurrent.TimeUnit;
*/
public class ClusterServiceImpl implements ClusterService {
+ public static final String KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION =
"org.apache.unomi.nodes";
+ public static final String KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS =
"publicEndpoints";
+ public static final String KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS
= "internalEndpoints";
private static final Logger LOGGER =
LoggerFactory.getLogger(ClusterServiceImpl.class.getName());
-
- /**
- * We use ServiceTracker instead of Blueprint dependency injection due to
a known bug in Apache Aries Blueprint
- * where service dependencies are not properly shut down in reverse order
of their initialization.
- *
- * The bug manifests in two ways:
- * 1. Services are not shut down in reverse order of their initialization,
causing potential deadlocks
- * 2. The PersistenceService is often shut down before other services that
depend on it, leading to timeout waits
- *
- * By using ServiceTracker, we have explicit control over:
- * - Service lifecycle management
- * - Shutdown order
- * - Service availability checks
- * - Graceful degradation when services become unavailable
- */
- private ServiceTracker<PersistenceService, PersistenceService>
persistenceServiceTracker;
-
- // Keep direct reference for backward compatibility and unit tests
- private PersistenceService persistenceService;
-
+ PersistenceService persistenceService;
+ private ClusterManager karafCellarClusterManager;
+ private EventProducer karafCellarEventProducer;
+ private GroupManager karafCellarGroupManager;
+ private String karafCellarGroupName = Configurations.DEFAULT_GROUP_NAME;
+ private ConfigurationAdmin osgiConfigurationAdmin;
private String publicAddress;
private String internalAddress;
- //private SchedulerService schedulerService; /* Wait for PR UNOMI-878 to
reactivate that code
- private ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(3);
- private String nodeId;
- private long nodeStartTime;
- private long nodeStatisticsUpdateFrequency = 10000;
- private Map<String, Map<String, Serializable>> nodeSystemStatistics = new
ConcurrentHashMap<>();
- private BundleContext bundleContext;
- private volatile boolean shutdownNow = false;
-
- private BundleWatcher bundleWatcher;
+ private Map<String, Map<String,Serializable>> nodeSystemStatistics = new
ConcurrentHashMap<>();
+ private Group group = null;
+ private SchedulerService schedulerService;
- /**
- * Max time to wait for persistence service (in milliseconds)
- */
- private static final long MAX_WAIT_TIME = 60000; // 60 seconds
+ private long nodeStatisticsUpdateFrequency = 10000;
- /**
- * Sets the bundle context, which is needed to create service trackers
- * @param bundleContext the OSGi bundle context
- */
- public void setBundleContext(BundleContext bundleContext) {
- this.bundleContext = bundleContext;
+ public void setPersistenceService(PersistenceService persistenceService) {
+ this.persistenceService = persistenceService;
}
- /**
- * Sets the bundle watcher used to retrieve server information
- *
- * @param bundleWatcher the bundle watcher
- */
- public void setBundleWatcher(BundleWatcher bundleWatcher) {
- this.bundleWatcher = bundleWatcher;
- LOGGER.info("BundleWatcher service set");
+ public void setKarafCellarClusterManager(ClusterManager
karafCellarClusterManager) {
+ this.karafCellarClusterManager = karafCellarClusterManager;
}
- /**
- * Waits for the persistence service to become available.
- * This method will retry getting the persistence service with exponential
backoff
- * until it's available or until the maximum wait time is reached.
- *
- * @throws IllegalStateException if the persistence service is not
available after the maximum wait time
- */
- private void waitForPersistenceService() {
- if (shutdownNow) {
- return;
- }
-
- // If persistence service is directly set (e.g., in unit tests), no
need to wait
- if (persistenceService != null) {
- LOGGER.debug("Persistence service is already available, no need to
wait");
- return;
- }
-
- // If no bundle context, we can't get the service via tracker
- if (bundleContext == null) {
- LOGGER.error("No BundleContext available, cannot wait for
persistence service");
- throw new IllegalStateException("No BundleContext available to get
persistence service");
- }
-
- // Initialize service tracker if needed
- if (persistenceServiceTracker == null) {
- initializeServiceTrackers();
- }
-
- // Try to get the service with retries
- long startTime = System.currentTimeMillis();
- long waitTime = 50; // Start with 50ms wait time
-
- while (System.currentTimeMillis() - startTime < MAX_WAIT_TIME) {
- PersistenceService service = getPersistenceService();
- if (service != null) {
- LOGGER.info("Persistence service is now available");
- return;
- }
-
- try {
- LOGGER.debug("Waiting for persistence service... ({}ms
elapsed)", System.currentTimeMillis() - startTime);
- Thread.sleep(waitTime);
- // Exponential backoff with a maximum of 5 seconds
- waitTime = Math.min(waitTime * 2, 5000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOGGER.error("Interrupted while waiting for persistence
service", e);
- break;
- }
- }
-
- throw new IllegalStateException("PersistenceService not available
after waiting " + MAX_WAIT_TIME + "ms");
+ public void setKarafCellarEventProducer(EventProducer
karafCellarEventProducer) {
+ this.karafCellarEventProducer = karafCellarEventProducer;
}
- /**
- * Safely gets the persistence service, either from the direct reference
(for tests)
- * or from the service tracker (for OSGi runtime)
- * @return the persistence service or null if not available
- */
- private PersistenceService getPersistenceService() {
- if (shutdownNow) return null;
-
- // For unit tests or if already directly set
- if (persistenceService != null) {
- return persistenceService;
- }
-
- // Otherwise try to get from service tracker
- return persistenceServiceTracker != null ?
persistenceServiceTracker.getService() : null;
+ public void setKarafCellarGroupManager(GroupManager
karafCellarGroupManager) {
+ this.karafCellarGroupManager = karafCellarGroupManager;
}
- /**
- * Initialize service tracker for PersistenceService
- */
- private void initializeServiceTrackers() {
- if (bundleContext == null) {
- LOGGER.warn("BundleContext is null, cannot initialize service
trackers");
- return;
- }
-
- // Only create service tracker if direct reference isn't set
- if (persistenceService == null) {
- LOGGER.info("Initializing PersistenceService tracker");
- persistenceServiceTracker = new ServiceTracker<>(
- bundleContext,
- PersistenceService.class,
- new ServiceTrackerCustomizer<PersistenceService,
PersistenceService>() {
- @Override
- public PersistenceService
addingService(ServiceReference<PersistenceService> reference) {
- PersistenceService service =
bundleContext.getService(reference);
- if (service != null) {
- persistenceService = service;
- LOGGER.info("PersistenceService acquired
through tracker");
- }
- return service;
- }
-
- @Override
- public void
modifiedService(ServiceReference<PersistenceService> reference,
PersistenceService service) {
- // No action needed
- }
-
- @Override
- public void
removedService(ServiceReference<PersistenceService> reference,
PersistenceService service) {
- LOGGER.info("PersistenceService removed");
- persistenceService = null;
- bundleContext.ungetService(reference);
- }
- }
- );
- persistenceServiceTracker.open();
- }
+ public void setKarafCellarGroupName(String karafCellarGroupName) {
+ this.karafCellarGroupName = karafCellarGroupName;
}
- /**
- * For unit tests and backward compatibility - directly sets the
persistence service
- * @param persistenceService the persistence service to set
- */
- public void setPersistenceService(PersistenceService persistenceService) {
- this.persistenceService = persistenceService;
- LOGGER.info("PersistenceService set directly");
+ public void setOsgiConfigurationAdmin(ConfigurationAdmin
osgiConfigurationAdmin) {
+ this.osgiConfigurationAdmin = osgiConfigurationAdmin;
}
public void setPublicAddress(String publicAddress) {
@@ -242,35 +101,8 @@ public class ClusterServiceImpl implements ClusterService {
this.nodeStatisticsUpdateFrequency = nodeStatisticsUpdateFrequency;
}
- /* Wait for PR UNOMI-878 to reactivate that code
public void setSchedulerService(SchedulerService schedulerService) {
this.schedulerService = schedulerService;
-
- // If we're already initialized, initialize scheduled tasks now
- // This handles the case when ClusterService was initialized before
SchedulerService was set
- if (schedulerService != null && System.currentTimeMillis() >
nodeStartTime && nodeStartTime > 0) {
- LOGGER.info("SchedulerService was set after ClusterService
initialization, initializing scheduled tasks now");
- initializeScheduledTasks();
- }
- }
- */
-
- /* Wait for PR UNOMI-878 to reactivate that code
- /**
- * Unbind method for the scheduler service, called by the OSGi framework
when the service is unregistered
- * @param schedulerService The scheduler service being unregistered
- */
- /*
- public void unsetSchedulerService(SchedulerService schedulerService) {
- if (this.schedulerService == schedulerService) {
- LOGGER.info("SchedulerService was unset");
- this.schedulerService = null;
- }
- }
- */
-
- public void setNodeId(String nodeId) {
- this.nodeId = nodeId;
}
public Map<String, Map<String, Serializable>> getNodeSystemStatistics() {
@@ -278,331 +110,211 @@ public class ClusterServiceImpl implements
ClusterService {
}
public void init() {
- // Initialize service trackers if not set directly (common in unit
tests)
- initializeServiceTrackers();
-
- // Validate that nodeId is provided
- if (StringUtils.isBlank(nodeId)) {
- String errorMessage = "CRITICAL: nodeId is not set. This is a
required setting for cluster operation.";
- LOGGER.error(errorMessage);
- throw new IllegalStateException(errorMessage);
- }
+ if (karafCellarEventProducer != null && karafCellarClusterManager !=
null) {
+
+ boolean setupConfigOk = true;
+ group =
karafCellarGroupManager.findGroupByName(karafCellarGroupName);
+ if (setupConfigOk && group == null) {
+ LOGGER.error("Cluster group {} doesn't exist, creating it...",
karafCellarGroupName);
+ group =
karafCellarGroupManager.createGroup(karafCellarGroupName);
+ if (group != null) {
+ setupConfigOk = true;
+ } else {
+ setupConfigOk = false;
+ }
+ }
- // Wait for persistence service to be available
- try {
- waitForPersistenceService();
- } catch (IllegalStateException e) {
- LOGGER.error("Failed to initialize cluster service: {}",
e.getMessage());
- return;
- }
+ // check if the producer is ON
+ if (setupConfigOk &&
karafCellarEventProducer.getSwitch().getStatus().equals(SwitchStatus.OFF)) {
+ LOGGER.error("Cluster event producer is OFF");
+ setupConfigOk = false;
+ }
- nodeStartTime = System.currentTimeMillis();
+ // check if the config pid is allowed
+ if (setupConfigOk && !isClusterConfigPIDAllowed(group,
Constants.CATEGORY, KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION,
EventType.OUTBOUND)) {
+ LOGGER.error("Configuration PID " +
KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION + " is blocked outbound for cluster
group {}",
+ karafCellarGroupName);
+ setupConfigOk = false;
+ }
+
+ if (setupConfigOk) {
+ Map<String, Properties> configurations =
karafCellarClusterManager.getMap(Constants.CONFIGURATION_MAP +
Configurations.SEPARATOR + karafCellarGroupName);
+ org.apache.karaf.cellar.core.Node thisKarafNode =
karafCellarClusterManager.getNode();
+ Properties karafCellarClusterNodeConfiguration =
configurations.get(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION);
+ if (karafCellarClusterNodeConfiguration == null) {
+ karafCellarClusterNodeConfiguration = new Properties();
+ }
+ Map<String, String> publicEndpoints =
getMapProperty(karafCellarClusterNodeConfiguration,
KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, thisKarafNode.getId() + "=" +
publicAddress);
+ publicEndpoints.put(thisKarafNode.getId(), publicAddress);
+ setMapProperty(karafCellarClusterNodeConfiguration,
KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, publicEndpoints);
- // Register this node in the persistence service
- registerNodeInPersistence();
+ Map<String, String> internalEndpoints =
getMapProperty(karafCellarClusterNodeConfiguration,
KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS, thisKarafNode.getId() + "=" +
internalAddress);
+ internalEndpoints.put(thisKarafNode.getId(), internalAddress);
+ setMapProperty(karafCellarClusterNodeConfiguration,
KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS, internalEndpoints);
+
+ configurations.put(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION,
karafCellarClusterNodeConfiguration);
+ ClusterConfigurationEvent clusterConfigurationEvent = new
ClusterConfigurationEvent(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION);
+ sendEvent(clusterConfigurationEvent);
+ }
+
+ TimerTask statisticsTask = new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ updateSystemStats();
+ } catch (Throwable t) {
+ LOGGER.error("Error updating system statistics", t);
+ }
+ }
+ };
+
schedulerService.getScheduleExecutorService().scheduleWithFixedDelay(statisticsTask,
0, nodeStatisticsUpdateFrequency, TimeUnit.MILLISECONDS);
- /* Wait for PR UNOMI-878 to reactivate that code
- /*
- // Only initialize scheduled tasks if scheduler service is available
- if (schedulerService != null) {
- initializeScheduledTasks();
- } else {
- LOGGER.warn("SchedulerService not available during ClusterService
initialization. Scheduled tasks will not be registered. They will be registered
when SchedulerService becomes available.");
}
- */
- initializeScheduledTasks();
+ LOGGER.info("Cluster service initialized.");
+ }
- LOGGER.info("Cluster service initialized with node ID: {}", nodeId);
+ public void destroy() {
+ LOGGER.info("Cluster service shutdown.");
}
- /**
- * Initializes scheduled tasks for cluster management.
- * This method can be called later if schedulerService wasn't available
during init.
- */
- public void initializeScheduledTasks() {
- /* Wait for PR UNOMI-878 to reactivate that code
- if (schedulerService == null) {
- LOGGER.error("Cannot initialize scheduled tasks: SchedulerService
is not set");
- return;
+ @Override
+ public List<ClusterNode> getClusterNodes() {
+ Map<String, ClusterNode> clusterNodes = new LinkedHashMap<String,
ClusterNode>();
+
+ Set<org.apache.karaf.cellar.core.Node> karafCellarNodes =
karafCellarClusterManager.listNodes();
+ org.apache.karaf.cellar.core.Node thisKarafNode =
karafCellarClusterManager.getNode();
+ Map<String, Properties> clusterConfigurations =
karafCellarClusterManager.getMap(Constants.CONFIGURATION_MAP +
Configurations.SEPARATOR + karafCellarGroupName);
+ Properties karafCellarClusterNodeConfiguration =
clusterConfigurations.get(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION);
+ Map<String, String> publicNodeEndpoints = new TreeMap<>();
+ Map<String, String> internalNodeEndpoints = new TreeMap<>();
+ if (karafCellarClusterNodeConfiguration != null) {
+ publicNodeEndpoints =
getMapProperty(karafCellarClusterNodeConfiguration,
KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, thisKarafNode.getId() + "=" +
publicAddress);
+ internalNodeEndpoints =
getMapProperty(karafCellarClusterNodeConfiguration,
KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS, thisKarafNode.getId() + "=" +
internalAddress);
}
- */
-
- // Schedule regular updates of the node statistics
- TimerTask statisticsTask = new TimerTask() {
- @Override
- public void run() {
- try {
- updateSystemStats();
- } catch (Throwable t) {
- LOGGER.error("Error updating system statistics", t);
- }
+ for (org.apache.karaf.cellar.core.Node karafCellarNode :
karafCellarNodes) {
+ ClusterNode clusterNode = new ClusterNode();
+ String publicEndpoint =
publicNodeEndpoints.get(karafCellarNode.getId());
+ if (publicEndpoint != null) {
+ clusterNode.setPublicHostAddress(publicEndpoint);
}
- };
- /* Wait for PR UNOMI-878 to reactivate that code
- schedulerService.createRecurringTask("clusterNodeStatisticsUpdate",
nodeStatisticsUpdateFrequency, TimeUnit.MILLISECONDS, statisticsTask, false);
- */
- scheduledExecutorService.scheduleAtFixedRate(statisticsTask, 100,
nodeStatisticsUpdateFrequency, TimeUnit.MILLISECONDS);
-
- // Schedule cleanup of stale nodes
- TimerTask cleanupTask = new TimerTask() {
- @Override
- public void run() {
- try {
- cleanupStaleNodes();
- } catch (Throwable t) {
- LOGGER.error("Error cleaning up stale nodes", t);
+ String internalEndpoint =
internalNodeEndpoints.get(karafCellarNode.getId());
+ if (internalEndpoint != null) {
+ clusterNode.setInternalHostAddress(internalEndpoint);
+ }
+ Map<String,Serializable> nodeStatistics =
nodeSystemStatistics.get(karafCellarNode.getId());
+ if (nodeStatistics != null) {
+ Long uptime = (Long) nodeStatistics.get("uptime");
+ if (uptime != null) {
+ clusterNode.setUptime(uptime);
+ }
+ Double systemCpuLoad = (Double)
nodeStatistics.get("systemCpuLoad");
+ if (systemCpuLoad != null) {
+ clusterNode.setCpuLoad(systemCpuLoad);
+ }
+ List<Double> loadAverage = (List<Double>)
nodeStatistics.get("systemLoadAverage");
+ if (loadAverage != null) {
+ Double[] loadAverageArray = loadAverage.toArray(new
Double[loadAverage.size()]);
+ ArrayUtils.toPrimitive(loadAverageArray);
+
clusterNode.setLoadAverage(ArrayUtils.toPrimitive(loadAverageArray));
}
}
- };
- /* Wait for PR UNOMI-878 to reactivate that code
- schedulerService.createRecurringTask("clusterStaleNodesCleanup",
60000, TimeUnit.MILLISECONDS, cleanupTask, false);
- */
- scheduledExecutorService.scheduleAtFixedRate(cleanupTask, 100, 60000,
TimeUnit.MILLISECONDS);
+ clusterNodes.put(karafCellarNode.getId(), clusterNode);
+ }
- LOGGER.info("Cluster service scheduled tasks initialized");
+ return new ArrayList<ClusterNode>(clusterNodes.values());
}
- public void destroy() {
- shutdownNow = true;
-
- // Remove this node from the persistence service
- PersistenceService service = getPersistenceService();
- if (service != null) {
- try {
- service.remove(nodeId, ClusterNode.class);
- LOGGER.info("Node {} removed from cluster", nodeId);
- } catch (Exception e) {
- LOGGER.error("Error removing node from cluster", e);
- }
- }
-
- // Close service trackers
- if (persistenceServiceTracker != null) {
- try {
- persistenceServiceTracker.close();
- LOGGER.debug("Persistence service tracker closed");
- } catch (Exception e) {
- LOGGER.debug("Error closing persistence service tracker: {}",
e.getMessage());
- }
- persistenceServiceTracker = null;
- }
+ @Override
+ public void purge(Date date) {
+ persistenceService.purge(date);
+ }
- // Clear references
- persistenceService = null;
- bundleWatcher = null;
+ @Override
+ public void purge(String scope) {
+ persistenceService.purge(scope);
+ }
- LOGGER.info("Cluster service shutdown.");
+ @Override
+ public void sendEvent(Serializable eventObject) {
+ Event event = (Event) eventObject;
+ event.setSourceGroup(group);
+ event.setSourceNode(karafCellarClusterManager.getNode());
+ karafCellarEventProducer.produce(event);
}
/**
- * Register this node in the persistence service
+ * Check if a configuration is allowed.
+ *
+ * @param group the cluster group.
+ * @param category the configuration category constant.
+ * @param pid the configuration PID.
+ * @param type the cluster event type.
+ * @return true if the cluster event type is allowed, false else.
*/
- private void registerNodeInPersistence() {
- PersistenceService service = getPersistenceService();
- if (service == null) {
- LOGGER.error("Cannot register node: PersistenceService not
available");
- return;
- }
+ public boolean isClusterConfigPIDAllowed(Group group, String category,
String pid, EventType type) {
+ CellarSupport support = new CellarSupport();
+ support.setClusterManager(this.karafCellarClusterManager);
+ support.setGroupManager(this.karafCellarGroupManager);
+ support.setConfigurationAdmin(this.osgiConfigurationAdmin);
+ return support.isAllowed(group, category, pid, type);
+ }
- ClusterNode clusterNode = new ClusterNode();
- clusterNode.setItemId(nodeId);
- clusterNode.setPublicHostAddress(publicAddress);
- clusterNode.setInternalHostAddress(internalAddress);
- clusterNode.setStartTime(nodeStartTime);
- clusterNode.setLastHeartbeat(System.currentTimeMillis());
-
- // Set server information if BundleWatcher is available
- if (bundleWatcher != null &&
!bundleWatcher.getServerInfos().isEmpty()) {
- ServerInfo serverInfo = bundleWatcher.getServerInfos().get(0);
- clusterNode.setServerInfo(serverInfo);
- LOGGER.info("Added server info to node: version={}, build={}",
- serverInfo.getServerVersion(),
serverInfo.getServerBuildNumber());
- } else {
- LOGGER.warn("BundleWatcher not available at registration time,
server info will not be available");
- }
+ private Map<String, String> getMapProperty(Properties properties, String
propertyName, String defaultValue) {
+ String propertyValue = properties.getProperty(propertyName,
defaultValue);
+ return getMapProperty(propertyValue);
+ }
- updateSystemStatsForNode(clusterNode);
+ private Map<String, String> getMapProperty(String propertyValue) {
+ String[] propertyValueArray = propertyValue.split(",");
+ Map<String, String> propertyMapValue = new LinkedHashMap<>();
+ for (String propertyValueElement : propertyValueArray) {
+ String[] propertyValueElementPrats =
propertyValueElement.split("=");
+ propertyMapValue.put(propertyValueElementPrats[0],
propertyValueElementPrats[1]);
+ }
+ return propertyMapValue;
+ }
- boolean success = service.save(clusterNode);
- if (success) {
- LOGGER.info("Node {} registered in cluster", nodeId);
- } else {
- LOGGER.error("Failed to register node {} in cluster", nodeId);
+ private Map<String, String> setMapProperty(Properties properties, String
propertyName, Map<String, String> propertyMapValue) {
+ StringBuilder propertyValueBuilder = new StringBuilder();
+ int entryCount = 0;
+ for (Map.Entry<String, String> propertyMapValueEntry :
propertyMapValue.entrySet()) {
+ propertyValueBuilder.append(propertyMapValueEntry.getKey());
+ propertyValueBuilder.append("=");
+ propertyValueBuilder.append(propertyMapValueEntry.getValue());
+ if (entryCount < propertyMapValue.size() - 1) {
+ propertyValueBuilder.append(",");
+ }
}
+ String oldPropertyValue = (String)
properties.setProperty(propertyName, propertyValueBuilder.toString());
+ if (oldPropertyValue == null) {
+ return null;
+ }
+ return getMapProperty(oldPropertyValue);
}
- /**
- * Updates system stats for the given cluster node
- */
- private void updateSystemStatsForNode(ClusterNode node) {
+ private void updateSystemStats() {
final RuntimeMXBean remoteRuntime =
ManagementFactory.getRuntimeMXBean();
long uptime = remoteRuntime.getUptime();
-
- double systemCpuLoad = 0.0;
+ ObjectName operatingSystemMXBeanName =
ManagementFactory.getOperatingSystemMXBean().getObjectName();
+ Double systemCpuLoad = null;
try {
- systemCpuLoad = ((com.sun.management.OperatingSystemMXBean)
ManagementFactory.getOperatingSystemMXBean()).getSystemCpuLoad();
- // Check for NaN value which Elasticsearch and OpenSearch don't
support for float fields
- if (Double.isNaN(systemCpuLoad)) {
- LOGGER.debug("System CPU load is NaN, setting to 0.0");
- systemCpuLoad = 0.0;
- }
- } catch (Exception e) {
- LOGGER.debug("Error retrieving system CPU load", e);
+ systemCpuLoad = (Double)
ManagementFactory.getPlatformMBeanServer().getAttribute(operatingSystemMXBeanName,
"SystemCpuLoad");
+ } catch (MBeanException | AttributeNotFoundException |
InstanceNotFoundException | ReflectionException e) {
+ LOGGER.error("Error retrieving system CPU load", e);
}
-
final OperatingSystemMXBean operatingSystemMXBean =
ManagementFactory.getOperatingSystemMXBean();
double systemLoadAverage =
operatingSystemMXBean.getSystemLoadAverage();
- // Check for NaN value which Elasticsearch/OpenSearch doesn't support
for float fields
- if (Double.isNaN(systemLoadAverage)) {
- LOGGER.debug("System load average is NaN, setting to 0.0");
- systemLoadAverage = 0.0;
- }
-
- node.setCpuLoad(systemCpuLoad);
- node.setUptime(uptime);
+ ClusterSystemStatisticsEvent clusterSystemStatisticsEvent = new
ClusterSystemStatisticsEvent("org.apache.unomi.cluster.system.statistics");
+ Map<String,Serializable> systemStatistics = new TreeMap<>();
ArrayList<Double> systemLoadAverageArray = new ArrayList<>();
systemLoadAverageArray.add(systemLoadAverage);
-
node.setLoadAverage(ArrayUtils.toPrimitive(systemLoadAverageArray.toArray(new
Double[0])));
-
- // Store system statistics in memory as well
- Map<String, Serializable> systemStatistics = new TreeMap<>();
systemStatistics.put("systemLoadAverage", systemLoadAverageArray);
systemStatistics.put("systemCpuLoad", systemCpuLoad);
systemStatistics.put("uptime", uptime);
- nodeSystemStatistics.put(nodeId, systemStatistics);
- }
-
- /**
- * Updates the system statistics for this node and stores them in the
persistence service
- */
- private void updateSystemStats() {
- if (shutdownNow) {
- return;
- }
-
- PersistenceService service = getPersistenceService();
- if (service == null) {
- LOGGER.warn("Cannot update system stats: PersistenceService not
available");
- return;
- }
-
- // Load node from persistence
- ClusterNode node = service.load(nodeId, ClusterNode.class);
- if (node == null) {
- LOGGER.warn("Node {} not found in persistence, re-registering",
nodeId);
- registerNodeInPersistence();
- return;
- }
-
- try {
- // Update its stats
- updateSystemStatsForNode(node);
-
- // Update server info if needed
- if (bundleWatcher != null &&
!bundleWatcher.getServerInfos().isEmpty()) {
- ServerInfo currentInfo = bundleWatcher.getServerInfos().get(0);
- // Check if server info needs updating
- if (node.getServerInfo() == null ||
-
!currentInfo.getServerVersion().equals(node.getServerInfo().getServerVersion()))
{
-
- node.setServerInfo(currentInfo);
- LOGGER.info("Updated server info for node {}: version={},
build={}",
- nodeId, currentInfo.getServerVersion(),
currentInfo.getServerBuildNumber());
- }
- }
-
- node.setLastHeartbeat(System.currentTimeMillis());
-
- // Save back to persistence
- boolean success = service.save(node);
- if (!success) {
- LOGGER.error("Failed to update node {} statistics", nodeId);
- }
- } catch (Exception e) {
- LOGGER.error("Error updating system statistics for node {}: {}",
nodeId, e.getMessage(), e);
- }
- }
-
- /**
- * Removes stale nodes from the cluster
- */
- private void cleanupStaleNodes() {
- if (shutdownNow) {
- return;
- }
-
- PersistenceService service = getPersistenceService();
- if (service == null) {
- LOGGER.warn("Cannot cleanup stale nodes: PersistenceService not
available");
- return;
- }
-
- long cutoffTime = System.currentTimeMillis() -
(nodeStatisticsUpdateFrequency * 3); // Node is stale if no heartbeat for 3x
the update frequency
-
- Condition staleNodesCondition = new Condition();
- ConditionType propertyConditionType = new ConditionType();
- propertyConditionType.setItemId("propertyCondition");
- propertyConditionType.setItemType(ConditionType.ITEM_TYPE);
-
propertyConditionType.setConditionEvaluator("propertyConditionEvaluator");
-
propertyConditionType.setQueryBuilder("propertyConditionESQueryBuilder");
- staleNodesCondition.setConditionType(propertyConditionType);
- staleNodesCondition.setConditionTypeId("propertyCondition");
- staleNodesCondition.setParameter("propertyName", "lastHeartbeat");
- staleNodesCondition.setParameter("comparisonOperator", "lessThan");
- staleNodesCondition.setParameter("propertyValueInteger", cutoffTime);
-
- PartialList<ClusterNode> staleNodes =
service.query(staleNodesCondition, null, ClusterNode.class, 0, -1);
-
- for (ClusterNode staleNode : staleNodes.getList()) {
- LOGGER.info("Removing stale node: {}", staleNode.getItemId());
- service.remove(staleNode.getItemId(), ClusterNode.class);
- nodeSystemStatistics.remove(staleNode.getItemId());
- }
- }
-
- @Override
- public List<ClusterNode> getClusterNodes() {
- PersistenceService service = getPersistenceService();
- if (service == null) {
- LOGGER.warn("Cannot get cluster nodes: PersistenceService not
available");
- return Collections.emptyList();
- }
-
- // Query all nodes from the persistence service
- return service.getAllItems(ClusterNode.class, 0, -1, null).getList();
+ clusterSystemStatisticsEvent.setStatistics(systemStatistics);
+ nodeSystemStatistics.put(karafCellarClusterManager.getNode().getId(),
systemStatistics);
+ sendEvent(clusterSystemStatisticsEvent);
}
- @Override
- public void purge(Date date) {
- PersistenceService service = getPersistenceService();
- if (service == null) {
- LOGGER.warn("Cannot purge by date: PersistenceService not
available");
- return;
- }
-
- service.purge(date);
- }
-
- @Override
- public void purge(String scope) {
- PersistenceService service = getPersistenceService();
- if (service == null) {
- LOGGER.warn("Cannot purge by scope: PersistenceService not
available");
- return;
- }
-
- service.purge(scope);
- }
-
- /**
- * Check if a persistence service is available.
- * This can be used to quickly check before performing operations.
- *
- * @return true if a persistence service is available (either directly set
or via tracker)
- */
- public boolean isPersistenceServiceAvailable() {
- return getPersistenceService() != null;
- }
}
-
diff --git a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 994c5598d..f49f3c849 100644
--- a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -53,7 +53,7 @@
<cm:property-placeholder persistent-id="org.apache.unomi.cluster"
update-strategy="reload"
placeholder-prefix="${cluster.">
<cm:default-properties>
- <cm:property name="nodeId" value="unomi-node-1"/>
+ <cm:property name="group" value="default"/>
<cm:property name="contextserver.publicAddress"
value="https://localhost:9443"/>
<cm:property name="contextserver.internalAddress"
value="http://127.0.0.1:8181"/>
<cm:property name="nodeStatisticsUpdateFrequency" value="10000"/>
@@ -262,11 +262,13 @@
<property name="publicAddress"
value="${cluster.contextserver.publicAddress}"/>
<property name="internalAddress"
value="${cluster.contextserver.internalAddress}"/>
<property name="persistenceService" ref="persistenceService"/>
- <property name="nodeId" value="${cluster.nodeId}"/>
+ <property name="karafCellarClusterManager"
ref="karafCellarClusterManager"/>
+ <property name="karafCellarEventProducer"
ref="karafCellarEventProducer"/>
+ <property name="karafCellarGroupManager"
ref="karafCellarGroupManager"/>
+ <property name="karafCellarGroupName" value="${cluster.group}"/>
+ <property name="osgiConfigurationAdmin" ref="osgiConfigurationAdmin"/>
<property name="nodeStatisticsUpdateFrequency"
value="${cluster.nodeStatisticsUpdateFrequency}"/>
- <!-- Wait for UNOMI-878 to be available to activate that
<property name="schedulerService" ref="schedulerServiceImpl"/>
- -->
</bean>
<service id="clusterService" ref="clusterServiceImpl"
interface="org.apache.unomi.api.services.ClusterService"/>
diff --git a/services/src/main/resources/org.apache.unomi.cluster.cfg
b/services/src/main/resources/org.apache.unomi.cluster.cfg
index eecb7e1de..bfbb189d9 100644
--- a/services/src/main/resources/org.apache.unomi.cluster.cfg
+++ b/services/src/main/resources/org.apache.unomi.cluster.cfg
@@ -14,18 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+group=${org.apache.unomi.cluster.group:-default}
# To simplify testing we set the public address to use HTTP, but for
production environments it is highly recommended
# to switch to using HTTPS with a proper SSL certificate installed.
contextserver.publicAddress=${org.apache.unomi.cluster.public.address:-http://localhost:8181}
contextserver.internalAddress=${org.apache.unomi.cluster.internal.address:-https://localhost:9443}
#
-# The nodeId is a required setting that uniquely identifies this node in the
cluster.
-# It must be set to a unique value for each node in the cluster.
-# Example: nodeId=node1
-nodeId=${org.apache.unomi.cluster.nodeId:-unomi-node-1}
-#
-## The nodeStatisticsUpdateFrequency controls the frequency of the update of
system statistics such as CPU load,
+# The nodeStatisticsUpdateFrequency controls the frequency of the update of
system statistics such as CPU load,
# system load average and uptime. This value is set in milliseconds and is set
to 10 seconds by default. Each node
# will retrieve the local values and broadcast them through a cluster event to
all the other nodes to update
# the global cluster statistics.
-nodeStatisticsUpdateFrequency=${org.apache.unomi.cluster.nodeStatisticsUpdateFrequency:-10000}
+nodeStatisticsUpdateFrequency=${org.apache.unomi.cluster.nodeStatisticsUpdateFrequency:-10000}
\ No newline at end of file