Repository: ambari Updated Branches: refs/heads/branch-alerts-dev f1018b176 -> ba325d083
AMBARI-7391 - Alerts: Scan Database For Pending AlertNoticeEntity (jonathanhurley) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/ba325d08 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/ba325d08 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/ba325d08 Branch: refs/heads/branch-alerts-dev Commit: ba325d0831e9816992849490766e511450ef7f3a Parents: f1018b1 Author: Jonathan Hurley <[email protected]> Authored: Thu Sep 18 11:20:44 2014 -0400 Committer: Jonathan Hurley <[email protected]> Committed: Thu Sep 18 15:21:26 2014 -0400 ---------------------------------------------------------------------- ambari-server/pom.xml | 10 + .../ambari/server/controller/AmbariServer.java | 12 + .../server/controller/ControllerModule.java | 22 ++ .../events/listeners/AlertReceivedListener.java | 7 +- .../listeners/AlertServiceStateListener.java | 20 +- .../listeners/AlertStateChangedListener.java | 4 +- .../events/publishers/AlertEventPublisher.java | 14 +- .../server/notifications/DispatchCallback.java | 50 ++++ .../server/notifications/DispatchFactory.java | 62 +++++ .../server/notifications/DispatchRunnable.java | 60 +++++ .../server/notifications/Notification.java | 57 +++++ .../notifications/NotificationDispatcher.java | 47 ++++ .../dispatchers/EmailDispatcher.java | 60 +++++ .../server/orm/dao/AlertDefinitionDAO.java | 15 +- .../ambari/server/orm/dao/AlertDispatchDAO.java | 69 +++++- .../server/orm/entities/AlertCurrentEntity.java | 38 +-- .../server/orm/entities/AlertGroupEntity.java | 3 +- .../server/orm/entities/AlertNoticeEntity.java | 2 + .../ambari/server/state/alert/TargetType.java | 15 +- .../server/state/cluster/ClusterImpl.java | 7 +- .../services/AlertNoticeDispatchService.java | 231 +++++++++++++++++++ .../apache/ambari/server/orm/OrmTestHelper.java | 37 +++ .../server/orm/dao/AlertDefinitionDAOTest.java | 11 +- .../server/orm/dao/AlertDispatchDAOTest.java | 40 +++- 24 files changed, 847 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-server/pom.xml b/ambari-server/pom.xml index b07afbf..d83db61 100644 --- a/ambari-server/pom.xml +++ b/ambari-server/pom.xml @@ -1418,6 +1418,16 @@ <artifactId>velocity</artifactId> <version>1.7</version> </dependency> + <dependency> + <groupId>com.sun.mail</groupId> + <artifactId>mailapi</artifactId> + <version>1.5.2</version> + </dependency> + <dependency> + <groupId>com.sun.mail</groupId> + <artifactId>smtp</artifactId> + <version>1.5.2</version> + </dependency> </dependencies> <!--<reporting> <plugins> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>findbugs-maven-plugin</artifactId> <version>2.5.2</version> </plugin> http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java index 7354706..57b678a 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java @@ -113,10 +113,12 @@ import org.springframework.web.context.WebApplicationContext; import org.springframework.web.context.support.GenericWebApplicationContext; import org.springframework.web.filter.DelegatingFilterProxy; +import com.google.common.util.concurrent.ServiceManager; import com.google.gson.Gson; import com.google.inject.Guice; import com.google.inject.Inject; import com.google.inject.Injector; +import com.google.inject.Scopes; import com.google.inject.Singleton; import com.google.inject.name.Named; import com.google.inject.persist.Transactional; @@ -147,6 +149,13 @@ public class AmbariServer { @Inject @Named("dbInitNeeded") boolean dbInitNeeded; + + /** + * Guava service manager singleton (bound with {@link Scopes#SINGLETON}). + */ + @Inject + private ServiceManager serviceManager; + /** * The singleton view registry. */ @@ -473,6 +482,9 @@ public class AmbariServer { executionScheduleManager.start(); LOG.info("********* Started Scheduled Request Manager **********"); + serviceManager.startAsync(); + LOG.info("********* Started Services **********"); + server.join(); LOG.info("Joined the Server"); } catch (BadPaddingException bpe){ http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java index 349e465..cfab1f8 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java @@ -33,9 +33,11 @@ import static org.eclipse.persistence.config.PersistenceUnitProperties.JDBC_USER import static org.eclipse.persistence.config.PersistenceUnitProperties.THROW_EXCEPTIONS; import java.security.SecureRandom; +import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; +import java.util.Set; import org.apache.ambari.server.actionmanager.ActionDBAccessor; import org.apache.ambari.server.actionmanager.ActionDBAccessorImpl; @@ -88,6 +90,7 @@ import org.apache.ambari.server.state.host.HostImpl; import org.apache.ambari.server.state.scheduler.RequestExecution; import org.apache.ambari.server.state.scheduler.RequestExecutionFactory; import org.apache.ambari.server.state.scheduler.RequestExecutionImpl; +import org.apache.ambari.server.state.services.AlertNoticeDispatchService; import org.apache.ambari.server.state.svccomphost.ServiceComponentHostImpl; import org.apache.ambari.server.view.ViewInstanceHandlerList; import org.eclipse.jetty.server.SessionIdManager; @@ -98,6 +101,7 @@ import org.springframework.security.crypto.password.PasswordEncoder; import org.springframework.security.crypto.password.StandardPasswordEncoder; import org.springframework.web.filter.DelegatingFilterProxy; +import com.google.common.util.concurrent.ServiceManager; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.inject.AbstractModule; @@ -226,6 +230,7 @@ public class ControllerModule extends AbstractModule { requestStaticInjection(ExecutionCommandWrapper.class); + bindServices(); bindEagerSingletons(); } @@ -306,6 +311,23 @@ public class ControllerModule extends AbstractModule { } /** + * Bind all {@link com.google.common.util.concurrent.Service} singleton + * instances and then register them with a singleton {@link ServiceManager}. + */ + private void bindServices() { + Set<com.google.common.util.concurrent.Service> services = new HashSet<com.google.common.util.concurrent.Service>(); + + AlertNoticeDispatchService alertNoticeDispatchService = new AlertNoticeDispatchService(); + + bind(AlertNoticeDispatchService.class).toInstance( + alertNoticeDispatchService); + + services.add(alertNoticeDispatchService); + ServiceManager manager = new ServiceManager(services); + bind(ServiceManager.class).toInstance(manager); + } + + /** * Initializes all eager singletons that should be instantiated as soon as * possible and not wait for injection. */ http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java index 519ceeb..0f1b49a 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java @@ -113,6 +113,10 @@ public class AlertReceivedListener { AlertHistoryEntity history = createHistory(clusterId, current.getAlertHistory().getAlertDefinition(), alert); + // manually create the new history entity since we are merging into + // an existing current entity + m_alertsDao.create(history); + current.setAlertHistory(history); current.setLatestTimestamp(Long.valueOf(alert.getTimestamp())); current.setOriginalTimestamp(Long.valueOf(alert.getTimestamp())); @@ -121,7 +125,8 @@ public class AlertReceivedListener { // broadcast the alert changed event for other subscribers AlertStateChangeEvent alertChangedEvent = new AlertStateChangeEvent( - event.getClusterId(), event.getAlert(), history, oldState); + event.getClusterId(), event.getAlert(), current.getAlertHistory(), + oldState); m_alertEventPublisher.publish(alertChangedEvent); } http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertServiceStateListener.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertServiceStateListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertServiceStateListener.java index 906d52d..f1ce617 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertServiceStateListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertServiceStateListener.java @@ -113,6 +113,17 @@ public class AlertServiceStateListener { String stackVersion = event.getStackVersion(); String serviceName = event.getServiceName(); + // create the default alert group for the new service; this MUST be done + // before adding definitions so that they are properly added to the + // default group + AlertGroupEntity serviceAlertGroup = new AlertGroupEntity(); + serviceAlertGroup.setClusterId(clusterId); + serviceAlertGroup.setDefault(true); + serviceAlertGroup.setGroupName(serviceName); + serviceAlertGroup.setServiceName(serviceName); + + m_alertDispatchDao.create(serviceAlertGroup); + // populate alert definitions for the new service from the database, but // don't worry about sending down commands to the agents; the host // components are not yet bound to the hosts so we'd have no way of knowing @@ -134,14 +145,5 @@ public class AlertServiceStateListener { serviceName); LOG.error(message, ae); } - - // create the default alert group for the new service - AlertGroupEntity serviceAlertGroup = new AlertGroupEntity(); - serviceAlertGroup.setClusterId(clusterId); - serviceAlertGroup.setDefault(true); - serviceAlertGroup.setGroupName(serviceName); - serviceAlertGroup.setServiceName(serviceName); - - m_alertDispatchDao.create(serviceAlertGroup); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertStateChangedListener.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertStateChangedListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertStateChangedListener.java index ac22f88..b2f08d7 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertStateChangedListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertStateChangedListener.java @@ -19,6 +19,7 @@ package org.apache.ambari.server.events.listeners; import java.util.List; import java.util.Set; +import java.util.UUID; import org.apache.ambari.server.events.AlertStateChangeEvent; import org.apache.ambari.server.events.publishers.AlertEventPublisher; @@ -89,11 +90,12 @@ public class AlertStateChangedListener { for (AlertTargetEntity target : targets) { AlertNoticeEntity notice = new AlertNoticeEntity(); + notice.setUuid(UUID.randomUUID().toString()); notice.setAlertTarget(target); notice.setAlertHistory(event.getNewHistoricalEntry()); notice.setNotifyState(NotificationState.PENDING); - m_alertsDispatchDao.merge(notice); + m_alertsDispatchDao.create(notice); } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java index f1a0b3e..006837a 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java @@ -17,8 +17,10 @@ */ package org.apache.ambari.server.events.publishers; -import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ambari.server.events.AlertEvent; @@ -48,8 +50,14 @@ public final class AlertEventPublisher { * Constructor. */ public AlertEventPublisher() { - m_eventBus = new AsyncEventBus(Executors.newFixedThreadPool(2, - new AlertEventBusThreadFactory())); + // create a fixed executor that is unbounded for now and will run rejected + // requests in the calling thread to prevent loss of alert handling + ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 0L, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), + new AlertEventBusThreadFactory(), + new ThreadPoolExecutor.CallerRunsPolicy()); + + m_eventBus = new AsyncEventBus(executor); } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchCallback.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchCallback.java b/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchCallback.java new file mode 100644 index 0000000..e4e944d --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchCallback.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.notifications; + +import java.util.List; + +/** + * The {@link DispatchCallback} interface is used by an {@link NotificationDispatcher} to + * callback to an interested party when a {@link Notification} delivery is + * attempted. + * <p/> + * This class can be used to let the caller know the success or failure status + * of the outbound {@link Notification}. + */ +public interface DispatchCallback { + + /** + * Invoked when a {@link Notification} was successfully dispatched. + * + * @param callbackIds + * a list of unique IDs that the caller can use to identify the + * {@link Notification} that was dispatched. + */ + public void onSuccess(List<String> callbackIds); + + /** + * Invoked when a {@link Notification} failed to be dispatched. + * + * @param callbackIds + * a list of unique IDs that the caller can use to identify the + * {@link Notification} that was dispatched. + */ + public void onFailure(List<String> callbackIds); + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchFactory.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchFactory.java new file mode 100644 index 0000000..3414035 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchFactory.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.notifications; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.ambari.server.notifications.dispatchers.EmailDispatcher; + +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.Singleton; + +/** + * The {@link DispatchFactory} is used to provide singleton instances of + * {@link NotificationDispatcher} based on a supplied type. + */ +@Singleton +public final class DispatchFactory { + + /** + * Mapping of dispatch type to dispatcher singleton. + */ + private final Map<String, NotificationDispatcher> m_dispatchers = new HashMap<String, NotificationDispatcher>(); + + /** + * Constructor. + * + */ + @Inject + public DispatchFactory(Injector injector) { + EmailDispatcher emailDispatcher = injector.getInstance(EmailDispatcher.class); + + m_dispatchers.put(emailDispatcher.getType(), emailDispatcher); + } + + /** + * Gets a dispatcher based on the type. + * + * @param type + * the type (not {@code null}). + * @return a dispatcher instance, or {@code null} if none. + */ + public NotificationDispatcher getDispatcher(String type) { + return m_dispatchers.get(type); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchRunnable.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchRunnable.java b/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchRunnable.java new file mode 100644 index 0000000..dcc2103 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchRunnable.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.notifications; + +import java.util.concurrent.Executor; + +/** + * The {@link DispatchRunnable} class is a simple {@link Runnable} that can be + * used to pass a {@link Notification} to an {@link NotificationDispatcher} via an + * {@link Executor}. + */ +public final class DispatchRunnable implements Runnable { + + /** + * The dispatcher to dispatch to. + */ + private final NotificationDispatcher m_dispatcher; + + /** + * The notification to dispatch. + */ + private final Notification m_notification; + + /** + * Constructor. + * + * @param dispatcher + * the dispatcher to dispatch to (not {@code null}). + * @param notification + * the notification to dispatch (not {@code null}). + */ + public DispatchRunnable(NotificationDispatcher dispatcher, Notification notification) { + m_dispatcher = dispatcher; + m_notification = notification; + } + + /** + * {@inheritDoc} + */ + @Override + public void run() { + m_dispatcher.dispatch(m_notification); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/notifications/Notification.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/notifications/Notification.java b/ambari-server/src/main/java/org/apache/ambari/server/notifications/Notification.java new file mode 100644 index 0000000..08c5242 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/notifications/Notification.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.notifications; + +import java.util.List; + +/** + * The {@link Notification} class is a generic way to relay content through an + * {@link NotificationDispatcher}. + */ +public class Notification { + + /** + * + */ + public String Subject; + + /** + * The main content of the notification. + */ + public String Body; + + /** + * An optional callback implementation that the dispatcher can use to report + * success/failure on delivery. + */ + public DispatchCallback Callback; + + /** + * An option list of unique IDs that will identify the origins of this + * notification. + */ + public List<String> CallbackIds; + + /** + * Constructor. + * + */ + public Notification() { + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/notifications/NotificationDispatcher.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/notifications/NotificationDispatcher.java b/ambari-server/src/main/java/org/apache/ambari/server/notifications/NotificationDispatcher.java new file mode 100644 index 0000000..10946be --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/notifications/NotificationDispatcher.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.notifications; + + +/** + * The {@link NotificationDispatcher} interface represents a mechanism for dispatching a + * {@link Notification}. + * <p/> + * Dispatchers should, in general, be singletons. They should also invoke the + * appropriate methods on {@link Notification#Callback} to indicate a success or + * failure during dispatch. + */ +public interface NotificationDispatcher { + + /** + * Gets the type of dispatcher. The type of each different dispatcher should + * be unique. + * + * @return the dispatcher's type (never {@code null}). + */ + public String getType(); + + /** + * Dispatches the specified notification. + * + * @param notification + * the notificationt to dispatch (not {@code null}). + */ + public void dispatch(Notification notification); + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/notifications/dispatchers/EmailDispatcher.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/notifications/dispatchers/EmailDispatcher.java b/ambari-server/src/main/java/org/apache/ambari/server/notifications/dispatchers/EmailDispatcher.java new file mode 100644 index 0000000..d0858d3 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/notifications/dispatchers/EmailDispatcher.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.notifications.dispatchers; + +import org.apache.ambari.server.notifications.NotificationDispatcher; +import org.apache.ambari.server.notifications.Notification; +import org.apache.ambari.server.state.alert.TargetType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Singleton; + +/** + * The {@link EmailDispatcher} class is used to dispatch {@link Notification} + * via JavaMail. + */ +@Singleton +public class EmailDispatcher implements NotificationDispatcher { + + /** + * Logger. + */ + private static final Logger LOG = LoggerFactory.getLogger(EmailDispatcher.class); + + /** + * {@inheritDoc} + */ + @Override + public String getType() { + return TargetType.EMAIL.name(); + } + + /** + * {@inheritDoc} + */ + @Override + public void dispatch(Notification notification) { + LOG.info("Sending email: {}", notification); + + // callback to inform the interested parties about the successful dispatch + if (null != notification.Callback) { + notification.Callback.onSuccess(notification.CallbackIds); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java index f410c39..570f268 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java @@ -26,6 +26,7 @@ import javax.persistence.TypedQuery; import org.apache.ambari.server.controller.RootServiceResponseFactory; import org.apache.ambari.server.orm.entities.AlertDefinitionEntity; +import org.apache.ambari.server.orm.entities.AlertGroupEntity; import org.apache.ambari.server.state.alert.Scope; import com.google.inject.Inject; @@ -125,7 +126,7 @@ public class AlertDefinitionDAO { /** * Gets all of the alert definitions for the list of IDs given. - * + * * @param definitionIds * the IDs of the definitions to retrieve. * @return the definition or an empty list (never {@code null}). @@ -246,7 +247,9 @@ public class AlertDefinitionDAO { } /** - * Persists a new alert definition. + * Persists a new alert definition, also creating the associated + * {@link AlertGroupEntity} relationship for the definition's service default + * group. * * @param alertDefinition * the definition to persist (not {@code null}). @@ -254,6 +257,13 @@ public class AlertDefinitionDAO { @Transactional public void create(AlertDefinitionEntity alertDefinition) { entityManagerProvider.get().persist(alertDefinition); + + AlertGroupEntity group = dispatchDao.findDefaultServiceGroup(alertDefinition.getServiceName()); + + if (null != group) { + group.addAlertDefinition(alertDefinition); + dispatchDao.merge(group); + } } /** @@ -305,7 +315,6 @@ public class AlertDefinitionDAO { */ @Transactional public void remove(AlertDefinitionEntity alertDefinition) { - alertDefinition = merge(alertDefinition); dispatchDao.removeNoticeByDefinitionId(alertDefinition.getDefinitionId()); alertsDao.removeByDefinitionId(alertDefinition.getDefinitionId()); http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDispatchDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDispatchDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDispatchDAO.java index 6d4d19b..2239c8f 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDispatchDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDispatchDAO.java @@ -26,6 +26,7 @@ import org.apache.ambari.server.orm.entities.AlertDefinitionEntity; import org.apache.ambari.server.orm.entities.AlertGroupEntity; import org.apache.ambari.server.orm.entities.AlertNoticeEntity; import org.apache.ambari.server.orm.entities.AlertTargetEntity; +import org.apache.ambari.server.state.NotificationState; import org.apache.ambari.server.state.alert.AlertGroup; import com.google.inject.Inject; @@ -101,6 +102,37 @@ public class AlertDispatchDAO { } /** + * Gets a notification with the specified UUID. + * + * @param uuid + * the UUID of the notification to retrieve. + * @return the notification or {@code null} if none exists. + */ + public AlertNoticeEntity findNoticeByUuid(String uuid) { + TypedQuery<AlertNoticeEntity> query = entityManagerProvider.get().createNamedQuery( + "AlertNoticeEntity.findByUuid", AlertNoticeEntity.class); + + query.setParameter("uuid", uuid); + + return daoUtils.selectOne(query); + } + + /** + * Gets all {@link AlertNoticeEntity} instances that are + * {@link NotificationState#PENDING} and not yet dispatched. + * + * @return the notices that are waiting to be dispatched, or an empty list + * (never {@code null}). + */ + public List<AlertNoticeEntity> findPendingNotices() { + TypedQuery<AlertNoticeEntity> query = entityManagerProvider.get().createNamedQuery( + "AlertNoticeEntity.findByState", AlertNoticeEntity.class); + + query.setParameter("notifyState", NotificationState.PENDING); + return daoUtils.selectList(query); + } + + /** * Gets an alert group with the specified name across all clusters. Alert * group names are unique within a cluster. * @@ -195,12 +227,13 @@ public class AlertDispatchDAO { /** * Gets all of the {@link AlertGroup} instances that include the specified - * alert definition. + * alert definition. Service default groups will also be returned. * * @param definitionEntity * the definition that the group must include (not {@code null}). * @return all alert groups that have an association with the specified - * definition or empty list if none exist (never {@code null}). + * definition and the definition's service default group or empty list + * if none exist (never {@code null}). */ public List<AlertGroupEntity> findGroupsByDefinition( AlertDefinitionEntity definitionEntity) { @@ -214,6 +247,23 @@ public class AlertDispatchDAO { } /** + * Gets the default group for the specified service. + * + * @param serviceName + * the name of the service (not {@code null}). + * @return the default group, or {@code null} if the service name is not valid + * for an installed service; otherwise {@code null} should not be + * possible. + */ + public AlertGroupEntity findDefaultServiceGroup(String serviceName) { + TypedQuery<AlertGroupEntity> query = entityManagerProvider.get().createNamedQuery( + "AlertGroupEntity.findServiceDefaultGroup", AlertGroupEntity.class); + + query.setParameter("serviceName", serviceName); + return daoUtils.selectSingle(query); + } + + /** * Gets all alert notifications stored in the database. * * @return all alert notifications or empty list if none exist (never @@ -289,6 +339,21 @@ public class AlertDispatchDAO { } /** + * Removes all {@link AlertDefinitionEntity} that are associated with the + * specified cluster ID. + * + * @param clusterId + * the cluster ID. + */ + @Transactional + public void removeAllGroups(long clusterId) { + List<AlertGroupEntity> groups = findAllGroups(clusterId); + for (AlertGroupEntity group : groups) { + remove(group); + } + } + + /** * Persists new alert targets. * * @param entities http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java index 8db89f7..c24ac17 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java @@ -69,14 +69,15 @@ public class AlertCurrentEntity { @Column(name = "original_timestamp", nullable = false) private Long originalTimestamp; - + @Column(name = "latest_text", length = 4000) private String latestText = null; /** * Unidirectional one-to-one association to {@link AlertHistoryEntity} */ - @OneToOne(cascade = { CascadeType.PERSIST }) + @OneToOne(cascade = { CascadeType.PERSIST, CascadeType.MERGE, + CascadeType.REFRESH }) @JoinColumn(name = "history_id", unique = true, nullable = false) private AlertHistoryEntity alertHistory; @@ -95,7 +96,7 @@ public class AlertCurrentEntity { /** * Gets the unique ID for this current alert. - * + * * @return the ID (never {@code null}). */ public Long getAlertId() { @@ -104,7 +105,7 @@ public class AlertCurrentEntity { /** * Sets the unique ID for this current alert. - * + * * @param alertId * the ID (not {@code null}). */ @@ -115,7 +116,7 @@ public class AlertCurrentEntity { /** * Gets the time, in millis, that the last instance of this alert state was * received. - * + * * @return the time of the most recently received alert data for this instance * (never {@code null}). */ @@ -126,7 +127,7 @@ public class AlertCurrentEntity { /** * Sets the time, in millis, that the last instance of this alert state was * received. - * + * * @param latestTimestamp * the time of the most recently received alert data for this * instance (never {@code null}). @@ -137,7 +138,7 @@ public class AlertCurrentEntity { /** * Gets the current maintenance state for the alert. - * + * * @return the current maintenance state (never {@code null}). */ public MaintenanceState getMaintenanceState() { @@ -146,7 +147,7 @@ public class AlertCurrentEntity { /** * Sets the current maintenance state for the alert. - * + * * @param maintenanceState * the state to set (not {@code null}). */ @@ -157,7 +158,7 @@ public class AlertCurrentEntity { /** * Gets the time, in milliseconds, when the alert was first received with the * current state. - * + * * @return the time of the first instance of this alert. */ public Long getOriginalTimestamp() { @@ -167,14 +168,14 @@ public class AlertCurrentEntity { /** * Sets the time, in milliseconds, when the alert was first received with the * current state. - * + * * @param originalTimestamp * the time of the first instance of this alert (not {@code null}). */ public void setOriginalTimestamp(Long originalTimestamp) { this.originalTimestamp = originalTimestamp; } - + /** * Gets the latest text for this alert. History will not get a new record on * update when the state is the same, but the text may be changed. For example, @@ -183,7 +184,7 @@ public class AlertCurrentEntity { public String getLatestText() { return latestText; } - + /** * Sets the latest text. {@link #getLatestText()} */ @@ -194,7 +195,7 @@ public class AlertCurrentEntity { /** * Gets the associated {@link AlertHistoryEntity} entry for this current alert * instance. - * + * * @return the most recently received history entry (never {@code null}). */ public AlertHistoryEntity getAlertHistory() { @@ -204,7 +205,7 @@ public class AlertCurrentEntity { /** * Gets the associated {@link AlertHistoryEntity} entry for this current alert * instance. - * + * * @param alertHistory * the most recently received history entry (not {@code null}). */ @@ -218,16 +219,19 @@ public class AlertCurrentEntity { */ @Override public boolean equals(Object object) { - if (this == object) + if (this == object) { return true; + } - if (object == null || getClass() != object.getClass()) + if (object == null || getClass() != object.getClass()) { return false; + } AlertCurrentEntity that = (AlertCurrentEntity) object; - if (alertId != null ? !alertId.equals(that.alertId) : that.alertId != null) + if (alertId != null ? !alertId.equals(that.alertId) : that.alertId != null) { return false; + } return true; } http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertGroupEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertGroupEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertGroupEntity.java index e7fa9c6..1cc9bcc 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertGroupEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertGroupEntity.java @@ -49,7 +49,8 @@ import javax.persistence.UniqueConstraint; @NamedQuery(name = "AlertGroupEntity.findAllInCluster", query = "SELECT alertGroup FROM AlertGroupEntity alertGroup WHERE alertGroup.clusterId = :clusterId"), @NamedQuery(name = "AlertGroupEntity.findByName", query = "SELECT alertGroup FROM AlertGroupEntity alertGroup WHERE alertGroup.groupName = :groupName"), @NamedQuery(name = "AlertGroupEntity.findByNameInCluster", query = "SELECT alertGroup FROM AlertGroupEntity alertGroup WHERE alertGroup.groupName = :groupName AND alertGroup.clusterId = :clusterId"), - @NamedQuery(name = "AlertGroupEntity.findByAssociatedDefinition", query = "SELECT alertGroup FROM AlertGroupEntity alertGroup WHERE :alertDefinition MEMBER OF alertGroup.alertDefinitions"), }) + @NamedQuery(name = "AlertGroupEntity.findByAssociatedDefinition", query = "SELECT alertGroup FROM AlertGroupEntity alertGroup WHERE :alertDefinition MEMBER OF alertGroup.alertDefinitions"), + @NamedQuery(name = "AlertGroupEntity.findServiceDefaultGroup", query = "SELECT alertGroup FROM AlertGroupEntity alertGroup WHERE alertGroup.serviceName = :serviceName AND alertGroup.isDefault = 1") }) public class AlertGroupEntity { @Id http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertNoticeEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertNoticeEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertNoticeEntity.java index c9470ce..d0d4c9a 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertNoticeEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertNoticeEntity.java @@ -46,6 +46,8 @@ import org.apache.ambari.server.state.NotificationState; @TableGenerator(name = "alert_notice_id_generator", table = "ambari_sequences", pkColumnName = "sequence_name", valueColumnName = "sequence_value", pkColumnValue = "alert_notice_id_seq", initialValue = 0, allocationSize = 1) @NamedQueries({ @NamedQuery(name = "AlertNoticeEntity.findAll", query = "SELECT notice FROM AlertNoticeEntity notice"), + @NamedQuery(name = "AlertNoticeEntity.findByState", query = "SELECT notice FROM AlertNoticeEntity notice WHERE notice.notifyState = :notifyState"), + @NamedQuery(name = "AlertNoticeEntity.findByUuid", query = "SELECT notice FROM AlertNoticeEntity notice WHERE notice.uuid = :uuid"), @NamedQuery(name = "AlertNoticeEntity.removeByDefinitionId", query = "DELETE FROM AlertNoticeEntity notice WHERE notice.alertHistory.alertDefinition.definitionId = :definitionId") }) public class AlertNoticeEntity { http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/state/alert/TargetType.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/TargetType.java b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/TargetType.java index e2564cc..9f3b5d8 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/TargetType.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/TargetType.java @@ -17,7 +17,6 @@ */ package org.apache.ambari.server.state.alert; -import org.apache.ambari.server.orm.dao.AlertsDAO; /** * The {@link TargetType} enumeration is used to represent the built-in target @@ -31,7 +30,17 @@ public enum TargetType { EMAIL, /** - * {@link AlertsDAO} will be distributed via SNMP. + * Alerts will be distributed via SNMP. */ - SNMP; + SNMP, + + /** + * Alerts will be distributed to Nagios. + */ + NAGIOS, + + /** + * Alerts will be distributed to a logger. + */ + LOG; } http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java index d938771..ff973f6 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java @@ -37,6 +37,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.persistence.RollbackException; import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.ConfigGroupNotFoundException; import org.apache.ambari.server.ObjectNotFoundException; import org.apache.ambari.server.ParentObjectNotFoundException; import org.apache.ambari.server.ServiceComponentHostNotFoundException; @@ -50,6 +51,7 @@ import org.apache.ambari.server.orm.RequiresSession; import org.apache.ambari.server.orm.cache.ConfigGroupHostMapping; import org.apache.ambari.server.orm.cache.HostConfigMapping; import org.apache.ambari.server.orm.dao.AlertDefinitionDAO; +import org.apache.ambari.server.orm.dao.AlertDispatchDAO; import org.apache.ambari.server.orm.dao.ClusterDAO; import org.apache.ambari.server.orm.dao.ClusterStateDAO; import org.apache.ambari.server.orm.dao.ConfigGroupHostMappingDAO; @@ -106,7 +108,6 @@ import com.google.inject.Inject; import com.google.inject.Injector; import com.google.inject.assistedinject.Assisted; import com.google.inject.persist.Transactional; -import org.apache.ambari.server.ConfigGroupNotFoundException; public class ClusterImpl implements Cluster { @@ -191,6 +192,9 @@ public class ClusterImpl implements Cluster { @Inject private AlertDefinitionDAO alertDefinitionDAO; + @Inject + private AlertDispatchDAO alertDispatchDAO; + private volatile boolean svcHostsLoaded = false; private volatile Multimap<String, String> serviceConfigTypes; @@ -1363,6 +1367,7 @@ public class ClusterImpl implements Cluster { protected void removeEntities() throws AmbariException { long clusterId = getClusterId(); alertDefinitionDAO.removeAll(clusterId); + alertDispatchDAO.removeAllGroups(clusterId); clusterDAO.removeByPK(clusterId); } http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/state/services/AlertNoticeDispatchService.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/services/AlertNoticeDispatchService.java b/ambari-server/src/main/java/org/apache/ambari/server/state/services/AlertNoticeDispatchService.java new file mode 100644 index 0000000..7025e14 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/services/AlertNoticeDispatchService.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.state.services; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.ambari.server.events.AlertEvent; +import org.apache.ambari.server.notifications.DispatchFactory; +import org.apache.ambari.server.notifications.DispatchRunnable; +import org.apache.ambari.server.notifications.DispatchCallback; +import org.apache.ambari.server.notifications.NotificationDispatcher; +import org.apache.ambari.server.notifications.Notification; +import org.apache.ambari.server.orm.dao.AlertDispatchDAO; +import org.apache.ambari.server.orm.entities.AlertNoticeEntity; +import org.apache.ambari.server.orm.entities.AlertTargetEntity; +import org.apache.ambari.server.state.NotificationState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.AbstractScheduledService; +import com.google.inject.Inject; +import com.google.inject.Singleton; + +/** + * The {@link AlertNoticeDispatchService} is used to scan the database for + * {@link AlertNoticeEntity} that are in the {@link NotificationState#PENDING}. + * It will then process them through the dispatch system. + */ +@Singleton +public class AlertNoticeDispatchService extends AbstractScheduledService { + + /** + * Logger. + */ + private static final Logger LOG = LoggerFactory.getLogger(AlertNoticeDispatchService.class); + + /** + * Dispatch DAO to query pending {@link AlertNoticeEntity} instances from. + */ + @Inject + private AlertDispatchDAO m_dao; + + /** + * The factory used to get an {@link NotificationDispatcher} instance to submit to the + * {@link #m_executor}. + */ + @Inject + private DispatchFactory m_dispatchFactory; + + /** + * The executor responsible for dispatching. + */ + private final ThreadPoolExecutor m_executor; + + /** + * Constructor. + */ + public AlertNoticeDispatchService() { + m_executor = new ThreadPoolExecutor(0, 2, 5L, + TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), + new AlertDispatchThreadFactory(), + new ThreadPoolExecutor.CallerRunsPolicy()); + } + + /** + * {@inheritDoc} + */ + @Override + protected void runOneIteration() throws Exception { + List<AlertNoticeEntity> pending = m_dao.findPendingNotices(); + if (pending.size() == 0) { + return; + } + + LOG.info("There are {} pending alert notices about to be dispatched..." + + pending.size()); + + // combine all histories by target + Map<AlertTargetEntity, List<AlertNoticeEntity>> aggregateMap = new HashMap<AlertTargetEntity, List<AlertNoticeEntity>>( + pending.size()); + + for (AlertNoticeEntity notice : pending) { + AlertTargetEntity target = notice.getAlertTarget(); + + List<AlertNoticeEntity> notices = aggregateMap.get(target); + if (null == notices) { + notices = new ArrayList<AlertNoticeEntity>(); + aggregateMap.put(target, notices); + } + + notices.add(notice); + } + + Set<AlertTargetEntity> targets = aggregateMap.keySet(); + for (AlertTargetEntity target : targets) { + List<AlertNoticeEntity> notices = aggregateMap.get(target); + if (null == notices || notices.size() == 0) { + continue; + } + + Notification notification = new Notification(); + notification.Subject = target.getTargetName(); + notification.Body = target.getDescription(); + notification.Callback = new AlertNoticeDispatchCallback(); + notification.CallbackIds = new ArrayList<String>(notices.size()); + + for (AlertNoticeEntity notice : notices) { + notification.CallbackIds.add(notice.getUuid()); + } + + NotificationDispatcher dispatcher = m_dispatchFactory.getDispatcher(target.getNotificationType()); + DispatchRunnable runnable = new DispatchRunnable(dispatcher, notification); + + m_executor.execute(runnable); + } + } + + /** + * {@inheritDoc} + * <p/> + * Returns a schedule that starts after 1 minute and runs every minute after + * {@link #runOneIteration()} completes. + */ + @Override + protected Scheduler scheduler() { + return Scheduler.newFixedDelaySchedule(1, 1, TimeUnit.MINUTES); + } + + /** + * A custom {@link ThreadFactory} for the threads that will handle dispatching + * {@link AlertNoticeEntity} instances. Threads created will have slightly + * reduced priority since {@link AlertEvent} instances are not critical to the + * system. + */ + private static final class AlertDispatchThreadFactory implements + ThreadFactory { + + private static final AtomicInteger s_threadIdPool = new AtomicInteger(1); + + /** + * {@inheritDoc} + */ + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r, "alert-dispatch-" + + s_threadIdPool.getAndIncrement()); + + thread.setDaemon(false); + thread.setPriority(Thread.NORM_PRIORITY - 1); + + return thread; + } + } + + /** + * The {@link AlertNoticeDispatchCallback} is used to receive a callback from + * the dispatch framework and then update the {@link AlertNoticeEntity} + * {@link NotificationState}. + */ + private final class AlertNoticeDispatchCallback implements + DispatchCallback { + + /** + * {@inheritDoc} + */ + @Override + public void onSuccess(List<String> callbackIds) { + for (String callbackId : callbackIds) { + updateAlertNotice(callbackId, NotificationState.DELIVERED); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void onFailure(List<String> callbackIds) { + for (String callbackId : callbackIds) { + updateAlertNotice(callbackId, NotificationState.FAILED); + } + } + + /** + * Updates the {@link AlertNoticeEntity} matching the given UUID with the + * specified state. + * + * @param uuid + * @param state + */ + private void updateAlertNotice(String uuid, NotificationState state) { + try { + AlertNoticeEntity entity = m_dao.findNoticeByUuid(uuid); + if (null == entity) { + LOG.warn("Unable to find an alert notice with UUID {}", uuid); + return; + } + + entity.setNotifyState(state); + m_dao.merge(entity); + } catch (Exception exception) { + LOG.error( + "Unable to update the alert notice with UUID {} to {}, notifications will continue to be sent", + uuid, state, exception); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java index 526104f..a9d126c 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java @@ -18,6 +18,9 @@ package org.apache.ambari.server.orm; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -353,4 +356,38 @@ public class OrmTestHelper { alertDispatchDAO.create(group); return alertDispatchDAO.findGroupById(group.getGroupId()); } + + /** + * Creates some default alert groups for various services used in the tests. + * + * @param clusterId + * @return + * @throws Exception + */ + @Transactional + public List<AlertGroupEntity> createDefaultAlertGroups(long clusterId) + throws Exception { + AlertGroupEntity hdfsGroup = new AlertGroupEntity(); + hdfsGroup.setDefault(true); + hdfsGroup.setClusterId(clusterId); + hdfsGroup.setGroupName("HDFS"); + hdfsGroup.setServiceName("HDFS"); + + AlertGroupEntity oozieGroup = new AlertGroupEntity(); + oozieGroup.setDefault(true); + oozieGroup.setClusterId(clusterId); + oozieGroup.setGroupName("OOZIE"); + oozieGroup.setServiceName("OOZIE"); + + alertDispatchDAO.create(hdfsGroup); + alertDispatchDAO.create(oozieGroup); + + List<AlertGroupEntity> defaultGroups = alertDispatchDAO.findAllGroups(clusterId); + assertEquals(2, defaultGroups.size()); + assertNotNull(alertDispatchDAO.findDefaultServiceGroup("HDFS")); + assertNotNull(alertDispatchDAO.findDefaultServiceGroup("OOZIE")); + + return defaultGroups; + } + } http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java index f7deef5..cdca3dd 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java @@ -72,16 +72,19 @@ public class AlertDefinitionDAOTest { * */ @Before - public void setup() { + public void setup() throws Exception { injector = Guice.createInjector(new InMemoryDefaultTestModule()); injector.getInstance(GuiceJpaInitializer.class); + dispatchDao = injector.getInstance(AlertDispatchDAO.class); dao = injector.getInstance(AlertDefinitionDAO.class); alertsDao = injector.getInstance(AlertsDAO.class); - dispatchDao = injector.getInstance(AlertDispatchDAO.class); helper = injector.getInstance(OrmTestHelper.class); clusterId = helper.createCluster(); + // create required default groups + helper.createDefaultAlertGroups(clusterId); + // create 8 HDFS alerts int i = 0; for (; i < 8; i++) { @@ -148,7 +151,6 @@ public class AlertDefinitionDAOTest { definition.setSourceType("SCRIPT"); dao.create(definition); } - } @After @@ -287,7 +289,6 @@ public class AlertDefinitionDAOTest { history.setAlertState(AlertState.OK); history.setAlertText("Alert Text"); history.setAlertTimestamp(calendar.getTimeInMillis()); -// alertsDao.create(history); AlertCurrentEntity current = new AlertCurrentEntity(); current.setAlertHistory(history); @@ -359,5 +360,7 @@ public class AlertDefinitionDAOTest { assertNull(clusterDAO.findById(clusterId)); assertNull(dao.findById(definition.getDefinitionId())); + + assertEquals(0, dispatchDao.findAllGroups(clusterId).size()); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java index 1103961..2c3a876 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java @@ -35,7 +35,11 @@ import org.apache.ambari.server.orm.InMemoryDefaultTestModule; import org.apache.ambari.server.orm.OrmTestHelper; import org.apache.ambari.server.orm.entities.AlertDefinitionEntity; import org.apache.ambari.server.orm.entities.AlertGroupEntity; +import org.apache.ambari.server.orm.entities.AlertHistoryEntity; +import org.apache.ambari.server.orm.entities.AlertNoticeEntity; import org.apache.ambari.server.orm.entities.AlertTargetEntity; +import org.apache.ambari.server.state.AlertState; +import org.apache.ambari.server.state.NotificationState; import org.apache.ambari.server.state.alert.Scope; import org.junit.After; import org.junit.Before; @@ -54,6 +58,7 @@ public class AlertDispatchDAOTest { Injector injector; AlertDispatchDAO dao; AlertDefinitionDAO definitionDao; + AlertsDAO alertsDao; OrmTestHelper helper; /** @@ -64,6 +69,7 @@ public class AlertDispatchDAOTest { injector = Guice.createInjector(new InMemoryDefaultTestModule()); injector.getInstance(GuiceJpaInitializer.class); dao = injector.getInstance(AlertDispatchDAO.class); + alertsDao = injector.getInstance(AlertsDAO.class); definitionDao = injector.getInstance(AlertDefinitionDAO.class); helper = injector.getInstance(OrmTestHelper.class); @@ -352,7 +358,7 @@ public class AlertDispatchDAOTest { /** * Tests finding groups by a definition ID that they are associatd with. - * + * * @throws Exception */ @Test @@ -380,6 +386,38 @@ public class AlertDispatchDAOTest { } /** + * @throws Exception + */ + @Test + public void testFindNoticeByUuid() throws Exception { + List<AlertDefinitionEntity> definitions = createDefinitions(); + AlertDefinitionEntity definition = definitions.get(0); + + AlertHistoryEntity history = new AlertHistoryEntity(); + history.setServiceName(definition.getServiceName()); + history.setClusterId(clusterId); + history.setAlertDefinition(definition); + history.setAlertLabel("Label"); + history.setAlertState(AlertState.OK); + history.setAlertText("Alert Text"); + history.setAlertTimestamp(System.currentTimeMillis()); + alertsDao.create(history); + + AlertTargetEntity target = helper.createAlertTarget(); + + AlertNoticeEntity notice = new AlertNoticeEntity(); + notice.setUuid(UUID.randomUUID().toString()); + notice.setAlertTarget(target); + notice.setAlertHistory(history); + notice.setNotifyState(NotificationState.PENDING); + dao.create(notice); + + AlertNoticeEntity actual = dao.findNoticeByUuid(notice.getUuid()); + assertEquals(notice.getNotificationId(), actual.getNotificationId()); + assertNull(dao.findNoticeByUuid("DEADBEEF")); + } + + /** * @return */ private List<AlertDefinitionEntity> createDefinitions() throws Exception {
