Repository: metron
Updated Branches:
  refs/heads/master 37e8ca7e2 -> 7a3de6737


METRON-1127 Add ability to escalate alerts for external consumption (merrimanr) 
closes apache/metron#711


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/7a3de673
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/7a3de673
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/7a3de673

Branch: refs/heads/master
Commit: 7a3de6737b964abbdcdc876695b50f29bf5d531e
Parents: 37e8ca7
Author: merrimanr <[email protected]>
Authored: Tue Sep 5 16:15:32 2017 -0500
Committer: merrimanr <[email protected]>
Committed: Tue Sep 5 16:15:32 2017 -0500

----------------------------------------------------------------------
 .../CURRENT/configuration/metron-rest-env.xml   |   6 +
 .../package/scripts/params/params_linux.py      |   1 +
 .../CURRENT/package/scripts/rest_commands.py    |   7 +-
 .../CURRENT/package/scripts/rest_master.py      |   1 +
 .../METRON/CURRENT/package/templates/metron.j2  |   1 +
 .../METRON/CURRENT/themes/metron_theme.json     |  10 ++
 metron-interface/metron-rest/README.md          |   8 ++
 .../src/main/config/rest_application.yml        |   2 +
 .../apache/metron/rest/MetronRestConstants.java |   1 +
 .../apache/metron/rest/config/KafkaConfig.java  |  16 +++
 .../rest/controller/AlertsController.java       |  55 +++++++++
 .../metron/rest/controller/KafkaController.java |  14 ++-
 .../metron/rest/service/AlertService.java       |  30 +++++
 .../metron/rest/service/KafkaService.java       |   5 +-
 .../rest/service/impl/AlertServiceImpl.java     |  62 ++++++++++
 .../rest/service/impl/KafkaServiceImpl.java     |  25 +++-
 .../src/main/resources/application.yml          |   7 +-
 .../apache/metron/rest/config/TestConfig.java   |  33 +++--
 .../AlertControllerIntegrationTest.java         |  86 +++++++++++++
 .../KafkaControllerIntegrationTest.java         | 123 ++++++++++---------
 .../rest/service/impl/AlertServiceImplTest.java |  69 +++++++++++
 .../rest/service/impl/KafkaServiceImplTest.java |  55 ++++++---
 22 files changed, 520 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml
index c68f5b2..9c11123 100644
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml
@@ -109,4 +109,10 @@
             <empty-value-valid>true</empty-value-valid>
         </value-attributes>
     </property>
+    <property>
+        <name>metron_escalation_topic</name>
+        <description>Escalated alerts will be produced to this 
topic</description>
+        <value>escalation</value>
+        <display-name>Metron escalation topic</display-name>
+    </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
index 7855d6c..78d253c 100755
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
@@ -58,6 +58,7 @@ metron_jdbc_client_path = 
config['configurations']['metron-rest-env']['metron_jd
 metron_temp_grok_path = 
config['configurations']['metron-rest-env']['metron_temp_grok_path']
 metron_default_grok_path = 
config['configurations']['metron-rest-env']['metron_default_grok_path']
 metron_spring_options = 
config['configurations']['metron-rest-env']['metron_spring_options']
+metron_escalation_topic = 
config['configurations']['metron-rest-env']['metron_escalation_topic']
 metron_config_path = metron_home + '/config'
 metron_zookeeper_config_dir = status_params.metron_zookeeper_config_dir
 metron_zookeeper_config_path = status_params.metron_zookeeper_config_path

http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py
index fe5fa6e..cf29a28 100755
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py
@@ -45,10 +45,15 @@ class RestCommands:
              owner=self.__params.metron_user,
              mode=0755)
 
+    def init_kafka_topics(self):
+      Logger.info('Creating Kafka topics for rest')
+      topics = [self.__params.metron_escalation_topic]
+      metron_service.init_kafka_topics(self.__params, topics)
+
     def init_kafka_acls(self):
         Logger.info('Creating Kafka ACLs for rest')
         # The following topics must be permissioned for the rest application 
