This is an automated email from the ASF dual-hosted git repository. shuber pushed a commit to branch UNOMI-877 in repository https://gitbox.apache.org/repos/asf/unomi.git
commit efb8266aee22849ff30e3a08f271d9e07298cbd5 Author: Serge Huber <shu...@jahia.com> AuthorDate: Fri Aug 22 15:51:56 2025 +0200 UNOMI-877: Remove all reference to cellar and hazelcast. UNOMI-877: Not sending event to cluster anymore. UNOMI-877: Replace Karaf Cellar and Hazelcast with PersistenceService for cluster synchronization (code isolated from branch unomi-3-dev made by Serge Huber) --- .../java/org/apache/unomi/api/ClusterNode.java | 65 +- .../apache/unomi/api/services/ClusterService.java | 7 - extensions/router/router-core/pom.xml | 10 - .../router/core/context/RouterCamelContext.java | 22 - .../router/core/event/UpdateCamelRouteEvent.java | 38 -- .../core/event/UpdateCamelRouteEventHandler.java | 76 --- .../resources/OSGI-INF/blueprint/blueprint.xml | 16 +- .../test/java/org/apache/unomi/itests/BaseIT.java | 5 - kar/pom.xml | 7 - kar/src/main/feature/feature.xml | 3 - .../src/main/asciidoc/building-and-deploying.adoc | 3 +- manual/src/main/asciidoc/clustering.adoc | 22 +- package/pom.xml | 19 - .../main/resources/etc/custom.system.properties | 13 +- .../etc/org.apache.karaf.cellar.groups.cfg | 81 --- persistence-elasticsearch/core/pom.xml | 7 - pom.xml | 19 - services/pom.xml | 22 +- .../services/impl/cluster/ClusterServiceImpl.java | 690 +++++++++++++++------ .../impl/cluster/ClusterSystemStatisticsEvent.java | 43 -- .../ClusterSystemStatisticsEventHandler.java | 138 ----- .../resources/OSGI-INF/blueprint/blueprint.xml | 29 +- services/src/main/resources/hazelcast.xml | 222 ------- .../main/resources/org.apache.unomi.cluster.cfg | 10 +- 24 files changed, 583 insertions(+), 984 deletions(-) diff --git a/api/src/main/java/org/apache/unomi/api/ClusterNode.java b/api/src/main/java/org/apache/unomi/api/ClusterNode.java index 6c40ca21b..e096490b9 100644 --- a/api/src/main/java/org/apache/unomi/api/ClusterNode.java +++ b/api/src/main/java/org/apache/unomi/api/ClusterNode.java @@ -22,10 +22,12 @@ import java.io.Serializable; /** * Information about a cluster node. */ -public class ClusterNode implements Serializable { +public class ClusterNode extends Item { private static final long serialVersionUID = 1281422346318230514L; + public static final String ITEM_TYPE = "clusterNode"; + private double cpuLoad; private double[] loadAverage; private String publicHostAddress; @@ -33,11 +35,18 @@ public class ClusterNode implements Serializable { private long uptime; private boolean master; private boolean data; + private long startTime; + private long lastHeartbeat; + + // Server information + private ServerInfo serverInfo; /** * Instantiates a new Cluster node. */ public ClusterNode() { + super(); + setItemType(ITEM_TYPE); } /** @@ -165,4 +174,58 @@ public class ClusterNode implements Serializable { public void setData(boolean data) { this.data = data; } + + /** + * Retrieves the node start time in milliseconds. + * + * @return the start time + */ + public long getStartTime() { + return startTime; + } + + /** + * Sets the node start time in milliseconds. + * + * @param startTime the start time + */ + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + /** + * Retrieves the last heartbeat time in milliseconds. + * + * @return the last heartbeat time + */ + public long getLastHeartbeat() { + return lastHeartbeat; + } + + /** + * Sets the last heartbeat time in milliseconds. + * + * @param lastHeartbeat the last heartbeat time + */ + public void setLastHeartbeat(long lastHeartbeat) { + this.lastHeartbeat = lastHeartbeat; + } + + /** + * Gets the server information. + * + * @return the server information + */ + public ServerInfo getServerInfo() { + return serverInfo; + } + + /** + * Sets the server information. + * + * @param serverInfo the server information + */ + public void setServerInfo(ServerInfo serverInfo) { + this.serverInfo = serverInfo; + } } diff --git a/api/src/main/java/org/apache/unomi/api/services/ClusterService.java b/api/src/main/java/org/apache/unomi/api/services/ClusterService.java index 299ac9098..ad8777503 100644 --- a/api/src/main/java/org/apache/unomi/api/services/ClusterService.java +++ b/api/src/main/java/org/apache/unomi/api/services/ClusterService.java @@ -51,11 +51,4 @@ public interface ClusterService { */ void purge(final String scope); - /** - * This function will send an event to the nodes of the cluster - * The function takes a Serializable to avoid dependency on any clustering framework - * - * @param event this object will be cast to a org.apache.karaf.cellar.core.event.Event object - */ - void sendEvent(Serializable event); } diff --git a/extensions/router/router-core/pom.xml b/extensions/router/router-core/pom.xml index a8ce97fd4..8b7d46aad 100644 --- a/extensions/router/router-core/pom.xml +++ b/extensions/router/router-core/pom.xml @@ -133,16 +133,6 @@ <version>${kafka.client.version}</version> <scope>provided</scope> </dependency> - <dependency> - <groupId>org.apache.karaf.cellar</groupId> - <artifactId>org.apache.karaf.cellar.core</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.karaf.cellar</groupId> - <artifactId>org.apache.karaf.cellar.config</artifactId> - <scope>provided</scope> - </dependency> </dependencies> <build> diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java index 67219f9c5..c3f03876e 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java @@ -31,7 +31,6 @@ import org.apache.unomi.router.api.ImportConfiguration; import org.apache.unomi.router.api.RouterConstants; import org.apache.unomi.router.api.services.ImportExportConfigurationService; import org.apache.unomi.router.api.services.ProfileExportService; -import org.apache.unomi.router.core.event.UpdateCamelRouteEvent; import org.apache.unomi.router.core.processor.ExportRouteCompletionProcessor; import org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor; import org.apache.unomi.router.core.processor.ImportRouteCompletionProcessor; @@ -75,7 +74,6 @@ public class RouterCamelContext implements IRouterCamelContext { private String allowedEndpoints; private BundleContext bundleContext; private ConfigSharingService configSharingService; - private ClusterService clusterService; // TODO UNOMI-572: when fixing UNOMI-572 please remove the usage of the custom ScheduledExecutorService and re-introduce the Unomi Scheduler Service private ScheduledExecutorService scheduler; @@ -102,10 +100,6 @@ public class RouterCamelContext implements IRouterCamelContext { this.configSharingService = configSharingService; } - public void setClusterService(ClusterService clusterService) { - this.clusterService = clusterService; - } - public void setTracing(boolean tracing) { camelContext.setTracing(true); } @@ -240,12 +234,6 @@ public class RouterCamelContext implements IRouterCamelContext { camelContext.removeRouteDefinition(routeDefinition); } } - - if (fireEvent) { - UpdateCamelRouteEvent event = new UpdateCamelRouteEvent(EVENT_ID_REMOVE); - event.setRouteId(routeId); - clusterService.sendEvent(event); - } } public void updateProfileImportReaderRoute(String configId, boolean fireEvent) throws Exception { @@ -266,11 +254,6 @@ public class RouterCamelContext implements IRouterCamelContext { builder.setJacksonDataFormat(jacksonDataFormat); builder.setContext(camelContext); camelContext.addRoutes(builder); - - if (fireEvent) { - UpdateCamelRouteEvent event = new UpdateCamelRouteEvent(EVENT_ID_IMPORT); - clusterService.sendEvent(event); - } } } @@ -291,11 +274,6 @@ public class RouterCamelContext implements IRouterCamelContext { profileExportCollectRouteBuilder.setJacksonDataFormat(jacksonDataFormat); profileExportCollectRouteBuilder.setContext(camelContext); camelContext.addRoutes(profileExportCollectRouteBuilder); - - if (fireEvent) { - UpdateCamelRouteEvent event = new UpdateCamelRouteEvent(EVENT_ID_EXPORT); - clusterService.sendEvent(event); - } } } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEvent.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEvent.java deleted file mode 100644 index 2f3d2cb3f..000000000 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEvent.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.unomi.router.core.event; - -import org.apache.karaf.cellar.core.event.Event; - -/** - * @author dgaillard - */ -public class UpdateCamelRouteEvent extends Event { - private String routeId; - - public UpdateCamelRouteEvent(String id) { - super(id); - } - - public String getRouteId() { - return routeId; - } - - public void setRouteId(String routeId) { - this.routeId = routeId; - } -} diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEventHandler.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEventHandler.java deleted file mode 100644 index f43f2b629..000000000 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEventHandler.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.unomi.router.core.event; - -import org.apache.commons.lang3.StringUtils; -import org.apache.karaf.cellar.config.Constants; -import org.apache.karaf.cellar.core.CellarSupport; -import org.apache.karaf.cellar.core.control.Switch; -import org.apache.karaf.cellar.core.event.EventHandler; -import org.apache.karaf.cellar.core.event.EventType; -import org.apache.unomi.router.core.context.RouterCamelContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author dgaillard - */ -public class UpdateCamelRouteEventHandler extends CellarSupport implements EventHandler<UpdateCamelRouteEvent> { - private static final Logger LLOGGER = LoggerFactory.getLogger(UpdateCamelRouteEventHandler.class.getName()); - - private RouterCamelContext routerCamelContext; - - @Override - public void handle(UpdateCamelRouteEvent event) { - LLOGGER.debug("Handle event"); - if (isAllowed(event.getSourceGroup(), Constants.CATEGORY, event.getId(), EventType.INBOUND)) { - LLOGGER.debug("Event is allowed"); - // check if it's not a "local" event - if (event.getSourceNode() != null && event.getSourceNode().getId().equalsIgnoreCase(clusterManager.getNode().getId())) { - LLOGGER.debug("Cluster event is local (coming from local synchronizer or listener)"); - return; - } - - try { - LLOGGER.debug("Event id is {}", event.getId()); - if (event.getId().equals(RouterCamelContext.EVENT_ID_REMOVE) && StringUtils.isNotBlank(event.getRouteId())) { - routerCamelContext.killExistingRoute(event.getRouteId(), false); - } else if ((event.getId().equals(RouterCamelContext.EVENT_ID_IMPORT))) { - routerCamelContext.updateProfileImportReaderRoute(event.getRouteId(), false); - } else if (event.getId().equals(RouterCamelContext.EVENT_ID_EXPORT)) { - routerCamelContext.updateProfileExportReaderRoute(event.getRouteId(), false); - } - } catch (Exception e) { - LLOGGER.error("Error when executing event", e); - } - } - } - - @Override - public Class<UpdateCamelRouteEvent> getType() { - return UpdateCamelRouteEvent.class; - } - - @Override - public Switch getSwitch() { - return null; - } - - public void setRouterCamelContext(RouterCamelContext routerCamelContext) { - this.routerCamelContext = routerCamelContext; - } -} diff --git a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml index d7b7a36c0..c2e73abe7 100644 --- a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -120,18 +120,6 @@ <property name="persistenceService" ref="persistenceService"/> </bean> - <bean id="updateCamelRouteEventHandler" class="org.apache.unomi.router.core.event.UpdateCamelRouteEventHandler"> - <property name="configurationAdmin" ref="osgiConfigurationAdmin"/> - <property name="clusterManager" ref="karafCellarClusterManager"/> - <property name="groupManager" ref="karafCellarGroupManager"/> - <property name="routerCamelContext" ref="camelContext"/> - </bean> - <service ref="updateCamelRouteEventHandler" interface="org.apache.karaf.cellar.core.event.EventHandler"> - <service-properties> - <entry key="managed" value="true"/> - </service-properties> - </service> - <reference id="configSharingService" interface="org.apache.unomi.api.services.ConfigSharingService" /> <reference id="profileImportService" interface="org.apache.unomi.router.api.services.ProfileImportService"/> <reference id="profileExportService" interface="org.apache.unomi.router.api.services.ProfileExportService"/> @@ -141,8 +129,6 @@ <reference id="importConfigurationService" interface="org.apache.unomi.router.api.services.ImportExportConfigurationService" filter="(configDiscriminator=IMPORT)"/> <reference id="exportConfigurationService" interface="org.apache.unomi.router.api.services.ImportExportConfigurationService" filter="(configDiscriminator=EXPORT)"/> <reference id="clusterService" interface="org.apache.unomi.api.services.ClusterService" /> - <reference id="karafCellarGroupManager" interface="org.apache.karaf.cellar.core.GroupManager" /> <reference id="osgiConfigurationAdmin" interface="org.osgi.service.cm.ConfigurationAdmin"/> - <reference id="karafCellarClusterManager" interface="org.apache.karaf.cellar.core.ClusterManager" /> -</blueprint> \ No newline at end of file +</blueprint> diff --git a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java index 38b7f0c66..483bacd8a 100644 --- a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java +++ b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java @@ -256,11 +256,6 @@ public abstract class BaseIT extends KarafTestSupport { editConfigurationFilePut("etc/custom.system.properties", "org.apache.unomi.elasticsearch.rollover.maxDocs", "300"), systemProperty("org.ops4j.pax.exam.rbc.rmi.port").value("1199"), - systemProperty("org.apache.unomi.hazelcast.group.name").value("cellar"), - systemProperty("org.apache.unomi.hazelcast.group.password").value("pass"), - systemProperty("org.apache.unomi.hazelcast.network.port").value("5701"), - systemProperty("org.apache.unomi.hazelcast.tcp-ip.members").value("127.0.0.1"), - systemProperty("org.apache.unomi.hazelcast.tcp-ip.interface").value("127.0.0.1"), systemProperty("org.apache.unomi.healthcheck.enabled").value("true"), logLevel(LogLevel.INFO), diff --git a/kar/pom.xml b/kar/pom.xml index 226e3c85c..9a38f9b78 100644 --- a/kar/pom.xml +++ b/kar/pom.xml @@ -143,13 +143,6 @@ <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> </dependency> - <dependency> - <groupId>org.apache.karaf.cellar</groupId> - <artifactId>apache-karaf-cellar</artifactId> - <classifier>features</classifier> - <type>xml</type> - <scope>runtime</scope> - </dependency> <dependency> <groupId>org.apache.unomi</groupId> <artifactId>unomi-web-tracker-wab</artifactId> diff --git a/kar/src/main/feature/feature.xml b/kar/src/main/feature/feature.xml index 9de01623a..6355a69e4 100644 --- a/kar/src/main/feature/feature.xml +++ b/kar/src/main/feature/feature.xml @@ -19,7 +19,6 @@ <features name="unomi-kar" xmlns="http://karaf.apache.org/xmlns/features/v1.3.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://karaf.apache.org/xmlns/features/v1.3.0 http://karaf.apache.org/xmlns/features/v1.3.0"> <repository>mvn:org.apache.cxf.karaf/apache-cxf/${cxf.version}/xml/features</repository> - <repository>mvn:org.apache.karaf.cellar/apache-karaf-cellar/${version.karaf.cellar}/xml/features</repository> <feature description="unomi-kar" version="${project.version}" name="unomi-kar" start-level="70"> <feature prerequisite="true">wrap</feature> @@ -29,7 +28,6 @@ <feature>cxf-features-metrics</feature> <feature>cxf-rs-security-cors</feature> <feature>cxf-rs-description-openapi-v3</feature> - <feature>cellar</feature> <feature>eventadmin</feature> <feature>feature</feature> <feature>shell-compat</feature> @@ -40,7 +38,6 @@ <configfile finalname="/etc/org.apache.unomi.services.cfg">mvn:org.apache.unomi/unomi-services/${project.version}/cfg/servicescfg</configfile> <configfile finalname="/etc/org.apache.unomi.thirdparty.cfg">mvn:org.apache.unomi/unomi-services/${project.version}/cfg/thirdpartycfg</configfile> <configfile finalname="/etc/org.apache.unomi.cluster.cfg">mvn:org.apache.unomi/unomi-services/${project.version}/cfg/clustercfg</configfile> - <configfile finalname="/etc/hazelcast.xml">mvn:org.apache.unomi/unomi-services/${project.version}/xml/hazelcastconfig</configfile> <configfile finalname="/etc/org.apache.unomi.geonames.cfg">mvn:org.apache.unomi/cxs-geonames-services/${project.version}/cfg/geonamescfg</configfile> <configfile finalname="/etc/org.apache.unomi.groovy.actions.cfg">mvn:org.apache.unomi/unomi-groovy-actions-services/${project.version}/cfg/groovyactionscfg</configfile> <configfile finalname="/etc/org.apache.unomi.schema.cfg">mvn:org.apache.unomi/unomi-json-schema-services/${project.version}/cfg/schemacfg</configfile> diff --git a/manual/src/main/asciidoc/building-and-deploying.adoc b/manual/src/main/asciidoc/building-and-deploying.adoc index 840639e6e..3de47041f 100644 --- a/manual/src/main/asciidoc/building-and-deploying.adoc +++ b/manual/src/main/asciidoc/building-and-deploying.adoc @@ -144,12 +144,11 @@ files (at the end of the file): export KARAF_OPTS="$KARAF_OPTS -Xmx3G" ---- -Install the WAR support, CXF and Karaf Cellar into Karaf by doing the following in the Karaf command line: +Install the WAR support and CXF into Karaf by doing the following in the Karaf command line: [source] ---- feature:repo-add cxf-jaxrs 3.3.4 - feature:repo-add cellar 4.1.3 feature:repo-add mvn:org.apache.unomi/unomi-kar/VERSION/xml/features feature:install unomi-kar ---- diff --git a/manual/src/main/asciidoc/clustering.adoc b/manual/src/main/asciidoc/clustering.adoc index be27ccfd9..9d31aa6a2 100644 --- a/manual/src/main/asciidoc/clustering.adoc +++ b/manual/src/main/asciidoc/clustering.adoc @@ -13,7 +13,7 @@ // === Cluster setup -Apache Karaf relies on Apache Karaf Cellar, which in turn uses Hazelcast to discover and configure its cluster. +Apache Karaf relies on Persistence to register nodes and manage cluster. You can control most of the important clustering settings through the centralized configuration file at @@ -21,19 +21,9 @@ You can control most of the important clustering settings through the centralize And notably using the following properties: - org.apache.unomi.hazelcast.group.name=${env:UNOMI_HAZELCAST_GROUP_NAME:-cellar} - org.apache.unomi.hazelcast.group.password=${env:UNOMI_HAZELCAST_GROUP_PASSWORD:-pass} - # This list can be comma separated and use ranges such as 192.168.1.0-7,192.168.1.21 - org.apache.unomi.hazelcast.tcp-ip.members=${env:UNOMI_HAZELCAST_TCPIP_MEMBERS:-127.0.0.1} - org.apache.unomi.hazelcast.tcp-ip.interface=${env:UNOMI_HAZELCAST_TCPIP_INTERFACE:-127.0.0.1} - org.apache.unomi.hazelcast.network.port=${env:UNOMI_HAZELCAST_NETWORK_PORT:-5701} + org.apache.unomi.cluster.public.address=${env:UNOMI_CLUSTER_PUBLIC_ADDRESS:-http://localhost:8181} + org.apache.unomi.cluster.internal.address=${env:UNOMI_CLUSTER_INTERNAL_ADDRESS:-https://localhost:9443} + org.apache.unomi.cluster.nodeId=${env:UNOMI_CLUSTER_NODEID:-unomi-node-1} + org.apache.unomi.cluster.nodeStatisticsUpdateFrequency=${env:UNOMI_CLUSTER_NODESTATISTICS_UPDATEFREQUENCY:-10000} -If you need more fine-grained control over the Hazelcast configuration you could also edit the following file: - -[source] ----- -etc/hazelcast.xml ----- - -Note that it would be best to keep all configuration in the centralized custom configuration, for example by adding -placeholders in the hazelcast.xml file if need be and adding the properties to the centralized configuration file. +Note that it is mandatory to set a different `org.apache.unomi.cluster.nodeId` for each node in the cluster. diff --git a/package/pom.xml b/package/pom.xml index 13af2a5be..265e91973 100644 --- a/package/pom.xml +++ b/package/pom.xml @@ -80,13 +80,6 @@ <type>xml</type> <scope>runtime</scope> </dependency> - <dependency> - <groupId>org.apache.karaf.cellar</groupId> - <artifactId>apache-karaf-cellar</artifactId> - <classifier>features</classifier> - <type>xml</type> - <scope>runtime</scope> - </dependency> <dependency> <groupId>org.apache.unomi</groupId> @@ -192,17 +185,6 @@ </outputDirectory> <destFileName>org.apache.unomi.persistence.elasticsearch.cfg</destFileName> </artifactItem> - <artifactItem> - <groupId>org.apache.unomi</groupId> - <artifactId>unomi-services</artifactId> - <version>${project.version}</version> - <classifier>hazelcastconfig</classifier> - <type>xml</type> - <outputDirectory> - ${project.build.directory}/assembly/etc - </outputDirectory> - <destFileName>hazelcast.xml</destFileName> - </artifactItem> <artifactItem> <groupId>org.apache.unomi</groupId> <artifactId>unomi-services</artifactId> @@ -362,7 +344,6 @@ <feature>system</feature> <feature>war</feature> <feature>cxf-jaxrs</feature> - <feature>cellar</feature> <feature>aries-blueprint</feature> <feature>shell-compat</feature> <feature>unomi-kar</feature> diff --git a/package/src/main/resources/etc/custom.system.properties b/package/src/main/resources/etc/custom.system.properties index 5e97437c3..7b1b576eb 100644 --- a/package/src/main/resources/etc/custom.system.properties +++ b/package/src/main/resources/etc/custom.system.properties @@ -18,14 +18,6 @@ ################################################################################ # This following file is used to customize system properties for the Apache Unomi application running in Apache Karaf. ${optionals}=unomi.custom.system.properties -####################################################################################################################### -## Hazelcast clustering settings ## -####################################################################################################################### -org.apache.unomi.hazelcast.group.name=${env:UNOMI_HAZELCAST_GROUP_NAME:-cellar} -org.apache.unomi.hazelcast.group.password=${env:UNOMI_HAZELCAST_GROUP_PASSWORD:-pass} -# This list can be comma separated and use ranges such as 192.168.1.0-7,192.168.1.21 -org.apache.unomi.hazelcast.tcp-ip.members=${env:UNOMI_HAZELCAST_TCPIP_MEMBERS:-127.0.0.1} -org.apache.unomi.hazelcast.network.port=${env:UNOMI_HAZELCAST_NETWORK_PORT:-5701} ####################################################################################################################### ## Security settings ## @@ -81,11 +73,14 @@ org.apache.unomi.admin.servlet.context=${env:UNOMI_ADMIN_CONTEXT:-/cxs} ####################################################################################################################### ## Cluster Settings ## ####################################################################################################################### -org.apache.unomi.cluster.group=${env:UNOMI_CLUSTER_GROUP:-default} # To simplify testing we set the public address to use HTTP, but for production environments it is highly recommended # to switch to using HTTPS with a proper SSL certificate installed. org.apache.unomi.cluster.public.address=${env:UNOMI_CLUSTER_PUBLIC_ADDRESS:-http://localhost:8181} org.apache.unomi.cluster.internal.address=${env:UNOMI_CLUSTER_INTERNAL_ADDRESS:-https://localhost:9443} +# The nodeId is a required setting that uniquely identifies this node in the cluster. +# It must be set to a unique value for each node in the cluster. +# Example: nodeId=node1 +org.apache.unomi.cluster.nodeId=${env:UNOMI_CLUSTER_NODEID:-unomi-node-1} # The nodeStatisticsUpdateFrequency controls the frequency of the update of system statistics such as CPU load, # system load average and uptime. This value is set in milliseconds and is set to 10 seconds by default. Each node # will retrieve the local values and broadcast them through a cluster event to all the other nodes to update diff --git a/package/src/main/resources/etc/org.apache.karaf.cellar.groups.cfg b/package/src/main/resources/etc/org.apache.karaf.cellar.groups.cfg deleted file mode 100644 index 9542b735e..000000000 --- a/package/src/main/resources/etc/org.apache.karaf.cellar.groups.cfg +++ /dev/null @@ -1,81 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# -# This property stores the cluster groups for which the local node is member -# -groups = default - -# -# Filtering of the bundles in the default cluster group -# -default.bundle.whitelist.inbound = * -default.bundle.whitelist.outbound = * -default.bundle.blacklist.inbound = *.xml -default.bundle.blacklist.outbound = *.xml - -# -# Filtering of the configurations in the default cluster group -# -default.config.whitelist.inbound = * -default.config.whitelist.outbound = * -default.config.blacklist.inbound = org.apache.felix.fileinstall*, \ - org.apache.karaf.cellar*, \ - org.apache.karaf.management, \ - org.apache.karaf.shell, \ - org.ops4j.pax.logging, \ - org.ops4j.pax.web, \ - org.apache.aries.transaction, \ - org.apache.unomi.cluster, \ - org.apache.unomi.geonames, \ - org.apache.unomi.persistence.elasticsearch, \ - org.apache.unomi.router, \ - org.apache.unomi.plugins.request -default.config.blacklist.outbound = org.apache.felix.fileinstall*, \ - org.apache.karaf.cellar*, \ - org.apache.karaf.management, \ - org.apache.karaf.shell, \ - org.ops4j.pax.logging, \ - org.ops4j.pax.web, \ - org.apache.aries.transaction, \ - org.apache.unomi.cluster, \ - org.apache.unomi.geonames, \ - org.apache.unomi.persistence.elasticsearch, \ - org.apache.unomi.router, \ - org.apache.unomi.plugins.request - -# -# Filtering of the features in the default cluster group -# -default.feature.whitelist.inbound = * -default.feature.whitelist.outbound = * -default.feature.blacklist.inbound = none -default.feature.blacklist.outbound = none - -# -# The following properties define the behavior to use when the node joins the cluster (the usage of the bootstrap -# synchronizer), per cluster group and per resource. -# The following values are accepted: -# disabled: means that the synchronizer is not used, meaning the node or the cluster are not updated at all -# cluster: if the node is the first one in the cluster, it pushes its local state to the cluster, else it's not the -# first node of the cluster, the node will update its local state with the cluster one (meaning that the cluster -# is the master) -# node: in this case, the node is the master, it means that the cluster state will be overwritten by the node state. -# -default.bundle.sync = disabled -default.config.sync = disabled -default.feature.sync = disabled -default.obr.urls.sync = disabled diff --git a/persistence-elasticsearch/core/pom.xml b/persistence-elasticsearch/core/pom.xml index ee5afc77f..810e38807 100644 --- a/persistence-elasticsearch/core/pom.xml +++ b/persistence-elasticsearch/core/pom.xml @@ -192,12 +192,6 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> - <dependency> - <groupId>com.hazelcast</groupId> - <artifactId>hazelcast-all</artifactId> - <version>3.12.8</version> - <scope>provided</scope> - </dependency> <dependency> <groupId>org.apache.unomi</groupId> @@ -226,7 +220,6 @@ com.google.apphosting.api;resolution:=optional, com.google.common.geometry;resolution:=optional, com.google.errorprone.annotations.concurrent;resolution:=optional, - com.hazelcast.core;version="[3.12,4)";resolution:=optional, com.lmax.disruptor;resolution:=optional, com.lmax.disruptor.dsl;resolution:=optional, com.sun.management;resolution:=optional, diff --git a/pom.xml b/pom.xml index c537486e1..bcef0de13 100644 --- a/pom.xml +++ b/pom.xml @@ -65,7 +65,6 @@ <version.jackson.databind>2.11.4</version.jackson.databind> <version.jackson.jaxb>2.11.4</version.jackson.jaxb> <version.karaf>4.2.15</version.karaf> - <version.karaf.cellar>4.2.1</version.karaf.cellar> <version.pax.exam>4.13.5</version.pax.exam> <elasticsearch.version>7.4.2</elasticsearch.version> <elasticsearch.test.version>7.11.0</elasticsearch.test.version> @@ -830,24 +829,6 @@ <type>xml</type> <scope>runtime</scope> </dependency> - <dependency> - <groupId>org.apache.karaf.cellar</groupId> - <artifactId>apache-karaf-cellar</artifactId> - <classifier>features</classifier> - <version>${version.karaf.cellar}</version> - <type>xml</type> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.apache.karaf.cellar</groupId> - <artifactId>org.apache.karaf.cellar.core</artifactId> - <version>${version.karaf.cellar}</version> - </dependency> - <dependency> - <groupId>org.apache.karaf.cellar</groupId> - <artifactId>org.apache.karaf.cellar.config</artifactId> - <version>${version.karaf.cellar}</version> - </dependency> <!-- End of Apache Karaf dependencies --> diff --git a/services/pom.xml b/services/pom.xml index c4cf7e4da..c9fb46411 100644 --- a/services/pom.xml +++ b/services/pom.xml @@ -56,6 +56,12 @@ <version>${project.version}</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.unomi</groupId> + <artifactId>unomi-lifecycle-watcher</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> <dependency> <groupId>javax.servlet</groupId> @@ -111,16 +117,6 @@ <groupId>commons-beanutils</groupId> <artifactId>commons-beanutils</artifactId> </dependency> - <dependency> - <groupId>org.apache.karaf.cellar</groupId> - <artifactId>org.apache.karaf.cellar.core</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.karaf.cellar</groupId> - <artifactId>org.apache.karaf.cellar.config</artifactId> - <scope>provided</scope> - </dependency> <dependency> <groupId>org.apache.unomi</groupId> @@ -175,6 +171,7 @@ <Embed-Dependency>*;scope=compile|runtime</Embed-Dependency> <Import-Package> sun.misc;resolution:=optional, + com.sun.management;resolution:=optional, * </Import-Package> </instructions> @@ -213,11 +210,6 @@ <type>cfg</type> <classifier>clustercfg</classifier> </artifact> - <artifact> - <file>src/main/resources/hazelcast.xml</file> - <type>xml</type> - <classifier>hazelcastconfig</classifier> - </artifact> </artifacts> </configuration> </execution> diff --git a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java index ec4cfe523..b4bc2c421 100644 --- a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java +++ b/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java @@ -18,28 +18,30 @@ package org.apache.unomi.services.impl.cluster; import org.apache.commons.lang3.ArrayUtils; -import org.apache.karaf.cellar.config.ClusterConfigurationEvent; -import org.apache.karaf.cellar.config.Constants; -import org.apache.karaf.cellar.core.*; -import org.apache.karaf.cellar.core.control.SwitchStatus; -import org.apache.karaf.cellar.core.event.Event; -import org.apache.karaf.cellar.core.event.EventProducer; -import org.apache.karaf.cellar.core.event.EventType; +import org.apache.commons.lang3.StringUtils; import org.apache.unomi.api.ClusterNode; +import org.apache.unomi.api.PartialList; +import org.apache.unomi.api.ServerInfo; +import org.apache.unomi.api.conditions.Condition; +import org.apache.unomi.api.conditions.ConditionType; import org.apache.unomi.api.services.ClusterService; -import org.apache.unomi.api.services.SchedulerService; +import org.apache.unomi.lifecycle.BundleWatcher; import org.apache.unomi.persistence.spi.PersistenceService; -import org.osgi.service.cm.ConfigurationAdmin; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceReference; +import org.osgi.util.tracker.ServiceTracker; +import org.osgi.util.tracker.ServiceTrackerCustomizer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.management.*; import java.io.Serializable; import java.lang.management.ManagementFactory; import java.lang.management.OperatingSystemMXBean; import java.lang.management.RuntimeMXBean; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** @@ -47,46 +49,185 @@ import java.util.concurrent.TimeUnit; */ public class ClusterServiceImpl implements ClusterService { - public static final String KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION = "org.apache.unomi.nodes"; - public static final String KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS = "publicEndpoints"; - public static final String KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS = "internalEndpoints"; private static final Logger LOGGER = LoggerFactory.getLogger(ClusterServiceImpl.class.getName()); - PersistenceService persistenceService; - private ClusterManager karafCellarClusterManager; - private EventProducer karafCellarEventProducer; - private GroupManager karafCellarGroupManager; - private String karafCellarGroupName = Configurations.DEFAULT_GROUP_NAME; - private ConfigurationAdmin osgiConfigurationAdmin; + + /** + * We use ServiceTracker instead of Blueprint dependency injection due to a known bug in Apache Aries Blueprint + * where service dependencies are not properly shut down in reverse order of their initialization. + * + * The bug manifests in two ways: + * 1. Services are not shut down in reverse order of their initialization, causing potential deadlocks + * 2. The PersistenceService is often shut down before other services that depend on it, leading to timeout waits + * + * By using ServiceTracker, we have explicit control over: + * - Service lifecycle management + * - Shutdown order + * - Service availability checks + * - Graceful degradation when services become unavailable + */ + private ServiceTracker<PersistenceService, PersistenceService> persistenceServiceTracker; + + // Keep direct reference for backward compatibility and unit tests + private PersistenceService persistenceService; + private String publicAddress; private String internalAddress; - private Map<String, Map<String,Serializable>> nodeSystemStatistics = new ConcurrentHashMap<>(); - private Group group = null; - private SchedulerService schedulerService; - + //private SchedulerService schedulerService; /* Wait for PR UNOMI-878 to reactivate that code + private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); + private String nodeId; + private long nodeStartTime; private long nodeStatisticsUpdateFrequency = 10000; + private Map<String, Map<String, Serializable>> nodeSystemStatistics = new ConcurrentHashMap<>(); + private BundleContext bundleContext; + private volatile boolean shutdownNow = false; - public void setPersistenceService(PersistenceService persistenceService) { - this.persistenceService = persistenceService; + private BundleWatcher bundleWatcher; + + /** + * Max time to wait for persistence service (in milliseconds) + */ + private static final long MAX_WAIT_TIME = 60000; // 60 seconds + + /** + * Sets the bundle context, which is needed to create service trackers + * @param bundleContext the OSGi bundle context + */ + public void setBundleContext(BundleContext bundleContext) { + this.bundleContext = bundleContext; } - public void setKarafCellarClusterManager(ClusterManager karafCellarClusterManager) { - this.karafCellarClusterManager = karafCellarClusterManager; + /** + * Sets the bundle watcher used to retrieve server information + * + * @param bundleWatcher the bundle watcher + */ + public void setBundleWatcher(BundleWatcher bundleWatcher) { + this.bundleWatcher = bundleWatcher; + LOGGER.info("BundleWatcher service set"); } - public void setKarafCellarEventProducer(EventProducer karafCellarEventProducer) { - this.karafCellarEventProducer = karafCellarEventProducer; + /** + * Waits for the persistence service to become available. + * This method will retry getting the persistence service with exponential backoff + * until it's available or until the maximum wait time is reached. + * + * @throws IllegalStateException if the persistence service is not available after the maximum wait time + */ + private void waitForPersistenceService() { + if (shutdownNow) { + return; + } + + // If persistence service is directly set (e.g., in unit tests), no need to wait + if (persistenceService != null) { + LOGGER.debug("Persistence service is already available, no need to wait"); + return; + } + + // If no bundle context, we can't get the service via tracker + if (bundleContext == null) { + LOGGER.error("No BundleContext available, cannot wait for persistence service"); + throw new IllegalStateException("No BundleContext available to get persistence service"); + } + + // Initialize service tracker if needed + if (persistenceServiceTracker == null) { + initializeServiceTrackers(); + } + + // Try to get the service with retries + long startTime = System.currentTimeMillis(); + long waitTime = 50; // Start with 50ms wait time + + while (System.currentTimeMillis() - startTime < MAX_WAIT_TIME) { + PersistenceService service = getPersistenceService(); + if (service != null) { + LOGGER.info("Persistence service is now available"); + return; + } + + try { + LOGGER.debug("Waiting for persistence service... ({}ms elapsed)", System.currentTimeMillis() - startTime); + Thread.sleep(waitTime); + // Exponential backoff with a maximum of 5 seconds + waitTime = Math.min(waitTime * 2, 5000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.error("Interrupted while waiting for persistence service", e); + break; + } + } + + throw new IllegalStateException("PersistenceService not available after waiting " + MAX_WAIT_TIME + "ms"); } - public void setKarafCellarGroupManager(GroupManager karafCellarGroupManager) { - this.karafCellarGroupManager = karafCellarGroupManager; + /** + * Safely gets the persistence service, either from the direct reference (for tests) + * or from the service tracker (for OSGi runtime) + * @return the persistence service or null if not available + */ + private PersistenceService getPersistenceService() { + if (shutdownNow) return null; + + // For unit tests or if already directly set + if (persistenceService != null) { + return persistenceService; + } + + // Otherwise try to get from service tracker + return persistenceServiceTracker != null ? persistenceServiceTracker.getService() : null; } - public void setKarafCellarGroupName(String karafCellarGroupName) { - this.karafCellarGroupName = karafCellarGroupName; + /** + * Initialize service tracker for PersistenceService + */ + private void initializeServiceTrackers() { + if (bundleContext == null) { + LOGGER.warn("BundleContext is null, cannot initialize service trackers"); + return; + } + + // Only create service tracker if direct reference isn't set + if (persistenceService == null) { + LOGGER.info("Initializing PersistenceService tracker"); + persistenceServiceTracker = new ServiceTracker<>( + bundleContext, + PersistenceService.class, + new ServiceTrackerCustomizer<PersistenceService, PersistenceService>() { + @Override + public PersistenceService addingService(ServiceReference<PersistenceService> reference) { + PersistenceService service = bundleContext.getService(reference); + if (service != null) { + persistenceService = service; + LOGGER.info("PersistenceService acquired through tracker"); + } + return service; + } + + @Override + public void modifiedService(ServiceReference<PersistenceService> reference, PersistenceService service) { + // No action needed + } + + @Override + public void removedService(ServiceReference<PersistenceService> reference, PersistenceService service) { + LOGGER.info("PersistenceService removed"); + persistenceService = null; + bundleContext.ungetService(reference); + } + } + ); + persistenceServiceTracker.open(); + } } - public void setOsgiConfigurationAdmin(ConfigurationAdmin osgiConfigurationAdmin) { - this.osgiConfigurationAdmin = osgiConfigurationAdmin; + /** + * For unit tests and backward compatibility - directly sets the persistence service + * @param persistenceService the persistence service to set + */ + public void setPersistenceService(PersistenceService persistenceService) { + this.persistenceService = persistenceService; + LOGGER.info("PersistenceService set directly"); } public void setPublicAddress(String publicAddress) { @@ -101,8 +242,35 @@ public class ClusterServiceImpl implements ClusterService { this.nodeStatisticsUpdateFrequency = nodeStatisticsUpdateFrequency; } + /* Wait for PR UNOMI-878 to reactivate that code public void setSchedulerService(SchedulerService schedulerService) { this.schedulerService = schedulerService; + + // If we're already initialized, initialize scheduled tasks now + // This handles the case when ClusterService was initialized before SchedulerService was set + if (schedulerService != null && System.currentTimeMillis() > nodeStartTime && nodeStartTime > 0) { + LOGGER.info("SchedulerService was set after ClusterService initialization, initializing scheduled tasks now"); + initializeScheduledTasks(); + } + } + */ + + /* Wait for PR UNOMI-878 to reactivate that code + /** + * Unbind method for the scheduler service, called by the OSGi framework when the service is unregistered + * @param schedulerService The scheduler service being unregistered + */ + /* + public void unsetSchedulerService(SchedulerService schedulerService) { + if (this.schedulerService == schedulerService) { + LOGGER.info("SchedulerService was unset"); + this.schedulerService = null; + } + } + */ + + public void setNodeId(String nodeId) { + this.nodeId = nodeId; } public Map<String, Map<String, Serializable>> getNodeSystemStatistics() { @@ -110,211 +278,331 @@ public class ClusterServiceImpl implements ClusterService { } public void init() { - if (karafCellarEventProducer != null && karafCellarClusterManager != null) { - - boolean setupConfigOk = true; - group = karafCellarGroupManager.findGroupByName(karafCellarGroupName); - if (setupConfigOk && group == null) { - LOGGER.error("Cluster group {} doesn't exist, creating it...", karafCellarGroupName); - group = karafCellarGroupManager.createGroup(karafCellarGroupName); - if (group != null) { - setupConfigOk = true; - } else { - setupConfigOk = false; - } - } - - // check if the producer is ON - if (setupConfigOk && karafCellarEventProducer.getSwitch().getStatus().equals(SwitchStatus.OFF)) { - LOGGER.error("Cluster event producer is OFF"); - setupConfigOk = false; - } - - // check if the config pid is allowed - if (setupConfigOk && !isClusterConfigPIDAllowed(group, Constants.CATEGORY, KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION, EventType.OUTBOUND)) { - LOGGER.error("Configuration PID " + KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION + " is blocked outbound for cluster group {}", - karafCellarGroupName); - setupConfigOk = false; - } - - if (setupConfigOk) { - Map<String, Properties> configurations = karafCellarClusterManager.getMap(Constants.CONFIGURATION_MAP + Configurations.SEPARATOR + karafCellarGroupName); - org.apache.karaf.cellar.core.Node thisKarafNode = karafCellarClusterManager.getNode(); - Properties karafCellarClusterNodeConfiguration = configurations.get(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION); - if (karafCellarClusterNodeConfiguration == null) { - karafCellarClusterNodeConfiguration = new Properties(); - } - Map<String, String> publicEndpoints = getMapProperty(karafCellarClusterNodeConfiguration, KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, thisKarafNode.getId() + "=" + publicAddress); - publicEndpoints.put(thisKarafNode.getId(), publicAddress); - setMapProperty(karafCellarClusterNodeConfiguration, KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, publicEndpoints); + // Initialize service trackers if not set directly (common in unit tests) + initializeServiceTrackers(); + + // Validate that nodeId is provided + if (StringUtils.isBlank(nodeId)) { + String errorMessage = "CRITICAL: nodeId is not set. This is a required setting for cluster operation."; + LOGGER.error(errorMessage); + throw new IllegalStateException(errorMessage); + } - Map<String, String> internalEndpoints = getMapProperty(karafCellarClusterNodeConfiguration, KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS, thisKarafNode.getId() + "=" + internalAddress); - internalEndpoints.put(thisKarafNode.getId(), internalAddress); - setMapProperty(karafCellarClusterNodeConfiguration, KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS, internalEndpoints); + // Wait for persistence service to be available + try { + waitForPersistenceService(); + } catch (IllegalStateException e) { + LOGGER.error("Failed to initialize cluster service: {}", e.getMessage()); + return; + } - configurations.put(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION, karafCellarClusterNodeConfiguration); - ClusterConfigurationEvent clusterConfigurationEvent = new ClusterConfigurationEvent(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION); - sendEvent(clusterConfigurationEvent); - } + nodeStartTime = System.currentTimeMillis(); - TimerTask statisticsTask = new TimerTask() { - @Override - public void run() { - try { - updateSystemStats(); - } catch (Throwable t) { - LOGGER.error("Error updating system statistics", t); - } - } - }; - schedulerService.getScheduleExecutorService().scheduleWithFixedDelay(statisticsTask, 0, nodeStatisticsUpdateFrequency, TimeUnit.MILLISECONDS); + // Register this node in the persistence service + registerNodeInPersistence(); + /* Wait for PR UNOMI-878 to reactivate that code + /* + // Only initialize scheduled tasks if scheduler service is available + if (schedulerService != null) { + initializeScheduledTasks(); + } else { + LOGGER.warn("SchedulerService not available during ClusterService initialization. Scheduled tasks will not be registered. They will be registered when SchedulerService becomes available."); } - LOGGER.info("Cluster service initialized."); - } + */ + initializeScheduledTasks(); - public void destroy() { - LOGGER.info("Cluster service shutdown."); + LOGGER.info("Cluster service initialized with node ID: {}", nodeId); } - @Override - public List<ClusterNode> getClusterNodes() { - Map<String, ClusterNode> clusterNodes = new LinkedHashMap<String, ClusterNode>(); - - Set<org.apache.karaf.cellar.core.Node> karafCellarNodes = karafCellarClusterManager.listNodes(); - org.apache.karaf.cellar.core.Node thisKarafNode = karafCellarClusterManager.getNode(); - Map<String, Properties> clusterConfigurations = karafCellarClusterManager.getMap(Constants.CONFIGURATION_MAP + Configurations.SEPARATOR + karafCellarGroupName); - Properties karafCellarClusterNodeConfiguration = clusterConfigurations.get(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION); - Map<String, String> publicNodeEndpoints = new TreeMap<>(); - Map<String, String> internalNodeEndpoints = new TreeMap<>(); - if (karafCellarClusterNodeConfiguration != null) { - publicNodeEndpoints = getMapProperty(karafCellarClusterNodeConfiguration, KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, thisKarafNode.getId() + "=" + publicAddress); - internalNodeEndpoints = getMapProperty(karafCellarClusterNodeConfiguration, KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS, thisKarafNode.getId() + "=" + internalAddress); + /** + * Initializes scheduled tasks for cluster management. + * This method can be called later if schedulerService wasn't available during init. + */ + public void initializeScheduledTasks() { + /* Wait for PR UNOMI-878 to reactivate that code + if (schedulerService == null) { + LOGGER.error("Cannot initialize scheduled tasks: SchedulerService is not set"); + return; } - for (org.apache.karaf.cellar.core.Node karafCellarNode : karafCellarNodes) { - ClusterNode clusterNode = new ClusterNode(); - String publicEndpoint = publicNodeEndpoints.get(karafCellarNode.getId()); - if (publicEndpoint != null) { - clusterNode.setPublicHostAddress(publicEndpoint); - } - String internalEndpoint = internalNodeEndpoints.get(karafCellarNode.getId()); - if (internalEndpoint != null) { - clusterNode.setInternalHostAddress(internalEndpoint); - } - Map<String,Serializable> nodeStatistics = nodeSystemStatistics.get(karafCellarNode.getId()); - if (nodeStatistics != null) { - Long uptime = (Long) nodeStatistics.get("uptime"); - if (uptime != null) { - clusterNode.setUptime(uptime); - } - Double systemCpuLoad = (Double) nodeStatistics.get("systemCpuLoad"); - if (systemCpuLoad != null) { - clusterNode.setCpuLoad(systemCpuLoad); + */ + + // Schedule regular updates of the node statistics + TimerTask statisticsTask = new TimerTask() { + @Override + public void run() { + try { + updateSystemStats(); + } catch (Throwable t) { + LOGGER.error("Error updating system statistics", t); } - List<Double> loadAverage = (List<Double>) nodeStatistics.get("systemLoadAverage"); - if (loadAverage != null) { - Double[] loadAverageArray = loadAverage.toArray(new Double[loadAverage.size()]); - ArrayUtils.toPrimitive(loadAverageArray); - clusterNode.setLoadAverage(ArrayUtils.toPrimitive(loadAverageArray)); + } + }; + /* Wait for PR UNOMI-878 to reactivate that code + schedulerService.createRecurringTask("clusterNodeStatisticsUpdate", nodeStatisticsUpdateFrequency, TimeUnit.MILLISECONDS, statisticsTask, false); + */ + scheduledExecutorService.scheduleAtFixedRate(statisticsTask, 100, nodeStatisticsUpdateFrequency, TimeUnit.MILLISECONDS); + + // Schedule cleanup of stale nodes + TimerTask cleanupTask = new TimerTask() { + @Override + public void run() { + try { + cleanupStaleNodes(); + } catch (Throwable t) { + LOGGER.error("Error cleaning up stale nodes", t); } } - clusterNodes.put(karafCellarNode.getId(), clusterNode); - } + }; + /* Wait for PR UNOMI-878 to reactivate that code + schedulerService.createRecurringTask("clusterStaleNodesCleanup", 60000, TimeUnit.MILLISECONDS, cleanupTask, false); + */ + scheduledExecutorService.scheduleAtFixedRate(cleanupTask, 100, 60000, TimeUnit.MILLISECONDS); - return new ArrayList<ClusterNode>(clusterNodes.values()); + LOGGER.info("Cluster service scheduled tasks initialized"); } - @Override - public void purge(Date date) { - persistenceService.purge(date); - } + public void destroy() { + shutdownNow = true; + + // Remove this node from the persistence service + PersistenceService service = getPersistenceService(); + if (service != null) { + try { + service.remove(nodeId, ClusterNode.class); + LOGGER.info("Node {} removed from cluster", nodeId); + } catch (Exception e) { + LOGGER.error("Error removing node from cluster", e); + } + } - @Override - public void purge(String scope) { - persistenceService.purge(scope); - } + // Close service trackers + if (persistenceServiceTracker != null) { + try { + persistenceServiceTracker.close(); + LOGGER.debug("Persistence service tracker closed"); + } catch (Exception e) { + LOGGER.debug("Error closing persistence service tracker: {}", e.getMessage()); + } + persistenceServiceTracker = null; + } - @Override - public void sendEvent(Serializable eventObject) { - Event event = (Event) eventObject; - event.setSourceGroup(group); - event.setSourceNode(karafCellarClusterManager.getNode()); - karafCellarEventProducer.produce(event); + // Clear references + persistenceService = null; + bundleWatcher = null; + + LOGGER.info("Cluster service shutdown."); } /** - * Check if a configuration is allowed. - * - * @param group the cluster group. - * @param category the configuration category constant. - * @param pid the configuration PID. - * @param type the cluster event type. - * @return true if the cluster event type is allowed, false else. + * Register this node in the persistence service */ - public boolean isClusterConfigPIDAllowed(Group group, String category, String pid, EventType type) { - CellarSupport support = new CellarSupport(); - support.setClusterManager(this.karafCellarClusterManager); - support.setGroupManager(this.karafCellarGroupManager); - support.setConfigurationAdmin(this.osgiConfigurationAdmin); - return support.isAllowed(group, category, pid, type); - } - - private Map<String, String> getMapProperty(Properties properties, String propertyName, String defaultValue) { - String propertyValue = properties.getProperty(propertyName, defaultValue); - return getMapProperty(propertyValue); - } - - private Map<String, String> getMapProperty(String propertyValue) { - String[] propertyValueArray = propertyValue.split(","); - Map<String, String> propertyMapValue = new LinkedHashMap<>(); - for (String propertyValueElement : propertyValueArray) { - String[] propertyValueElementPrats = propertyValueElement.split("="); - propertyMapValue.put(propertyValueElementPrats[0], propertyValueElementPrats[1]); + private void registerNodeInPersistence() { + PersistenceService service = getPersistenceService(); + if (service == null) { + LOGGER.error("Cannot register node: PersistenceService not available"); + return; } - return propertyMapValue; - } - private Map<String, String> setMapProperty(Properties properties, String propertyName, Map<String, String> propertyMapValue) { - StringBuilder propertyValueBuilder = new StringBuilder(); - int entryCount = 0; - for (Map.Entry<String, String> propertyMapValueEntry : propertyMapValue.entrySet()) { - propertyValueBuilder.append(propertyMapValueEntry.getKey()); - propertyValueBuilder.append("="); - propertyValueBuilder.append(propertyMapValueEntry.getValue()); - if (entryCount < propertyMapValue.size() - 1) { - propertyValueBuilder.append(","); - } + ClusterNode clusterNode = new ClusterNode(); + clusterNode.setItemId(nodeId); + clusterNode.setPublicHostAddress(publicAddress); + clusterNode.setInternalHostAddress(internalAddress); + clusterNode.setStartTime(nodeStartTime); + clusterNode.setLastHeartbeat(System.currentTimeMillis()); + + // Set server information if BundleWatcher is available + if (bundleWatcher != null && !bundleWatcher.getServerInfos().isEmpty()) { + ServerInfo serverInfo = bundleWatcher.getServerInfos().get(0); + clusterNode.setServerInfo(serverInfo); + LOGGER.info("Added server info to node: version={}, build={}", + serverInfo.getServerVersion(), serverInfo.getServerBuildNumber()); + } else { + LOGGER.warn("BundleWatcher not available at registration time, server info will not be available"); } - String oldPropertyValue = (String) properties.setProperty(propertyName, propertyValueBuilder.toString()); - if (oldPropertyValue == null) { - return null; + + updateSystemStatsForNode(clusterNode); + + boolean success = service.save(clusterNode); + if (success) { + LOGGER.info("Node {} registered in cluster", nodeId); + } else { + LOGGER.error("Failed to register node {} in cluster", nodeId); } - return getMapProperty(oldPropertyValue); } - private void updateSystemStats() { + /** + * Updates system stats for the given cluster node + */ + private void updateSystemStatsForNode(ClusterNode node) { final RuntimeMXBean remoteRuntime = ManagementFactory.getRuntimeMXBean(); long uptime = remoteRuntime.getUptime(); - ObjectName operatingSystemMXBeanName = ManagementFactory.getOperatingSystemMXBean().getObjectName(); - Double systemCpuLoad = null; + + double systemCpuLoad = 0.0; try { - systemCpuLoad = (Double) ManagementFactory.getPlatformMBeanServer().getAttribute(operatingSystemMXBeanName, "SystemCpuLoad"); - } catch (MBeanException | AttributeNotFoundException | InstanceNotFoundException | ReflectionException e) { - LOGGER.error("Error retrieving system CPU load", e); + systemCpuLoad = ((com.sun.management.OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean()).getSystemCpuLoad(); + // Check for NaN value which Elasticsearch and OpenSearch don't support for float fields + if (Double.isNaN(systemCpuLoad)) { + LOGGER.debug("System CPU load is NaN, setting to 0.0"); + systemCpuLoad = 0.0; + } + } catch (Exception e) { + LOGGER.debug("Error retrieving system CPU load", e); } + final OperatingSystemMXBean operatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean(); double systemLoadAverage = operatingSystemMXBean.getSystemLoadAverage(); + // Check for NaN value which Elasticsearch/OpenSearch doesn't support for float fields + if (Double.isNaN(systemLoadAverage)) { + LOGGER.debug("System load average is NaN, setting to 0.0"); + systemLoadAverage = 0.0; + } + + node.setCpuLoad(systemCpuLoad); + node.setUptime(uptime); - ClusterSystemStatisticsEvent clusterSystemStatisticsEvent = new ClusterSystemStatisticsEvent("org.apache.unomi.cluster.system.statistics"); - Map<String,Serializable> systemStatistics = new TreeMap<>(); ArrayList<Double> systemLoadAverageArray = new ArrayList<>(); systemLoadAverageArray.add(systemLoadAverage); + node.setLoadAverage(ArrayUtils.toPrimitive(systemLoadAverageArray.toArray(new Double[0]))); + + // Store system statistics in memory as well + Map<String, Serializable> systemStatistics = new TreeMap<>(); systemStatistics.put("systemLoadAverage", systemLoadAverageArray); systemStatistics.put("systemCpuLoad", systemCpuLoad); systemStatistics.put("uptime", uptime); - clusterSystemStatisticsEvent.setStatistics(systemStatistics); - nodeSystemStatistics.put(karafCellarClusterManager.getNode().getId(), systemStatistics); - sendEvent(clusterSystemStatisticsEvent); + nodeSystemStatistics.put(nodeId, systemStatistics); + } + + /** + * Updates the system statistics for this node and stores them in the persistence service + */ + private void updateSystemStats() { + if (shutdownNow) { + return; + } + + PersistenceService service = getPersistenceService(); + if (service == null) { + LOGGER.warn("Cannot update system stats: PersistenceService not available"); + return; + } + + // Load node from persistence + ClusterNode node = service.load(nodeId, ClusterNode.class); + if (node == null) { + LOGGER.warn("Node {} not found in persistence, re-registering", nodeId); + registerNodeInPersistence(); + return; + } + + try { + // Update its stats + updateSystemStatsForNode(node); + + // Update server info if needed + if (bundleWatcher != null && !bundleWatcher.getServerInfos().isEmpty()) { + ServerInfo currentInfo = bundleWatcher.getServerInfos().get(0); + // Check if server info needs updating + if (node.getServerInfo() == null || + !currentInfo.getServerVersion().equals(node.getServerInfo().getServerVersion())) { + + node.setServerInfo(currentInfo); + LOGGER.info("Updated server info for node {}: version={}, build={}", + nodeId, currentInfo.getServerVersion(), currentInfo.getServerBuildNumber()); + } + } + + node.setLastHeartbeat(System.currentTimeMillis()); + + // Save back to persistence + boolean success = service.save(node); + if (!success) { + LOGGER.error("Failed to update node {} statistics", nodeId); + } + } catch (Exception e) { + LOGGER.error("Error updating system statistics for node {}: {}", nodeId, e.getMessage(), e); + } + } + + /** + * Removes stale nodes from the cluster + */ + private void cleanupStaleNodes() { + if (shutdownNow) { + return; + } + + PersistenceService service = getPersistenceService(); + if (service == null) { + LOGGER.warn("Cannot cleanup stale nodes: PersistenceService not available"); + return; + } + + long cutoffTime = System.currentTimeMillis() - (nodeStatisticsUpdateFrequency * 3); // Node is stale if no heartbeat for 3x the update frequency + + Condition staleNodesCondition = new Condition(); + ConditionType propertyConditionType = new ConditionType(); + propertyConditionType.setItemId("propertyCondition"); + propertyConditionType.setItemType(ConditionType.ITEM_TYPE); + propertyConditionType.setConditionEvaluator("propertyConditionEvaluator"); + propertyConditionType.setQueryBuilder("propertyConditionESQueryBuilder"); + staleNodesCondition.setConditionType(propertyConditionType); + staleNodesCondition.setConditionTypeId("propertyCondition"); + staleNodesCondition.setParameter("propertyName", "lastHeartbeat"); + staleNodesCondition.setParameter("comparisonOperator", "lessThan"); + staleNodesCondition.setParameter("propertyValueInteger", cutoffTime); + + PartialList<ClusterNode> staleNodes = service.query(staleNodesCondition, null, ClusterNode.class, 0, -1); + + for (ClusterNode staleNode : staleNodes.getList()) { + LOGGER.info("Removing stale node: {}", staleNode.getItemId()); + service.remove(staleNode.getItemId(), ClusterNode.class); + nodeSystemStatistics.remove(staleNode.getItemId()); + } + } + + @Override + public List<ClusterNode> getClusterNodes() { + PersistenceService service = getPersistenceService(); + if (service == null) { + LOGGER.warn("Cannot get cluster nodes: PersistenceService not available"); + return Collections.emptyList(); + } + + // Query all nodes from the persistence service + return service.getAllItems(ClusterNode.class, 0, -1, null).getList(); } + @Override + public void purge(Date date) { + PersistenceService service = getPersistenceService(); + if (service == null) { + LOGGER.warn("Cannot purge by date: PersistenceService not available"); + return; + } + + service.purge(date); + } + + @Override + public void purge(String scope) { + PersistenceService service = getPersistenceService(); + if (service == null) { + LOGGER.warn("Cannot purge by scope: PersistenceService not available"); + return; + } + + service.purge(scope); + } + + /** + * Check if a persistence service is available. + * This can be used to quickly check before performing operations. + * + * @return true if a persistence service is available (either directly set or via tracker) + */ + public boolean isPersistenceServiceAvailable() { + return getPersistenceService() != null; + } } + diff --git a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterSystemStatisticsEvent.java b/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterSystemStatisticsEvent.java deleted file mode 100644 index 3c4ec5ad7..000000000 --- a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterSystemStatisticsEvent.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.unomi.services.impl.cluster; - -import org.apache.karaf.cellar.core.event.Event; - -import java.io.Serializable; -import java.util.Map; -import java.util.TreeMap; - -/** - * The cluster event used to transmit update to node system statistics. - */ -public class ClusterSystemStatisticsEvent extends Event { - - Map<String,Serializable> statistics = new TreeMap<>(); - - public ClusterSystemStatisticsEvent(String id) { - super(id); - } - - public Map<String, Serializable> getStatistics() { - return statistics; - } - - public void setStatistics(Map<String, Serializable> statistics) { - this.statistics = statistics; - } -} diff --git a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterSystemStatisticsEventHandler.java b/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterSystemStatisticsEventHandler.java deleted file mode 100644 index 9eecaf4ba..000000000 --- a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterSystemStatisticsEventHandler.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.unomi.services.impl.cluster; - -import org.apache.karaf.cellar.config.Constants; -import org.apache.karaf.cellar.core.CellarSupport; -import org.apache.karaf.cellar.core.Configurations; -import org.apache.karaf.cellar.core.Group; -import org.apache.karaf.cellar.core.control.BasicSwitch; -import org.apache.karaf.cellar.core.control.Switch; -import org.apache.karaf.cellar.core.control.SwitchStatus; -import org.apache.karaf.cellar.core.event.EventHandler; -import org.apache.karaf.cellar.core.event.EventType; -import org.osgi.service.cm.Configuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Serializable; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * A Karaf Cellar event handler to process incoming events that contain system statistics updates from nodes. - */ -public class ClusterSystemStatisticsEventHandler extends CellarSupport implements EventHandler<ClusterSystemStatisticsEvent> { - - public static final String SWITCH_ID = "org.apache.unomi.cluster.system.statistics.handler"; - private static final Logger LLOGGER = LoggerFactory.getLogger(ClusterSystemStatisticsEventHandler.class.getName()); - private final Switch eventSwitch = new BasicSwitch(SWITCH_ID); - private ClusterServiceImpl clusterServiceImpl; - - public void setClusterServiceImpl(ClusterServiceImpl clusterServiceImpl) { - this.clusterServiceImpl = clusterServiceImpl; - } - - public void init() { - // nothing to do - } - - public void destroy() { - // nothing to do - } - - @Override - public void handle(ClusterSystemStatisticsEvent event) { - // check if the handler is ON - if (this.getSwitch().getStatus().equals(SwitchStatus.OFF)) { - LLOGGER.debug("CELLAR SYSTEM STATS: {} switch is OFF, cluster event not handled", SWITCH_ID); - return; - } - - if (groupManager == null) { - //in rare cases for example right after installation this happens! - LLOGGER.error("CELLAR SYSTEM STATS: retrieved event {} while groupManager is not available yet!", event); - return; - } - - // check if the group is local - if (!groupManager.isLocalGroup(event.getSourceGroup().getName())) { - LLOGGER.info("CELLAR SYSTEM STATS: node is not part of the event cluster group {}",event.getSourceGroup().getName()); - return; - } - - Group group = event.getSourceGroup(); - String groupName = group.getName(); - - String pid = event.getId(); - - if (isAllowed(event.getSourceGroup(), Constants.CATEGORY, pid, EventType.INBOUND)) { - - // check if it's not a "local" event - if (event.getSourceNode() != null && event.getSourceNode().getId().equalsIgnoreCase(clusterManager.getNode().getId())) { - LLOGGER.trace("CELLAR SYSTEM STATS: cluster event is local (coming from local synchronizer or listener)"); - return; - } - - - Map<String, Serializable> nodeSystemStatistics = clusterServiceImpl.getNodeSystemStatistics().get(event.getSourceNode().getId()); - if (nodeSystemStatistics == null) { - nodeSystemStatistics = new ConcurrentHashMap<>(); - } - nodeSystemStatistics.putAll(event.getStatistics()); - clusterServiceImpl.getNodeSystemStatistics().put(event.getSourceNode().getId(), nodeSystemStatistics); - } - - } - - @Override - public Class<ClusterSystemStatisticsEvent> getType() { - return ClusterSystemStatisticsEvent.class; - } - - /** - * Get the cluster configuration event handler switch. - * - * @return the cluster configuration event handler switch. - */ - @Override - public Switch getSwitch() { - // load the switch status from the config - try { - Configuration configuration = configurationAdmin.getConfiguration(Configurations.NODE, null); - if (configuration != null) { - String handlerStatus = (String) configuration.getProperties().get(Configurations.HANDLER + "." + this.getClass().getName()); - if (handlerStatus == null) { - // default value is on. - eventSwitch.turnOn(); - } else { - Boolean status = new Boolean(handlerStatus); - if (status) { - eventSwitch.turnOn(); - } else { - eventSwitch.turnOff(); - } - } - } - } catch (Exception e) { - // nothing to do - } - return eventSwitch; - } - - -} diff --git a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml index f49f3c849..e2e188464 100644 --- a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -53,7 +53,7 @@ <cm:property-placeholder persistent-id="org.apache.unomi.cluster" update-strategy="reload" placeholder-prefix="${cluster."> <cm:default-properties> - <cm:property name="group" value="default"/> + <cm:property name="nodeId" value="unomi-node-1"/> <cm:property name="contextserver.publicAddress" value="https://localhost:9443"/> <cm:property name="contextserver.internalAddress" value="http://127.0.0.1:8181"/> <cm:property name="nodeStatisticsUpdateFrequency" value="10000"/> @@ -70,9 +70,6 @@ <reference id="persistenceService" interface="org.apache.unomi.persistence.spi.PersistenceService"/> <reference id="httpService" interface="org.osgi.service.http.HttpService"/> - <reference id="karafCellarClusterManager" interface="org.apache.karaf.cellar.core.ClusterManager"/> - <reference id="karafCellarEventProducer" interface="org.apache.karaf.cellar.core.event.EventProducer"/> - <reference id="karafCellarGroupManager" interface="org.apache.karaf.cellar.core.GroupManager"/> <reference id="osgiConfigurationAdmin" interface="org.osgi.service.cm.ConfigurationAdmin"/> <reference id="metricsService" interface="org.apache.unomi.metrics.MetricsService"/> <reference id="scriptExecutor" interface="org.apache.unomi.scripting.ScriptExecutor"/> @@ -262,13 +259,11 @@ <property name="publicAddress" value="${cluster.contextserver.publicAddress}"/> <property name="internalAddress" value="${cluster.contextserver.internalAddress}"/> <property name="persistenceService" ref="persistenceService"/> - <property name="karafCellarClusterManager" ref="karafCellarClusterManager"/> - <property name="karafCellarEventProducer" ref="karafCellarEventProducer"/> - <property name="karafCellarGroupManager" ref="karafCellarGroupManager"/> - <property name="karafCellarGroupName" value="${cluster.group}"/> - <property name="osgiConfigurationAdmin" ref="osgiConfigurationAdmin"/> + <property name="nodeId" value="${cluster.nodeId}"/> <property name="nodeStatisticsUpdateFrequency" value="${cluster.nodeStatisticsUpdateFrequency}"/> + <!-- Wait for UNOMI-878 to be available to activate that <property name="schedulerService" ref="schedulerServiceImpl"/> + --> </bean> <service id="clusterService" ref="clusterServiceImpl" interface="org.apache.unomi.api.services.ClusterService"/> @@ -411,7 +406,6 @@ </bean> </service> - <bean id="configSharingServiceImpl" class="org.apache.unomi.services.impl.configsharing.ConfigSharingServiceImpl" destroy-method="preDestroy"> <property name="configProperties"> @@ -429,19 +423,4 @@ </interfaces> </service> - <!-- Cluster System Statistics Event Handler --> - <bean id="clusterSystemStatisticsEventHandler" - class="org.apache.unomi.services.impl.cluster.ClusterSystemStatisticsEventHandler" - init-method="init" destroy-method="destroy"> - <property name="configurationAdmin" ref="osgiConfigurationAdmin"/> - <property name="clusterManager" ref="karafCellarClusterManager"/> - <property name="groupManager" ref="karafCellarGroupManager"/> - <property name="clusterServiceImpl" ref="clusterServiceImpl"/> - </bean> - <service ref="clusterSystemStatisticsEventHandler" interface="org.apache.karaf.cellar.core.event.EventHandler"> - <service-properties> - <entry key="managed" value="true"/> - </service-properties> - </service> - </blueprint> diff --git a/services/src/main/resources/hazelcast.xml b/services/src/main/resources/hazelcast.xml deleted file mode 100644 index 2179c92c6..000000000 --- a/services/src/main/resources/hazelcast.xml +++ /dev/null @@ -1,222 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> -<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.4.xsd" - xmlns="http://www.hazelcast.com/schema/config" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> - <group> - <name>${org.apache.unomi.hazelcast.group.name}</name> - <password>${org.apache.unomi.hazelcast.group.password}</password> - </group> - <management-center enabled="false">http://localhost:8080/mancenter</management-center> - <properties> - <property name="hazelcast.jmx">true</property> - </properties> - <network> - <port auto-increment="true" port-count="100">${org.apache.unomi.hazelcast.network.port}</port> - <outbound-ports> - <!-- - Allowed port range when connecting to other nodes. - 0 or * means use system provided port. - --> - <ports>0</ports> - </outbound-ports> - <join> - <multicast enabled="false"> - <multicast-group>224.2.2.3</multicast-group> - <multicast-port>54327</multicast-port> - </multicast> - <tcp-ip enabled="true"> - <members>${org.apache.unomi.hazelcast.tcp-ip.members}</members> - </tcp-ip> - <aws enabled="false"> - <access-key>my-access-key</access-key> - <secret-key>my-secret-key</secret-key> - <!--optional, default is us-east-1 --> - <region>us-west-1</region> - <!--optional, default is ec2.amazonaws.com. If set, region shouldn't be set as it will override this property --> - <host-header>ec2.amazonaws.com</host-header> - <!-- optional, only instances belonging to this group will be discovered, default will try all running instances --> - <security-group-name>hazelcast-sg</security-group-name> - <tag-key>type</tag-key> - <tag-value>hz-nodes</tag-value> - </aws> - </join> - <interfaces enabled="false"> - <interface>10.10.1.*</interface> - </interfaces> - <ssl enabled="false"/> - <socket-interceptor enabled="false"/> - <symmetric-encryption enabled="false"> - <!-- - encryption algorithm such as - DES/ECB/PKCS5Padding, - PBEWithMD5AndDES, - AES/CBC/PKCS5Padding, - Blowfish, - DESede - --> - <algorithm>PBEWithMD5AndDES</algorithm> - <!-- salt value to use when generating the secret key --> - <salt>thesalt</salt> - <!-- pass phrase to use when generating the secret key --> - <password>thepass</password> - <!-- iteration count to use when generating the secret key --> - <iteration-count>19</iteration-count> - </symmetric-encryption> - </network> - <partition-group enabled="false"/> - <executor-service> - <pool-size>16</pool-size> - <!-- Queue capacity. 0 means Integer.MAX_VALUE --> - <queue-capacity>0</queue-capacity> - </executor-service> - <queue name="default"> - <!-- - Maximum size of the queue. When a JVM's local queue size reaches the maximum, - all put/offer operations will get blocked until the queue size - of the JVM goes down below the maximum. - Any integer between 0 and Integer.MAX_VALUE. 0 means - Integer.MAX_VALUE. Default is 0. - --> - <max-size>0</max-size> - <!-- - Number of backups. If 1 is set as the backup-count for example, - then all entries of the map will be copied to another JVM for - fail-safety. 0 means no backup. - --> - <backup-count>1</backup-count> - <!-- - Number of async backups. 0 means no backup. - --> - <async-backup-count>0</async-backup-count> - <empty-queue-ttl>-1</empty-queue-ttl> - </queue> - - <map name="default"> - <!-- - Data type that will be used for storing recordMap. - Possible values: - BINARY (default): keys and values will be stored as binary data - OBJECT : values will be stored in their object forms - OFFHEAP : values will be stored in non-heap region of JVM - --> - <in-memory-format>BINARY</in-memory-format> - <!-- - Number of backups. If 1 is set as the backup-count for example, - then all entries of the map will be copied to another JVM for - fail-safety. 0 means no backup. - --> - <backup-count>1</backup-count> - <!-- - Number of async backups. 0 means no backup. - --> - <async-backup-count>0</async-backup-count> - <!-- - Maximum number of seconds for each entry to stay in the map. Entries that are - older than <time-to-live-seconds> and not updated for <time-to-live-seconds> - will get automatically evicted from the map. - Any integer between 0 and Integer.MAX_VALUE. 0 means infinite. Default is 0. - --> - <time-to-live-seconds>0</time-to-live-seconds> - <!-- - Maximum number of seconds for each entry to stay idle in the map. Entries that are - idle(not touched) for more than <max-idle-seconds> will get - automatically evicted from the map. Entry is touched if get, put or containsKey is called. - Any integer between 0 and Integer.MAX_VALUE. 0 means infinite. Default is 0. - --> - <max-idle-seconds>0</max-idle-seconds> - <!-- - Valid values are: - NONE (no eviction), - LRU (Least Recently Used), - LFU (Least Frequently Used). - NONE is the default. - --> - <eviction-policy>NONE</eviction-policy> - <!-- - Maximum size of the map. When max size is reached, - map is evicted based on the policy defined. - Any integer between 0 and Integer.MAX_VALUE. 0 means - Integer.MAX_VALUE. Default is 0. - --> - <max-size policy="PER_NODE">0</max-size> - <!-- - When max. size is reached, specified percentage of - the map will be evicted. Any integer between 0 and 100. - If 25 is set for example, 25% of the entries will - get evicted. - --> - <eviction-percentage>25</eviction-percentage> - <!-- - While recovering from split-brain (network partitioning), - map entries in the small cluster will merge into the bigger cluster - based on the policy set here. When an entry merge into the - cluster, there might an existing entry with the same key already. - Values of these entries might be different for that same key. - Which value should be set for the key? Conflict is resolved by - the policy set here. Default policy is PutIfAbsentMapMergePolicy - - There are built-in merge policies such as - com.hazelcast.map.merge.PassThroughMergePolicy; entry will be added if there is no existing entry for the key. - com.hazelcast.map.merge.PutIfAbsentMapMergePolicy ; entry will be added if the merging entry doesn't exist in the cluster. - com.hazelcast.map.merge.HigherHitsMapMergePolicy ; entry with the higher hits wins. - com.hazelcast.map.merge.LatestUpdateMapMergePolicy ; entry with the latest update wins. - --> - <merge-policy>com.hazelcast.map.merge.PassThroughMergePolicy</merge-policy> - </map> - - <multimap name="default"> - <backup-count>1</backup-count> - <value-collection-type>SET</value-collection-type> - </multimap> - - <multimap name="default"> - <backup-count>1</backup-count> - <value-collection-type>SET</value-collection-type> - </multimap> - - <list name="default"> - <backup-count>1</backup-count> - </list> - - <set name="default"> - <backup-count>1</backup-count> - </set> - - <jobtracker name="default"> - <max-thread-size>0</max-thread-size> - <!-- Queue size 0 means number of partitions * 2 --> - <queue-size>0</queue-size> - <retry-count>0</retry-count> - <chunk-size>1000</chunk-size> - <communicate-stats>true</communicate-stats> - <topology-changed-strategy>CANCEL_RUNNING_OPERATION</topology-changed-strategy> - </jobtracker> - - <semaphore name="default"> - <initial-permits>0</initial-permits> - <backup-count>1</backup-count> - <async-backup-count>0</async-backup-count> - </semaphore> - - <serialization> - <portable-version>0</portable-version> - </serialization> - - <services enable-defaults="true" /> -</hazelcast> \ No newline at end of file diff --git a/services/src/main/resources/org.apache.unomi.cluster.cfg b/services/src/main/resources/org.apache.unomi.cluster.cfg index bfbb189d9..eecb7e1de 100644 --- a/services/src/main/resources/org.apache.unomi.cluster.cfg +++ b/services/src/main/resources/org.apache.unomi.cluster.cfg @@ -14,14 +14,18 @@ # See the License for the specific language governing permissions and # limitations under the License. # -group=${org.apache.unomi.cluster.group:-default} # To simplify testing we set the public address to use HTTP, but for production environments it is highly recommended # to switch to using HTTPS with a proper SSL certificate installed. contextserver.publicAddress=${org.apache.unomi.cluster.public.address:-http://localhost:8181} contextserver.internalAddress=${org.apache.unomi.cluster.internal.address:-https://localhost:9443} # -# The nodeStatisticsUpdateFrequency controls the frequency of the update of system statistics such as CPU load, +# The nodeId is a required setting that uniquely identifies this node in the cluster. +# It must be set to a unique value for each node in the cluster. +# Example: nodeId=node1 +nodeId=${org.apache.unomi.cluster.nodeId:-unomi-node-1} +# +## The nodeStatisticsUpdateFrequency controls the frequency of the update of system statistics such as CPU load, # system load average and uptime. This value is set in milliseconds and is set to 10 seconds by default. Each node # will retrieve the local values and broadcast them through a cluster event to all the other nodes to update # the global cluster statistics. -nodeStatisticsUpdateFrequency=${org.apache.unomi.cluster.nodeStatisticsUpdateFrequency:-10000} \ No newline at end of file +nodeStatisticsUpdateFrequency=${org.apache.unomi.cluster.nodeStatisticsUpdateFrequency:-10000}