Repository: ambari Updated Branches: refs/heads/trunk 36edfee56 -> 5f3c6c11d
AMBARI-15702 - Global Repeat Tolerance Value For Alerts (jonathanhurley) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5f3c6c11 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5f3c6c11 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5f3c6c11 Branch: refs/heads/trunk Commit: 5f3c6c11d3caadf3a2081c94ca1d56f25b768235 Parents: 36edfee Author: Jonathan Hurley <[email protected]> Authored: Mon Apr 4 18:42:42 2016 -0400 Committer: Jonathan Hurley <[email protected]> Committed: Tue Apr 5 11:34:08 2016 -0400 ---------------------------------------------------------------------- .../server/agent/RecoveryConfigHelper.java | 23 ++--- .../AlertDefinitionResourceProvider.java | 17 ++-- .../internal/AlertResourceProvider.java | 46 +++++++++- .../events/ClusterConfigChangedEvent.java | 4 +- .../listeners/alerts/AlertReceivedListener.java | 54 +++++++++-- .../org/apache/ambari/server/state/Cluster.java | 19 ++++ .../ambari/server/state/ConfigHelper.java | 41 ++++----- .../apache/ambari/server/state/ConfigImpl.java | 16 ++-- .../server/state/cluster/ClusterImpl.java | 94 ++++++++++++++++---- .../server/upgrade/UpgradeCatalog240.java | 26 ++++++ .../internal/AlertResourceProviderTest.java | 2 + .../state/alerts/AlertReceivedListenerTest.java | 45 ++++++++++ .../server/state/cluster/ClusterTest.java | 55 +++++++----- .../server/upgrade/UpgradeCatalog240Test.java | 9 +- 14 files changed, 356 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/5f3c6c11/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigHelper.java index 951b04b..7d6a7f5 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigHelper.java @@ -18,10 +18,12 @@ package org.apache.ambari.server.agent; -import com.google.common.eventbus.AllowConcurrentEvents; -import com.google.common.eventbus.Subscribe; -import com.google.inject.Inject; -import com.google.inject.Singleton; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.events.ClusterConfigChangedEvent; import org.apache.ambari.server.events.MaintenanceModeEvent; @@ -37,11 +39,10 @@ import org.apache.ambari.server.state.MaintenanceState; import org.apache.ambari.server.state.ServiceComponentHost; import org.apache.commons.lang.StringUtils; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import com.google.common.eventbus.AllowConcurrentEvents; +import com.google.common.eventbus.Subscribe; +import com.google.inject.Inject; +import com.google.inject.Singleton; @Singleton public class RecoveryConfigHelper { @@ -227,8 +228,8 @@ public class RecoveryConfigHelper { @Subscribe @AllowConcurrentEvents public void handleClusterEnvConfigChangedEvent(ClusterConfigChangedEvent event) { - if (event.getConfigType() == ConfigHelper.CLUSTER_ENV) { - invalidateRecoveryTimestamp(event.getclusterName(), null); + if (StringUtils.equals(event.getConfigType(), ConfigHelper.CLUSTER_ENV)) { + invalidateRecoveryTimestamp(event.getClusterName(), null); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/5f3c6c11/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProvider.java index ca3fb63..bcf0205 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProvider.java @@ -619,14 +619,17 @@ public class AlertDefinitionResourceProvider extends AbstractControllerResourceP managed = true; } - if (null != repeatTolerance) { - entity.setRepeatTolerance(repeatTolerance); - managed = true; - } + // repeat tolerance is only for non-AGGREGATE alerts + if (entity.getSourceType() != SourceType.AGGREGATE) { + if (null != repeatTolerance) { + entity.setRepeatTolerance(repeatTolerance); + managed = true; + } - if (null != repeatToleranceEnabled) { - entity.setRepeatToleranceEnabled(repeatToleranceEnabled); - managed = true; + if (null != repeatToleranceEnabled) { + entity.setRepeatToleranceEnabled(repeatToleranceEnabled); + managed = true; + } } if (managed) { http://git-wip-us.apache.org/repos/asf/ambari/blob/5f3c6c11/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertResourceProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertResourceProvider.java index 4c20c6c..4cf41b4 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertResourceProvider.java @@ -45,9 +45,14 @@ import org.apache.ambari.server.orm.entities.AlertCurrentEntity; import org.apache.ambari.server.orm.entities.AlertDefinitionEntity; import org.apache.ambari.server.orm.entities.AlertHistoryEntity; import org.apache.ambari.server.state.AlertState; +import org.apache.ambari.server.state.Cluster; +import org.apache.ambari.server.state.Clusters; +import org.apache.ambari.server.state.ConfigHelper; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.math.NumberUtils; import com.google.inject.Inject; +import com.google.inject.Provider; /** * ResourceProvider for Alert instances @@ -88,6 +93,9 @@ public class AlertResourceProvider extends ReadOnlyResourceProvider implements @Inject private static AlertDefinitionDAO alertDefinitionDAO = null; + @Inject + private static Provider<Clusters> clusters; + /** * The property ids for an alert defintion resource. */ @@ -265,11 +273,12 @@ public class AlertResourceProvider extends ReadOnlyResourceProvider implements setResourceProperty(resource, ALERT_SCOPE, definition.getScope(), requestedIds); // repeat tolerance values - int repeatTolerance = definition.getRepeatTolerance(); + int repeatTolerance = getRepeatTolerance(definition, clusterName); long occurrences = entity.getOccurrences(); long remaining = (occurrences > repeatTolerance) ? 0 : (repeatTolerance - occurrences); - // the OK state is special; when received, we ignore tolerance and notify + // the OK state is special; when received, we ignore tolerance and assume + // the alert is HARD if (history.getAlertState() == AlertState.OK) { remaining = 0; } @@ -288,4 +297,37 @@ public class AlertResourceProvider extends ReadOnlyResourceProvider implements return resource; } + + /** + * Gets the repeat tolerance value for the specified definition. This method + * will return the override from the definition if + * {@link AlertDefinitionEntity#isRepeatToleranceEnabled()} is {@code true}. + * Otherwise, it uses {@link ConfigHelper#CLUSTER_ENV_ALERT_REPEAT_TOLERANCE}, + * defaulting to {@code 1} if not found. + * + * @param definition + * the definition (not {@code null}). + * @param clusterName + * the name of the cluster (not {@code null}). + * @return the repeat tolerance for the alert + */ + private int getRepeatTolerance(AlertDefinitionEntity definition, String clusterName ){ + + // if the definition overrides the global value, then use that + if( definition.isRepeatToleranceEnabled() ){ + return definition.getRepeatTolerance(); + } + + int repeatTolerance = 1; + try { + Cluster cluster = clusters.get().getCluster(clusterName); + String value = cluster.getClusterProperty(ConfigHelper.CLUSTER_ENV_ALERT_REPEAT_TOLERANCE, "1"); + repeatTolerance = NumberUtils.toInt(value, 1); + } catch (AmbariException ambariException) { + LOG.warn("Unable to read {}/{} from cluster {}, defaulting to 1", ConfigHelper.CLUSTER_ENV, + ConfigHelper.CLUSTER_ENV_ALERT_REPEAT_TOLERANCE, clusterName, ambariException); + } + + return repeatTolerance; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/5f3c6c11/ambari-server/src/main/java/org/apache/ambari/server/events/ClusterConfigChangedEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/ClusterConfigChangedEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/ClusterConfigChangedEvent.java index dec2a33..dd37ff9 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/ClusterConfigChangedEvent.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/ClusterConfigChangedEvent.java @@ -41,7 +41,7 @@ public class ClusterConfigChangedEvent extends AmbariEvent { * * @return */ - public String getclusterName() { + public String getClusterName() { return m_clusterName; } @@ -78,7 +78,7 @@ public class ClusterConfigChangedEvent extends AmbariEvent { @Override public String toString() { StringBuilder buffer = new StringBuilder("ClusterEnvConfigChangedEvent{"); - buffer.append("clusterName=").append(getclusterName()); + buffer.append("clusterName=").append(getClusterName()); buffer.append(", configType=").append(getConfigType()); buffer.append(", versionTag=").append(getVersionTag()); buffer.append(", version=").append(getVersion()); http://git-wip-us.apache.org/repos/asf/ambari/blob/5f3c6c11/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java index 2800ac6..71abffd 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java @@ -42,9 +42,11 @@ import org.apache.ambari.server.state.AlertFirmness; import org.apache.ambari.server.state.AlertState; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; +import org.apache.ambari.server.state.ConfigHelper; import org.apache.ambari.server.state.MaintenanceState; import org.apache.ambari.server.state.alert.SourceType; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.math.NumberUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -140,7 +142,8 @@ public class AlertReceivedListener { continue; } - Long clusterId = getClusterIdByName(alert.getCluster()); + String clusterName = alert.getCluster(); + Long clusterId = getClusterIdByName(clusterName); if (clusterId == null) { // check event clusterId = event.getClusterId(); @@ -285,9 +288,10 @@ public class AlertReceivedListener { break; } - // set the firmness of the new alert state based on the state & type - AlertFirmness firmness = calculateFirmnessForStateChange(definition, alertState, - current.getOccurrences()); + // set the firmness of the new alert state based on the state, type, + // occurrences, and repeat tolerance + AlertFirmness firmness = calculateFirmnessForStateChange(clusterName, definition, + alertState, current.getOccurrences()); current.setFirmness(firmness); @@ -537,24 +541,60 @@ public class AlertReceivedListener { * calculation firmness when moving between non-OK states) * @return */ - private AlertFirmness calculateFirmnessForStateChange(AlertDefinitionEntity definition, + private AlertFirmness calculateFirmnessForStateChange(String clusterName, AlertDefinitionEntity definition, AlertState state, long occurrences) { + // OK is always HARD since the alert has fulfilled the conditions if (state == AlertState.OK) { return AlertFirmness.HARD; } + // aggregate alerts are always HARD since they only react to HARD alerts if (definition.getSourceType() == SourceType.AGGREGATE) { return AlertFirmness.HARD; } - if (definition.getRepeatTolerance() <= 1) { + int tolerance = getRepeatTolerance(definition, clusterName); + if (tolerance <= 1) { return AlertFirmness.HARD; } - if (definition.getRepeatTolerance() <= occurrences) { + if (tolerance <= occurrences) { return AlertFirmness.HARD; } return AlertFirmness.SOFT; } + + /** + * Gets the repeat tolerance value for the specified definition. This method + * will return the override from the definition if + * {@link AlertDefinitionEntity#isRepeatToleranceEnabled()} is {@code true}. + * Otherwise, it uses {@link ConfigHelper#CLUSTER_ENV_ALERT_REPEAT_TOLERANCE}, + * defaulting to {@code 1} if not found. + * + * @param definition + * the definition (not {@code null}). + * @param clusterName + * the name of the cluster (not {@code null}). + * @return the repeat tolerance for the alert + */ + private int getRepeatTolerance(AlertDefinitionEntity definition, String clusterName) { + + // if the definition overrides the global value, then use that + if (definition.isRepeatToleranceEnabled()) { + return definition.getRepeatTolerance(); + } + + int repeatTolerance = 1; + try { + Cluster cluster = m_clusters.get().getCluster(clusterName); + String value = cluster.getClusterProperty(ConfigHelper.CLUSTER_ENV_ALERT_REPEAT_TOLERANCE, "1"); + repeatTolerance = NumberUtils.toInt(value, 1); + } catch (AmbariException ambariException) { + LOG.warn("Unable to read {}/{} from cluster {}, defaulting to 1", ConfigHelper.CLUSTER_ENV, + ConfigHelper.CLUSTER_ENV_ALERT_REPEAT_TOLERANCE, clusterName, ambariException); + } + + return repeatTolerance; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/5f3c6c11/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java index 38d05ab..cf2c9aa 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java @@ -27,6 +27,7 @@ import java.util.concurrent.locks.ReadWriteLock; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.controller.ClusterResponse; import org.apache.ambari.server.controller.ServiceConfigVersionResponse; +import org.apache.ambari.server.events.ClusterConfigChangedEvent; import org.apache.ambari.server.orm.entities.ClusterVersionEntity; import org.apache.ambari.server.orm.entities.HostEntity; import org.apache.ambari.server.orm.entities.HostVersionEntity; @@ -677,4 +678,22 @@ public interface Cluster { * otherwise returns null. */ String getServiceByConfigType(String configType); + + /** + * Gets the most recent value of {@code cluster-env/propertyName} where + * {@code propertyName} is the paramter specified to the method. This will use + * the desired configuration for {@code cluster-env}. + * <p/> + * The value is cached on this {@link Cluster} instance, so subsequent calls + * will not inclur a lookup penalty. This class also responds to + * {@link ClusterConfigChangedEvent} in order to clear the cache. + * + * @param propertyName + * the property to lookup in {@code cluster-env} (not {@code null}). + * @param defaultValue + * a default value to cache return if none exists (may be + * {@code null}). + * @return + */ + String getClusterProperty(String propertyName, String defaultValue); } http://git-wip-us.apache.org/repos/asf/ambari/blob/5f3c6c11/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java index 77e36c8..8d9a879 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java @@ -17,26 +17,6 @@ */ package org.apache.ambari.server.state; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.collect.Maps; -import com.google.inject.Inject; -import com.google.inject.Singleton; -import com.google.inject.persist.Transactional; -import org.apache.ambari.server.AmbariException; -import org.apache.ambari.server.api.services.AmbariMetaInfo; -import org.apache.ambari.server.configuration.Configuration; -import org.apache.ambari.server.controller.AmbariManagementController; -import org.apache.ambari.server.orm.dao.ClusterDAO; -import org.apache.ambari.server.orm.entities.ClusterConfigEntity; -import org.apache.ambari.server.security.authorization.AuthorizationException; -import org.apache.ambari.server.state.PropertyInfo.PropertyType; -import org.apache.ambari.server.state.configgroup.ConfigGroup; -import org.apache.ambari.server.upgrade.UpgradeCatalog170; -import org.apache.ambari.server.utils.SecretReference; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -51,6 +31,26 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.api.services.AmbariMetaInfo; +import org.apache.ambari.server.configuration.Configuration; +import org.apache.ambari.server.controller.AmbariManagementController; +import org.apache.ambari.server.orm.dao.ClusterDAO; +import org.apache.ambari.server.orm.entities.ClusterConfigEntity; +import org.apache.ambari.server.state.PropertyInfo.PropertyType; +import org.apache.ambari.server.state.configgroup.ConfigGroup; +import org.apache.ambari.server.upgrade.UpgradeCatalog170; +import org.apache.ambari.server.utils.SecretReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.Maps; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import com.google.inject.persist.Transactional; + /** * Helper class that works with config traversals. */ @@ -77,6 +77,7 @@ public class ConfigHelper { public static final String HIVE_SITE = "hive-site"; public static final String YARN_SITE = "yarn-site"; public static final String CLUSTER_ENV = "cluster-env"; + public static final String CLUSTER_ENV_ALERT_REPEAT_TOLERANCE = "alerts_repeat_tolerance"; public static final String CLUSTER_ENV_RETRY_ENABLED = "command_retry_enabled"; public static final String CLUSTER_ENV_RETRY_COMMANDS = "commands_to_retry"; public static final String CLUSTER_ENV_RETRY_MAX_TIME_IN_SEC = "command_retry_max_time_in_sec"; http://git-wip-us.apache.org/repos/asf/ambari/blob/5f3c6c11/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java index 73ee25e..dbdd5a2 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java @@ -33,6 +33,7 @@ import org.apache.ambari.server.orm.dao.ClusterDAO; import org.apache.ambari.server.orm.dao.ServiceConfigDAO; import org.apache.ambari.server.orm.entities.ClusterConfigEntity; import org.apache.ambari.server.orm.entities.ClusterEntity; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -404,13 +405,6 @@ public class ConfigImpl implements Config { // newest data clusterDAO.merge(clusterEntity, true); cluster.refresh(); - - // broadcast the change event for cluster-env config type - if (getType() == ConfigHelper.CLUSTER_ENV) { - ClusterConfigChangedEvent event = new ClusterConfigChangedEvent( - cluster.getClusterName(), getType(), getTag(), getVersion()); - eventPublisher.publish(event); - } } } } finally { @@ -420,6 +414,12 @@ public class ConfigImpl implements Config { cluster.getClusterGlobalLock().writeLock().unlock(); } - } + // broadcast the change event for cluster-env config type + if (StringUtils.equals(getType(), ConfigHelper.CLUSTER_ENV)) { + ClusterConfigChangedEvent event = new ClusterConfigChangedEvent(cluster.getClusterName(), + getType(), getTag(), getVersion()); + eventPublisher.publish(event); + } + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/5f3c6c11/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java index 9e456eb..f38c25a 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -48,7 +49,6 @@ import org.apache.ambari.server.ParentObjectNotFoundException; import org.apache.ambari.server.ServiceComponentHostNotFoundException; import org.apache.ambari.server.ServiceComponentNotFoundException; import org.apache.ambari.server.ServiceNotFoundException; -import org.apache.ambari.server.actionmanager.HostRoleCommand; import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.configuration.Configuration; @@ -60,6 +60,7 @@ import org.apache.ambari.server.controller.RootServiceResponseFactory.Services; import org.apache.ambari.server.controller.ServiceConfigVersionResponse; import org.apache.ambari.server.controller.internal.UpgradeResourceProvider; import org.apache.ambari.server.events.AmbariEvent.AmbariEventType; +import org.apache.ambari.server.events.ClusterConfigChangedEvent; import org.apache.ambari.server.events.ClusterEvent; import org.apache.ambari.server.events.publishers.AmbariEventPublisher; import org.apache.ambari.server.orm.RequiresSession; @@ -74,7 +75,6 @@ import org.apache.ambari.server.orm.dao.HostDAO; import org.apache.ambari.server.orm.dao.HostRoleCommandDAO; import org.apache.ambari.server.orm.dao.HostVersionDAO; import org.apache.ambari.server.orm.dao.RepositoryVersionDAO; -import org.apache.ambari.server.orm.dao.RequestDAO; import org.apache.ambari.server.orm.dao.ServiceConfigDAO; import org.apache.ambari.server.orm.dao.StackDAO; import org.apache.ambari.server.orm.dao.TopologyRequestDAO; @@ -93,7 +93,6 @@ import org.apache.ambari.server.orm.entities.HostVersionEntity; import org.apache.ambari.server.orm.entities.PermissionEntity; import org.apache.ambari.server.orm.entities.PrivilegeEntity; import org.apache.ambari.server.orm.entities.RepositoryVersionEntity; -import org.apache.ambari.server.orm.entities.RequestEntity; import org.apache.ambari.server.orm.entities.RequestScheduleEntity; import org.apache.ambari.server.orm.entities.ResourceEntity; import org.apache.ambari.server.orm.entities.ServiceConfigEntity; @@ -148,6 +147,7 @@ import com.google.common.collect.ListMultimap; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; +import com.google.common.eventbus.Subscribe; import com.google.inject.Inject; import com.google.inject.Injector; import com.google.inject.assistedinject.Assisted; @@ -225,9 +225,6 @@ public class ClusterImpl implements Cluster { private HostRoleCommandDAO hostRoleCommandDAO; @Inject - private RequestDAO requestDAO; - - @Inject private HostDAO hostDAO; @Inject @@ -292,15 +289,21 @@ public class ClusterImpl implements Cluster { private volatile Multimap<String, String> serviceConfigTypes; /** - * Used to publish events relating to cluster CRUD operations. + * Used to publish events relating to cluster CRUD operations and to receive + * information about cluster operations. */ - @Inject private AmbariEventPublisher eventPublisher; + /** + * A simple cache for looking up {@code cluster-env} properties for a cluster. + * This map is changed whenever {{cluster-env}} is changed and we receive a + * {@link ClusterConfigChangedEvent}. + */ + private Map<String, String> m_clusterPropertyCache = new ConcurrentHashMap<>(); @Inject public ClusterImpl(@Assisted ClusterEntity clusterEntity, - Injector injector) throws AmbariException { + Injector injector, AmbariEventPublisher eventPublisher) throws AmbariException { injector.injectMembers(this); clusterId = clusterEntity.getClusterId(); @@ -321,6 +324,10 @@ public class ClusterImpl implements Cluster { // Load any active stack upgrades. loadStackUpgrade(); + + // register to receive stuff + eventPublisher.register(this); + this.eventPublisher = eventPublisher; } @@ -346,9 +353,9 @@ public class ClusterImpl implements Cluster { clusterGlobalLock.writeLock().lock(); try { - UpgradeEntity activeUpgrade = this.getUpgradeInProgress(); + UpgradeEntity activeUpgrade = getUpgradeInProgress(); if (activeUpgrade != null) { - this.setUpgradeEntity(activeUpgrade); + setUpgradeEntity(activeUpgrade); } } catch (AmbariException e) { LOG.error("Unable to load active stack upgrade. Error: " + e.getMessage()); @@ -1189,7 +1196,7 @@ public class ClusterImpl implements Cluster { * @return */ private UpgradeEntity getUpgradeInProgress() { - UpgradeEntity mostRecentUpgrade = upgradeDAO.findLastUpgradeOrDowngradeForCluster(this.getClusterId()); + UpgradeEntity mostRecentUpgrade = upgradeDAO.findLastUpgradeOrDowngradeForCluster(getClusterId()); if (mostRecentUpgrade != null) { List<HostRoleStatus> UNFINISHED_STATUSES = new ArrayList(); UNFINISHED_STATUSES.add(HostRoleStatus.PENDING); @@ -1217,16 +1224,16 @@ public class ClusterImpl implements Cluster { @Override public ClusterVersionEntity getEffectiveClusterVersion() throws AmbariException { // This is not reliable. Need to find the last upgrade request. - UpgradeEntity upgradeInProgress = this.getUpgradeEntity(); + UpgradeEntity upgradeInProgress = getUpgradeEntity(); if (upgradeInProgress == null) { - return this.getCurrentClusterVersion(); + return getCurrentClusterVersion(); } String effectiveVersion = null; switch (upgradeInProgress.getUpgradeType()) { case NON_ROLLING: if (upgradeInProgress.getDirection() == Direction.UPGRADE) { - boolean pastChangingStack = this.isNonRollingUpgradePastUpgradingStack(upgradeInProgress); + boolean pastChangingStack = isNonRollingUpgradePastUpgradingStack(upgradeInProgress); effectiveVersion = pastChangingStack ? upgradeInProgress.getToVersion() : upgradeInProgress.getFromVersion(); } else { // Should be the lower value during a Downgrade. @@ -2521,6 +2528,7 @@ public class ClusterImpl implements Cluster { return serviceName; } + @Override public String getServiceByConfigType(String configType) { for (Entry<String, String> entry : serviceConfigTypes.entries()) { String serviceName = entry.getKey(); @@ -3639,4 +3647,60 @@ public class ClusterImpl implements Cluster { return false; } + + /** + * {@inheritDoc} + */ + @Override + public String getClusterProperty(String propertyName, String defaultValue) { + String cachedValue = m_clusterPropertyCache.get(propertyName); + if (null != cachedValue) { + return cachedValue; + } + + // start with the default + cachedValue = defaultValue; + + Config clusterEnv = getDesiredConfigByType(ConfigHelper.CLUSTER_ENV); + if (null != clusterEnv) { + Map<String, String> clusterEnvProperties = clusterEnv.getProperties(); + if (clusterEnvProperties.containsKey(propertyName)) { + String value = clusterEnvProperties.get(propertyName); + if (null != value) { + cachedValue = value; + } + } + } + + // cache the value and return it + m_clusterPropertyCache.put(propertyName, cachedValue); + return cachedValue; + } + + /** + * Gets whether the specified cluster property is already cached. + * + * @param propertyName + * the property to check. + * @return {@code true} if the property is cached. + */ + boolean isClusterPropertyCached(String propertyName) { + return m_clusterPropertyCache.containsKey(propertyName); + } + + /** + * Handles {@link ClusterConfigChangedEvent} which means that the + * {{cluster-env}} may have changed. + * + * @param event + * the change event. + */ + @Subscribe + public void handleClusterEnvConfigChangedEvent(ClusterConfigChangedEvent event) { + if (!StringUtils.equals(event.getConfigType(), ConfigHelper.CLUSTER_ENV)) { + return; + } + + m_clusterPropertyCache.clear(); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/5f3c6c11/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java index 6fa250d..f603896 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java @@ -45,6 +45,7 @@ import org.apache.ambari.server.state.AlertFirmness; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.Config; +import org.apache.ambari.server.state.ConfigHelper; import org.apache.ambari.server.state.RepositoryType; import org.apache.ambari.server.state.State; import org.apache.ambari.server.state.alert.AlertDefinition; @@ -181,6 +182,7 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog { addSettingPermission(); addManageUserPersistedDataPermission(); updateAMSConfigs(); + updateClusterEnv(); } private void createSettingTable() throws SQLException { @@ -851,4 +853,28 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog { addSequence("blueprint_setting_id_seq", 0L, false); } + + /** + * Updates {@code cluster-env} in the following ways: + * <ul> + * <li>Adds {@link ConfigHelper#CLUSTER_ENV_ALERT_REPEAT_TOLERANCE} = 1</li> + * </ul> + * + * @throws Exception + */ + protected void updateClusterEnv() throws AmbariException { + Map<String, String> propertyMap = new HashMap<>(); + propertyMap.put(ConfigHelper.CLUSTER_ENV_ALERT_REPEAT_TOLERANCE, "1"); + + AmbariManagementController ambariManagementController = injector.getInstance( + AmbariManagementController.class); + + Clusters clusters = ambariManagementController.getClusters(); + + Map<String, Cluster> clusterMap = getCheckedClusterMap(clusters); + for (final Cluster cluster : clusterMap.values()) { + updateConfigurationPropertiesForCluster(cluster, ConfigHelper.CLUSTER_ENV, propertyMap, true, + true); + } + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/5f3c6c11/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertResourceProviderTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertResourceProviderTest.java index 3427052..63cda40 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertResourceProviderTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertResourceProviderTest.java @@ -71,6 +71,7 @@ import org.apache.ambari.server.security.authorization.AuthorizationException; import org.apache.ambari.server.state.AlertState; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; +import org.apache.ambari.server.state.ConfigHelper; import org.apache.ambari.server.state.MaintenanceState; import org.easymock.EasyMock; import org.junit.After; @@ -119,6 +120,7 @@ public class AlertResourceProviderTest { expect(clusters.getCluster(capture(EasyMock.<String>newCapture()))).andReturn(cluster).atLeastOnce(); expect(cluster.getClusterId()).andReturn(1L).anyTimes(); expect(cluster.getResourceId()).andReturn(4L).anyTimes(); + expect(cluster.getClusterProperty(ConfigHelper.CLUSTER_ENV_ALERT_REPEAT_TOLERANCE, "1")).andReturn("1").atLeastOnce(); replay(m_amc, clusters, cluster); } http://git-wip-us.apache.org/repos/asf/ambari/blob/5f3c6c11/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertReceivedListenerTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertReceivedListenerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertReceivedListenerTest.java index f8a1f64..302c5fe 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertReceivedListenerTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertReceivedListenerTest.java @@ -754,4 +754,49 @@ public class AlertReceivedListenerTest { assertEquals(1, (long) allCurrent.get(0).getOccurrences()); assertEquals(AlertFirmness.HARD, allCurrent.get(0).getFirmness()); } + + /** + * Tests that we correctly record alert firmness, using the global value if + * the definition does not override it. + */ + @Test + public void testAlertFirmnessUsingGlobalValue() throws Exception { + String definitionName = ALERT_DEFINITION + "1"; + String serviceName = "HDFS"; + String componentName = "NAMENODE"; + String text = serviceName + " " + componentName + " is OK"; + + Alert alert = new Alert(definitionName, null, serviceName, componentName, HOST1, AlertState.OK); + alert.setCluster(m_cluster.getClusterName()); + alert.setLabel(ALERT_LABEL); + alert.setText(text); + alert.setTimestamp(1L); + + // fire the alert, and check that the new entry was created + AlertReceivedListener listener = m_injector.getInstance(AlertReceivedListener.class); + AlertReceivedEvent event = new AlertReceivedEvent(m_cluster.getClusterId(), alert); + listener.onAlertEvent(event); + + List<AlertCurrentEntity> allCurrent = m_dao.findCurrent(); + assertEquals(1, allCurrent.size()); + + // check occurrences (should be 1 since it's the first) + assertEquals(1, (long) allCurrent.get(0).getOccurrences()); + assertEquals(AlertFirmness.HARD, allCurrent.get(0).getFirmness()); + + // move the repeat tolerance to 2 on the definition, but leave it disabled + // so that we still use the global + AlertDefinitionEntity definition = allCurrent.get(0).getAlertHistory().getAlertDefinition(); + definition.setRepeatTolerance(2); + definition.setRepeatToleranceEnabled(false); + m_definitionDao.merge(definition); + + // change state to CRITICAL; this should make a HARD alert since the global + // value is in use + alert.setState(AlertState.CRITICAL); + listener.onAlertEvent(event); + allCurrent = m_dao.findCurrent(); + assertEquals(1, (long) allCurrent.get(0).getOccurrences()); + assertEquals(AlertFirmness.HARD, allCurrent.get(0).getFirmness()); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/5f3c6c11/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java index df2ef46..7c45ecd 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java @@ -20,6 +20,7 @@ package org.apache.ambari.server.state.cluster; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -55,6 +56,8 @@ import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.controller.ClusterResponse; import org.apache.ambari.server.controller.ConfigurationResponse; import org.apache.ambari.server.controller.ServiceConfigVersionResponse; +import org.apache.ambari.server.events.ClusterConfigChangedEvent; +import org.apache.ambari.server.events.publishers.AmbariEventPublisher; import org.apache.ambari.server.orm.GuiceJpaInitializer; import org.apache.ambari.server.orm.InMemoryDefaultTestModule; import org.apache.ambari.server.orm.OrmTestHelper; @@ -70,7 +73,6 @@ import org.apache.ambari.server.orm.entities.ClusterConfigEntity; import org.apache.ambari.server.orm.entities.ClusterConfigMappingEntity; import org.apache.ambari.server.orm.entities.ClusterEntity; import org.apache.ambari.server.orm.entities.ClusterServiceEntity; -import org.apache.ambari.server.orm.entities.ClusterStateEntity; import org.apache.ambari.server.orm.entities.ClusterVersionEntity; import org.apache.ambari.server.orm.entities.HostComponentStateEntity; import org.apache.ambari.server.orm.entities.HostEntity; @@ -88,6 +90,7 @@ import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.ComponentInfo; import org.apache.ambari.server.state.Config; import org.apache.ambari.server.state.ConfigFactory; +import org.apache.ambari.server.state.ConfigHelper; import org.apache.ambari.server.state.ConfigImpl; import org.apache.ambari.server.state.DesiredConfig; import org.apache.ambari.server.state.Host; @@ -108,10 +111,10 @@ import org.apache.ambari.server.state.configgroup.ConfigGroupFactory; import org.apache.ambari.server.state.fsm.InvalidStateTransitionException; import org.apache.ambari.server.state.host.HostHealthyHeartbeatEvent; import org.apache.ambari.server.state.host.HostRegistrationRequestEvent; +import org.apache.ambari.server.utils.EventBusSynchronizer; import org.apache.commons.lang.StringUtils; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -934,24 +937,6 @@ public class ClusterTest { } @Test - @Ignore - // Test clearly depends on a detached reference used to create - // in-memory objects. Based on the timeline this is a very old test with - // assertions that are not too meaningful. - public void testClusterRecovery() throws AmbariException { - ClusterEntity entity = createDummyData(); - ClusterStateEntity clusterStateEntity = new ClusterStateEntity(); - clusterStateEntity.setCurrentStack(entity.getDesiredStack()); - entity.setClusterStateEntity(clusterStateEntity); - ClusterImpl cluster = new ClusterImpl(entity, injector); - Service service = cluster.getService("HDFS"); - /* make sure the services are recovered */ - Assert.assertEquals("HDFS", service.getName()); - Map<String, Service> services = cluster.getServices(); - Assert.assertNotNull(services.get("HDFS")); - } - - @Test public void testConvertToResponse() throws Exception { createDefaultCluster(); @@ -2516,4 +2501,34 @@ public class ClusterTest { clusterConfigs = clusterDAO.getAllConfigurations(cluster.getClusterId(), newStackId); Assert.assertEquals(0, clusterConfigs.size()); } + + /** + * Tests that properties request from {@code cluster-env} are correctly cached + * and invalidated. + * + * @throws Exception + */ + @Test + public void testCachedClusterProperties() throws Exception { + EventBusSynchronizer.synchronizeAmbariEventPublisher(injector); + AmbariEventPublisher publisher = injector.getInstance(AmbariEventPublisher.class); + + createDefaultCluster(); + Cluster cluster = clusters.getCluster("c1"); + + assertFalse(((ClusterImpl) cluster).isClusterPropertyCached("foo")); + + String property = cluster.getClusterProperty("foo", "bar"); + assertEquals("bar", property); + + assertTrue(((ClusterImpl) cluster).isClusterPropertyCached("foo")); + + // cause a cache invalidation + ClusterConfigChangedEvent event = new ClusterConfigChangedEvent(cluster.getClusterName(), + ConfigHelper.CLUSTER_ENV, null, 1L); + + publisher.publish(event); + + assertFalse(((ClusterImpl) cluster).isClusterPropertyCached("foo")); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/5f3c6c11/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java index ea0547b..56dd033 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java @@ -172,7 +172,7 @@ public class UpgradeCatalog240Test { Capture<DBAccessor.DBColumnInfo> capturedRepeatToleranceEnabledColumnInfo = newCapture(); Capture<DBAccessor.DBColumnInfo> capturedOccurrencesColumnInfo = newCapture(); Capture<DBAccessor.DBColumnInfo> capturedFirmnessColumnInfo = newCapture(); - + dbAccessor.addColumn(eq(UpgradeCatalog240.ALERT_DEFINITION_TABLE), capture(capturedHelpURLColumnInfo)); dbAccessor.addColumn(eq(UpgradeCatalog240.ALERT_DEFINITION_TABLE), capture(capturedRepeatToleranceColumnInfo)); dbAccessor.addColumn(eq(UpgradeCatalog240.ALERT_DEFINITION_TABLE), capture(capturedRepeatToleranceEnabledColumnInfo)); @@ -302,8 +302,8 @@ public class UpgradeCatalog240Test { Assert.assertEquals(UpgradeCatalog240.ALERT_CURRENT_FIRMNESS_COLUMN, columnFirmnessInfo.getName()); Assert.assertEquals(String.class, columnFirmnessInfo.getType()); Assert.assertEquals(AlertFirmness.HARD.name(), columnFirmnessInfo.getDefaultValue()); - Assert.assertEquals(false, columnFirmnessInfo.isNullable()); - + Assert.assertEquals(false, columnFirmnessInfo.isNullable()); + assertEquals(expectedCaptures, actualCaptures); // Verify blueprint_setting columns @@ -330,6 +330,7 @@ public class UpgradeCatalog240Test { Method addManageUserPersistedDataPermission = UpgradeCatalog240.class.getDeclaredMethod("addManageUserPersistedDataPermission"); Method addSettingPermission = UpgradeCatalog240.class.getDeclaredMethod("addSettingPermission"); Method updateAmsConfigs = UpgradeCatalog240.class.getDeclaredMethod("updateAMSConfigs"); + Method updateClusterEnv = UpgradeCatalog240.class.getDeclaredMethod("updateClusterEnv"); Capture<String> capturedStatements = newCapture(CaptureType.ALL); @@ -342,6 +343,7 @@ public class UpgradeCatalog240Test { .addMockedMethod(addSettingPermission) .addMockedMethod(addManageUserPersistedDataPermission) .addMockedMethod(updateAmsConfigs) + .addMockedMethod(updateClusterEnv) .createMock(); Field field = AbstractUpgradeCatalog.class.getDeclaredField("dbAccessor"); @@ -352,6 +354,7 @@ public class UpgradeCatalog240Test { upgradeCatalog240.addSettingPermission(); upgradeCatalog240.addManageUserPersistedDataPermission(); upgradeCatalog240.updateAMSConfigs(); + upgradeCatalog240.updateClusterEnv(); replay(upgradeCatalog240, dbAccessor);