list operation
-        topics = [self.__params.ambari_kafka_service_check_topic, 
self.__params.consumer_offsets_topic]
+        topics = [self.__params.ambari_kafka_service_check_topic, 
self.__params.consumer_offsets_topic, self.__params.metron_escalation_topic]
         metron_service.init_kafka_acls(self.__params, topics, ['metron-rest'])
 
     def start_rest_application(self):

http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_master.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_master.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_master.py
index 9331759..2f419df 100755
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_master.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_master.py
@@ -44,6 +44,7 @@ class RestMaster(Script):
              )
 
         commands = RestCommands(params)
+        commands.init_kafka_topics()
         if params.security_enabled and not commands.is_acl_configured():
             commands.init_kafka_acls()
             commands.set_acl_configured()

http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2
index 7233b54..dd37946 100644
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2
@@ -40,3 +40,4 @@ SECURITY_ENABLED={{security_enabled|lower}}
 {% endif %}
 KAFKA_SECURITY_PROTOCOL="{{kafka_security_protocol}}"
 PARSER_TOPOLOGY_OPTIONS="/home/{{metron_user}}/.storm/storm.config"
+METRON_ESCALATION_TOPIC="{{metron_escalation_topic}}"

http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
index 748feb8..207051d 100644
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
@@ -617,6 +617,10 @@
           "subsection-name": "subsection-rest"
         },
         {
+          "config": "metron-rest-env/metron_escalation_topic",
+          "subsection-name": "subsection-rest"
+        },
+        {
           "config": "metron-management-ui-env/metron_management_ui_port",
           "subsection-name": "subsection-management-ui"
         }
