Repository: ambari
Updated Branches:
  refs/heads/branch-alerts-dev f1018b176 -> ba325d083


AMBARI-7391 - Alerts: Scan Database For Pending AlertNoticeEntity 
(jonathanhurley)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/ba325d08
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/ba325d08
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/ba325d08

Branch: refs/heads/branch-alerts-dev
Commit: ba325d0831e9816992849490766e511450ef7f3a
Parents: f1018b1
Author: Jonathan Hurley <[email protected]>
Authored: Thu Sep 18 11:20:44 2014 -0400
Committer: Jonathan Hurley <[email protected]>
Committed: Thu Sep 18 15:21:26 2014 -0400

----------------------------------------------------------------------
 ambari-server/pom.xml                           |  10 +
 .../ambari/server/controller/AmbariServer.java  |  12 +
 .../server/controller/ControllerModule.java     |  22 ++
 .../events/listeners/AlertReceivedListener.java |   7 +-
 .../listeners/AlertServiceStateListener.java    |  20 +-
 .../listeners/AlertStateChangedListener.java    |   4 +-
 .../events/publishers/AlertEventPublisher.java  |  14 +-
 .../server/notifications/DispatchCallback.java  |  50 ++++
 .../server/notifications/DispatchFactory.java   |  62 +++++
 .../server/notifications/DispatchRunnable.java  |  60 +++++
 .../server/notifications/Notification.java      |  57 +++++
 .../notifications/NotificationDispatcher.java   |  47 ++++
 .../dispatchers/EmailDispatcher.java            |  60 +++++
 .../server/orm/dao/AlertDefinitionDAO.java      |  15 +-
 .../ambari/server/orm/dao/AlertDispatchDAO.java |  69 +++++-
 .../server/orm/entities/AlertCurrentEntity.java |  38 +--
 .../server/orm/entities/AlertGroupEntity.java   |   3 +-
 .../server/orm/entities/AlertNoticeEntity.java  |   2 +
 .../ambari/server/state/alert/TargetType.java   |  15 +-
 .../server/state/cluster/ClusterImpl.java       |   7 +-
 .../services/AlertNoticeDispatchService.java    | 231 +++++++++++++++++++
 .../apache/ambari/server/orm/OrmTestHelper.java |  37 +++
 .../server/orm/dao/AlertDefinitionDAOTest.java  |  11 +-
 .../server/orm/dao/AlertDispatchDAOTest.java    |  40 +++-
 24 files changed, 847 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-server/pom.xml b/ambari-server/pom.xml
index b07afbf..d83db61 100644
--- a/ambari-server/pom.xml
+++ b/ambari-server/pom.xml
@@ -1418,6 +1418,16 @@
       <artifactId>velocity</artifactId>
       <version>1.7</version>
     </dependency>
+    <dependency>
+      <groupId>com.sun.mail</groupId>
+      <artifactId>mailapi</artifactId>
+      <version>1.5.2</version>
+    </dependency>
+    <dependency>
+      <groupId>com.sun.mail</groupId>
+      <artifactId>smtp</artifactId>
+      <version>1.5.2</version>
+    </dependency>
   </dependencies>
   <!--<reporting> <plugins> <plugin> <groupId>org.codehaus.mojo</groupId>
     <artifactId>findbugs-maven-plugin</artifactId> <version>2.5.2</version> 
</plugin>

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
index 7354706..57b678a 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
@@ -113,10 +113,12 @@ import 
org.springframework.web.context.WebApplicationContext;
 import org.springframework.web.context.support.GenericWebApplicationContext;
 import org.springframework.web.filter.DelegatingFilterProxy;
 
+import com.google.common.util.concurrent.ServiceManager;
 import com.google.gson.Gson;
 import com.google.inject.Guice;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
+import com.google.inject.Scopes;
 import com.google.inject.Singleton;
 import com.google.inject.name.Named;
 import com.google.inject.persist.Transactional;
