This is an automated email from the ASF dual-hosted git repository.
shuber pushed a commit to branch unomi-3-dev
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/unomi-3-dev by this push:
new c85d6a02f UNOMI-877 Refactor ClusterServiceImpl to remove
ServiceTracker and streamline persistence service handling
c85d6a02f is described below
commit c85d6a02fdf430774928a2fa08e48cc617c764b4
Author: Serge Huber <[email protected]>
AuthorDate: Mon Sep 1 10:00:20 2025 +0200
UNOMI-877 Refactor ClusterServiceImpl to remove ServiceTracker and
streamline persistence service handling
- Eliminated the use of ServiceTracker for PersistenceService, simplifying
service management.
- Updated methods to directly reference the persistence service, enhancing
clarity and reducing complexity.
- Improved logging messages to reflect changes in how the persistence
service is set and utilized.
- Added a new method to cancel scheduled tasks during shutdown for better
resource management.
- Adjusted blueprint configuration to remove unnecessary bundle context
property.
- Updated unit tests to reflect changes in the ClusterServiceImpl
initialization and configuration.
---
.../services/impl/cluster/ClusterServiceImpl.java | 310 ++++++---------------
.../resources/OSGI-INF/blueprint/blueprint.xml | 2 +-
.../java/org/apache/unomi/services/TestHelper.java | 3 -
3 files changed, 84 insertions(+), 231 deletions(-)
diff --git
a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
b/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
index 420217eaf..de65f871a 100644
---
a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
+++
b/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
@@ -31,9 +31,6 @@ import org.apache.unomi.persistence.spi.PersistenceService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceReference;
-import org.osgi.util.tracker.ServiceTracker;
-import org.osgi.util.tracker.ServiceTrackerCustomizer;
import java.io.Serializable;
import java.lang.management.ManagementFactory;
@@ -50,25 +47,8 @@ public class ClusterServiceImpl implements ClusterService {
private static final Logger LOGGER =
LoggerFactory.getLogger(ClusterServiceImpl.class.getName());
- /**
- * We use ServiceTracker instead of Blueprint dependency injection due to
a known bug in Apache Aries Blueprint
- * where service dependencies are not properly shut down in reverse order
of their initialization.
- *
- * The bug manifests in two ways:
- * 1. Services are not shut down in reverse order of their initialization,
causing potential deadlocks
- * 2. The PersistenceService is often shut down before other services that
depend on it, leading to timeout waits
- *
- * By using ServiceTracker, we have explicit control over:
- * - Service lifecycle management
- * - Shutdown order
- * - Service availability checks
- * - Graceful degradation when services become unavailable
- */
- private ServiceTracker<PersistenceService, PersistenceService>
persistenceServiceTracker;
-
- // Keep direct reference for backward compatibility and unit tests
private PersistenceService persistenceService;
-
+
private String publicAddress;
private String internalAddress;
private SchedulerService schedulerService;
@@ -76,27 +56,14 @@ public class ClusterServiceImpl implements ClusterService {
private long nodeStartTime;
private long nodeStatisticsUpdateFrequency = 10000;
private Map<String, Map<String, Serializable>> nodeSystemStatistics = new
ConcurrentHashMap<>();
- private BundleContext bundleContext;
private volatile boolean shutdownNow = false;
-
+ private volatile List<ClusterNode> cachedClusterNodes =
Collections.emptyList();
+
private BundleWatcher bundleWatcher;
-
- /**
- * Max time to wait for persistence service (in milliseconds)
- */
- private static final long MAX_WAIT_TIME = 60000; // 60 seconds
- /**
- * Sets the bundle context, which is needed to create service trackers
- * @param bundleContext the OSGi bundle context
- */
- public void setBundleContext(BundleContext bundleContext) {
- this.bundleContext = bundleContext;
- }
-
/**
* Sets the bundle watcher used to retrieve server information
- *
+ *
* @param bundleWatcher the bundle watcher
*/
public void setBundleWatcher(BundleWatcher bundleWatcher) {
@@ -105,127 +72,12 @@ public class ClusterServiceImpl implements ClusterService
{
}
/**
- * Waits for the persistence service to become available.
- * This method will retry getting the persistence service with exponential
backoff
- * until it's available or until the maximum wait time is reached.
- *
- * @throws IllegalStateException if the persistence service is not
available after the maximum wait time
- */
- private void waitForPersistenceService() {
- if (shutdownNow) {
- return;
- }
-
- // If persistence service is directly set (e.g., in unit tests), no
need to wait
- if (persistenceService != null) {
- LOGGER.debug("Persistence service is already available, no need to
wait");
- return;
- }
-
- // If no bundle context, we can't get the service via tracker
- if (bundleContext == null) {
- LOGGER.error("No BundleContext available, cannot wait for
persistence service");
- throw new IllegalStateException("No BundleContext available to get
persistence service");
- }
-
- // Initialize service tracker if needed
- if (persistenceServiceTracker == null) {
- initializeServiceTrackers();
- }
-
- // Try to get the service with retries
- long startTime = System.currentTimeMillis();
- long waitTime = 50; // Start with 50ms wait time
-
- while (System.currentTimeMillis() - startTime < MAX_WAIT_TIME) {
- PersistenceService service = getPersistenceService();
- if (service != null) {
- LOGGER.info("Persistence service is now available");
- return;
- }
-
- try {
- LOGGER.debug("Waiting for persistence service... ({}ms
elapsed)", System.currentTimeMillis() - startTime);
- Thread.sleep(waitTime);
- // Exponential backoff with a maximum of 5 seconds
- waitTime = Math.min(waitTime * 2, 5000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOGGER.error("Interrupted while waiting for persistence
service", e);
- break;
- }
- }
-
- throw new IllegalStateException("PersistenceService not available
after waiting " + MAX_WAIT_TIME + "ms");
- }
-
- /**
- * Safely gets the persistence service, either from the direct reference
(for tests)
- * or from the service tracker (for OSGi runtime)
- * @return the persistence service or null if not available
- */
- private PersistenceService getPersistenceService() {
- if (shutdownNow) return null;
-
- // For unit tests or if already directly set
- if (persistenceService != null) {
- return persistenceService;
- }
-
- // Otherwise try to get from service tracker
- return persistenceServiceTracker != null ?
persistenceServiceTracker.getService() : null;
- }
-
- /**
- * Initialize service tracker for PersistenceService
- */
- private void initializeServiceTrackers() {
- if (bundleContext == null) {
- LOGGER.warn("BundleContext is null, cannot initialize service
trackers");
- return;
- }
-
- // Only create service tracker if direct reference isn't set
- if (persistenceService == null) {
- LOGGER.info("Initializing PersistenceService tracker");
- persistenceServiceTracker = new ServiceTracker<>(
- bundleContext,
- PersistenceService.class,
- new ServiceTrackerCustomizer<PersistenceService,
PersistenceService>() {
- @Override
- public PersistenceService
addingService(ServiceReference<PersistenceService> reference) {
- PersistenceService service =
bundleContext.getService(reference);
- if (service != null) {
- persistenceService = service;
- LOGGER.info("PersistenceService acquired through
tracker");
- }
- return service;
- }
-
- @Override
- public void
modifiedService(ServiceReference<PersistenceService> reference,
PersistenceService service) {
- // No action needed
- }
-
- @Override
- public void
removedService(ServiceReference<PersistenceService> reference,
PersistenceService service) {
- LOGGER.info("PersistenceService removed");
- persistenceService = null;
- bundleContext.ungetService(reference);
- }
- }
- );
- persistenceServiceTracker.open();
- }
- }
-
- /**
- * For unit tests and backward compatibility - directly sets the
persistence service
+ * Sets the persistence service via Blueprint dependency injection
* @param persistenceService the persistence service to set
*/
public void setPersistenceService(PersistenceService persistenceService) {
this.persistenceService = persistenceService;
- LOGGER.info("PersistenceService set directly");
+ LOGGER.info("PersistenceService set via Blueprint dependency
injection");
}
public void setPublicAddress(String publicAddress) {
@@ -242,7 +94,7 @@ public class ClusterServiceImpl implements ClusterService {
public void setSchedulerService(SchedulerService schedulerService) {
this.schedulerService = schedulerService;
-
+
// If we're already initialized, initialize scheduled tasks now
// This handles the case when ClusterService was initialized before
SchedulerService was set
if (schedulerService != null && System.currentTimeMillis() >
nodeStartTime && nodeStartTime > 0) {
@@ -257,7 +109,8 @@ public class ClusterServiceImpl implements ClusterService {
*/
public void unsetSchedulerService(SchedulerService schedulerService) {
if (this.schedulerService == schedulerService) {
- LOGGER.info("SchedulerService was unset");
+ LOGGER.info("SchedulerService was unbound, cancelling scheduled
tasks");
+ cancelScheduledTasks();
this.schedulerService = null;
}
}
@@ -271,9 +124,6 @@ public class ClusterServiceImpl implements ClusterService {
}
public void init() {
- // Initialize service trackers if not set directly (common in unit
tests)
- initializeServiceTrackers();
-
// Validate that nodeId is provided
if (StringUtils.isBlank(nodeId)) {
String errorMessage = "CRITICAL: nodeId is not set. This is a
required setting for cluster operation.";
@@ -281,12 +131,11 @@ public class ClusterServiceImpl implements ClusterService
{
throw new IllegalStateException(errorMessage);
}
- // Wait for persistence service to be available
- try {
- waitForPersistenceService();
- } catch (IllegalStateException e) {
- LOGGER.error("Failed to initialize cluster service: {}",
e.getMessage());
- return;
+ // Validate that persistence service is available
+ if (persistenceService == null) {
+ String errorMessage = "CRITICAL: PersistenceService is not set.
This is a required dependency for cluster operation.";
+ LOGGER.error(errorMessage);
+ throw new IllegalStateException(errorMessage);
}
nodeStartTime = System.currentTimeMillis();
@@ -313,7 +162,7 @@ public class ClusterServiceImpl implements ClusterService {
LOGGER.error("Cannot initialize scheduled tasks: SchedulerService
is not set");
return;
}
-
+
// Schedule regular updates of the node statistics
TimerTask statisticsTask = new TimerTask() {
@Override
@@ -339,64 +188,73 @@ public class ClusterServiceImpl implements ClusterService
{
}
};
schedulerService.createRecurringTask("clusterStaleNodesCleanup",
60000, TimeUnit.MILLISECONDS, cleanupTask, false);
-
+
LOGGER.info("Cluster service scheduled tasks initialized");
}
public void destroy() {
+ LOGGER.info("Cluster service shutting down...");
shutdownNow = true;
-
- // Remove this node from the persistence service
- PersistenceService service = getPersistenceService();
- if (service != null) {
+
+ cancelScheduledTasks();
+
+ // Remove node from persistence service
+ if (persistenceService != null) {
try {
- service.remove(nodeId, ClusterNode.class);
+ persistenceService.remove(nodeId, ClusterNode.class);
LOGGER.info("Node {} removed from cluster", nodeId);
} catch (Exception e) {
LOGGER.error("Error removing node from cluster", e);
}
}
-
- // Close service trackers
- if (persistenceServiceTracker != null) {
- try {
- persistenceServiceTracker.close();
- LOGGER.debug("Persistence service tracker closed");
- } catch (Exception e) {
- LOGGER.debug("Error closing persistence service tracker: {}",
e.getMessage());
- }
- persistenceServiceTracker = null;
- }
-
+
// Clear references
persistenceService = null;
bundleWatcher = null;
-
+ schedulerService = null;
+
LOGGER.info("Cluster service shutdown.");
}
+ private void cancelScheduledTasks() {
+ // Cancel scheduled tasks
+ if (schedulerService != null) {
+ try {
+ schedulerService.cancelTask("clusterNodeStatisticsUpdate");
+ LOGGER.debug("Cancelled clusterNodeStatisticsUpdate task");
+ } catch (Exception e) {
+ LOGGER.debug("Error cancelling clusterNodeStatisticsUpdate
task: {}", e.getMessage());
+ }
+ try {
+ schedulerService.cancelTask("clusterStaleNodesCleanup");
+ LOGGER.debug("Cancelled clusterStaleNodesCleanup task");
+ } catch (Exception e) {
+ LOGGER.debug("Error cancelling clusterStaleNodesCleanup task:
{}", e.getMessage());
+ }
+ }
+ }
+
/**
* Register this node in the persistence service
*/
private void registerNodeInPersistence() {
- PersistenceService service = getPersistenceService();
- if (service == null) {
+ if (persistenceService == null) {
LOGGER.error("Cannot register node: PersistenceService not
available");
return;
}
-
+
ClusterNode clusterNode = new ClusterNode();
clusterNode.setItemId(nodeId);
clusterNode.setPublicHostAddress(publicAddress);
clusterNode.setInternalHostAddress(internalAddress);
clusterNode.setStartTime(nodeStartTime);
clusterNode.setLastHeartbeat(System.currentTimeMillis());
-
+
// Set server information if BundleWatcher is available
if (bundleWatcher != null &&
!bundleWatcher.getServerInfos().isEmpty()) {
ServerInfo serverInfo = bundleWatcher.getServerInfos().get(0);
clusterNode.setServerInfo(serverInfo);
- LOGGER.info("Added server info to node: version={}, build={}",
+ LOGGER.info("Added server info to node: version={}, build={}",
serverInfo.getServerVersion(),
serverInfo.getServerBuildNumber());
} else {
LOGGER.warn("BundleWatcher not available at registration time,
server info will not be available");
@@ -404,7 +262,7 @@ public class ClusterServiceImpl implements ClusterService {
updateSystemStatsForNode(clusterNode);
- boolean success = service.save(clusterNode);
+ boolean success = persistenceService.save(clusterNode);
if (success) {
LOGGER.info("Node {} registered in cluster", nodeId);
} else {
@@ -461,15 +319,14 @@ public class ClusterServiceImpl implements ClusterService
{
if (shutdownNow) {
return;
}
-
- PersistenceService service = getPersistenceService();
- if (service == null) {
+
+ if (persistenceService == null) {
LOGGER.warn("Cannot update system stats: PersistenceService not
available");
return;
}
-
+
// Load node from persistence
- ClusterNode node = service.load(nodeId, ClusterNode.class);
+ ClusterNode node = persistenceService.load(nodeId, ClusterNode.class);
if (node == null) {
LOGGER.warn("Node {} not found in persistence, re-registering",
nodeId);
registerNodeInPersistence();
@@ -479,27 +336,35 @@ public class ClusterServiceImpl implements ClusterService
{
try {
// Update its stats
updateSystemStatsForNode(node);
-
+
// Update server info if needed
if (bundleWatcher != null &&
!bundleWatcher.getServerInfos().isEmpty()) {
ServerInfo currentInfo = bundleWatcher.getServerInfos().get(0);
// Check if server info needs updating
- if (node.getServerInfo() == null ||
+ if (node.getServerInfo() == null ||
!currentInfo.getServerVersion().equals(node.getServerInfo().getServerVersion()))
{
-
+
node.setServerInfo(currentInfo);
- LOGGER.info("Updated server info for node {}: version={},
build={}",
+ LOGGER.info("Updated server info for node {}: version={},
build={}",
nodeId, currentInfo.getServerVersion(),
currentInfo.getServerBuildNumber());
}
}
-
+
node.setLastHeartbeat(System.currentTimeMillis());
// Save back to persistence
- boolean success = service.save(node);
+ boolean success = persistenceService.save(node);
if (!success) {
LOGGER.error("Failed to update node {} statistics", nodeId);
}
+
+ // Always refresh cluster nodes cache after attempting stats update
+ try {
+ List<ClusterNode> nodes =
persistenceService.getAllItems(ClusterNode.class, 0, -1, null).getList();
+ cachedClusterNodes = nodes;
+ } catch (Exception e) {
+ LOGGER.warn("Failed to refresh cluster nodes cache during
stats update", e);
+ }
} catch (Exception e) {
LOGGER.error("Error updating system statistics for node {}: {}",
nodeId, e.getMessage(), e);
}
@@ -512,13 +377,12 @@ public class ClusterServiceImpl implements ClusterService
{
if (shutdownNow) {
return;
}
-
- PersistenceService service = getPersistenceService();
- if (service == null) {
+
+ if (persistenceService == null) {
LOGGER.warn("Cannot cleanup stale nodes: PersistenceService not
available");
return;
}
-
+
long cutoffTime = System.currentTimeMillis() -
(nodeStatisticsUpdateFrequency * 3); // Node is stale if no heartbeat for 3x
the update frequency
Condition staleNodesCondition = new Condition();
@@ -533,56 +397,48 @@ public class ClusterServiceImpl implements ClusterService
{
staleNodesCondition.setParameter("comparisonOperator", "lessThan");
staleNodesCondition.setParameter("propertyValueInteger", cutoffTime);
- PartialList<ClusterNode> staleNodes =
service.query(staleNodesCondition, null, ClusterNode.class, 0, -1);
+ PartialList<ClusterNode> staleNodes =
persistenceService.query(staleNodesCondition, null, ClusterNode.class, 0, -1);
for (ClusterNode staleNode : staleNodes.getList()) {
LOGGER.info("Removing stale node: {}", staleNode.getItemId());
- service.remove(staleNode.getItemId(), ClusterNode.class);
+ persistenceService.remove(staleNode.getItemId(),
ClusterNode.class);
nodeSystemStatistics.remove(staleNode.getItemId());
}
}
@Override
public List<ClusterNode> getClusterNodes() {
- PersistenceService service = getPersistenceService();
- if (service == null) {
- LOGGER.warn("Cannot get cluster nodes: PersistenceService not
available");
- return Collections.emptyList();
- }
-
- // Query all nodes from the persistence service
- return service.getAllItems(ClusterNode.class, 0, -1, null).getList();
+ // Return cached cluster nodes, creating a defensive copy
+ return cachedClusterNodes.isEmpty() ? Collections.emptyList() : new
ArrayList<>(cachedClusterNodes);
}
@Override
public void purge(Date date) {
- PersistenceService service = getPersistenceService();
- if (service == null) {
+ if (persistenceService == null) {
LOGGER.warn("Cannot purge by date: PersistenceService not
available");
return;
}
-
- service.purge(date);
+
+ persistenceService.purge(date);
}
@Override
public void purge(String scope) {
- PersistenceService service = getPersistenceService();
- if (service == null) {
+ if (persistenceService == null) {
LOGGER.warn("Cannot purge by scope: PersistenceService not
available");
return;
}
-
- service.purge(scope);
+
+ persistenceService.purge(scope);
}
/**
* Check if a persistence service is available.
* This can be used to quickly check before performing operations.
- *
- * @return true if a persistence service is available (either directly set
or via tracker)
+ *
+ * @return true if a persistence service is available
*/
public boolean isPersistenceServiceAvailable() {
- return getPersistenceService() != null;
+ return persistenceService != null;
}
}
diff --git a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 1abc4dff4..9e5b32cfe 100644
--- a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -390,12 +390,12 @@
<bean id="clusterServiceImpl"
class="org.apache.unomi.services.impl.cluster.ClusterServiceImpl"
init-method="init" destroy-method="destroy">
+ <property name="persistenceService" ref="persistenceService" />
<property name="publicAddress"
value="${cluster.contextserver.publicAddress}"/>
<property name="internalAddress"
value="${cluster.contextserver.internalAddress}"/>
<property name="persistenceService" ref="persistenceService"/>
<property name="nodeId" value="${cluster.nodeId}"/>
<property name="nodeStatisticsUpdateFrequency"
value="${cluster.nodeStatisticsUpdateFrequency}"/>
- <property name="bundleContext" ref="blueprintBundleContext"/>
<property name="bundleWatcher" ref="bundleWatcher"/>
</bean>
diff --git a/services/src/test/java/org/apache/unomi/services/TestHelper.java
b/services/src/test/java/org/apache/unomi/services/TestHelper.java
index 92f02f3fb..65a17ab35 100644
--- a/services/src/test/java/org/apache/unomi/services/TestHelper.java
+++ b/services/src/test/java/org/apache/unomi/services/TestHelper.java
@@ -486,9 +486,6 @@ public class TestHelper {
clusterService.setInternalAddress(internalAddress);
clusterService.setNodeStatisticsUpdateFrequency(60000);
clusterService.setNodeId(nodeId);
- if (bundleContext != null) {
- clusterService.setBundleContext(bundleContext);
- }
return clusterService;
}