This is an automated email from the ASF dual-hosted git repository.

shuber pushed a commit to branch unomi-3-dev
in repository https://gitbox.apache.org/repos/asf/unomi.git


The following commit(s) were added to refs/heads/unomi-3-dev by this push:
     new c85d6a02f UNOMI-877 Refactor ClusterServiceImpl to remove 
ServiceTracker and streamline persistence service handling
c85d6a02f is described below

commit c85d6a02fdf430774928a2fa08e48cc617c764b4
Author: Serge Huber <[email protected]>
AuthorDate: Mon Sep 1 10:00:20 2025 +0200

    UNOMI-877 Refactor ClusterServiceImpl to remove ServiceTracker and 
streamline persistence service handling
    
    - Eliminated the use of ServiceTracker for PersistenceService, simplifying 
service management.
    - Updated methods to directly reference the persistence service, enhancing 
clarity and reducing complexity.
    - Improved logging messages to reflect changes in how the persistence 
service is set and utilized.
    - Added a new method to cancel scheduled tasks during shutdown for better 
resource management.
    - Adjusted blueprint configuration to remove unnecessary bundle context 
property.
    - Updated unit tests to reflect changes in the ClusterServiceImpl 
initialization and configuration.
---
 .../services/impl/cluster/ClusterServiceImpl.java  | 310 ++++++---------------
 .../resources/OSGI-INF/blueprint/blueprint.xml     |   2 +-
 .../java/org/apache/unomi/services/TestHelper.java |   3 -
 3 files changed, 84 insertions(+), 231 deletions(-)

diff --git 
a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
 
b/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
index 420217eaf..de65f871a 100644
--- 
a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
+++ 
b/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
@@ -31,9 +31,6 @@ import org.apache.unomi.persistence.spi.PersistenceService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceReference;
-import org.osgi.util.tracker.ServiceTracker;
-import org.osgi.util.tracker.ServiceTrackerCustomizer;
 
 import java.io.Serializable;
 import java.lang.management.ManagementFactory;
@@ -50,25 +47,8 @@ public class ClusterServiceImpl implements ClusterService {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterServiceImpl.class.getName());
 
-    /**
-     * We use ServiceTracker instead of Blueprint dependency injection due to 
a known bug in Apache Aries Blueprint
-     * where service dependencies are not properly shut down in reverse order 
of their initialization.
-     * 
-     * The bug manifests in two ways:
-     * 1. Services are not shut down in reverse order of their initialization, 
causing potential deadlocks
-     * 2. The PersistenceService is often shut down before other services that 
depend on it, leading to timeout waits
-     * 
-     * By using ServiceTracker, we have explicit control over:
-     * - Service lifecycle management
-     * - Shutdown order
-     * - Service availability checks
-     * - Graceful degradation when services become unavailable
-     */
-    private ServiceTracker<PersistenceService, PersistenceService> 
persistenceServiceTracker;
-    
-    // Keep direct reference for backward compatibility and unit tests
     private PersistenceService persistenceService;
-    
+
     private String publicAddress;
     private String internalAddress;
     private SchedulerService schedulerService;