@@ -147,6 +149,13 @@ public class AmbariServer {
   @Inject
   @Named("dbInitNeeded")
   boolean dbInitNeeded;
+
+  /**
+   * Guava service manager singleton (bound with {@link Scopes#SINGLETON}).
+   */
+  @Inject
+  private ServiceManager serviceManager;
+
   /**
    * The singleton view registry.
    */
@@ -473,6 +482,9 @@ public class AmbariServer {
       executionScheduleManager.start();
       LOG.info("********* Started Scheduled Request Manager **********");
 
+      serviceManager.startAsync();
+      LOG.info("********* Started Services **********");
+
       server.join();
       LOG.info("Joined the Server");
     } catch (BadPaddingException bpe){

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
index 349e465..cfab1f8 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
@@ -33,9 +33,11 @@ import static 
org.eclipse.persistence.config.PersistenceUnitProperties.JDBC_USER
 import static 
org.eclipse.persistence.config.PersistenceUnitProperties.THROW_EXCEPTIONS;
 
 import java.security.SecureRandom;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
+import java.util.Set;
 
 import org.apache.ambari.server.actionmanager.ActionDBAccessor;
 import org.apache.ambari.server.actionmanager.ActionDBAccessorImpl;
@@ -88,6 +90,7 @@ import org.apache.ambari.server.state.host.HostImpl;
 import org.apache.ambari.server.state.scheduler.RequestExecution;
 import org.apache.ambari.server.state.scheduler.RequestExecutionFactory;
 import org.apache.ambari.server.state.scheduler.RequestExecutionImpl;
+import org.apache.ambari.server.state.services.AlertNoticeDispatchService;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostImpl;
 import org.apache.ambari.server.view.ViewInstanceHandlerList;
 import org.eclipse.jetty.server.SessionIdManager;
@@ -98,6 +101,7 @@ import 
org.springframework.security.crypto.password.PasswordEncoder;
 import org.springframework.security.crypto.password.StandardPasswordEncoder;
 import org.springframework.web.filter.DelegatingFilterProxy;
 
+import com.google.common.util.concurrent.ServiceManager;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.inject.AbstractModule;
@@ -226,6 +230,7 @@ public class ControllerModule extends AbstractModule {
 
     requestStaticInjection(ExecutionCommandWrapper.class);
 
+    bindServices();
     bindEagerSingletons();
   }
 
@@ -306,6 +311,23 @@ public class ControllerModule extends AbstractModule {
   }
 
   /**
+   * Bind all {@link com.google.common.util.concurrent.Service} singleton
+   * instances and then register them with a singleton {@link ServiceManager}.
+   */
+  private void bindServices() {
+    Set<com.google.common.util.concurrent.Service> services = new 
HashSet<com.google.common.util.concurrent.Service>();
+
+    AlertNoticeDispatchService alertNoticeDispatchService = new 
AlertNoticeDispatchService();
+
+    bind(AlertNoticeDispatchService.class).toInstance(
+        alertNoticeDispatchService);
+
+    services.add(alertNoticeDispatchService);
+    ServiceManager manager = new ServiceManager(services);
+    bind(ServiceManager.class).toInstance(manager);
+  }
+
+  /**
    * Initializes all eager singletons that should be instantiated as soon as
    * possible and not wait for injection.
    */

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java
index 519ceeb..0f1b49a 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java
@@ -113,6 +113,10 @@ public class AlertReceivedListener {
       AlertHistoryEntity history = createHistory(clusterId,
           current.getAlertHistory().getAlertDefinition(), alert);
 
+      // manually create the new history entity since we are merging into
+      // an existing current entity
+      m_alertsDao.create(history);
+
       current.setAlertHistory(history);
       current.setLatestTimestamp(Long.valueOf(alert.getTimestamp()));
       current.setOriginalTimestamp(Long.valueOf(alert.getTimestamp()));
@@ -121,7 +125,8 @@ public class AlertReceivedListener {
 
       // broadcast the alert changed event for other subscribers
       AlertStateChangeEvent alertChangedEvent = new AlertStateChangeEvent(
-          event.getClusterId(), event.getAlert(), history, oldState);
+          event.getClusterId(), event.getAlert(), current.getAlertHistory(),
+          oldState);
 
       m_alertEventPublisher.publish(alertChangedEvent);
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertServiceStateListener.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertServiceStateListener.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertServiceStateListener.java
index 906d52d..f1ce617 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertServiceStateListener.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertServiceStateListener.java
@@ -113,6 +113,17 @@ public class AlertServiceStateListener {
     String stackVersion = event.getStackVersion();
     String serviceName = event.getServiceName();
 
+    // create the default alert group for the new service; this MUST be done
+    // before adding definitions so that they are properly added to the
+    // default group
+    AlertGroupEntity serviceAlertGroup = new AlertGroupEntity();
+    serviceAlertGroup.setClusterId(clusterId);
+    serviceAlertGroup.setDefault(true);
+    serviceAlertGroup.setGroupName(serviceName);
+    serviceAlertGroup.setServiceName(serviceName);
+
+    m_alertDispatchDao.create(serviceAlertGroup);
+
     // populate alert definitions for the new service from the database, but
     // don't worry about sending down commands to the agents; the host
     // components are not yet bound to the hosts so we'd have no way of knowing
@@ -134,14 +145,5 @@ public class AlertServiceStateListener {
           serviceName);
       LOG.error(message, ae);
     }
-
-    // create the default alert group for the new service
-    AlertGroupEntity serviceAlertGroup = new AlertGroupEntity();
-    serviceAlertGroup.setClusterId(clusterId);
-    serviceAlertGroup.setDefault(true);
-    serviceAlertGroup.setGroupName(serviceName);
-    serviceAlertGroup.setServiceName(serviceName);
-
-    m_alertDispatchDao.create(serviceAlertGroup);
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertStateChangedListener.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertStateChangedListener.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertStateChangedListener.java
index ac22f88..b2f08d7 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertStateChangedListener.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertStateChangedListener.java
@@ -19,6 +19,7 @@ package org.apache.ambari.server.events.listeners;
 
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 
 import org.apache.ambari.server.events.AlertStateChangeEvent;
 import org.apache.ambari.server.events.publishers.AlertEventPublisher;
@@ -89,11 +90,12 @@ public class AlertStateChangedListener {
 
       for (AlertTargetEntity target : targets) {
         AlertNoticeEntity notice = new AlertNoticeEntity();
+        notice.setUuid(UUID.randomUUID().toString());
         notice.setAlertTarget(target);
         notice.setAlertHistory(event.getNewHistoricalEntry());
         notice.setNotifyState(NotificationState.PENDING);
 
-        m_alertsDispatchDao.merge(notice);
+        m_alertsDispatchDao.create(notice);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java
index f1a0b3e..006837a 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java
@@ -17,8 +17,10 @@
  */
 package org.apache.ambari.server.events.publishers;
 
-import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.ambari.server.events.AlertEvent;
@@ -48,8 +50,14 @@ public final class AlertEventPublisher {
    * Constructor.
    */
   public AlertEventPublisher() {
-    m_eventBus = new AsyncEventBus(Executors.newFixedThreadPool(2,
-        new AlertEventBusThreadFactory()));
+    // create a fixed executor that is unbounded for now and will run rejected
+    // requests in the calling thread to prevent loss of alert handling
+    ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 0L,
+        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
+        new AlertEventBusThreadFactory(),
+        new ThreadPoolExecutor.CallerRunsPolicy());
+
+    m_eventBus = new AsyncEventBus(executor);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchCallback.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchCallback.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchCallback.java
new file mode 100644
index 0000000..e4e944d
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchCallback.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.notifications;
+
+import java.util.List;
+
+/**
+ * The {@link DispatchCallback} interface is used by an {@link 
NotificationDispatcher} to
+ * callback to an interested party when a {@link Notification} delivery is
+ * attempted.
+ * <p/>
+ * This class can be used to let the caller know the success or failure status
+ * of the outbound {@link Notification}.
+ */
+public interface DispatchCallback {
+
+  /**
+   * Invoked when a {@link Notification} was successfully dispatched.
+   *
+   * @param callbackIds
+   *          a list of unique IDs that the caller can use to identify the
+   *          {@link Notification} that was dispatched.
+   */
+  public void onSuccess(List<String> callbackIds);
+
+  /**
+   * Invoked when a {@link Notification} failed to be dispatched.
+   * 
+   * @param callbackIds
+   *          a list of unique IDs that the caller can use to identify the
+   *          {@link Notification} that was dispatched.
+   */
+  public void onFailure(List<String> callbackIds);
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchFactory.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchFactory.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchFactory.java
new file mode 100644
index 0000000..3414035
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchFactory.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.notifications;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.server.notifications.dispatchers.EmailDispatcher;
+
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Singleton;
+
+/**
+ * The {@link DispatchFactory} is used to provide singleton instances of
+ * {@link NotificationDispatcher} based on a supplied type.
+ */
+@Singleton
+public final class DispatchFactory {
+
+  /**
+   * Mapping of dispatch type to dispatcher singleton.
+   */
+  private final Map<String, NotificationDispatcher> m_dispatchers = new 
HashMap<String, NotificationDispatcher>();
+
+  /**
+   * Constructor.
+   *
+   */
+  @Inject
+  public DispatchFactory(Injector injector) {
+    EmailDispatcher emailDispatcher = 
injector.getInstance(EmailDispatcher.class);
+
+    m_dispatchers.put(emailDispatcher.getType(), emailDispatcher);
+  }
+
+  /**
+   * Gets a dispatcher based on the type.
+   *
+   * @param type
+   *          the type (not {@code null}).
+   * @return a dispatcher instance, or {@code null} if none.
+   */
+  public NotificationDispatcher getDispatcher(String type) {
+    return m_dispatchers.get(type);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchRunnable.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchRunnable.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchRunnable.java
new file mode 100644
index 0000000..dcc2103
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/notifications/DispatchRunnable.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.notifications;
+
+import java.util.concurrent.Executor;
+
+/**
+ * The {@link DispatchRunnable} class is a simple {@link Runnable} that can be
+ * used to pass a {@link Notification} to an {@link NotificationDispatcher} 
via an
+ * {@link Executor}.
+ */
+public final class DispatchRunnable implements Runnable {
+
+  /**
+   * The dispatcher to dispatch to.
+   */
+  private final NotificationDispatcher m_dispatcher;
+
+  /**
+   * The notification to dispatch.
+   */
+  private final Notification m_notification;
+
+  /**
+   * Constructor.
+   * 
+   * @param dispatcher
+   *          the dispatcher to dispatch to (not {@code null}).
+   * @param notification
+   *          the notification to dispatch (not {@code null}).
+   */
+  public DispatchRunnable(NotificationDispatcher dispatcher, Notification 
notification) {
+    m_dispatcher = dispatcher;
+    m_notification = notification;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void run() {
+    m_dispatcher.dispatch(m_notification);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/notifications/Notification.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/notifications/Notification.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/notifications/Notification.java
new file mode 100644
index 0000000..08c5242
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/notifications/Notification.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.notifications;
+
+import java.util.List;
+
+/**
+ * The {@link Notification} class is a generic way to relay content through an
+ * {@link NotificationDispatcher}.
+ */
+public class Notification {
+
+  /**
+   *
+   */
+  public String Subject;
+
+  /**
+   * The main content of the notification.
+   */
+  public String Body;
+
+  /**
+   * An optional callback implementation that the dispatcher can use to report
+   * success/failure on delivery.
+   */
+  public DispatchCallback Callback;
+
+  /**
+   * An option list of unique IDs that will identify the origins of this
+   * notification.
+   */
+  public List<String> CallbackIds;
+
+  /**
+   * Constructor.
+   *
+   */
+  public Notification() {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/notifications/NotificationDispatcher.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/notifications/NotificationDispatcher.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/notifications/NotificationDispatcher.java
new file mode 100644
index 0000000..10946be
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/notifications/NotificationDispatcher.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.notifications;
+
+
+/**
+ * The {@link NotificationDispatcher} interface represents a mechanism for 
dispatching a
+ * {@link Notification}.
+ * <p/>
+ * Dispatchers should, in general, be singletons. They should also invoke the
+ * appropriate methods on {@link Notification#Callback} to indicate a success 
or
+ * failure during dispatch.
+ */
+public interface NotificationDispatcher {
+
+  /**
+   * Gets the type of dispatcher. The type of each different dispatcher should
+   * be unique.
+   *
+   * @return the dispatcher's type (never {@code null}).
+   */
+  public String getType();
+
+  /**
+   * Dispatches the specified notification.
+   *
+   * @param notification
+   *          the notificationt to dispatch (not {@code null}).
+   */
+  public void dispatch(Notification notification);
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/notifications/dispatchers/EmailDispatcher.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/notifications/dispatchers/EmailDispatcher.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/notifications/dispatchers/EmailDispatcher.java
new file mode 100644
index 0000000..d0858d3
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/notifications/dispatchers/EmailDispatcher.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.notifications.dispatchers;
+
+import org.apache.ambari.server.notifications.NotificationDispatcher;
+import org.apache.ambari.server.notifications.Notification;
+import org.apache.ambari.server.state.alert.TargetType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Singleton;
+
+/**
+ * The {@link EmailDispatcher} class is used to dispatch {@link Notification}
+ * via JavaMail.
+ */
+@Singleton
+public class EmailDispatcher implements NotificationDispatcher {
+
+  /**
+   * Logger.
+   */
+  private static final Logger LOG = 
LoggerFactory.getLogger(EmailDispatcher.class);
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String getType() {
+    return TargetType.EMAIL.name();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void dispatch(Notification notification) {
+    LOG.info("Sending email: {}", notification);
+
+    // callback to inform the interested parties about the successful dispatch
+    if (null != notification.Callback) {
+      notification.Callback.onSuccess(notification.CallbackIds);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java
index f410c39..570f268 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java
@@ -26,6 +26,7 @@ import javax.persistence.TypedQuery;
 
 import org.apache.ambari.server.controller.RootServiceResponseFactory;
 import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
+import org.apache.ambari.server.orm.entities.AlertGroupEntity;
 import org.apache.ambari.server.state.alert.Scope;
 
 import com.google.inject.Inject;
@@ -125,7 +126,7 @@ public class AlertDefinitionDAO {
 
   /**
    * Gets all of the alert definitions for the list of IDs given.
-   * 
+   *
    * @param definitionIds
    *          the IDs of the definitions to retrieve.
    * @return the definition or an empty list (never {@code null}).
@@ -246,7 +247,9 @@ public class AlertDefinitionDAO {
   }
 
   /**
-   * Persists a new alert definition.
+   * Persists a new alert definition, also creating the associated
+   * {@link AlertGroupEntity} relationship for the definition's service default
+   * group.
    *
    * @param alertDefinition
    *          the definition to persist (not {@code null}).
@@ -254,6 +257,13 @@ public class AlertDefinitionDAO {
   @Transactional
   public void create(AlertDefinitionEntity alertDefinition) {
     entityManagerProvider.get().persist(alertDefinition);
+
+    AlertGroupEntity group = 
dispatchDao.findDefaultServiceGroup(alertDefinition.getServiceName());
+
+    if (null != group) {
+      group.addAlertDefinition(alertDefinition);
+      dispatchDao.merge(group);
+    }
   }
 
   /**
@@ -305,7 +315,6 @@ public class AlertDefinitionDAO {
    */
   @Transactional
   public void remove(AlertDefinitionEntity alertDefinition) {
-    alertDefinition = merge(alertDefinition);
     dispatchDao.removeNoticeByDefinitionId(alertDefinition.getDefinitionId());
     alertsDao.removeByDefinitionId(alertDefinition.getDefinitionId());
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDispatchDAO.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDispatchDAO.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDispatchDAO.java
index 6d4d19b..2239c8f 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDispatchDAO.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDispatchDAO.java
@@ -26,6 +26,7 @@ import 
org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
 import org.apache.ambari.server.orm.entities.AlertGroupEntity;
 import org.apache.ambari.server.orm.entities.AlertNoticeEntity;
 import org.apache.ambari.server.orm.entities.AlertTargetEntity;
+import org.apache.ambari.server.state.NotificationState;
 import org.apache.ambari.server.state.alert.AlertGroup;
 
 import com.google.inject.Inject;
@@ -101,6 +102,37 @@ public class AlertDispatchDAO {
   }
 
   /**
+   * Gets a notification with the specified UUID.
+   *
+   * @param uuid
+   *          the UUID of the notification to retrieve.
+   * @return the notification or {@code null} if none exists.
+   */
+  public AlertNoticeEntity findNoticeByUuid(String uuid) {
+    TypedQuery<AlertNoticeEntity> query = 
entityManagerProvider.get().createNamedQuery(
+        "AlertNoticeEntity.findByUuid", AlertNoticeEntity.class);
+
+    query.setParameter("uuid", uuid);
+
+    return daoUtils.selectOne(query);
+  }
+
+  /**
+   * Gets all {@link AlertNoticeEntity} instances that are
+   * {@link NotificationState#PENDING} and not yet dispatched.
+   *
+   * @return the notices that are waiting to be dispatched, or an empty list
+   *         (never {@code null}).
+   */
+  public List<AlertNoticeEntity> findPendingNotices() {
+    TypedQuery<AlertNoticeEntity> query = 
entityManagerProvider.get().createNamedQuery(
+        "AlertNoticeEntity.findByState", AlertNoticeEntity.class);
+
+    query.setParameter("notifyState", NotificationState.PENDING);
+    return daoUtils.selectList(query);
+  }
+
+  /**
    * Gets an alert group with the specified name across all clusters. Alert
    * group names are unique within a cluster.
    *
@@ -195,12 +227,13 @@ public class AlertDispatchDAO {
 
   /**
    * Gets all of the {@link AlertGroup} instances that include the specified
-   * alert definition.
+   * alert definition. Service default groups will also be returned.
    *
    * @param definitionEntity
    *          the definition that the group must include (not {@code null}).
    * @return all alert groups that have an association with the specified
-   *         definition or empty list if none exist (never {@code null}).
+   *         definition and the definition's service default group or empty 
list
+   *         if none exist (never {@code null}).
    */
   public List<AlertGroupEntity> findGroupsByDefinition(
       AlertDefinitionEntity definitionEntity) {
@@ -214,6 +247,23 @@ public class AlertDispatchDAO {
   }
 
   /**
+   * Gets the default group for the specified service.
+   *
+   * @param serviceName
+   *          the name of the service (not {@code null}).
+   * @return the default group, or {@code null} if the service name is not 
valid
+   *         for an installed service; otherwise {@code null} should not be
+   *         possible.
+   */
+  public AlertGroupEntity findDefaultServiceGroup(String serviceName) {
+    TypedQuery<AlertGroupEntity> query = 
entityManagerProvider.get().createNamedQuery(
+        "AlertGroupEntity.findServiceDefaultGroup", AlertGroupEntity.class);
+
+    query.setParameter("serviceName", serviceName);
+    return daoUtils.selectSingle(query);
+  }
+
+  /**
    * Gets all alert notifications stored in the database.
    *
    * @return all alert notifications or empty list if none exist (never
@@ -289,6 +339,21 @@ public class AlertDispatchDAO {
   }
 
   /**
+   * Removes all {@link AlertDefinitionEntity} that are associated with the
+   * specified cluster ID.
+   *
+   * @param clusterId
+   *          the cluster ID.
+   */
+  @Transactional
+  public void removeAllGroups(long clusterId) {
+    List<AlertGroupEntity> groups = findAllGroups(clusterId);
+    for (AlertGroupEntity group : groups) {
+      remove(group);
+    }
+  }
+
+  /**
    * Persists new alert targets.
    *
    * @param entities

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java
index 8db89f7..c24ac17 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java
@@ -69,14 +69,15 @@ public class AlertCurrentEntity {
 
   @Column(name = "original_timestamp", nullable = false)
   private Long originalTimestamp;
-  
+
   @Column(name = "latest_text", length = 4000)
   private String latestText = null;
 
   /**
    * Unidirectional one-to-one association to {@link AlertHistoryEntity}
    */
-  @OneToOne(cascade = { CascadeType.PERSIST })
+  @OneToOne(cascade = { CascadeType.PERSIST, CascadeType.MERGE,
+      CascadeType.REFRESH })
   @JoinColumn(name = "history_id", unique = true, nullable = false)
   private AlertHistoryEntity alertHistory;
 
@@ -95,7 +96,7 @@ public class AlertCurrentEntity {
 
   /**
    * Gets the unique ID for this current alert.
-   * 
+   *
    * @return the ID (never {@code null}).
    */
   public Long getAlertId() {
@@ -104,7 +105,7 @@ public class AlertCurrentEntity {
 
   /**
    * Sets the unique ID for this current alert.
-   * 
+   *
    * @param alertId
    *          the ID (not {@code null}).
    */
@@ -115,7 +116,7 @@ public class AlertCurrentEntity {
   /**
    * Gets the time, in millis, that the last instance of this alert state was
    * received.
-   * 
+   *
    * @return the time of the most recently received alert data for this 
instance
    *         (never {@code null}).
    */
@@ -126,7 +127,7 @@ public class AlertCurrentEntity {
   /**
    * Sets the time, in millis, that the last instance of this alert state was
    * received.
-   * 
+   *
    * @param latestTimestamp
    *          the time of the most recently received alert data for this
    *          instance (never {@code null}).
@@ -137,7 +138,7 @@ public class AlertCurrentEntity {
 
   /**
    * Gets the current maintenance state for the alert.
-   * 
+   *
    * @return the current maintenance state (never {@code null}).
    */
   public MaintenanceState getMaintenanceState() {
@@ -146,7 +147,7 @@ public class AlertCurrentEntity {
 
   /**
    * Sets the current maintenance state for the alert.
-   * 
+   *
    * @param maintenanceState
    *          the state to set (not {@code null}).
    */
@@ -157,7 +158,7 @@ public class AlertCurrentEntity {
   /**
    * Gets the time, in milliseconds, when the alert was first received with the
    * current state.
-   * 
+   *
    * @return the time of the first instance of this alert.
    */
   public Long getOriginalTimestamp() {
@@ -167,14 +168,14 @@ public class AlertCurrentEntity {
   /**
    * Sets the time, in milliseconds, when the alert was first received with the
    * current state.
-   * 
+   *
    * @param originalTimestamp
    *          the time of the first instance of this alert (not {@code null}).
    */
   public void setOriginalTimestamp(Long originalTimestamp) {
     this.originalTimestamp = originalTimestamp;
   }
-  
+
   /**
    * Gets the latest text for this alert.  History will not get a new record on
    * update when the state is the same, but the text may be changed.  For 
example,
@@ -183,7 +184,7 @@ public class AlertCurrentEntity {
   public String getLatestText() {
     return latestText;
   }
-  
+
   /**
    * Sets the latest text.  {@link #getLatestText()}
    */
@@ -194,7 +195,7 @@ public class AlertCurrentEntity {
   /**
    * Gets the associated {@link AlertHistoryEntity} entry for this current 
alert
    * instance.
-   * 
+   *
    * @return the most recently received history entry (never {@code null}).
    */
   public AlertHistoryEntity getAlertHistory() {
@@ -204,7 +205,7 @@ public class AlertCurrentEntity {
   /**
    * Gets the associated {@link AlertHistoryEntity} entry for this current 
alert
    * instance.
-   * 
+   *
    * @param alertHistory
    *          the most recently received history entry (not {@code null}).
    */
@@ -218,16 +219,19 @@ public class AlertCurrentEntity {
    */
   @Override
   public boolean equals(Object object) {
-    if (this == object)
+    if (this == object) {
       return true;
+    }
 
-    if (object == null || getClass() != object.getClass())
+    if (object == null || getClass() != object.getClass()) {
       return false;
+    }
 
     AlertCurrentEntity that = (AlertCurrentEntity) object;
 
-    if (alertId != null ? !alertId.equals(that.alertId) : that.alertId != null)
+    if (alertId != null ? !alertId.equals(that.alertId) : that.alertId != 
null) {
       return false;
+    }
 
     return true;
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertGroupEntity.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertGroupEntity.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertGroupEntity.java
index e7fa9c6..1cc9bcc 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertGroupEntity.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertGroupEntity.java
@@ -49,7 +49,8 @@ import javax.persistence.UniqueConstraint;
     @NamedQuery(name = "AlertGroupEntity.findAllInCluster", query = "SELECT 
alertGroup FROM AlertGroupEntity alertGroup WHERE alertGroup.clusterId = 
:clusterId"),
     @NamedQuery(name = "AlertGroupEntity.findByName", query = "SELECT 
alertGroup FROM AlertGroupEntity alertGroup WHERE alertGroup.groupName = 
:groupName"),
     @NamedQuery(name = "AlertGroupEntity.findByNameInCluster", query = "SELECT 
alertGroup FROM AlertGroupEntity alertGroup WHERE alertGroup.groupName = 
:groupName AND alertGroup.clusterId = :clusterId"),
-    @NamedQuery(name = "AlertGroupEntity.findByAssociatedDefinition", query = 
"SELECT alertGroup FROM AlertGroupEntity alertGroup WHERE :alertDefinition 
MEMBER OF alertGroup.alertDefinitions"), })
+    @NamedQuery(name = "AlertGroupEntity.findByAssociatedDefinition", query = 
"SELECT alertGroup FROM AlertGroupEntity alertGroup WHERE :alertDefinition 
MEMBER OF alertGroup.alertDefinitions"),
+    @NamedQuery(name = "AlertGroupEntity.findServiceDefaultGroup", query = 
"SELECT alertGroup FROM AlertGroupEntity alertGroup WHERE 
alertGroup.serviceName = :serviceName AND alertGroup.isDefault = 1") })
 public class AlertGroupEntity {
 
   @Id

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertNoticeEntity.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertNoticeEntity.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertNoticeEntity.java
index c9470ce..d0d4c9a 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertNoticeEntity.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertNoticeEntity.java
@@ -46,6 +46,8 @@ import org.apache.ambari.server.state.NotificationState;
 @TableGenerator(name = "alert_notice_id_generator", table = 
"ambari_sequences", pkColumnName = "sequence_name", valueColumnName = 
"sequence_value", pkColumnValue = "alert_notice_id_seq", initialValue = 0, 
allocationSize = 1)
 @NamedQueries({
     @NamedQuery(name = "AlertNoticeEntity.findAll", query = "SELECT notice 
FROM AlertNoticeEntity notice"),
+    @NamedQuery(name = "AlertNoticeEntity.findByState", query = "SELECT notice 
FROM AlertNoticeEntity notice WHERE notice.notifyState = :notifyState"),
+    @NamedQuery(name = "AlertNoticeEntity.findByUuid", query = "SELECT notice 
FROM AlertNoticeEntity notice WHERE notice.uuid = :uuid"),
     @NamedQuery(name = "AlertNoticeEntity.removeByDefinitionId", query = 
"DELETE FROM AlertNoticeEntity notice WHERE 
notice.alertHistory.alertDefinition.definitionId = :definitionId") })
 public class AlertNoticeEntity {
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/state/alert/TargetType.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/TargetType.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/TargetType.java
index e2564cc..9f3b5d8 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/TargetType.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/TargetType.java
@@ -17,7 +17,6 @@
  */
 package org.apache.ambari.server.state.alert;
 
-import org.apache.ambari.server.orm.dao.AlertsDAO;
 
 /**
  * The {@link TargetType} enumeration is used to represent the built-in target
@@ -31,7 +30,17 @@ public enum TargetType {
   EMAIL,
 
   /**
-   * {@link AlertsDAO} will be distributed via SNMP.
+   * Alerts will be distributed via SNMP.
    */
-  SNMP;
+  SNMP,
+
+  /**
+   * Alerts will be distributed to Nagios.
+   */
+  NAGIOS,
+
+  /**
+   * Alerts will be distributed to a logger.
+   */
+  LOG;
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
index d938771..ff973f6 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
@@ -37,6 +37,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.persistence.RollbackException;
 
 import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.ConfigGroupNotFoundException;
 import org.apache.ambari.server.ObjectNotFoundException;
 import org.apache.ambari.server.ParentObjectNotFoundException;
 import org.apache.ambari.server.ServiceComponentHostNotFoundException;
@@ -50,6 +51,7 @@ import org.apache.ambari.server.orm.RequiresSession;
 import org.apache.ambari.server.orm.cache.ConfigGroupHostMapping;
 import org.apache.ambari.server.orm.cache.HostConfigMapping;
 import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
+import org.apache.ambari.server.orm.dao.AlertDispatchDAO;
 import org.apache.ambari.server.orm.dao.ClusterDAO;
 import org.apache.ambari.server.orm.dao.ClusterStateDAO;
 import org.apache.ambari.server.orm.dao.ConfigGroupHostMappingDAO;
@@ -106,7 +108,6 @@ import com.google.inject.Inject;
 import com.google.inject.Injector;
 import com.google.inject.assistedinject.Assisted;
 import com.google.inject.persist.Transactional;
-import org.apache.ambari.server.ConfigGroupNotFoundException;
 
 public class ClusterImpl implements Cluster {
 
@@ -191,6 +192,9 @@ public class ClusterImpl implements Cluster {
   @Inject
   private AlertDefinitionDAO alertDefinitionDAO;
 
+  @Inject
+  private AlertDispatchDAO alertDispatchDAO;
+
   private volatile boolean svcHostsLoaded = false;
 
   private volatile Multimap<String, String> serviceConfigTypes;
@@ -1363,6 +1367,7 @@ public class ClusterImpl implements Cluster {
   protected void removeEntities() throws AmbariException {
     long clusterId = getClusterId();
     alertDefinitionDAO.removeAll(clusterId);
+    alertDispatchDAO.removeAllGroups(clusterId);
     clusterDAO.removeByPK(clusterId);
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/main/java/org/apache/ambari/server/state/services/AlertNoticeDispatchService.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/state/services/AlertNoticeDispatchService.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/state/services/AlertNoticeDispatchService.java
new file mode 100644
index 0000000..7025e14
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/state/services/AlertNoticeDispatchService.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.state.services;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.ambari.server.events.AlertEvent;
+import org.apache.ambari.server.notifications.DispatchFactory;
+import org.apache.ambari.server.notifications.DispatchRunnable;
+import org.apache.ambari.server.notifications.DispatchCallback;
+import org.apache.ambari.server.notifications.NotificationDispatcher;
+import org.apache.ambari.server.notifications.Notification;
+import org.apache.ambari.server.orm.dao.AlertDispatchDAO;
+import org.apache.ambari.server.orm.entities.AlertNoticeEntity;
+import org.apache.ambari.server.orm.entities.AlertTargetEntity;
+import org.apache.ambari.server.state.NotificationState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.AbstractScheduledService;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * The {@link AlertNoticeDispatchService} is used to scan the database for
+ * {@link AlertNoticeEntity} that are in the {@link NotificationState#PENDING}.
+ * It will then process them through the dispatch system.
+ */
+@Singleton
+public class AlertNoticeDispatchService extends AbstractScheduledService {
+
+  /**
+   * Logger.
+   */
+  private static final Logger LOG = 
LoggerFactory.getLogger(AlertNoticeDispatchService.class);
+
+  /**
+   * Dispatch DAO to query pending {@link AlertNoticeEntity} instances from.
+   */
+  @Inject
+  private AlertDispatchDAO m_dao;
+
+  /**
+   * The factory used to get an {@link NotificationDispatcher} instance to 
submit to the
+   * {@link #m_executor}.
+   */
+  @Inject
+  private DispatchFactory m_dispatchFactory;
+
+  /**
+   * The executor responsible for dispatching.
+   */
+  private final ThreadPoolExecutor m_executor;
+
+  /**
+   * Constructor.
+   */
+  public AlertNoticeDispatchService() {
+    m_executor = new ThreadPoolExecutor(0, 2, 5L,
+        TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(),
+        new AlertDispatchThreadFactory(),
+        new ThreadPoolExecutor.CallerRunsPolicy());
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  protected void runOneIteration() throws Exception {
+    List<AlertNoticeEntity> pending = m_dao.findPendingNotices();
+    if (pending.size() == 0) {
+      return;
+    }
+
+    LOG.info("There are {} pending alert notices about to be dispatched..."
+        + pending.size());
+
+    // combine all histories by target
+    Map<AlertTargetEntity, List<AlertNoticeEntity>> aggregateMap = new 
HashMap<AlertTargetEntity, List<AlertNoticeEntity>>(
+        pending.size());
+
+    for (AlertNoticeEntity notice : pending) {
+      AlertTargetEntity target = notice.getAlertTarget();
+
+      List<AlertNoticeEntity> notices = aggregateMap.get(target);
+      if (null == notices) {
+        notices = new ArrayList<AlertNoticeEntity>();
+        aggregateMap.put(target, notices);
+      }
+
+      notices.add(notice);
+    }
+
+    Set<AlertTargetEntity> targets = aggregateMap.keySet();
+    for (AlertTargetEntity target : targets) {
+      List<AlertNoticeEntity> notices = aggregateMap.get(target);
+      if (null == notices || notices.size() == 0) {
+        continue;
+      }
+
+      Notification notification = new Notification();
+      notification.Subject = target.getTargetName();
+      notification.Body = target.getDescription();
+      notification.Callback = new AlertNoticeDispatchCallback();
+      notification.CallbackIds = new ArrayList<String>(notices.size());
+
+      for (AlertNoticeEntity notice : notices) {
+        notification.CallbackIds.add(notice.getUuid());
+      }
+
+      NotificationDispatcher dispatcher = 
m_dispatchFactory.getDispatcher(target.getNotificationType());
+      DispatchRunnable runnable = new DispatchRunnable(dispatcher, 
notification);
+
+      m_executor.execute(runnable);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   * <p/>
+   * Returns a schedule that starts after 1 minute and runs every minute after
+   * {@link #runOneIteration()} completes.
+   */
+  @Override
+  protected Scheduler scheduler() {
+    return Scheduler.newFixedDelaySchedule(1, 1, TimeUnit.MINUTES);
+  }
+
+  /**
+   * A custom {@link ThreadFactory} for the threads that will handle 
dispatching
+   * {@link AlertNoticeEntity} instances. Threads created will have slightly
+   * reduced priority since {@link AlertEvent} instances are not critical to 
the
+   * system.
+   */
+  private static final class AlertDispatchThreadFactory implements
+      ThreadFactory {
+
+    private static final AtomicInteger s_threadIdPool = new AtomicInteger(1);
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Thread newThread(Runnable r) {
+      Thread thread = new Thread(r, "alert-dispatch-"
+          + s_threadIdPool.getAndIncrement());
+
+      thread.setDaemon(false);
+      thread.setPriority(Thread.NORM_PRIORITY - 1);
+
+      return thread;
+    }
+  }
+
+  /**
+   * The {@link AlertNoticeDispatchCallback} is used to receive a callback from
+   * the dispatch framework and then update the {@link AlertNoticeEntity}
+   * {@link NotificationState}.
+   */
+  private final class AlertNoticeDispatchCallback implements
+      DispatchCallback {
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void onSuccess(List<String> callbackIds) {
+      for (String callbackId : callbackIds) {
+        updateAlertNotice(callbackId, NotificationState.DELIVERED);
+      }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void onFailure(List<String> callbackIds) {
+      for (String callbackId : callbackIds) {
+        updateAlertNotice(callbackId, NotificationState.FAILED);
+      }
+    }
+
+    /**
+     * Updates the {@link AlertNoticeEntity} matching the given UUID with the
+     * specified state.
+     *
+     * @param uuid
+     * @param state
+     */
+    private void updateAlertNotice(String uuid, NotificationState state) {
+      try {
+        AlertNoticeEntity entity = m_dao.findNoticeByUuid(uuid);
+        if (null == entity) {
+          LOG.warn("Unable to find an alert notice with UUID {}", uuid);
+          return;
+        }
+
+        entity.setNotifyState(state);
+        m_dao.merge(entity);
+      } catch (Exception exception) {
+        LOG.error(
+            "Unable to update the alert notice with UUID {} to {}, 
notifications will continue to be sent",
+            uuid, state, exception);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java 
b/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java
index 526104f..a9d126c 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java
@@ -18,6 +18,9 @@
 
 package org.apache.ambari.server.orm;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -353,4 +356,38 @@ public class OrmTestHelper {
     alertDispatchDAO.create(group);
     return alertDispatchDAO.findGroupById(group.getGroupId());
   }
+
+  /**
+   * Creates some default alert groups for various services used in the tests.
+   *
+   * @param clusterId
+   * @return
+   * @throws Exception
+   */
+  @Transactional
+  public List<AlertGroupEntity> createDefaultAlertGroups(long clusterId)
+      throws Exception {
+    AlertGroupEntity hdfsGroup = new AlertGroupEntity();
+    hdfsGroup.setDefault(true);
+    hdfsGroup.setClusterId(clusterId);
+    hdfsGroup.setGroupName("HDFS");
+    hdfsGroup.setServiceName("HDFS");
+
+    AlertGroupEntity oozieGroup = new AlertGroupEntity();
+    oozieGroup.setDefault(true);
+    oozieGroup.setClusterId(clusterId);
+    oozieGroup.setGroupName("OOZIE");
+    oozieGroup.setServiceName("OOZIE");
+
+    alertDispatchDAO.create(hdfsGroup);
+    alertDispatchDAO.create(oozieGroup);
+
+    List<AlertGroupEntity> defaultGroups = 
alertDispatchDAO.findAllGroups(clusterId);
+    assertEquals(2, defaultGroups.size());
+    assertNotNull(alertDispatchDAO.findDefaultServiceGroup("HDFS"));
+    assertNotNull(alertDispatchDAO.findDefaultServiceGroup("OOZIE"));
+
+    return defaultGroups;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java
index f7deef5..cdca3dd 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java
@@ -72,16 +72,19 @@ public class AlertDefinitionDAOTest {
    *
    */
   @Before
-  public void setup() {
+  public void setup() throws Exception {
     injector = Guice.createInjector(new InMemoryDefaultTestModule());
     injector.getInstance(GuiceJpaInitializer.class);
 
+    dispatchDao = injector.getInstance(AlertDispatchDAO.class);
     dao = injector.getInstance(AlertDefinitionDAO.class);
     alertsDao = injector.getInstance(AlertsDAO.class);
-    dispatchDao = injector.getInstance(AlertDispatchDAO.class);
     helper = injector.getInstance(OrmTestHelper.class);
     clusterId = helper.createCluster();
 
+    // create required default groups
+    helper.createDefaultAlertGroups(clusterId);
+
     // create 8 HDFS alerts
     int i = 0;
     for (; i < 8; i++) {
@@ -148,7 +151,6 @@ public class AlertDefinitionDAOTest {
       definition.setSourceType("SCRIPT");
       dao.create(definition);
     }
-
   }
 
   @After
@@ -287,7 +289,6 @@ public class AlertDefinitionDAOTest {
     history.setAlertState(AlertState.OK);
     history.setAlertText("Alert Text");
     history.setAlertTimestamp(calendar.getTimeInMillis());
-//    alertsDao.create(history);
 
     AlertCurrentEntity current = new AlertCurrentEntity();
     current.setAlertHistory(history);
@@ -359,5 +360,7 @@ public class AlertDefinitionDAOTest {
 
     assertNull(clusterDAO.findById(clusterId));
     assertNull(dao.findById(definition.getDefinitionId()));
+
+    assertEquals(0, dispatchDao.findAllGroups(clusterId).size());
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba325d08/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java
index 1103961..2c3a876 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java
@@ -35,7 +35,11 @@ import 
org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.orm.OrmTestHelper;
 import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
 import org.apache.ambari.server.orm.entities.AlertGroupEntity;
+import org.apache.ambari.server.orm.entities.AlertHistoryEntity;
+import org.apache.ambari.server.orm.entities.AlertNoticeEntity;
 import org.apache.ambari.server.orm.entities.AlertTargetEntity;
+import org.apache.ambari.server.state.AlertState;
+import org.apache.ambari.server.state.NotificationState;
 import org.apache.ambari.server.state.alert.Scope;
 import org.junit.After;
 import org.junit.Before;
@@ -54,6 +58,7 @@ public class AlertDispatchDAOTest {
   Injector injector;
   AlertDispatchDAO dao;
   AlertDefinitionDAO definitionDao;
+  AlertsDAO alertsDao;
   OrmTestHelper helper;
 
   /**
@@ -64,6 +69,7 @@ public class AlertDispatchDAOTest {
     injector = Guice.createInjector(new InMemoryDefaultTestModule());
     injector.getInstance(GuiceJpaInitializer.class);
     dao = injector.getInstance(AlertDispatchDAO.class);
+    alertsDao = injector.getInstance(AlertsDAO.class);
     definitionDao = injector.getInstance(AlertDefinitionDAO.class);
     helper = injector.getInstance(OrmTestHelper.class);
 
@@ -352,7 +358,7 @@ public class AlertDispatchDAOTest {
 
   /**
    * Tests finding groups by a definition ID that they are associatd with.
-   * 
+   *
    * @throws Exception
    */
   @Test
@@ -380,6 +386,38 @@ public class AlertDispatchDAOTest {
   }
 
   /**
+   * @throws Exception
+   */
+  @Test
+  public void testFindNoticeByUuid() throws Exception {
+    List<AlertDefinitionEntity> definitions = createDefinitions();
+    AlertDefinitionEntity definition = definitions.get(0);
+
+    AlertHistoryEntity history = new AlertHistoryEntity();
+    history.setServiceName(definition.getServiceName());
+    history.setClusterId(clusterId);
+    history.setAlertDefinition(definition);
+    history.setAlertLabel("Label");
+    history.setAlertState(AlertState.OK);
+    history.setAlertText("Alert Text");
+    history.setAlertTimestamp(System.currentTimeMillis());
+    alertsDao.create(history);
+
+    AlertTargetEntity target = helper.createAlertTarget();
+
+    AlertNoticeEntity notice = new AlertNoticeEntity();
+    notice.setUuid(UUID.randomUUID().toString());
+    notice.setAlertTarget(target);
+    notice.setAlertHistory(history);
+    notice.setNotifyState(NotificationState.PENDING);
+    dao.create(notice);
+
+    AlertNoticeEntity actual = dao.findNoticeByUuid(notice.getUuid());
+    assertEquals(notice.getNotificationId(), actual.getNotificationId());
+    assertNull(dao.findNoticeByUuid("DEADBEEF"));
+  }
+
+  /**
    * @return
    */
   private List<AlertDefinitionEntity> createDefinitions() throws Exception {

Reply via email to