This is an automated email from the ASF dual-hosted git repository.
shuber pushed a commit to branch UNOMI-877
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/UNOMI-877 by this push:
new 324349538 Refactor configuration handling and remove unused OSGi
ServiceTracker references.
324349538 is described below
commit 324349538ac345601f62ad8cfedf7811ca6fa1b0
Author: Serge Huber <[email protected]>
AuthorDate: Tue Aug 26 17:22:46 2025 +0200
Refactor configuration handling and remove unused OSGi ServiceTracker
references.
---
.../test/java/org/apache/unomi/itests/BaseIT.java | 48 ++++-
.../org/apache/unomi/itests/ProfileServiceIT.java | 4 +-
.../org/apache/unomi/itests/RuleServiceIT.java | 4 +-
.../ElasticSearchPersistenceServiceImpl.java | 37 +++-
.../resources/OSGI-INF/blueprint/blueprint.xml | 6 +-
.../spi/config/ConfigurationUpdateHelper.java | 160 ++++++++++++++++
.../services/impl/cluster/ClusterServiceImpl.java | 204 ++++++---------------
.../services/impl/rules/RulesServiceImpl.java | 18 +-
.../resources/OSGI-INF/blueprint/blueprint.xml | 10 +-
9 files changed, 326 insertions(+), 165 deletions(-)
diff --git a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
index 483bacd8a..a98d90d7d 100644
--- a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
@@ -378,6 +378,17 @@ public abstract class BaseIT extends KarafTestSupport {
segmentService = getService(SegmentService.class);
}
+ /**
+ * Updates an OSGi configuration with a single property value and
optionally waits for the service to be reregistered.
+ * If serviceName is null, the method will not wait for service
re-registration.
+ *
+ * @param serviceName The fully qualified name of the service to wait for,
or null to skip waiting
+ * @param configPid The persistent identifier of the configuration to
update
+ * @param propName The name of the property to update
+ * @param propValue The new value for the property
+ * @throws InterruptedException If the thread is interrupted while waiting
for service reregistration
+ * @throws IOException If an error occurs while updating the
configuration
+ */
public void updateConfiguration(String serviceName, String configPid,
String propName, Object propValue)
throws InterruptedException, IOException {
Map<String, Object> props = new HashMap<>();
@@ -385,20 +396,43 @@ public abstract class BaseIT extends KarafTestSupport {
updateConfiguration(serviceName, configPid, props);
}
+ /**
+ * Updates an OSGi configuration with multiple property values and
optionally waits for the service to be reregistered.
+ * If serviceName is null, the method will not wait for service
re-registration.
+ *
+ * @param serviceName The fully qualified name of the service to wait for,
or null to skip waiting
+ * @param configPid The persistent identifier of the configuration to
update
+ * @param propsToSet A map of property names to their new values
+ * @throws InterruptedException If the thread is interrupted while waiting
for service reregistration
+ * @throws IOException If an error occurs while updating the
configuration
+ */
public void updateConfiguration(String serviceName, String configPid,
Map<String, Object> propsToSet)
throws InterruptedException, IOException {
org.osgi.service.cm.Configuration cfg =
configurationAdmin.getConfiguration(configPid);
Dictionary<String, Object> props = cfg.getProperties();
+
+ // Handle case where properties haven't been initialized yet
+ final Dictionary<String, Object> finalProps = (props != null) ? props
: new Hashtable<>();
+
+ // Add new properties to the dictionary
for (Map.Entry<String, Object> propToSet : propsToSet.entrySet()) {
- props.put(propToSet.getKey(), propToSet.getValue());
+ finalProps.put(propToSet.getKey(), propToSet.getValue());
}
- waitForReRegistration(serviceName, () -> {
- try {
- cfg.update(props);
- } catch (IOException ignored) {
- }
- });
+ // If serviceName is null, don't wait for service re-registration
+ if (serviceName == null) {
+ LOGGER.info("Updating configuration {} without waiting for service
restart", configPid);
+ cfg.update(finalProps);
+ // Give the configuration change handler time to process
+ Thread.sleep(1000);
+ } else {
+ waitForReRegistration(serviceName, () -> {
+ try {
+ cfg.update(finalProps);
+ } catch (IOException ignored) {
+ }
+ });
+ }
waitForStartup();
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
index 623904938..479ba5657 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
@@ -168,7 +168,7 @@ public class ProfileServiceIT extends BaseIT {
}
}
- updateConfiguration(PersistenceService.class.getName(),
"org.apache.unomi.persistence.elasticsearch", "throwExceptions", true);
+ updateConfiguration(null,
"org.apache.unomi.persistence.elasticsearch", "throwExceptions", true);
Query query = new Query();
query.setLimit(2);
@@ -181,7 +181,7 @@ public class ProfileServiceIT extends BaseIT {
} catch (RuntimeException ex) {
// Should get here since this scenario should throw exception
} finally {
- updateConfiguration(PersistenceService.class.getName(),
"org.apache.unomi.persistence.elasticsearch", "throwExceptions",
+ updateConfiguration(null,
"org.apache.unomi.persistence.elasticsearch", "throwExceptions",
throwExceptionCurrent);
}
}
diff --git a/itests/src/test/java/org/apache/unomi/itests/RuleServiceIT.java
b/itests/src/test/java/org/apache/unomi/itests/RuleServiceIT.java
index 1d5a23d9d..04ef4904a 100644
--- a/itests/src/test/java/org/apache/unomi/itests/RuleServiceIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/RuleServiceIT.java
@@ -151,14 +151,14 @@ public class RuleServiceIT extends BaseIT {
Profile profile = new Profile(UUID.randomUUID().toString());
Session session = new Session(UUID.randomUUID().toString(), profile,
new Date(), TEST_SCOPE);
- updateConfiguration(RulesService.class.getName(),
"org.apache.unomi.services", "rules.optimizationActivated", "false");
+ updateConfiguration(null, "org.apache.unomi.services",
"rules.optimizationActivated", "false");
rulesService = getService(RulesService.class);
eventService = getService(EventService.class);
LOGGER.info("Running unoptimized rules performance test...");
long unoptimizedRunTime = runEventTest(profile, session);
- updateConfiguration(RulesService.class.getName(),
"org.apache.unomi.services", "rules.optimizationActivated", "true");
+ updateConfiguration(null, "org.apache.unomi.services",
"rules.optimizationActivated", "true");
rulesService = getService(RulesService.class);
eventService = getService(EventService.class);
diff --git
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 27e874e52..372fb6950 100644
---
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -40,6 +40,7 @@ import org.apache.unomi.metrics.MetricsService;
import org.apache.unomi.persistence.elasticsearch.conditions.*;
import org.apache.unomi.persistence.spi.PersistenceService;
import org.apache.unomi.persistence.spi.aggregate.*;
+import org.apache.unomi.persistence.spi.config.ConfigurationUpdateHelper;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.action.DocWriteRequest;
@@ -112,6 +113,7 @@ import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.TaskId;
import org.osgi.framework.*;
+import org.osgi.service.cm.ManagedService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -134,7 +136,7 @@ import java.util.concurrent.TimeUnit;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
@SuppressWarnings("rawtypes")
-public class ElasticSearchPersistenceServiceImpl implements
PersistenceService, SynchronousBundleListener {
+public class ElasticSearchPersistenceServiceImpl implements
PersistenceService, SynchronousBundleListener, ManagedService {
public static final String BULK_PROCESSOR_BULK_SIZE =
"bulkProcessor.bulkSize";
public static final String BULK_PROCESSOR_FLUSH_INTERVAL =
"bulkProcessor.flushInterval";
@@ -717,6 +719,39 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
}
}
+ @Override
+ public void updated(Dictionary<String, ?> properties) {
+ Map<String, ConfigurationUpdateHelper.PropertyMapping>
propertyMappings = new HashMap<>();
+
+ // Boolean properties
+ propertyMappings.put("throwExceptions",
ConfigurationUpdateHelper.booleanProperty(this::setThrowExceptions));
+ propertyMappings.put("alwaysOverwrite",
ConfigurationUpdateHelper.booleanProperty(this::setAlwaysOverwrite));
+ propertyMappings.put("useBatchingForSave",
ConfigurationUpdateHelper.booleanProperty(this::setUseBatchingForSave));
+ propertyMappings.put("useBatchingForUpdate",
ConfigurationUpdateHelper.booleanProperty(this::setUseBatchingForUpdate));
+ propertyMappings.put("aggQueryThrowOnMissingDocs",
ConfigurationUpdateHelper.booleanProperty(this::setAggQueryThrowOnMissingDocs));
+
+ // String properties
+ propertyMappings.put("logLevelRestClient",
ConfigurationUpdateHelper.stringProperty(this::setLogLevelRestClient));
+ propertyMappings.put("clientSocketTimeout",
ConfigurationUpdateHelper.stringProperty(this::setClientSocketTimeout));
+ propertyMappings.put("taskWaitingTimeout",
ConfigurationUpdateHelper.stringProperty(this::setTaskWaitingTimeout));
+ propertyMappings.put("taskWaitingPollingInterval",
ConfigurationUpdateHelper.stringProperty(this::setTaskWaitingPollingInterval));
+ propertyMappings.put("aggQueryMaxResponseSizeHttp",
ConfigurationUpdateHelper.stringProperty(this::setAggQueryMaxResponseSizeHttp));
+
+ // Integer properties
+ propertyMappings.put("aggregateQueryBucketSize",
ConfigurationUpdateHelper.integerProperty(this::setAggregateQueryBucketSize));
+
+ // Custom property for itemTypeToRefreshPolicy with IOException
handling
+ propertyMappings.put("itemTypeToRefreshPolicy",
ConfigurationUpdateHelper.customProperty((value, logger) -> {
+ try {
+ setItemTypeToRefreshPolicy(value.toString());
+ } catch (IOException e) {
+ logger.warn("Error setting itemTypeToRefreshPolicy: {}",
e.getMessage());
+ }
+ }));
+
+ ConfigurationUpdateHelper.processConfigurationUpdates(properties,
LOGGER, "ElasticSearch persistence", propertyMappings);
+ }
+
private void loadPredefinedMappings(BundleContext bundleContext, boolean
forceUpdateMapping) {
Enumeration<URL> predefinedMappings =
bundleContext.getBundle().findEntries("META-INF/cxs/mappings", "*.json", true);
if (predefinedMappings == null) {
diff --git
a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index d14235171..e376be29f 100644
---
a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++
b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -23,7 +23,7 @@
http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0
http://aries.apache.org/schemas/blueprint-cm/blueprint-cm-1.1.0.xsd">
<cm:property-placeholder
persistent-id="org.apache.unomi.persistence.elasticsearch"
- update-strategy="reload"
placeholder-prefix="${es.">
+ update-strategy="none" placeholder-prefix="${es.">
<cm:default-properties>
<cm:property name="cluster.name" value="contextElasticSearch"/>
<cm:property name="elasticSearchAddresses" value="localhost:9200"/>
@@ -85,7 +85,11 @@
<interfaces>
<value>org.apache.unomi.persistence.spi.PersistenceService</value>
<value>org.osgi.framework.SynchronousBundleListener</value>
+ <value>org.osgi.service.cm.ManagedService</value>
</interfaces>
+ <service-properties>
+ <entry key="service.pid"
value="org.apache.unomi.persistence.elasticsearch"/>
+ </service-properties>
</service>
<bean id="conditionESQueryBuilderDispatcher"
diff --git
a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/config/ConfigurationUpdateHelper.java
b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/config/ConfigurationUpdateHelper.java
new file mode 100644
index 000000000..7b5154d45
--- /dev/null
+++
b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/config/ConfigurationUpdateHelper.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.persistence.spi.config;
+
+import org.slf4j.Logger;
+
+import java.util.Dictionary;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * Utility class to handle configuration updates in ManagedService
implementations.
+ * This class provides generic methods to process configuration property
changes
+ * without hardcoding specific property names.
+ */
+public class ConfigurationUpdateHelper {
+
+ /**
+ * Processes configuration updates using a property mapping.
+ *
+ * @param properties The configuration properties dictionary
+ * @param logger The logger to use for debug messages
+ * @param serviceName The name of the service for logging purposes
+ * @param propertyMappings Map of property names to their setters and types
+ */
+ public static void processConfigurationUpdates(Dictionary<String, ?>
properties, Logger logger,
+ String serviceName,
+ Map<String, PropertyMapping>
propertyMappings) {
+ if (properties == null) {
+ return;
+ }
+
+ logger.info("{} configuration updated, applying changes without
restart", serviceName);
+
+ try {
+ for (Map.Entry<String, PropertyMapping> entry :
propertyMappings.entrySet()) {
+ String propertyName = entry.getKey();
+ PropertyMapping mapping = entry.getValue();
+
+ Object value = properties.get(propertyName);
+ if (value != null) {
+ try {
+ mapping.apply(value, logger);
+ logger.debug("Updated {} to: {}", propertyName, value);
+ } catch (Exception e) {
+ logger.warn("Error setting property {}: {}",
propertyName, e.getMessage());
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Error applying configuration updates", e);
+ }
+ }
+
+ /**
+ * Creates a boolean property mapping.
+ *
+ * @param setter The setter function to call with the boolean value
+ * @return PropertyMapping for boolean properties
+ */
+ public static PropertyMapping booleanProperty(Consumer<Boolean> setter) {
+ return (value, logger) -> {
+ boolean boolValue = Boolean.parseBoolean(value.toString());
+ setter.accept(boolValue);
+ };
+ }
+
+ /**
+ * Creates a string property mapping.
+ *
+ * @param setter The setter function to call with the string value
+ * @return PropertyMapping for string properties
+ */
+ public static PropertyMapping stringProperty(Consumer<String> setter) {
+ return (value, logger) -> {
+ String stringValue = value.toString();
+ setter.accept(stringValue);
+ };
+ }
+
+ /**
+ * Creates an integer property mapping.
+ *
+ * @param setter The setter function to call with the integer value
+ * @return PropertyMapping for integer properties
+ */
+ public static PropertyMapping integerProperty(Consumer<Integer> setter) {
+ return (value, logger) -> {
+ int intValue = Integer.parseInt(value.toString());
+ setter.accept(intValue);
+ };
+ }
+
+ /**
+ * Creates a long property mapping.
+ *
+ * @param setter The setter function to call with the long value
+ * @return PropertyMapping for long properties
+ */
+ public static PropertyMapping longProperty(Consumer<Long> setter) {
+ return (value, logger) -> {
+ long longValue = Long.parseLong(value.toString());
+ setter.accept(longValue);
+ };
+ }
+
+ /**
+ * Creates a custom property mapping for special cases.
+ *
+ * @param processor The custom processor function
+ * @return PropertyMapping for custom properties
+ */
+ public static PropertyMapping customProperty(PropertyProcessor processor) {
+ return processor::process;
+ }
+
+ /**
+ * Functional interface for property processing.
+ */
+ @FunctionalInterface
+ public interface PropertyMapping {
+ /**
+ * Applies the property value using the appropriate setter.
+ *
+ * @param value The property value
+ * @param logger The logger to use
+ * @throws Exception if there's an error processing the property
+ */
+ void apply(Object value, Logger logger) throws Exception;
+ }
+
+ /**
+ * Functional interface for custom property processing.
+ */
+ @FunctionalInterface
+ public interface PropertyProcessor {
+ /**
+ * Processes the property value.
+ *
+ * @param value The property value
+ * @param logger The logger to use
+ * @throws Exception if there's an error processing the property
+ */
+ void process(Object value, Logger logger) throws Exception;
+ }
+}
diff --git
a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
b/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
index 80fcc6397..c61096f4e 100644
---
a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
+++
b/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
@@ -27,10 +27,6 @@ import org.apache.unomi.api.conditions.ConditionType;
import org.apache.unomi.api.services.ClusterService;
import org.apache.unomi.lifecycle.BundleWatcher;
import org.apache.unomi.persistence.spi.PersistenceService;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceReference;
-import org.osgi.util.tracker.ServiceTracker;
-import org.osgi.util.tracker.ServiceTrackerCustomizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,10 +35,7 @@ import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.RuntimeMXBean;
import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
/**
* Implementation of the persistence service interface
@@ -51,23 +44,6 @@ public class ClusterServiceImpl implements ClusterService {
private static final Logger LOGGER =
LoggerFactory.getLogger(ClusterServiceImpl.class.getName());
- /**
- * We use ServiceTracker instead of Blueprint dependency injection due to
a known bug in Apache Aries Blueprint
- * where service dependencies are not properly shut down in reverse order
of their initialization.
- *
- * The bug manifests in two ways:
- * 1. Services are not shut down in reverse order of their initialization,
causing potential deadlocks
- * 2. The PersistenceService is often shut down before other services that
depend on it, leading to timeout waits
- *
- * By using ServiceTracker, we have explicit control over:
- * - Service lifecycle management
- * - Shutdown order
- * - Service availability checks
- * - Graceful degradation when services become unavailable
- */
- private ServiceTracker<PersistenceService, PersistenceService>
persistenceServiceTracker;
-
- // Keep direct reference for backward compatibility and unit tests
private PersistenceService persistenceService;
private String publicAddress;
@@ -78,24 +54,17 @@ public class ClusterServiceImpl implements ClusterService {
private long nodeStartTime;
private long nodeStatisticsUpdateFrequency = 10000;
private Map<String, Map<String, Serializable>> nodeSystemStatistics = new
ConcurrentHashMap<>();
- private BundleContext bundleContext;
private volatile boolean shutdownNow = false;
private BundleWatcher bundleWatcher;
+ private ScheduledFuture<?> updateSystemStatsFuture;
+ private ScheduledFuture<?> cleanupStaleNodesFuture;
/**
* Max time to wait for persistence service (in milliseconds)
*/
private static final long MAX_WAIT_TIME = 60000; // 60 seconds
- /**
- * Sets the bundle context, which is needed to create service trackers
- * @param bundleContext the OSGi bundle context
- */
- public void setBundleContext(BundleContext bundleContext) {
- this.bundleContext = bundleContext;
- }
-
/**
* Sets the bundle watcher used to retrieve server information
*
@@ -124,24 +93,12 @@ public class ClusterServiceImpl implements ClusterService {
return;
}
- // If no bundle context, we can't get the service via tracker
- if (bundleContext == null) {
- LOGGER.error("No BundleContext available, cannot wait for
persistence service");
- throw new IllegalStateException("No BundleContext available to get
persistence service");
- }
-
- // Initialize service tracker if needed
- if (persistenceServiceTracker == null) {
- initializeServiceTrackers();
- }
-
// Try to get the service with retries
long startTime = System.currentTimeMillis();
long waitTime = 50; // Start with 50ms wait time
while (System.currentTimeMillis() - startTime < MAX_WAIT_TIME) {
- PersistenceService service = getPersistenceService();
- if (service != null) {
+ if (persistenceService != null) {
LOGGER.info("Persistence service is now available");
return;
}
@@ -161,66 +118,6 @@ public class ClusterServiceImpl implements ClusterService {
throw new IllegalStateException("PersistenceService not available
after waiting " + MAX_WAIT_TIME + "ms");
}
- /**
- * Safely gets the persistence service, either from the direct reference
(for tests)
- * or from the service tracker (for OSGi runtime)
- * @return the persistence service or null if not available
- */
- private PersistenceService getPersistenceService() {
- if (shutdownNow) return null;
-
- // For unit tests or if already directly set
- if (persistenceService != null) {
- return persistenceService;
- }
-
- // Otherwise try to get from service tracker
- return persistenceServiceTracker != null ?
persistenceServiceTracker.getService() : null;
- }
-
- /**
- * Initialize service tracker for PersistenceService
- */
- private void initializeServiceTrackers() {
- if (bundleContext == null) {
- LOGGER.warn("BundleContext is null, cannot initialize service
trackers");
- return;
- }
-
- // Only create service tracker if direct reference isn't set
- if (persistenceService == null) {
- LOGGER.info("Initializing PersistenceService tracker");
- persistenceServiceTracker = new ServiceTracker<>(
- bundleContext,
- PersistenceService.class,
- new ServiceTrackerCustomizer<PersistenceService,
PersistenceService>() {
- @Override
- public PersistenceService
addingService(ServiceReference<PersistenceService> reference) {
- PersistenceService service =
bundleContext.getService(reference);
- if (service != null) {
- persistenceService = service;
- LOGGER.info("PersistenceService acquired
through tracker");
- }
- return service;
- }
-
- @Override
- public void
modifiedService(ServiceReference<PersistenceService> reference,
PersistenceService service) {
- // No action needed
- }
-
- @Override
- public void
removedService(ServiceReference<PersistenceService> reference,
PersistenceService service) {
- LOGGER.info("PersistenceService removed");
- persistenceService = null;
- bundleContext.ungetService(reference);
- }
- }
- );
- persistenceServiceTracker.open();
- }
- }
-
/**
* For unit tests and backward compatibility - directly sets the
persistence service
* @param persistenceService the persistence service to set
@@ -278,9 +175,6 @@ public class ClusterServiceImpl implements ClusterService {
}
public void init() {
- // Initialize service trackers if not set directly (common in unit
tests)
- initializeServiceTrackers();
-
// Validate that nodeId is provided
if (StringUtils.isBlank(nodeId)) {
String errorMessage = "CRITICAL: nodeId is not set. This is a
required setting for cluster operation.";
@@ -341,7 +235,7 @@ public class ClusterServiceImpl implements ClusterService {
/* Wait for PR UNOMI-878 to reactivate that code
schedulerService.createRecurringTask("clusterNodeStatisticsUpdate",
nodeStatisticsUpdateFrequency, TimeUnit.MILLISECONDS, statisticsTask, false);
*/
- scheduledExecutorService.scheduleAtFixedRate(statisticsTask, 100,
nodeStatisticsUpdateFrequency, TimeUnit.MILLISECONDS);
+ updateSystemStatsFuture =
scheduledExecutorService.scheduleAtFixedRate(statisticsTask, 100,
nodeStatisticsUpdateFrequency, TimeUnit.MILLISECONDS);
// Schedule cleanup of stale nodes
TimerTask cleanupTask = new TimerTask() {
@@ -357,34 +251,54 @@ public class ClusterServiceImpl implements ClusterService
{
/* Wait for PR UNOMI-878 to reactivate that code
schedulerService.createRecurringTask("clusterStaleNodesCleanup",
60000, TimeUnit.MILLISECONDS, cleanupTask, false);
*/
- scheduledExecutorService.scheduleAtFixedRate(cleanupTask, 100, 60000,
TimeUnit.MILLISECONDS);
+ cleanupStaleNodesFuture =
scheduledExecutorService.scheduleAtFixedRate(cleanupTask, 100, 60000,
TimeUnit.MILLISECONDS);
LOGGER.info("Cluster service scheduled tasks initialized");
}
public void destroy() {
- // Remove this node from the persistence service BEFORE setting
shutdownNow otherwise it won't work
- PersistenceService service = getPersistenceService();
- if (service != null) {
+ LOGGER.info("Cluster service shutting down...");
+ shutdownNow = true;
+
+ // Cancel scheduled tasks
+ if (updateSystemStatsFuture != null) {
+ boolean successfullyCancelled =
updateSystemStatsFuture.cancel(false);
+ if (!successfullyCancelled) {
+ LOGGER.warn("Failed to cancel scheduled task:
clusterNodeStatisticsUpdate");
+ } else {
+ LOGGER.info("Scheduled task: clusterNodeStatisticsUpdate
cancelled");
+ }
+ }
+ if (cleanupStaleNodesFuture != null) {
+ boolean successfullyCancelled =
cleanupStaleNodesFuture.cancel(false);
+ if (!successfullyCancelled) {
+ LOGGER.warn("Failed to cancel scheduled task:
cleanupStaleNodesFuture");
+ } else {
+ LOGGER.info("Scheduled task: cleanupStaleNodesFuture
cancelled");
+ }
+ }
+ if (scheduledExecutorService != null) {
+ scheduledExecutorService.shutdownNow();
try {
- service.remove(nodeId, ClusterNode.class);
- LOGGER.info("Node {} removed from cluster", nodeId);
- } catch (Exception e) {
- LOGGER.error("Error removing node from cluster", e);
+ boolean successfullyTerminated =
scheduledExecutorService.awaitTermination(10, TimeUnit.SECONDS);
+ if (!successfullyTerminated) {
+ LOGGER.warn("Failed to terminate scheduled tasks after 10
seconds...");
+ } else {
+ LOGGER.info("Scheduled tasks terminated");
+ }
+ } catch (InterruptedException e) {
+ LOGGER.error("Error waiting for scheduled tasks to terminate",
e);
}
}
- shutdownNow = true;
-
- // Close service trackers
- if (persistenceServiceTracker != null) {
+ // Remove node from persistence service
+ if (persistenceService != null) {
try {
- persistenceServiceTracker.close();
- LOGGER.debug("Persistence service tracker closed");
+ persistenceService.remove(nodeId, ClusterNode.class);
+ LOGGER.info("Node {} removed from cluster", nodeId);
} catch (Exception e) {
- LOGGER.debug("Error closing persistence service tracker: {}",
e.getMessage());
+ LOGGER.error("Error removing node from cluster", e);
}
- persistenceServiceTracker = null;
}
// Clear references
@@ -398,8 +312,7 @@ public class ClusterServiceImpl implements ClusterService {
* Register this node in the persistence service
*/
private void registerNodeInPersistence() {
- PersistenceService service = getPersistenceService();
- if (service == null) {
+ if (persistenceService == null) {
LOGGER.error("Cannot register node: PersistenceService not
available");
return;
}
@@ -423,7 +336,7 @@ public class ClusterServiceImpl implements ClusterService {
updateSystemStatsForNode(clusterNode);
- boolean success = service.save(clusterNode);
+ boolean success = persistenceService.save(clusterNode);
if (success) {
LOGGER.info("Node {} registered in cluster", nodeId);
} else {
@@ -481,14 +394,13 @@ public class ClusterServiceImpl implements ClusterService
{
return;
}
- PersistenceService service = getPersistenceService();
- if (service == null) {
+ if (persistenceService == null) {
LOGGER.warn("Cannot update system stats: PersistenceService not
available");
return;
}
// Load node from persistence
- ClusterNode node = service.load(nodeId, ClusterNode.class);
+ ClusterNode node = persistenceService.load(nodeId, ClusterNode.class);
if (node == null) {
LOGGER.warn("Node {} not found in persistence, re-registering",
nodeId);
registerNodeInPersistence();
@@ -515,7 +427,7 @@ public class ClusterServiceImpl implements ClusterService {
node.setLastHeartbeat(System.currentTimeMillis());
// Save back to persistence
- boolean success = service.save(node);
+ boolean success = persistenceService.save(node);
if (!success) {
LOGGER.error("Failed to update node {} statistics", nodeId);
}
@@ -532,8 +444,7 @@ public class ClusterServiceImpl implements ClusterService {
return;
}
- PersistenceService service = getPersistenceService();
- if (service == null) {
+ if (persistenceService == null) {
LOGGER.warn("Cannot cleanup stale nodes: PersistenceService not
available");
return;
}
@@ -552,47 +463,44 @@ public class ClusterServiceImpl implements ClusterService
{
staleNodesCondition.setParameter("comparisonOperator", "lessThan");
staleNodesCondition.setParameter("propertyValueInteger", cutoffTime);
- PartialList<ClusterNode> staleNodes =
service.query(staleNodesCondition, null, ClusterNode.class, 0, -1);
+ PartialList<ClusterNode> staleNodes =
persistenceService.query(staleNodesCondition, null, ClusterNode.class, 0, -1);
for (ClusterNode staleNode : staleNodes.getList()) {
LOGGER.info("Removing stale node: {}", staleNode.getItemId());
- service.remove(staleNode.getItemId(), ClusterNode.class);
+ persistenceService.remove(staleNode.getItemId(),
ClusterNode.class);
nodeSystemStatistics.remove(staleNode.getItemId());
}
}
@Override
public List<ClusterNode> getClusterNodes() {
- PersistenceService service = getPersistenceService();
- if (service == null) {
+ if (persistenceService == null) {
LOGGER.warn("Cannot get cluster nodes: PersistenceService not
available");
return Collections.emptyList();
}
// Query all nodes from the persistence service
- return service.getAllItems(ClusterNode.class, 0, -1, null).getList();
+ return persistenceService.getAllItems(ClusterNode.class, 0, -1,
null).getList();
}
@Override
public void purge(Date date) {
- PersistenceService service = getPersistenceService();
- if (service == null) {
+ if (persistenceService == null) {
LOGGER.warn("Cannot purge by date: PersistenceService not
available");
return;
}
- service.purge(date);
+ persistenceService.purge(date);
}
@Override
public void purge(String scope) {
- PersistenceService service = getPersistenceService();
- if (service == null) {
+ if (persistenceService == null) {
LOGGER.warn("Cannot purge by scope: PersistenceService not
available");
return;
}
- service.purge(scope);
+ persistenceService.purge(scope);
}
/**
@@ -602,7 +510,7 @@ public class ClusterServiceImpl implements ClusterService {
* @return true if a persistence service is available (either directly set
or via tracker)
*/
public boolean isPersistenceServiceAvailable() {
- return getPersistenceService() != null;
+ return persistenceService != null;
}
}
diff --git
a/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
b/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
index 6702f5689..fe3fe8fb0 100644
---
a/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
+++
b/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
@@ -30,9 +30,11 @@ import org.apache.unomi.api.rules.RuleStatistics;
import org.apache.unomi.api.services.*;
import org.apache.unomi.persistence.spi.CustomObjectMapper;
import org.apache.unomi.persistence.spi.PersistenceService;
+import org.apache.unomi.persistence.spi.config.ConfigurationUpdateHelper;
import org.apache.unomi.services.actions.ActionExecutorDispatcher;
import org.apache.unomi.api.utils.ParserHelper;
import org.osgi.framework.*;
+import org.osgi.service.cm.ManagedService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +45,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
-public class RulesServiceImpl implements RulesService, EventListenerService,
SynchronousBundleListener {
+public class RulesServiceImpl implements RulesService, EventListenerService,
SynchronousBundleListener, ManagedService {
public static final String TRACKED_PARAMETER =
"trackedConditionParameters";
private static final Logger LOGGER =
LoggerFactory.getLogger(RulesServiceImpl.class.getName());
@@ -105,6 +107,20 @@ public class RulesServiceImpl implements RulesService,
EventListenerService, Syn
this.optimizedRulesActivated = optimizedRulesActivated;
}
+ @Override
+ public void updated(Dictionary<String, ?> properties) {
+ Map<String, ConfigurationUpdateHelper.PropertyMapping>
propertyMappings = new HashMap<>();
+
+ // Boolean properties
+ propertyMappings.put("rules.optimizationActivated",
ConfigurationUpdateHelper.booleanProperty(this::setOptimizedRulesActivated));
+
+ // Integer properties
+ propertyMappings.put("rules.refresh.interval",
ConfigurationUpdateHelper.integerProperty(this::setRulesRefreshInterval));
+ propertyMappings.put("rules.statistics.refresh.interval",
ConfigurationUpdateHelper.integerProperty(this::setRulesStatisticsRefreshInterval));
+
+ ConfigurationUpdateHelper.processConfigurationUpdates(properties,
LOGGER, "Rules service", propertyMappings);
+ }
+
public void postConstruct() {
LOGGER.debug("postConstruct {{}}", bundleContext.getBundle());
diff --git a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 69c789886..bf64a77d7 100644
--- a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -23,7 +23,7 @@
http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0
http://aries.apache.org/schemas/blueprint-cm/blueprint-cm-1.1.0.xsd">
<cm:property-placeholder persistent-id="org.apache.unomi.services"
- update-strategy="reload"
placeholder-prefix="${services.">
+ update-strategy="none"
placeholder-prefix="${services.">
<cm:default-properties>
<cm:property name="profile.purge.interval" value="1"/>
<cm:property name="profile.purge.inactiveTime" value="180"/>
@@ -61,7 +61,7 @@
</cm:property-placeholder>
<cm:property-placeholder
persistent-id="org.apache.unomi.persistence.elasticsearch"
- update-strategy="reload"
placeholder-prefix="${es.">
+ update-strategy="none" placeholder-prefix="${es.">
<cm:default-properties>
<cm:property name="aggregateQueryBucketSize" value="5000"/>
</cm:default-properties>
@@ -181,7 +181,11 @@
<value>org.apache.unomi.api.services.RulesService</value>
<value>org.apache.unomi.api.services.EventListenerService</value>
<value>org.osgi.framework.SynchronousBundleListener</value>
+ <value>org.osgi.service.cm.ManagedService</value>
</interfaces>
+ <service-properties>
+ <entry key="service.pid" value="org.apache.unomi.services"/>
+ </service-properties>
</service>
<bean id="segmentServiceImpl"
class="org.apache.unomi.services.impl.segments.SegmentServiceImpl"
@@ -257,12 +261,12 @@
<bean id="clusterServiceImpl"
class="org.apache.unomi.services.impl.cluster.ClusterServiceImpl"
init-method="init" destroy-method="destroy">
+ <property name="persistenceService" ref="persistenceService" />
<property name="publicAddress"
value="${cluster.contextserver.publicAddress}"/>
<property name="internalAddress"
value="${cluster.contextserver.internalAddress}"/>
<property name="persistenceService" ref="persistenceService"/>
<property name="nodeId" value="${cluster.nodeId}"/>
<property name="nodeStatisticsUpdateFrequency"
value="${cluster.nodeStatisticsUpdateFrequency}"/>
- <property name="bundleContext" ref="blueprintBundleContext"/>
<property name="bundleWatcher" ref="bundleWatcher"/>
<!-- Wait for UNOMI-878 to be available to activate that
<property name="schedulerService" ref="schedulerServiceImpl"/>