@@ -76,27 +56,14 @@ public class ClusterServiceImpl implements ClusterService {
     private long nodeStartTime;
     private long nodeStatisticsUpdateFrequency = 10000;
     private Map<String, Map<String, Serializable>> nodeSystemStatistics = new 
ConcurrentHashMap<>();
-    private BundleContext bundleContext;
     private volatile boolean shutdownNow = false;
-    
+    private volatile List<ClusterNode> cachedClusterNodes = 
Collections.emptyList();
+
     private BundleWatcher bundleWatcher;
-    
-    /**
-     * Max time to wait for persistence service (in milliseconds)
-     */
-    private static final long MAX_WAIT_TIME = 60000; // 60 seconds
 
-    /**
-     * Sets the bundle context, which is needed to create service trackers
-     * @param bundleContext the OSGi bundle context
-     */
-    public void setBundleContext(BundleContext bundleContext) {
-        this.bundleContext = bundleContext;
-    }
-    
     /**
      * Sets the bundle watcher used to retrieve server information
-     * 
+     *
      * @param bundleWatcher the bundle watcher
      */
     public void setBundleWatcher(BundleWatcher bundleWatcher) {
@@ -105,127 +72,12 @@ public class ClusterServiceImpl implements ClusterService 
{
     }
 
     /**
-     * Waits for the persistence service to become available.
-     * This method will retry getting the persistence service with exponential 
backoff
-     * until it's available or until the maximum wait time is reached.
-     * 
-     * @throws IllegalStateException if the persistence service is not 
available after the maximum wait time
-     */
-    private void waitForPersistenceService() {
-        if (shutdownNow) {
-            return;
-        }
-
-        // If persistence service is directly set (e.g., in unit tests), no 
need to wait
-        if (persistenceService != null) {
-            LOGGER.debug("Persistence service is already available, no need to 
wait");
-            return;
-        }
-
-        // If no bundle context, we can't get the service via tracker
-        if (bundleContext == null) {
-            LOGGER.error("No BundleContext available, cannot wait for 
persistence service");
-            throw new IllegalStateException("No BundleContext available to get 
persistence service");
-        }
-
-        // Initialize service tracker if needed
-        if (persistenceServiceTracker == null) {
-            initializeServiceTrackers();
-        }
-
-        // Try to get the service with retries
-        long startTime = System.currentTimeMillis();
-        long waitTime = 50; // Start with 50ms wait time
-        
-        while (System.currentTimeMillis() - startTime < MAX_WAIT_TIME) {
-            PersistenceService service = getPersistenceService();
-            if (service != null) {
-                LOGGER.info("Persistence service is now available");
-                return;
-            }
-            
-            try {
-                LOGGER.debug("Waiting for persistence service... ({}ms 
elapsed)", System.currentTimeMillis() - startTime);
-                Thread.sleep(waitTime);
-                // Exponential backoff with a maximum of 5 seconds
-                waitTime = Math.min(waitTime * 2, 5000);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                LOGGER.error("Interrupted while waiting for persistence 
service", e);
-                break;
-            }
-        }
-        
-        throw new IllegalStateException("PersistenceService not available 
after waiting " + MAX_WAIT_TIME + "ms");
-    }
-    
-    /**
-     * Safely gets the persistence service, either from the direct reference 
(for tests)
-     * or from the service tracker (for OSGi runtime)
-     * @return the persistence service or null if not available
-     */
-    private PersistenceService getPersistenceService() {
-        if (shutdownNow) return null;
-        
-        // For unit tests or if already directly set
-        if (persistenceService != null) {
-            return persistenceService;
-        }
-        
-        // Otherwise try to get from service tracker
-        return persistenceServiceTracker != null ? 
persistenceServiceTracker.getService() : null;
-    }
-    
-    /**
-     * Initialize service tracker for PersistenceService
-     */
-    private void initializeServiceTrackers() {
-        if (bundleContext == null) {
-            LOGGER.warn("BundleContext is null, cannot initialize service 
trackers");
-            return;
-        }
-        
-        // Only create service tracker if direct reference isn't set
-        if (persistenceService == null) {
-            LOGGER.info("Initializing PersistenceService tracker");
-            persistenceServiceTracker = new ServiceTracker<>(
-                bundleContext,
-                PersistenceService.class,
-                new ServiceTrackerCustomizer<PersistenceService, 
PersistenceService>() {
-                    @Override
-                    public PersistenceService 
addingService(ServiceReference<PersistenceService> reference) {
-                        PersistenceService service = 
bundleContext.getService(reference);
-                        if (service != null) {
-                            persistenceService = service;
-                            LOGGER.info("PersistenceService acquired through 
tracker");
-                        }
-                        return service;
-                    }
-                    
-                    @Override
-                    public void 
modifiedService(ServiceReference<PersistenceService> reference, 
PersistenceService service) {
-                        // No action needed
-                    }
-                    
-                    @Override
-                    public void 
removedService(ServiceReference<PersistenceService> reference, 
PersistenceService service) {
-                        LOGGER.info("PersistenceService removed");
-                        persistenceService = null;
-                        bundleContext.ungetService(reference);
-                    }
-                }
-            );
-            persistenceServiceTracker.open();
-        }
-    }
-
-    /**
-     * For unit tests and backward compatibility - directly sets the 
persistence service
+     * Sets the persistence service via Blueprint dependency injection
      * @param persistenceService the persistence service to set
      */
     public void setPersistenceService(PersistenceService persistenceService) {
         this.persistenceService = persistenceService;
-        LOGGER.info("PersistenceService set directly");
+        LOGGER.info("PersistenceService set via Blueprint dependency 
injection");
     }
 
     public void setPublicAddress(String publicAddress) {
@@ -242,7 +94,7 @@ public class ClusterServiceImpl implements ClusterService {
 
     public void setSchedulerService(SchedulerService schedulerService) {
         this.schedulerService = schedulerService;
-        
+
         // If we're already initialized, initialize scheduled tasks now
         // This handles the case when ClusterService was initialized before 
SchedulerService was set
         if (schedulerService != null && System.currentTimeMillis() > 
nodeStartTime && nodeStartTime > 0) {
@@ -257,7 +109,8 @@ public class ClusterServiceImpl implements ClusterService {
      */
     public void unsetSchedulerService(SchedulerService schedulerService) {
         if (this.schedulerService == schedulerService) {
-            LOGGER.info("SchedulerService was unset");
+            LOGGER.info("SchedulerService was unbound, cancelling scheduled 
tasks");
+            cancelScheduledTasks();
             this.schedulerService = null;
         }
     }
@@ -271,9 +124,6 @@ public class ClusterServiceImpl implements ClusterService {
     }
 
     public void init() {
-        // Initialize service trackers if not set directly (common in unit 
tests)
-        initializeServiceTrackers();
-        
         // Validate that nodeId is provided
         if (StringUtils.isBlank(nodeId)) {
             String errorMessage = "CRITICAL: nodeId is not set. This is a 
required setting for cluster operation.";
@@ -281,12 +131,11 @@ public class ClusterServiceImpl implements ClusterService 
{
             throw new IllegalStateException(errorMessage);
         }
 
-        // Wait for persistence service to be available
-        try {
-            waitForPersistenceService();
-        } catch (IllegalStateException e) {
-            LOGGER.error("Failed to initialize cluster service: {}", 
e.getMessage());
-            return;
+        // Validate that persistence service is available
+        if (persistenceService == null) {
+            String errorMessage = "CRITICAL: PersistenceService is not set. 
This is a required dependency for cluster operation.";
+            LOGGER.error(errorMessage);
+            throw new IllegalStateException(errorMessage);
         }
 
         nodeStartTime = System.currentTimeMillis();
@@ -313,7 +162,7 @@ public class ClusterServiceImpl implements ClusterService {
             LOGGER.error("Cannot initialize scheduled tasks: SchedulerService 
is not set");
             return;
         }
-        
+
         // Schedule regular updates of the node statistics
         TimerTask statisticsTask = new TimerTask() {
             @Override
@@ -339,64 +188,73 @@ public class ClusterServiceImpl implements ClusterService 
{
             }
         };
         schedulerService.createRecurringTask("clusterStaleNodesCleanup", 
60000, TimeUnit.MILLISECONDS, cleanupTask, false);
-        
+
         LOGGER.info("Cluster service scheduled tasks initialized");
     }
 
     public void destroy() {
+        LOGGER.info("Cluster service shutting down...");
         shutdownNow = true;
-        
-        // Remove this node from the persistence service
-        PersistenceService service = getPersistenceService();
-        if (service != null) {
+
+        cancelScheduledTasks();
+
+        // Remove node from persistence service
+        if (persistenceService != null) {
             try {
-                service.remove(nodeId, ClusterNode.class);
+                persistenceService.remove(nodeId, ClusterNode.class);
                 LOGGER.info("Node {} removed from cluster", nodeId);
             } catch (Exception e) {
                 LOGGER.error("Error removing node from cluster", e);
             }
         }
-        
-        // Close service trackers
-        if (persistenceServiceTracker != null) {
-            try {
-                persistenceServiceTracker.close();
-                LOGGER.debug("Persistence service tracker closed");
-            } catch (Exception e) {
-                LOGGER.debug("Error closing persistence service tracker: {}", 
e.getMessage());
-            }
-            persistenceServiceTracker = null;
-        }
-        
+
         // Clear references
         persistenceService = null;
         bundleWatcher = null;
-        
+        schedulerService = null;
+
         LOGGER.info("Cluster service shutdown.");
     }
 
+    private void cancelScheduledTasks() {
+        // Cancel scheduled tasks
+        if (schedulerService != null) {
+            try {
+                schedulerService.cancelTask("clusterNodeStatisticsUpdate");
+                LOGGER.debug("Cancelled clusterNodeStatisticsUpdate task");
+            } catch (Exception e) {
+                LOGGER.debug("Error cancelling clusterNodeStatisticsUpdate 
task: {}", e.getMessage());
+            }
+            try {
+                schedulerService.cancelTask("clusterStaleNodesCleanup");
+                LOGGER.debug("Cancelled clusterStaleNodesCleanup task");
+            } catch (Exception e) {
+                LOGGER.debug("Error cancelling clusterStaleNodesCleanup task: 
{}", e.getMessage());
+            }
+        }
+    }
+
     /**
      * Register this node in the persistence service
      */
     private void registerNodeInPersistence() {
-        PersistenceService service = getPersistenceService();
-        if (service == null) {
+        if (persistenceService == null) {
             LOGGER.error("Cannot register node: PersistenceService not 
available");
             return;
         }
-        
+
         ClusterNode clusterNode = new ClusterNode();
         clusterNode.setItemId(nodeId);
         clusterNode.setPublicHostAddress(publicAddress);
         clusterNode.setInternalHostAddress(internalAddress);
         clusterNode.setStartTime(nodeStartTime);
         clusterNode.setLastHeartbeat(System.currentTimeMillis());
-        
+
         // Set server information if BundleWatcher is available
         if (bundleWatcher != null && 
!bundleWatcher.getServerInfos().isEmpty()) {
             ServerInfo serverInfo = bundleWatcher.getServerInfos().get(0);
             clusterNode.setServerInfo(serverInfo);
-            LOGGER.info("Added server info to node: version={}, build={}", 
+            LOGGER.info("Added server info to node: version={}, build={}",
                 serverInfo.getServerVersion(), 
serverInfo.getServerBuildNumber());
         } else {
             LOGGER.warn("BundleWatcher not available at registration time, 
server info will not be available");
@@ -404,7 +262,7 @@ public class ClusterServiceImpl implements ClusterService {
 
         updateSystemStatsForNode(clusterNode);
 
-        boolean success = service.save(clusterNode);
+        boolean success = persistenceService.save(clusterNode);
         if (success) {
             LOGGER.info("Node {} registered in cluster", nodeId);
         } else {
@@ -461,15 +319,14 @@ public class ClusterServiceImpl implements ClusterService 
{
         if (shutdownNow) {
             return;
         }
-        
-        PersistenceService service = getPersistenceService();
-        if (service == null) {
+
+        if (persistenceService == null) {
             LOGGER.warn("Cannot update system stats: PersistenceService not 
available");
             return;
         }
-        
+
         // Load node from persistence
-        ClusterNode node = service.load(nodeId, ClusterNode.class);
+        ClusterNode node = persistenceService.load(nodeId, ClusterNode.class);
         if (node == null) {
             LOGGER.warn("Node {} not found in persistence, re-registering", 
nodeId);
             registerNodeInPersistence();
@@ -479,27 +336,35 @@ public class ClusterServiceImpl implements ClusterService 
{
         try {
             // Update its stats
             updateSystemStatsForNode(node);
-            
+
             // Update server info if needed
             if (bundleWatcher != null && 
!bundleWatcher.getServerInfos().isEmpty()) {
                 ServerInfo currentInfo = bundleWatcher.getServerInfos().get(0);
                 // Check if server info needs updating
-                if (node.getServerInfo() == null || 
+                if (node.getServerInfo() == null ||
                     
!currentInfo.getServerVersion().equals(node.getServerInfo().getServerVersion()))
 {
-                    
+
                     node.setServerInfo(currentInfo);
-                    LOGGER.info("Updated server info for node {}: version={}, 
build={}", 
+                    LOGGER.info("Updated server info for node {}: version={}, 
build={}",
                         nodeId, currentInfo.getServerVersion(), 
currentInfo.getServerBuildNumber());
                 }
             }
-            
+
             node.setLastHeartbeat(System.currentTimeMillis());
 
             // Save back to persistence
-            boolean success = service.save(node);
+            boolean success = persistenceService.save(node);
             if (!success) {
                 LOGGER.error("Failed to update node {} statistics", nodeId);
             }
+
+            // Always refresh cluster nodes cache after attempting stats update
+            try {
+                List<ClusterNode> nodes = 
persistenceService.getAllItems(ClusterNode.class, 0, -1, null).getList();
+                cachedClusterNodes = nodes;
+            } catch (Exception e) {
+                LOGGER.warn("Failed to refresh cluster nodes cache during 
stats update", e);
+            }
         } catch (Exception e) {
             LOGGER.error("Error updating system statistics for node {}: {}", 
nodeId, e.getMessage(), e);
         }
@@ -512,13 +377,12 @@ public class ClusterServiceImpl implements ClusterService 
{
         if (shutdownNow) {
             return;
         }
-        
-        PersistenceService service = getPersistenceService();
-        if (service == null) {
+
+        if (persistenceService == null) {
             LOGGER.warn("Cannot cleanup stale nodes: PersistenceService not 
available");
             return;
         }
-        
+
         long cutoffTime = System.currentTimeMillis() - 
(nodeStatisticsUpdateFrequency * 3); // Node is stale if no heartbeat for 3x 
the update frequency
 
         Condition staleNodesCondition = new Condition();
@@ -533,56 +397,48 @@ public class ClusterServiceImpl implements ClusterService 
{
         staleNodesCondition.setParameter("comparisonOperator", "lessThan");
         staleNodesCondition.setParameter("propertyValueInteger", cutoffTime);
 
-        PartialList<ClusterNode> staleNodes = 
service.query(staleNodesCondition, null, ClusterNode.class, 0, -1);
+        PartialList<ClusterNode> staleNodes = 
persistenceService.query(staleNodesCondition, null, ClusterNode.class, 0, -1);
 
         for (ClusterNode staleNode : staleNodes.getList()) {
             LOGGER.info("Removing stale node: {}", staleNode.getItemId());
-            service.remove(staleNode.getItemId(), ClusterNode.class);
+            persistenceService.remove(staleNode.getItemId(), 
ClusterNode.class);
             nodeSystemStatistics.remove(staleNode.getItemId());
         }
     }
 
     @Override
     public List<ClusterNode> getClusterNodes() {
-        PersistenceService service = getPersistenceService();
-        if (service == null) {
-            LOGGER.warn("Cannot get cluster nodes: PersistenceService not 
available");
-            return Collections.emptyList();
-        }
-        
-        // Query all nodes from the persistence service
-        return service.getAllItems(ClusterNode.class, 0, -1, null).getList();
+        // Return cached cluster nodes, creating a defensive copy
+        return cachedClusterNodes.isEmpty() ? Collections.emptyList() : new 
ArrayList<>(cachedClusterNodes);
     }
 
     @Override
     public void purge(Date date) {
-        PersistenceService service = getPersistenceService();
-        if (service == null) {
+        if (persistenceService == null) {
             LOGGER.warn("Cannot purge by date: PersistenceService not 
available");
             return;
         }
-        
-        service.purge(date);
+
+        persistenceService.purge(date);
     }
 
     @Override
     public void purge(String scope) {
-        PersistenceService service = getPersistenceService();
-        if (service == null) {
+        if (persistenceService == null) {
             LOGGER.warn("Cannot purge by scope: PersistenceService not 
available");
             return;
         }
-        
-        service.purge(scope);
+
+        persistenceService.purge(scope);
     }
 
     /**
      * Check if a persistence service is available.
      * This can be used to quickly check before performing operations.
-     * 
-     * @return true if a persistence service is available (either directly set 
or via tracker)
+     *
+     * @return true if a persistence service is available
      */
     public boolean isPersistenceServiceAvailable() {
-        return getPersistenceService() != null;
+        return persistenceService != null;
     }
 }
diff --git a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml 
b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 1abc4dff4..9e5b32cfe 100644
--- a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -390,12 +390,12 @@
 
     <bean id="clusterServiceImpl" 
class="org.apache.unomi.services.impl.cluster.ClusterServiceImpl"
           init-method="init" destroy-method="destroy">
+        <property name="persistenceService" ref="persistenceService" />
         <property name="publicAddress" 
value="${cluster.contextserver.publicAddress}"/>
         <property name="internalAddress" 
value="${cluster.contextserver.internalAddress}"/>
         <property name="persistenceService" ref="persistenceService"/>
         <property name="nodeId" value="${cluster.nodeId}"/>
         <property name="nodeStatisticsUpdateFrequency" 
value="${cluster.nodeStatisticsUpdateFrequency}"/>
-        <property name="bundleContext" ref="blueprintBundleContext"/>
         <property name="bundleWatcher" ref="bundleWatcher"/>
     </bean>
 
diff --git a/services/src/test/java/org/apache/unomi/services/TestHelper.java 
b/services/src/test/java/org/apache/unomi/services/TestHelper.java
index 92f02f3fb..65a17ab35 100644
--- a/services/src/test/java/org/apache/unomi/services/TestHelper.java
+++ b/services/src/test/java/org/apache/unomi/services/TestHelper.java
@@ -486,9 +486,6 @@ public class TestHelper {
         clusterService.setInternalAddress(internalAddress);
         clusterService.setNodeStatisticsUpdateFrequency(60000);
         clusterService.setNodeId(nodeId);
-        if (bundleContext != null) {
-            clusterService.setBundleContext(bundleContext);
-        }
 
         return clusterService;
     }

Reply via email to