@@ -1041,6 +1045,12 @@
         }
       },
       {
+        "config": "metron-rest-env/metron_escalation_topic",
+        "widget": {
+          "type": "text-field"
+        }
+      },
+      {
         "config": "metron-management-ui-env/metron_management_ui_port",
         "widget": {
           "type": "text-field"

http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/README.md
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/README.md 
b/metron-interface/metron-rest/README.md
index b76712b..522f549 100644
--- a/metron-interface/metron-rest/README.md
+++ b/metron-interface/metron-rest/README.md
@@ -184,6 +184,7 @@ Request and Response objects are JSON formatted.  The JSON 
schemas are available
 
 |            |
 | ---------- |
+| [ `POST /api/v1/alert/escalate`](#get-apiv1alertescalate)|
 | [ `GET /api/v1/global/config`](#get-apiv1globalconfig)|
 | [ `DELETE /api/v1/global/config`](#delete-apiv1globalconfig)|
 | [ `POST /api/v1/global/config`](#post-apiv1globalconfig)|
@@ -247,6 +248,13 @@ Request and Response objects are JSON formatted.  The JSON 
schemas are available
 | [ `PUT /api/v1/update/replace`](#patch-apiv1updatereplace)|
 | [ `GET /api/v1/user`](#get-apiv1user)|
 
+### `POST /api/v1/alert/escalate`
+  * Description: Escalates a list of alerts by producing it to the Kafka 
escalate topic
+  * Input:
+    * alerts - The alerts to be escalated
+  * Returns:
+    * 200 - Alerts were escalated
+
 ### `GET /api/v1/global/config`
   * Description: Retrieves the current Global Config from Zookeeper
   * Returns:

http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/main/config/rest_application.yml
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/config/rest_application.yml 
b/metron-interface/metron-rest/src/main/config/rest_application.yml
index 6d12e95..0c17580 100644
--- a/metron-interface/metron-rest/src/main/config/rest_application.yml
+++ b/metron-interface/metron-rest/src/main/config/rest_application.yml
@@ -30,6 +30,8 @@ kafka:
     url: ${BROKERLIST}
   security:
     protocol: ${KAFKA_SECURITY_PROTOCOL}
+  topics:
+    escalation: ${METRON_ESCALATION_TOPIC}
 
 grok:
   path:

http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
index 11310d4..7c9cdac 100644
--- 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
@@ -50,6 +50,7 @@ public class MetronRestConstants {
   public static final String CURATOR_MAX_RETRIES = "curator.max.retries";
 
   public static final String KAFKA_BROKER_URL_SPRING_PROPERTY = 
"kafka.broker.url";
+  public static final String KAFKA_TOPICS_ESCALATION_PROPERTY = 
"kafka.topics.escalation";
 
   public static final String KERBEROS_ENABLED_SPRING_PROPERTY = 
"kerberos.enabled";
   public static final String KERBEROS_PRINCIPLE_SPRING_PROPERTY = 
"kerberos.principal";

http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
index 247264b..a2abbeb 100644
--- 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
@@ -21,6 +21,7 @@ import kafka.admin.AdminUtils$;
 import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.metron.rest.MetronRestConstants;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Bean;
@@ -100,6 +101,21 @@ public class KafkaConfig {
     return new DefaultKafkaConsumerFactory<>(consumerProperties());
   }
 
+  @Bean
+  public Map<String, Object> producerProperties() {
+    Map<String, Object> producerConfig = new HashMap<>();
+    producerConfig.put("bootstrap.servers", 
environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY));
+    producerConfig.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+    producerConfig.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+    producerConfig.put("request.required.acks", 1);
+    return producerConfig;
+  }
+
+  @Bean
+  public KafkaProducer kafkaProducer() {
+    return new KafkaProducer<>(producerProperties());
+  }
+
   /**
    * Create a bean for {@link AdminUtils$}. This is primarily done to make 
testing a bit easier.
    *

http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/AlertsController.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/AlertsController.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/AlertsController.java
new file mode 100644
index 0000000..6f028a1
--- /dev/null
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/AlertsController.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.controller;
+
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import java.util.List;
+import java.util.Map;
+import org.apache.metron.rest.RestException;
+import org.apache.metron.rest.service.AlertService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * The API resource that is used for alert-related operations.
+ */
+@RestController
+@RequestMapping("/api/v1/alert")
+public class AlertsController {
+
+  /**
+   * Service used to interact with alerts.
+   */
+  @Autowired
+  private AlertService alertService;
+
+  @ApiOperation(value = "Escalates a list of alerts by producing it to the 
Kafka escalate topic")
+  @ApiResponse(message = "Alerts were escalated", code = 200)
+  @RequestMapping(value = "/escalate", method = RequestMethod.POST)
+  ResponseEntity<Void> escalate(final @ApiParam(name = "alerts", value = "The 
alerts to be escalated", required = true) @RequestBody List<Map<String, 
Object>> alerts) throws RestException {
+    alertService.escalateAlerts(alerts);
+    return new ResponseEntity<>(HttpStatus.OK);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java
index 2787504..d057ac4 100644
--- 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java
@@ -21,6 +21,7 @@ import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
+import java.util.Set;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.model.KafkaTopic;
 import org.apache.metron.rest.service.KafkaService;
@@ -33,8 +34,6 @@ import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.RestController;
 
-import java.util.Set;
-
 /**
  * The API resource that is use to interact with Kafka.
  */
@@ -109,4 +108,15 @@ public class KafkaController {
       return new ResponseEntity<>(HttpStatus.NOT_FOUND);
     }
   }
+
+  @ApiOperation(value = "Produces a message to a Kafka topic")
+  @ApiResponses(value = {
+      @ApiResponse(message = "Message produced successfully", code = 200)
+  })
+  @RequestMapping(value = "/topic/{name}/produce", method = RequestMethod.POST)
+  ResponseEntity<String> produce(final @ApiParam(name = "name", value = "Kafka 
topic name", required = true) @PathVariable String name,
+      final @ApiParam(name = "message", value = "Message", required = true) 
@RequestBody String message) throws RestException {
+    kafkaService.produceMessage(name, message);
+    return new ResponseEntity<>(HttpStatus.OK);
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/AlertService.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/AlertService.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/AlertService.java
new file mode 100644
index 0000000..9668b7c
--- /dev/null
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/AlertService.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.metron.rest.RestException;
+
+/**
+ * This is a set of operations created to interact with alerts.
+ */
+public interface AlertService {
+
+  void escalateAlerts(List<Map<String, Object>> alerts) throws RestException;
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java
index bee00f2..da3b226 100644
--- 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java
@@ -17,11 +17,10 @@
  */
 package org.apache.metron.rest.service;
 
+import java.util.Set;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.model.KafkaTopic;
 
-import java.util.Set;
-
 /**
  * This is a set of operations created to interact with Kafka.
  */
@@ -67,4 +66,6 @@ public interface KafkaService {
    * @return A string representation of the sample message retrieved. If topic 
doesn't exist null will be returned.
    */
   String getSampleMessage(String topic);
+
+  void produceMessage(String topic, String message) throws RestException;
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/AlertServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/AlertServiceImpl.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/AlertServiceImpl.java
new file mode 100644
index 0000000..46370eb
--- /dev/null
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/AlertServiceImpl.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service.impl;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.util.List;
+import java.util.Map;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.rest.MetronRestConstants;
+import org.apache.metron.rest.RestException;
+import org.apache.metron.rest.service.AlertService;
+import org.apache.metron.rest.service.KafkaService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.env.Environment;
+import org.springframework.stereotype.Service;
+
+/**
+ * The default service layer implementation of {@link AlertService}.
+ *
+ * @see AlertService
+ */
+@Service
+public class AlertServiceImpl implements AlertService {
+
+  private Environment environment;
+  private final KafkaService kafkaService;
+
+  @Autowired
+  public AlertServiceImpl(final KafkaService kafkaService,
+                          final Environment environment) {
+    this.kafkaService = kafkaService;
+    this.environment = environment;
+  }
+
+  @Override
+  public void escalateAlerts(List<Map<String, Object>> alerts) throws 
RestException {
+    try {
+      for (Map<String, Object> alert : alerts) {
+        kafkaService.produceMessage(
+            
environment.getProperty(MetronRestConstants.KAFKA_TOPICS_ESCALATION_PROPERTY),
+            JSONUtils.INSTANCE.toJSON(alert, false));
+      }
+    } catch (JsonProcessingException e) {
+      throw new RestException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
index 61e2618..4f232fb 100644
--- 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
@@ -17,6 +17,11 @@
  */
 package org.apache.metron.rest.service.impl;
 
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 import kafka.admin.AdminOperationException;
 import kafka.admin.AdminUtils$;
 import kafka.admin.RackAwareMode;
@@ -24,21 +29,18 @@ import kafka.utils.ZkUtils;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.model.KafkaTopic;
 import org.apache.metron.rest.service.KafkaService;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.env.Environment;
 import org.springframework.kafka.core.ConsumerFactory;
 import org.springframework.stereotype.Service;
 
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
 /**
  * The default service layer implementation of {@link KafkaService}.
  *
@@ -53,19 +55,26 @@ public class KafkaServiceImpl implements KafkaService {
 
   private final ZkUtils zkUtils;
   private final ConsumerFactory<String, String> kafkaConsumerFactory;
+  private final KafkaProducer<String, String> kafkaProducer;
   private final AdminUtils$ adminUtils;
 
+  @Autowired
+  private Environment environment;
+
   /**
    * @param zkUtils              A utility class used to interact with 
ZooKeeper.
    * @param kafkaConsumerFactory A class used to create {@link KafkaConsumer} 
in order to interact with Kafka.
+   * @param kafkaProducer        A class used to produce messages to Kafka.
    * @param adminUtils           A utility class used to do administration 
operations on Kafka.
    */
   @Autowired
   public KafkaServiceImpl(final ZkUtils zkUtils,
                           final ConsumerFactory<String, String> 
kafkaConsumerFactory,
+                          final KafkaProducer<String, String> kafkaProducer,
                           final AdminUtils$ adminUtils) {
     this.zkUtils = zkUtils;
     this.kafkaConsumerFactory = kafkaConsumerFactory;
+    this.kafkaProducer = kafkaProducer;
     this.adminUtils = adminUtils;
   }
 
@@ -141,4 +150,8 @@ public class KafkaServiceImpl implements KafkaService {
     return message;
   }
 
+  @Override
+  public void produceMessage(String topic, String message) throws 
RestException {
+    kafkaProducer.send(new ProducerRecord<>(topic, message));
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/main/resources/application.yml
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/resources/application.yml 
b/metron-interface/metron-rest/src/main/resources/application.yml
index 4aff769..cf9218b 100644
--- a/metron-interface/metron-rest/src/main/resources/application.yml
+++ b/metron-interface/metron-rest/src/main/resources/application.yml
@@ -35,6 +35,10 @@ zookeeper:
       session: 10000
       connection: 10000
 
+kafka:
+  topics:
+    escalation: escalation
+
 curator:
   sleep:
     time: 1000
@@ -48,4 +52,5 @@ search:
 index:
   dao:
   # By default, we use the ElasticsearchDao and HBaseDao for backing updates.
-     impl: 
org.apache.metron.elasticsearch.dao.ElasticsearchDao,org.apache.metron.indexing.dao.HBaseDao
\ No newline at end of file
+     impl: 
org.apache.metron.elasticsearch.dao.ElasticsearchDao,org.apache.metron.indexing.dao.HBaseDao
+

http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
index e9bb510..ea64fbe 100644
--- 
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
+++ 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
@@ -17,6 +17,14 @@
  */
 package org.apache.metron.rest.config;
 
+import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
 import kafka.admin.AdminUtils$;
 import kafka.utils.ZKStringSerializer$;
 import kafka.utils.ZkUtils;
@@ -26,6 +34,7 @@ import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
 import org.apache.metron.hbase.mock.MockHBaseTableProvider;
 import org.apache.metron.integration.ComponentRunner;
@@ -42,15 +51,6 @@ import org.springframework.kafka.core.ConsumerFactory;
 import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
 import org.springframework.web.client.RestTemplate;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
-
 @Configuration
 @Profile(TEST_PROFILE)
 public class TestConfig {
@@ -133,6 +133,21 @@ public class TestConfig {
   }
 
   @Bean
+  public Map<String, Object> producerProperties(KafkaComponent 
kafkaWithZKComponent) {
+    Map<String, Object> producerConfig = new HashMap<>();
+    producerConfig.put("bootstrap.servers", 
kafkaWithZKComponent.getBrokerList());
+    producerConfig.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+    producerConfig.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+    producerConfig.put("request.required.acks", 1);
+    return producerConfig;
+  }
+
+  @Bean
+  public KafkaProducer kafkaProducer(KafkaComponent kafkaWithZKComponent) {
+    return new KafkaProducer<>(producerProperties(kafkaWithZKComponent));
+  }
+
+  @Bean
   public StormCLIWrapper stormCLIClientWrapper() {
     return new MockStormCLIClientWrapper();
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/AlertControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/AlertControllerIntegrationTest.java
 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/AlertControllerIntegrationTest.java
new file mode 100644
index 0000000..be320fc
--- /dev/null
+++ 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/AlertControllerIntegrationTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.controller;
+
+import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
+import static 
org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf;
+import static 
org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic;
+import static 
org.springframework.security.test.web.servlet.setup.SecurityMockMvcConfigurers.springSecurity;
+import static 
org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
+import static 
org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.http.MediaType;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.test.web.servlet.MockMvc;
+import org.springframework.test.web.servlet.setup.MockMvcBuilders;
+import org.springframework.web.context.WebApplicationContext;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(webEnvironment= SpringBootTest.WebEnvironment.RANDOM_PORT)
+@ActiveProfiles(TEST_PROFILE)
+public class AlertControllerIntegrationTest {
+
+    /**
+     [
+     {
+     "is_alert": true,
+     "field": "value1"
+     },
+     {
+     "is_alert": true,
+     "field": "value2"
+     }
+     ]
+     */
+    @Multiline
+    public static String alerts;
+
+    @Autowired
+    private WebApplicationContext wac;
+
+    private MockMvc mockMvc;
+
+    private String alertUrl = "/api/v1/alert";
+    private String user = "user";
+    private String password = "password";
+
+    @Before
+    public void setup() throws Exception {
+        this.mockMvc = 
MockMvcBuilders.webAppContextSetup(this.wac).apply(springSecurity()).build();
+    }
+
+    @Test
+    public void testSecurity() throws Exception {
+        this.mockMvc.perform(post(alertUrl + 
"/escalate").with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(alerts))
+                .andExpect(status().isUnauthorized());
+    }
+
+    @Test
+    public void test() throws Exception {
+        this.mockMvc.perform(post(alertUrl + 
"/escalate").with(httpBasic(user,password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(alerts))
+                .andExpect(status().isOk());
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/KafkaControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/KafkaControllerIntegrationTest.java
 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/KafkaControllerIntegrationTest.java
index 9e6d408..5719a41 100644
--- 
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/KafkaControllerIntegrationTest.java
+++ 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/KafkaControllerIntegrationTest.java
@@ -17,15 +17,24 @@
  */
 package org.apache.metron.rest.controller;
 
+import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
+import static 
org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf;
+import static 
org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic;
+import static 
org.springframework.security.test.web.servlet.setup.SecurityMockMvcConfigurers.springSecurity;
+import static 
org.springframework.test.web.servlet.request.MockMvcRequestBuilders.delete;
+import static 
org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
+import static 
org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
+import static 
org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
+import static 
org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
+import static 
org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+
 import kafka.common.TopicAlreadyMarkedForDeletionException;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.integration.ComponentRunner;
 import org.apache.metron.integration.UnableToStartException;
 import org.apache.metron.integration.components.KafkaComponent;
-import org.apache.metron.rest.generator.SampleDataGenerator;
 import org.apache.metron.rest.service.KafkaService;
 import org.hamcrest.Matchers;
-import org.json.simple.parser.ParseException;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -41,19 +50,6 @@ import 
org.springframework.test.web.servlet.setup.MockMvcBuilders;
 import org.springframework.web.context.WebApplicationContext;
 import org.springframework.web.util.NestedServletException;
 
-import java.io.IOException;
-
-import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
-import static 
org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf;
-import static 
org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic;
-import static 
org.springframework.security.test.web.servlet.setup.SecurityMockMvcConfigurers.springSecurity;
-import static 
org.springframework.test.web.servlet.request.MockMvcRequestBuilders.delete;
-import static 
org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
-import static 
org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
-import static 
org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
-import static 
org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
-import static 
org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
-
 @RunWith(SpringRunner.class)
 @SpringBootTest(webEnvironment= SpringBootTest.WebEnvironment.RANDOM_PORT)
 @ActiveProfiles(TEST_PROFILE)
@@ -99,35 +95,6 @@ public class KafkaControllerIntegrationTest {
     }
   }
 
-  class SampleDataRunner implements Runnable {
-
-    private boolean stop = false;
-    private String path = 
"../../metron-platform/metron-integration-test/src/main/sample/data/bro/raw/BroExampleOutput";
-
-    @Override
-    public void run() {
-      SampleDataGenerator broSampleDataGenerator = new SampleDataGenerator();
-      
broSampleDataGenerator.setBrokerUrl(kafkaWithZKComponent.getBrokerList());
-      broSampleDataGenerator.setNum(1);
-      broSampleDataGenerator.setSelectedSensorType("bro");
-      broSampleDataGenerator.setDelay(0);
-      try {
-        while(!stop) {
-          broSampleDataGenerator.generateSampleData(path);
-        }
-      } catch (ParseException|IOException e) {
-        throw new IllegalStateException("Caught an error generating sample 
data", e);
-      }
-    }
-
-    public void stop() {
-      stop = true;
-    }
-  }
-
-  private SampleDataRunner sampleDataRunner =  new SampleDataRunner();
-  private Thread sampleDataThread = new Thread(sampleDataRunner);
-
   /**
    {
    "name": "bro",
@@ -139,6 +106,30 @@ public class KafkaControllerIntegrationTest {
   @Multiline
   public static String broTopic;
 
+  /**
+   * {
+   *   "type":"message1"
+   * }
+   */
+  @Multiline
+  public static String message1;
+
+  /**
+   * {
+   *   "type":"message2"
+   * }
+   */
+  @Multiline
+  public static String message2;
+
+  /**
+   * {
+   *   "type":"message3"
+   * }
+   */
+  @Multiline
+  public static String message3;
+
   @Autowired
   private WebApplicationContext wac;
 
@@ -179,6 +170,9 @@ public class KafkaControllerIntegrationTest {
     this.mockMvc.perform(get(kafkaUrl + "/topic/bro/sample"))
             .andExpect(status().isUnauthorized());
 
+    this.mockMvc.perform(get(kafkaUrl + "/topic/bro/produce"))
+        .andExpect(status().isUnauthorized());
+
     this.mockMvc.perform(delete(kafkaUrl + "/topic/bro").with(csrf()))
             .andExpect(status().isUnauthorized());
   }
@@ -200,7 +194,7 @@ public class KafkaControllerIntegrationTest {
             .andExpect(jsonPath("$.numPartitions").value(1))
             .andExpect(jsonPath("$.replicationFactor").value(1))
     );
-    sampleDataThread.start();
+
     Thread.sleep(1000);
     testAndRetry(() ->
     this.mockMvc.perform(get(kafkaUrl + 
"/topic/bro").with(httpBasic(user,password)))
@@ -222,24 +216,42 @@ public class KafkaControllerIntegrationTest {
             .andExpect(jsonPath("$", Matchers.hasItem("bro")))
     );
 
-    for(int i = 0;i < KAFKA_RETRY;++i) {
-      MvcResult result = this.mockMvc.perform(get(kafkaUrl + 
"/topic/bro/sample").with(httpBasic(user, password)))
-              .andReturn();
-      if(result.getResponse().getStatus() == 200) {
-        break;
-      }
-      Thread.sleep(1000);
-    }
+    testAndRetry(() ->
+        this.mockMvc.perform(post(kafkaUrl + 
"/topic/bro/produce").with(httpBasic(user,password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(message1))
+            .andExpect(status().isOk())
+    );
+    testAndRetry(() ->
+        this.mockMvc.perform(get(kafkaUrl + 
"/topic/bro/sample").with(httpBasic(user,password)))
+            .andExpect(status().isOk())
+            
.andExpect(content().contentType(MediaType.parseMediaType("text/plain;charset=UTF-8")))
+            .andExpect(jsonPath("$.type").value("message1"))
+    );
 
     testAndRetry(() ->
-    this.mockMvc.perform(get(kafkaUrl + 
"/topic/bro/sample").with(httpBasic(user,password)))
+        this.mockMvc.perform(post(kafkaUrl + 
"/topic/bro/produce").with(httpBasic(user,password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(message2))
+            .andExpect(status().isOk())
+    );
+    testAndRetry(() ->
+        this.mockMvc.perform(get(kafkaUrl + 
"/topic/bro/sample").with(httpBasic(user,password)))
             .andExpect(status().isOk())
             
.andExpect(content().contentType(MediaType.parseMediaType("text/plain;charset=UTF-8")))
-            .andExpect(jsonPath("$").isNotEmpty())
+            .andExpect(jsonPath("$.type").value("message2"))
+    );
+
+    testAndRetry(() ->
+        this.mockMvc.perform(post(kafkaUrl + 
"/topic/bro/produce").with(httpBasic(user,password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(message3))
+            .andExpect(status().isOk())
+    );
+    testAndRetry(() ->
+        this.mockMvc.perform(get(kafkaUrl + 
"/topic/bro/sample").with(httpBasic(user,password)))
+            .andExpect(status().isOk())
+            
.andExpect(content().contentType(MediaType.parseMediaType("text/plain;charset=UTF-8")))
+            .andExpect(jsonPath("$.type").value("message3"))
     );
 
     this.mockMvc.perform(get(kafkaUrl + 
"/topic/someTopic/sample").with(httpBasic(user,password)))
             .andExpect(status().isNotFound());
+
     boolean deleted = false;
     for(int i = 0;i < KAFKA_RETRY;++i) {
       try {
@@ -270,7 +282,6 @@ public class KafkaControllerIntegrationTest {
 
   @After
   public void tearDown() {
-    sampleDataRunner.stop();
     runner.stop();
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/AlertServiceImplTest.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/AlertServiceImplTest.java
 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/AlertServiceImplTest.java
new file mode 100644
index 0000000..c55e0a5
--- /dev/null
+++ 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/AlertServiceImplTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service.impl;
+
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.metron.rest.MetronRestConstants;
+import org.apache.metron.rest.service.AlertService;
+import org.apache.metron.rest.service.KafkaService;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.core.env.Environment;
+
+@SuppressWarnings("unchecked")
+public class AlertServiceImplTest {
+
+  private KafkaService kafkaService;
+  private Environment environment;
+  private AlertService alertService;
+
+  @SuppressWarnings("unchecked")
+  @Before
+  public void setUp() throws Exception {
+    kafkaService = mock(KafkaService.class);
+    environment = mock(Environment.class);
+    alertService = new AlertServiceImpl(kafkaService, environment);
+  }
+
+  @Test
+  public void produceMessageShouldProperlyProduceMessage() throws Exception {
+    String escalationTopic = "escalation";
+    final Map<String, Object> message1 = new HashMap<>();
+    message1.put("field", "value1");
+    final Map<String, Object> message2 = new HashMap<>();
+    message2.put("field", "value2");
+    List<Map<String, Object>> messages = Arrays.asList(message1, message2);
+    
when(environment.getProperty(MetronRestConstants.KAFKA_TOPICS_ESCALATION_PROPERTY)).thenReturn(escalationTopic);
+
+    alertService.escalateAlerts(messages);
+
+    String expectedMessage1 = "{\"field\":\"value1\"}";
+    String expectedMessage2 = "{\"field\":\"value2\"}";
+    verify(kafkaService).produceMessage("escalation", expectedMessage1);
+    verify(kafkaService).produceMessage("escalation", expectedMessage2);
+    verifyZeroInteractions(kafkaService);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
index c92feab..1f300ea 100644
--- 
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
+++ 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
@@ -17,9 +17,27 @@
  */
 package org.apache.metron.rest.service.impl;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
 import kafka.admin.AdminOperationException;
 import kafka.admin.AdminUtils$;
 import kafka.admin.RackAwareMode;
@@ -27,6 +45,8 @@ import kafka.utils.ZkUtils;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -43,25 +63,6 @@ import 
org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.springframework.kafka.core.ConsumerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.mock;
-
 @SuppressWarnings("unchecked")
 @RunWith(PowerMockRunner.class)
 @PowerMockIgnore("javax.management.*") // resolve classloader conflict
@@ -72,6 +73,7 @@ public class KafkaServiceImplTest {
 
   private ZkUtils zkUtils;
   private KafkaConsumer<String, String> kafkaConsumer;
+  private KafkaProducer<String, String> kafkaProducer;
   private ConsumerFactory<String, String> kafkaConsumerFactory;
   private AdminUtils$ adminUtils;
 
@@ -90,11 +92,12 @@ public class KafkaServiceImplTest {
     zkUtils = mock(ZkUtils.class);
     kafkaConsumerFactory = mock(ConsumerFactory.class);
     kafkaConsumer = mock(KafkaConsumer.class);
+    kafkaProducer = mock(KafkaProducer.class);
     adminUtils = mock(AdminUtils$.class);
 
     when(kafkaConsumerFactory.createConsumer()).thenReturn(kafkaConsumer);
 
-    kafkaService = new KafkaServiceImpl(zkUtils, kafkaConsumerFactory, 
adminUtils);
+    kafkaService = new KafkaServiceImpl(zkUtils, kafkaConsumerFactory, 
kafkaProducer, adminUtils);
   }
 
   @Test
@@ -304,4 +307,16 @@ public class KafkaServiceImplTest {
 
     verifyZeroInteractions(zkUtils, adminUtils);
   }
+
+  @Test
+  public void produceMessageShouldProperlyProduceMessage() throws Exception {
+    final String topicName = "t";
+    final String message = "{\"field\":\"value\"}";
+
+    kafkaService.produceMessage(topicName, message);
+
+    String expectedMessage = "{\"field\":\"value\"}";
+    verify(kafkaProducer).send(new ProducerRecord<>(topicName, 
expectedMessage));
+    verifyZeroInteractions(kafkaProducer);
+  }
 }

Reply via email to