This is an automated email from the ASF dual-hosted git repository. ghenzler pushed a commit to branch feature/FELIX-6245-condition-service-interfaces-FELIX-6250-healthcheck-monitor in repository https://gitbox.apache.org/repos/asf/felix-dev.git
commit 775545aa8f0657d7c5f703bc901693dcdbaff92f Author: georg.henzler <[email protected]> AuthorDate: Sat Mar 28 01:46:50 2020 +0100 FELIX-6250 Introduce HealthCheckMonitor for monitoring health checks --- healthcheck/core/pom.xml | 2 +- .../impl/executor/CombinedExecutionResult.java | 9 + .../hc/core/impl/monitor/HealthCheckMonitor.java | 344 +++++++++++++++++++++ 3 files changed, 354 insertions(+), 1 deletion(-) diff --git a/healthcheck/core/pom.xml b/healthcheck/core/pom.xml index 36333a5..620d11f 100644 --- a/healthcheck/core/pom.xml +++ b/healthcheck/core/pom.xml @@ -145,7 +145,7 @@ <dependency> <groupId>org.apache.felix</groupId> <artifactId>org.apache.felix.healthcheck.api</artifactId> - <version>2.0.0</version> + <version>2.0.3-SNAPSHOT</version> <scope>provided</scope> </dependency> diff --git a/healthcheck/core/src/main/java/org/apache/felix/hc/core/impl/executor/CombinedExecutionResult.java b/healthcheck/core/src/main/java/org/apache/felix/hc/core/impl/executor/CombinedExecutionResult.java index a159fc6..55a9172 100644 --- a/healthcheck/core/src/main/java/org/apache/felix/hc/core/impl/executor/CombinedExecutionResult.java +++ b/healthcheck/core/src/main/java/org/apache/felix/hc/core/impl/executor/CombinedExecutionResult.java @@ -89,5 +89,14 @@ public class CombinedExecutionResult implements HealthCheckExecutionResult { public HealthCheckMetadata getHealthCheckMetadata() { throw new UnsupportedOperationException(); } + + @Override + public String toString() { + return "CombinedExecutionResult [size="+executionResults.size()+" overall status=" + getHealthCheckResult().getStatus() + + ", finishedAt=" + getFinishedAt() + + ", elapsedTimeInMs=" + getElapsedTimeInMs() + + ", timedOut=" + hasTimedOut() + + "]"; + } } \ No newline at end of file diff --git a/healthcheck/core/src/main/java/org/apache/felix/hc/core/impl/monitor/HealthCheckMonitor.java b/healthcheck/core/src/main/java/org/apache/felix/hc/core/impl/monitor/HealthCheckMonitor.java new file mode 100644 index 0000000..18b47cb --- /dev/null +++ b/healthcheck/core/src/main/java/org/apache/felix/hc/core/impl/monitor/HealthCheckMonitor.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The SF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.felix.hc.core.impl.monitor; + +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.Dictionary; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.felix.hc.api.Result; +import org.apache.felix.hc.api.condition.Healthy; +import org.apache.felix.hc.api.condition.SystemReady; +import org.apache.felix.hc.api.condition.Unhealthy; +import org.apache.felix.hc.api.execution.HealthCheckExecutionResult; +import org.apache.felix.hc.api.execution.HealthCheckExecutor; +import org.apache.felix.hc.api.execution.HealthCheckSelector; +import org.apache.felix.hc.core.impl.executor.CombinedExecutionResult; +import org.apache.felix.hc.core.impl.executor.HealthCheckExecutorThreadPool; +import org.apache.felix.hc.core.impl.scheduling.AsyncIntervalJob; +import org.apache.felix.hc.core.impl.scheduling.AsyncJob; +import org.apache.felix.hc.core.impl.scheduling.AsyncQuartzCronJob; +import org.apache.felix.hc.core.impl.scheduling.QuartzCronSchedulerProvider; +import org.osgi.framework.BundleContext; +import org.osgi.framework.InvalidSyntaxException; +import org.osgi.framework.ServiceRegistration; +import org.osgi.service.component.ComponentConstants; +import org.osgi.service.component.ComponentContext; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.ConfigurationPolicy; +import org.osgi.service.component.annotations.Deactivate; +import org.osgi.service.component.annotations.Reference; +import org.osgi.service.event.Event; +import org.osgi.service.event.EventAdmin; +import org.osgi.service.metatype.annotations.AttributeDefinition; +import org.osgi.service.metatype.annotations.Designate; +import org.osgi.service.metatype.annotations.ObjectClassDefinition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Monitors health check tags and/or names and depending on configuration: + * <p> + * <ul> + * <li>Activates the condition marker services {@link SystemReady}, {@link Healthy}, {@link Unhealthy}</li> + * <li>Sends OSGi events</li> + * </ul> + * <p> + * + */ +@Component(immediate = true, configurationPolicy = ConfigurationPolicy.REQUIRE) +@Designate(ocd = HealthCheckMonitor.Config.class, factory = true) +public class HealthCheckMonitor implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(HealthCheckMonitor.class); + + public static final String TAG_SYSTEMREADY = "systemready"; + + public static final String EVENT_TOPIC_PREFIX = "org/apache/felix/healthchange"; + public static final String EVENT_PROP_EXECUTION_RESULT = "executionResult"; + public static final String EVENT_PROP_STATUS = "status"; + public static final String EVENT_PROP_PREVIOUS_STATUS = "previousStatus"; + + private static final Healthy MARKER_SERVICE_HEALTHY = new Healthy() { }; + private static final Unhealthy MARKER_SERVICE_UNHEALTHY = new Unhealthy() { }; + private static final SystemReady MARKER_SERVICE_SYSTEMREADY = new SystemReady() { }; + + + @ObjectClassDefinition(name = "Health Check Monitor", description = "Regularly executes health checks according to given interval/cron expression") + public @interface Config { + + @AttributeDefinition(name = "Tags", description = "List of tags to query regularly") + String[] tags() default {}; + + @AttributeDefinition(name = "Names", description = "List of health check names to query regularly") + String[] names() default {}; + + @AttributeDefinition(name = "Interval (Sec)", description = "Will execute the checks for give tags every n seconds (either use intervalInSec or cronExpression )") + long intervalInSec() default 0; + + @AttributeDefinition(name = "Interval (Cron Expresson)", description = "Will execute the checks for give tags according to cron expression") + String cronExpression() default ""; + + @AttributeDefinition(name = "Register Healthy Marker Service", description = "For the case a given tag/name is healthy, will register a service Healthy with property tag=<tagname> (or name=<hc.name>) that other services can depend on") + boolean registerHealthyMarkerService() default true; + + @AttributeDefinition(name = "Register Unhealthy Marker Service", description = "For the case a given tag/name is unhealthy, will register a service Unhealthy with property tag=<tagname> (or name=<hc.name>) that other services can depend on") + boolean registerUnhealthyMarkerService() default false; + + @AttributeDefinition(name = "Treat WARN as Healthy", description = "Whether to treat status WARN as healthy (it normally should because WARN indicates a working system that only possibly might become unavailable if no action is taken") + boolean treatWarnAsHealthy() default true; + + @AttributeDefinition(name = "Send Events", description = "Whether to send OSGi events for the case a status has changed") + boolean sendEvents() default true; + + @AttributeDefinition + String webconsole_configurationFactory_nameHint() default "Health Monitor for '{tags}'/'{names}', {intervalInSec}sec/{cronExpression}, Marker Service Healthy:{registerHealthyMarkerService} Unhealthy:{registerUnhealthyMarkerService}, Send Events {sendEvents}"; + } + + private List<String> tags; + private List<String> names; + private List<HealthState> healthStates = new ArrayList<>(); + + private long intervalInSec; + private String cronExpression; + + private boolean registerHealthyMarkerService; + private boolean registerUnhealthyMarkerService; + + private boolean treatWarnAsHealthy; + + private boolean sendEvents; + + @Reference + HealthCheckExecutorThreadPool healthCheckExecutorThreadPool; + + @Reference + QuartzCronSchedulerProvider quartzCronSchedulerProvider; + + AsyncJob monitorJob = null; + + @Reference + private HealthCheckExecutor executor; + + @Reference + private EventAdmin eventAdmin; + + private BundleContext bundleContext; + + @Activate + protected final void activate(BundleContext bundleContext, Config config, ComponentContext componentContext) throws InvalidSyntaxException { + + this.bundleContext = bundleContext; + + this.tags = Arrays.asList(config.tags()); + this.tags.stream().filter(StringUtils::isNotBlank).forEach(tag -> healthStates.add(new HealthState(tag, true))); + + this.names = Arrays.asList(config.names()); + this.names.stream().filter(StringUtils::isNotBlank).forEach(name -> healthStates.add(new HealthState(name, false))); + + this.registerHealthyMarkerService = config.registerHealthyMarkerService(); + this.registerUnhealthyMarkerService = config.registerUnhealthyMarkerService(); + + this.treatWarnAsHealthy = config.treatWarnAsHealthy(); + this.sendEvents = config.sendEvents(); + + this.intervalInSec = config.intervalInSec(); + this.cronExpression = config.cronExpression(); + if (StringUtils.isNotBlank(cronExpression)) { + try { + monitorJob = new AsyncQuartzCronJob(this, quartzCronSchedulerProvider, "job-hc-monitor-"+componentContext.getProperties().get(ComponentConstants.COMPONENT_ID), "healthcheck-monitor", cronExpression); + } catch(ClassNotFoundException e) { + throw new IllegalArgumentException("Cannot use cron expression "+cronExpression+" while class is not available: "+cronExpression); + } + } else if (intervalInSec > 0) { + monitorJob = new AsyncIntervalJob(this, healthCheckExecutorThreadPool, intervalInSec); + } else { + throw new IllegalArgumentException("Either cronExpression or intervalInSec needs to be set"); + } + monitorJob.schedule(); + LOG.info("HealthCheckMonitor active for tags {} and names {}", this.tags, this.names); + } + + @Override + public String toString() { + return "[HealthCheckMonitor tags=" + tags + "/names=" + names + ", intervalInSec="+intervalInSec+"/cron="+cronExpression+"]"; + } + + @Deactivate + protected final void deactivate() { + healthStates.stream().forEach(HealthState::cleanUp); + monitorJob.unschedule(); + LOG.info("HealthCheckMonitor deactivated for tags {} and names {}", this.tags, this.names); + } + + public void run() { + try { + // run in tags/names in parallel + healthStates.parallelStream().forEach((healthState) -> { + HealthCheckSelector selector = healthState.isTag ? HealthCheckSelector.tags(healthState.tagOrName) + : HealthCheckSelector.names(healthState.tagOrName); + + List<HealthCheckExecutionResult> executionResults = executor.execute(selector); + + HealthCheckExecutionResult result = executionResults.size() == 1 ? executionResults.get(0) + : new CombinedExecutionResult(executionResults, Result.Status.TEMPORARILY_UNAVAILABLE); + + LOG.trace("Result of '{}' => {}", healthState.tagOrName, result.getHealthCheckResult().getStatus()); + + healthState.update(result); + }); + + LOG.trace("HealthCheckMonitor: updated results for tags {} and names {}", this.tags, this.names); + } catch(Exception e) { + LOG.error("Exception HealthCheckMonitor run(): "+e, e); + } + + } + + private class HealthState { + + private String tagOrName; + private boolean isTag; + private String propertyName; + + private ServiceRegistration<?> healthyRegistration = null; + private ServiceRegistration<Unhealthy> unhealthyRegistration = null; + + private Result.Status status = null; + private boolean isHealthy = false; + private boolean statusChanged = false; + + public HealthState(String tagOrName, boolean isTag) { + this.tagOrName = tagOrName; + this.isTag = isTag; + this.propertyName = isTag ? "tag" : "name"; + } + + synchronized void update(HealthCheckExecutionResult executionResult) { + Result.Status previousStatus = status; + status = executionResult.getHealthCheckResult().getStatus(); + + isHealthy = (status == Result.Status.OK || (treatWarnAsHealthy && status == Result.Status.WARN)); + statusChanged = previousStatus != status; + LOG.trace(" {}: isHealthy={} statusChanged={}", tagOrName, isHealthy, statusChanged); + + registerMarkerServices(); + sendEvent(executionResult, previousStatus); + } + + private void registerMarkerServices() { + if (registerHealthyMarkerService) { + if (isHealthy && healthyRegistration == null) { + registerHealthyService(); + } else if (!isHealthy && healthyRegistration != null) { + unregisterHealthyService(); + } + } + if (registerUnhealthyMarkerService) { + if (!isHealthy && unhealthyRegistration == null) { + registerUnhealthyService(); + } else if (isHealthy && unhealthyRegistration != null) { + unregisterUnhealthyService(); + } + } + } + + private void registerHealthyService() { + if (healthyRegistration == null) { + LOG.debug("HealthCheckMonitor: registerHealthyService() {} ", tagOrName); + Dictionary<String, String> registrationProps = new Hashtable<>(); + registrationProps.put(propertyName, tagOrName); + registrationProps.put("activated", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); + + if (StringUtils.equals(tagOrName, TAG_SYSTEMREADY)) { + LOG.debug("HealthCheckMonitor: SYSTEM READY"); + healthyRegistration = bundleContext.registerService( + new String[] { SystemReady.class.getName(), Healthy.class.getName()}, + MARKER_SERVICE_SYSTEMREADY, registrationProps); + } else { + healthyRegistration = bundleContext.registerService(Healthy.class, MARKER_SERVICE_HEALTHY, + registrationProps); + } + LOG.debug("HealthCheckMonitor: Healthy service for {} '{}' registered", propertyName, tagOrName); + } + } + + private void unregisterHealthyService() { + if (healthyRegistration != null) { + healthyRegistration.unregister(); + healthyRegistration = null; + LOG.debug("HealthCheckMonitor: Healthy service for {} '{}' unregistered", propertyName, tagOrName); + } + } + + private void registerUnhealthyService() { + if (unhealthyRegistration == null) { + Dictionary<String, String> registrationProps = new Hashtable<>(); + registrationProps.put("tag", tagOrName); + registrationProps.put("activated", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); + unhealthyRegistration = bundleContext.registerService(Unhealthy.class, MARKER_SERVICE_UNHEALTHY, + registrationProps); + LOG.debug("HealthCheckMonitor: Unhealthy service for {} '{}' registered", propertyName, tagOrName); + } + } + + private void unregisterUnhealthyService() { + if (unhealthyRegistration != null) { + unhealthyRegistration.unregister(); + unhealthyRegistration = null; + LOG.debug("HealthCheckMonitor: Unhealthy service for {} '{}' unregistered", propertyName, tagOrName); + } + } + + + private void sendEvent(HealthCheckExecutionResult executionResult, Result.Status previousStatus) { + if(sendEvents && statusChanged) { + Map<String,Object> properties = new HashMap<>(); + properties.put(EVENT_PROP_STATUS , status); + if(previousStatus != null) { + properties.put(EVENT_PROP_PREVIOUS_STATUS , previousStatus); + } + properties.put(EVENT_PROP_EXECUTION_RESULT, executionResult); + String topic = EVENT_TOPIC_PREFIX + "/" +propertyName + "/" + tagOrName.replaceAll("\\s+","_"); + eventAdmin.postEvent(new Event(topic, properties)); + LOG.debug("HealthCheckMonitor: Posted event for topic '{}': Status change from {} to {}", topic, previousStatus, status); + if(!(executionResult instanceof CombinedExecutionResult)) { + String componentClass = (String) executionResult.getHealthCheckMetadata().getServiceReference().getProperty(ComponentConstants.COMPONENT_NAME); + if(StringUtils.isNotBlank(componentClass)) { + String topicClass = EVENT_TOPIC_PREFIX + "/class/" + componentClass.replace(".", "/"); + eventAdmin.postEvent(new Event(topicClass, properties)); + LOG.debug("HealthCheckMonitor: Posted event for topic '{}': Status change from {} to {}", topicClass, previousStatus, status); + } + } + } + } + + synchronized void cleanUp() { + unregisterHealthyService(); + unregisterUnhealthyService(); + } + + } +}
