Repository: ambari Updated Branches: refs/heads/trunk cbc34867b -> 7b41d3114
AMBARI-11775 - Threading Can Cause Aggregate Alerts To Have Duplicate Alert Entries With Different States (jonathanhurley) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/7b41d311 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/7b41d311 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/7b41d311 Branch: refs/heads/trunk Commit: 7b41d3114db74082119990d92b9d0f7760ee6261 Parents: cbc3486 Author: Jonathan Hurley <[email protected]> Authored: Sun Jun 7 21:16:53 2015 -0400 Committer: Jonathan Hurley <[email protected]> Committed: Mon Jun 8 07:48:26 2015 -0400 ---------------------------------------------------------------------- .../alerts/AlertAggregateListener.java | 41 ++++- .../listeners/alerts/AlertReceivedListener.java | 7 - .../ambari/server/orm/dao/AlertSummaryDTO.java | 50 ++++++ .../state/alert/AggregateDefinitionMapping.java | 2 +- .../alerts/AggregateAlertListenerTest.java | 167 +++++++++++++++++++ 5 files changed, 255 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/7b41d311/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertAggregateListener.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertAggregateListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertAggregateListener.java index 4d2add1..ac2f907 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertAggregateListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertAggregateListener.java @@ -19,6 +19,8 @@ package org.apache.ambari.server.events.listeners.alerts; import java.text.MessageFormat; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.ambari.server.EagerSingleton; import org.apache.ambari.server.events.AggregateAlertRecalculateEvent; @@ -35,9 +37,11 @@ import org.apache.ambari.server.state.alert.AggregateSource; import org.apache.ambari.server.state.alert.AlertDefinition; import org.apache.ambari.server.state.alert.Reporting; import org.apache.ambari.server.state.alert.SourceType; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.eventbus.AllowConcurrentEvents; import com.google.common.eventbus.Subscribe; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -70,6 +74,13 @@ public class AlertAggregateListener { private final AlertEventPublisher m_publisher; /** + * A cache used to store the last state and text of an aggregate alert. We + * shouldn't need to fire new aggregate alerts unless the state or text has + * changed. + */ + private Map<String, Alert> m_alertCache = new ConcurrentHashMap<String, Alert>(); + + /** * Used for quick lookups of aggregate alerts. */ @Inject @@ -122,6 +133,9 @@ public class AlertAggregateListener { /** * Calculates the aggregate alert state if there is an aggregate alert for the * specified alert. + * <p/> + * This method should not be decoratd with {@link AllowConcurrentEvents} since + * it would need extra locking around {@link #m_alertCache}. * * @param clusterId * the ID of the cluster. @@ -187,10 +201,29 @@ public class AlertAggregateListener { } } - // make a new event and allow others to consume it - AlertReceivedEvent aggEvent = new AlertReceivedEvent(clusterId, - aggregateAlert); + // now that the alert has been created, see if we need to send it; only send + // alerts if the state or the text has changed + boolean sendAlertEvent = true; + Alert cachedAlert = m_alertCache.get(aggregateAlert.getName()); + if (null != cachedAlert) { + AlertState cachedState = cachedAlert.getState(); + AlertState alertState = aggregateAlert.getState(); + String cachedText = cachedAlert.getText(); + String alertText = aggregateAlert.getText(); + + if (cachedState == alertState && StringUtils.equals(cachedText, alertText)) { + sendAlertEvent = false; + } + } + + // update the cache + m_alertCache.put(aggregateAlert.getName(), aggregateAlert); - m_publisher.publish(aggEvent); + // make a new event and allow others to consume it, but only if the + // aggregate has changed + if (sendAlertEvent) { + AlertReceivedEvent aggEvent = new AlertReceivedEvent(clusterId, aggregateAlert); + m_publisher.publish(aggEvent); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/7b41d311/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java index 41dd1d5..f28929a 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java @@ -17,10 +17,6 @@ */ package org.apache.ambari.server.events.listeners.alerts; -import java.util.List; -import java.util.Map; -import java.util.Set; - import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.EagerSingleton; import org.apache.ambari.server.controller.RootServiceResponseFactory.Services; @@ -39,10 +35,7 @@ import org.apache.ambari.server.state.Alert; import org.apache.ambari.server.state.AlertState; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; -import org.apache.ambari.server.state.Host; import org.apache.ambari.server.state.MaintenanceState; -import org.apache.ambari.server.state.Service; -import org.apache.ambari.server.state.ServiceComponentHost; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/ambari/blob/7b41d311/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertSummaryDTO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertSummaryDTO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertSummaryDTO.java index f56875f..0023def 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertSummaryDTO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertSummaryDTO.java @@ -78,4 +78,54 @@ public class AlertSummaryDTO { public int getMaintenanceCount() { return maintenanceCount; } + + /** + * Sets the count of {@link AlertState#OK} states. + * + * @param okCount + * the okCount to set + */ + public void setOkCount(int okCount) { + this.okCount = okCount; + } + + /** + * Sets the count of {@link AlertState#WARNING} states. + * + * @param warningCount + * the warningCount to set + */ + public void setWarningCount(int warningCount) { + this.warningCount = warningCount; + } + + /** + * Sets the count of {@link AlertState#CRITICAL} states. + * + * @param criticalCount + * the criticalCount to set + */ + public void setCriticalCount(int criticalCount) { + this.criticalCount = criticalCount; + } + + /** + * Sets the count of {@link AlertState#UNKNOWN} states. + * + * @param unknownCount + * the unknownCount to set + */ + public void setUnknownCount(int unknownCount) { + this.unknownCount = unknownCount; + } + + /** + * Sets the count of alerts in maintenance state. + * + * @param maintenanceCount + * the maintenanceCount to set + */ + public void setMaintenanceCount(int maintenanceCount) { + this.maintenanceCount = maintenanceCount; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/7b41d311/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java index 104ebef..21ad99b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java @@ -32,7 +32,7 @@ import com.google.inject.Singleton; * associated with them. */ @Singleton -public final class AggregateDefinitionMapping { +public class AggregateDefinitionMapping { /** * In-memory mapping of cluster ID to definition name / aggregate definition. * This is used for fast lookups when receiving events. http://git-wip-us.apache.org/repos/asf/ambari/blob/7b41d311/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AggregateAlertListenerTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AggregateAlertListenerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AggregateAlertListenerTest.java new file mode 100644 index 0000000..29969d6 --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AggregateAlertListenerTest.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.state.alerts; + +import junit.framework.Assert; + +import org.apache.ambari.server.events.AlertReceivedEvent; +import org.apache.ambari.server.events.AlertStateChangeEvent; +import org.apache.ambari.server.events.MockEventListener; +import org.apache.ambari.server.events.listeners.alerts.AlertAggregateListener; +import org.apache.ambari.server.orm.GuiceJpaInitializer; +import org.apache.ambari.server.orm.InMemoryDefaultTestModule; +import org.apache.ambari.server.orm.dao.AlertSummaryDTO; +import org.apache.ambari.server.orm.dao.AlertsDAO; +import org.apache.ambari.server.orm.entities.AlertCurrentEntity; +import org.apache.ambari.server.state.Alert; +import org.apache.ambari.server.state.alert.AggregateDefinitionMapping; +import org.apache.ambari.server.state.alert.AggregateSource; +import org.apache.ambari.server.state.alert.AlertDefinition; +import org.apache.ambari.server.state.alert.Reporting; +import org.apache.ambari.server.state.alert.Reporting.ReportTemplate; +import org.apache.ambari.server.utils.EventBusSynchronizer; +import org.easymock.EasyMock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.inject.Binder; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.persist.PersistService; +import com.google.inject.util.Modules; + +/** + * Tests the {@link AlertAggregateListener}. + */ +public class AggregateAlertListenerTest { + + private Injector m_injector; + private MockEventListener m_listener; + private AlertsDAO m_alertsDao; + private AggregateDefinitionMapping m_aggregateMapping; + + /** + * + */ + @Before + public void setup() throws Exception { + m_injector = Guice.createInjector(Modules.override( + new InMemoryDefaultTestModule()).with(new MockModule())); + + m_injector.getInstance(GuiceJpaInitializer.class); + m_listener = m_injector.getInstance(MockEventListener.class); + + m_alertsDao = m_injector.getInstance(AlertsDAO.class); + + // !!! need a synchronous op for testing + EventBusSynchronizer.synchronizeAlertEventPublisher(m_injector).register(m_listener); + EventBusSynchronizer.synchronizeAmbariEventPublisher(m_injector).register(m_listener); + } + + /** + * @throws Exception + */ + @After + public void teardown() throws Exception { + m_injector.getInstance(PersistService.class).stop(); + m_injector = null; + } + + /** + * Tests that the {@link AlertAggregateListener} caches values of the + * aggregates and only triggers events when needed. + * + * @throws Exception + */ + @Test + public void testAlertNoticeCreationFromEvent() throws Exception { + AlertCurrentEntity currentEntityMock = EasyMock.createNiceMock(AlertCurrentEntity.class); + + // setup the mocks for the aggregate definition to avoid NPEs + AlertDefinition aggregateDefinition = new AlertDefinition(); + aggregateDefinition.setName("mock-aggregate-alert"); + AggregateSource aggregateSource = new AggregateSource(); + aggregateSource.setAlertName("mock-aggregate-alert"); + Reporting reporting = new Reporting(); + ReportTemplate criticalTemplate = new ReportTemplate(); + ReportTemplate okTemplate = new ReportTemplate(); + criticalTemplate.setValue(.05); + criticalTemplate.setText("CRITICAL"); + okTemplate.setText("OK"); + reporting.setCritical(criticalTemplate); + reporting.setWarning(criticalTemplate); + reporting.setOk(okTemplate); + aggregateSource.setReporting(reporting); + aggregateDefinition.setSource(aggregateSource); + + EasyMock.expect( + m_aggregateMapping.getAggregateDefinition(EasyMock.anyLong(), EasyMock.eq("mock-alert"))).andReturn( + aggregateDefinition).atLeastOnce(); + + AlertSummaryDTO summaryDTO = new AlertSummaryDTO(5,0,0,0,0); + EasyMock.expect( + m_alertsDao.findAggregateCounts(EasyMock.anyLong(), EasyMock.eq("mock-aggregate-alert"))).andReturn( + summaryDTO).atLeastOnce(); + + EasyMock.replay(m_alertsDao, m_aggregateMapping); + + // check that we're starting at 0 + Assert.assertEquals(0, m_listener.getAlertEventReceivedCount(AlertReceivedEvent.class)); + + // trigger an alert which will trigger the aggregate + Alert alert = new Alert("mock-alert", null, null, null, null, null); + AlertAggregateListener aggregateListener = m_injector.getInstance(AlertAggregateListener.class); + AlertStateChangeEvent event = new AlertStateChangeEvent(0, alert, currentEntityMock, null); + aggregateListener.onAlertStateChangeEvent(event); + + // verify that one AlertReceivedEvent was fired (it's the one the listener + // creates for the aggregate) + Assert.assertEquals(1, m_listener.getAlertEventReceivedCount(AlertReceivedEvent.class)); + + // fire the same alert event again; the cache in the aggregate listener + // should prevent it from firing a new alert received event of its own + aggregateListener.onAlertStateChangeEvent(event); + + // check that we're still at 1 + Assert.assertEquals(1, m_listener.getAlertEventReceivedCount(AlertReceivedEvent.class)); + + // now change the returned summary DTO so that a new alert will get generated + summaryDTO.setOkCount(0); + summaryDTO.setCriticalCount(5); + aggregateListener.onAlertStateChangeEvent(event); + Assert.assertEquals(2, m_listener.getAlertEventReceivedCount(AlertReceivedEvent.class)); + } + + /** + * + */ + private class MockModule implements Module { + /** + * {@inheritDoc} + */ + @Override + public void configure(Binder binder) { + m_alertsDao = EasyMock.createMock(AlertsDAO.class); + m_aggregateMapping = EasyMock.createMock(AggregateDefinitionMapping.class); + binder.bind(AlertsDAO.class).toInstance(m_alertsDao); + binder.bind(AggregateDefinitionMapping.class).toInstance(m_aggregateMapping); + } + } +}
