Repository: metron Updated Branches: refs/heads/master 37e8ca7e2 -> 7a3de6737
METRON-1127 Add ability to escalate alerts for external consumption (merrimanr) closes apache/metron#711 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/7a3de673 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/7a3de673 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/7a3de673 Branch: refs/heads/master Commit: 7a3de6737b964abbdcdc876695b50f29bf5d531e Parents: 37e8ca7 Author: merrimanr <[email protected]> Authored: Tue Sep 5 16:15:32 2017 -0500 Committer: merrimanr <[email protected]> Committed: Tue Sep 5 16:15:32 2017 -0500 ---------------------------------------------------------------------- .../CURRENT/configuration/metron-rest-env.xml | 6 + .../package/scripts/params/params_linux.py | 1 + .../CURRENT/package/scripts/rest_commands.py | 7 +- .../CURRENT/package/scripts/rest_master.py | 1 + .../METRON/CURRENT/package/templates/metron.j2 | 1 + .../METRON/CURRENT/themes/metron_theme.json | 10 ++ metron-interface/metron-rest/README.md | 8 ++ .../src/main/config/rest_application.yml | 2 + .../apache/metron/rest/MetronRestConstants.java | 1 + .../apache/metron/rest/config/KafkaConfig.java | 16 +++ .../rest/controller/AlertsController.java | 55 +++++++++ .../metron/rest/controller/KafkaController.java | 14 ++- .../metron/rest/service/AlertService.java | 30 +++++ .../metron/rest/service/KafkaService.java | 5 +- .../rest/service/impl/AlertServiceImpl.java | 62 ++++++++++ .../rest/service/impl/KafkaServiceImpl.java | 25 +++- .../src/main/resources/application.yml | 7 +- .../apache/metron/rest/config/TestConfig.java | 33 +++-- .../AlertControllerIntegrationTest.java | 86 +++++++++++++ .../KafkaControllerIntegrationTest.java | 123 ++++++++++--------- .../rest/service/impl/AlertServiceImplTest.java | 69 +++++++++++ .../rest/service/impl/KafkaServiceImplTest.java | 55 ++++++--- 22 files changed, 520 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml index c68f5b2..9c11123 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml @@ -109,4 +109,10 @@ <empty-value-valid>true</empty-value-valid> </value-attributes> </property> + <property> + <name>metron_escalation_topic</name> + <description>Escalated alerts will be produced to this topic</description> + <value>escalation</value> + <display-name>Metron escalation topic</display-name> + </property> </configuration> http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py index 7855d6c..78d253c 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py @@ -58,6 +58,7 @@ metron_jdbc_client_path = config['configurations']['metron-rest-env']['metron_jd metron_temp_grok_path = config['configurations']['metron-rest-env']['metron_temp_grok_path'] metron_default_grok_path = config['configurations']['metron-rest-env']['metron_default_grok_path'] metron_spring_options = config['configurations']['metron-rest-env']['metron_spring_options'] +metron_escalation_topic = config['configurations']['metron-rest-env']['metron_escalation_topic'] metron_config_path = metron_home + '/config' metron_zookeeper_config_dir = status_params.metron_zookeeper_config_dir metron_zookeeper_config_path = status_params.metron_zookeeper_config_path http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py index fe5fa6e..cf29a28 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py @@ -45,10 +45,15 @@ class RestCommands: owner=self.__params.metron_user, mode=0755) + def init_kafka_topics(self): + Logger.info('Creating Kafka topics for rest') + topics = [self.__params.metron_escalation_topic] + metron_service.init_kafka_topics(self.__params, topics) + def init_kafka_acls(self): Logger.info('Creating Kafka ACLs for rest') # The following topics must be permissioned for the rest application list operation - topics = [self.__params.ambari_kafka_service_check_topic, self.__params.consumer_offsets_topic] + topics = [self.__params.ambari_kafka_service_check_topic, self.__params.consumer_offsets_topic, self.__params.metron_escalation_topic] metron_service.init_kafka_acls(self.__params, topics, ['metron-rest']) def start_rest_application(self): http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_master.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_master.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_master.py index 9331759..2f419df 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_master.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_master.py @@ -44,6 +44,7 @@ class RestMaster(Script): ) commands = RestCommands(params) + commands.init_kafka_topics() if params.security_enabled and not commands.is_acl_configured(): commands.init_kafka_acls() commands.set_acl_configured() http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2 ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2 b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2 index 7233b54..dd37946 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2 +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2 @@ -40,3 +40,4 @@ SECURITY_ENABLED={{security_enabled|lower}} {% endif %} KAFKA_SECURITY_PROTOCOL="{{kafka_security_protocol}}" PARSER_TOPOLOGY_OPTIONS="/home/{{metron_user}}/.storm/storm.config" +METRON_ESCALATION_TOPIC="{{metron_escalation_topic}}" http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json index 748feb8..207051d 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json @@ -617,6 +617,10 @@ "subsection-name": "subsection-rest" }, { + "config": "metron-rest-env/metron_escalation_topic", + "subsection-name": "subsection-rest" + }, + { "config": "metron-management-ui-env/metron_management_ui_port", "subsection-name": "subsection-management-ui" } @@ -1041,6 +1045,12 @@ } }, { + "config": "metron-rest-env/metron_escalation_topic", + "widget": { + "type": "text-field" + } + }, + { "config": "metron-management-ui-env/metron_management_ui_port", "widget": { "type": "text-field" http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/README.md ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/README.md b/metron-interface/metron-rest/README.md index b76712b..522f549 100644 --- a/metron-interface/metron-rest/README.md +++ b/metron-interface/metron-rest/README.md @@ -184,6 +184,7 @@ Request and Response objects are JSON formatted. The JSON schemas are available | | | ---------- | +| [ `POST /api/v1/alert/escalate`](#get-apiv1alertescalate)| | [ `GET /api/v1/global/config`](#get-apiv1globalconfig)| | [ `DELETE /api/v1/global/config`](#delete-apiv1globalconfig)| | [ `POST /api/v1/global/config`](#post-apiv1globalconfig)| @@ -247,6 +248,13 @@ Request and Response objects are JSON formatted. The JSON schemas are available | [ `PUT /api/v1/update/replace`](#patch-apiv1updatereplace)| | [ `GET /api/v1/user`](#get-apiv1user)| +### `POST /api/v1/alert/escalate` + * Description: Escalates a list of alerts by producing it to the Kafka escalate topic + * Input: + * alerts - The alerts to be escalated + * Returns: + * 200 - Alerts were escalated + ### `GET /api/v1/global/config` * Description: Retrieves the current Global Config from Zookeeper * Returns: http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/main/config/rest_application.yml ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/config/rest_application.yml b/metron-interface/metron-rest/src/main/config/rest_application.yml index 6d12e95..0c17580 100644 --- a/metron-interface/metron-rest/src/main/config/rest_application.yml +++ b/metron-interface/metron-rest/src/main/config/rest_application.yml @@ -30,6 +30,8 @@ kafka: url: ${BROKERLIST} security: protocol: ${KAFKA_SECURITY_PROTOCOL} + topics: + escalation: ${METRON_ESCALATION_TOPIC} grok: path: http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java index 11310d4..7c9cdac 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java @@ -50,6 +50,7 @@ public class MetronRestConstants { public static final String CURATOR_MAX_RETRIES = "curator.max.retries"; public static final String KAFKA_BROKER_URL_SPRING_PROPERTY = "kafka.broker.url"; + public static final String KAFKA_TOPICS_ESCALATION_PROPERTY = "kafka.topics.escalation"; public static final String KERBEROS_ENABLED_SPRING_PROPERTY = "kerberos.enabled"; public static final String KERBEROS_PRINCIPLE_SPRING_PROPERTY = "kerberos.principal"; http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java index 247264b..a2abbeb 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java @@ -21,6 +21,7 @@ import kafka.admin.AdminUtils$; import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.metron.rest.MetronRestConstants; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; @@ -100,6 +101,21 @@ public class KafkaConfig { return new DefaultKafkaConsumerFactory<>(consumerProperties()); } + @Bean + public Map<String, Object> producerProperties() { + Map<String, Object> producerConfig = new HashMap<>(); + producerConfig.put("bootstrap.servers", environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY)); + producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producerConfig.put("request.required.acks", 1); + return producerConfig; + } + + @Bean + public KafkaProducer kafkaProducer() { + return new KafkaProducer<>(producerProperties()); + } + /** * Create a bean for {@link AdminUtils$}. This is primarily done to make testing a bit easier. * http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/AlertsController.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/AlertsController.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/AlertsController.java new file mode 100644 index 0000000..6f028a1 --- /dev/null +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/AlertsController.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.rest.controller; + +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import io.swagger.annotations.ApiResponse; +import java.util.List; +import java.util.Map; +import org.apache.metron.rest.RestException; +import org.apache.metron.rest.service.AlertService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; + +/** + * The API resource that is used for alert-related operations. + */ +@RestController +@RequestMapping("/api/v1/alert") +public class AlertsController { + + /** + * Service used to interact with alerts. + */ + @Autowired + private AlertService alertService; + + @ApiOperation(value = "Escalates a list of alerts by producing it to the Kafka escalate topic") + @ApiResponse(message = "Alerts were escalated", code = 200) + @RequestMapping(value = "/escalate", method = RequestMethod.POST) + ResponseEntity<Void> escalate(final @ApiParam(name = "alerts", value = "The alerts to be escalated", required = true) @RequestBody List<Map<String, Object>> alerts) throws RestException { + alertService.escalateAlerts(alerts); + return new ResponseEntity<>(HttpStatus.OK); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java index 2787504..d057ac4 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java @@ -21,6 +21,7 @@ import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; +import java.util.Set; import org.apache.metron.rest.RestException; import org.apache.metron.rest.model.KafkaTopic; import org.apache.metron.rest.service.KafkaService; @@ -33,8 +34,6 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; -import java.util.Set; - /** * The API resource that is use to interact with Kafka. */ @@ -109,4 +108,15 @@ public class KafkaController { return new ResponseEntity<>(HttpStatus.NOT_FOUND); } } + + @ApiOperation(value = "Produces a message to a Kafka topic") + @ApiResponses(value = { + @ApiResponse(message = "Message produced successfully", code = 200) + }) + @RequestMapping(value = "/topic/{name}/produce", method = RequestMethod.POST) + ResponseEntity<String> produce(final @ApiParam(name = "name", value = "Kafka topic name", required = true) @PathVariable String name, + final @ApiParam(name = "message", value = "Message", required = true) @RequestBody String message) throws RestException { + kafkaService.produceMessage(name, message); + return new ResponseEntity<>(HttpStatus.OK); + } } http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/AlertService.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/AlertService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/AlertService.java new file mode 100644 index 0000000..9668b7c --- /dev/null +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/AlertService.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.rest.service; + +import java.util.List; +import java.util.Map; +import org.apache.metron.rest.RestException; + +/** + * This is a set of operations created to interact with alerts. + */ +public interface AlertService { + + void escalateAlerts(List<Map<String, Object>> alerts) throws RestException; +} http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java index bee00f2..da3b226 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java @@ -17,11 +17,10 @@ */ package org.apache.metron.rest.service; +import java.util.Set; import org.apache.metron.rest.RestException; import org.apache.metron.rest.model.KafkaTopic; -import java.util.Set; - /** * This is a set of operations created to interact with Kafka. */ @@ -67,4 +66,6 @@ public interface KafkaService { * @return A string representation of the sample message retrieved. If topic doesn't exist null will be returned. */ String getSampleMessage(String topic); + + void produceMessage(String topic, String message) throws RestException; } http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/AlertServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/AlertServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/AlertServiceImpl.java new file mode 100644 index 0000000..46370eb --- /dev/null +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/AlertServiceImpl.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.rest.service.impl; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.List; +import java.util.Map; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.rest.MetronRestConstants; +import org.apache.metron.rest.RestException; +import org.apache.metron.rest.service.AlertService; +import org.apache.metron.rest.service.KafkaService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.env.Environment; +import org.springframework.stereotype.Service; + +/** + * The default service layer implementation of {@link AlertService}. + * + * @see AlertService + */ +@Service +public class AlertServiceImpl implements AlertService { + + private Environment environment; + private final KafkaService kafkaService; + + @Autowired + public AlertServiceImpl(final KafkaService kafkaService, + final Environment environment) { + this.kafkaService = kafkaService; + this.environment = environment; + } + + @Override + public void escalateAlerts(List<Map<String, Object>> alerts) throws RestException { + try { + for (Map<String, Object> alert : alerts) { + kafkaService.produceMessage( + environment.getProperty(MetronRestConstants.KAFKA_TOPICS_ESCALATION_PROPERTY), + JSONUtils.INSTANCE.toJSON(alert, false)); + } + } catch (JsonProcessingException e) { + throw new RestException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java index 61e2618..4f232fb 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java @@ -17,6 +17,11 @@ */ package org.apache.metron.rest.service.impl; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import kafka.admin.AdminOperationException; import kafka.admin.AdminUtils$; import kafka.admin.RackAwareMode; @@ -24,21 +29,18 @@ import kafka.utils.ZkUtils; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.metron.rest.RestException; import org.apache.metron.rest.model.KafkaTopic; import org.apache.metron.rest.service.KafkaService; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.env.Environment; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.stereotype.Service; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - /** * The default service layer implementation of {@link KafkaService}. * @@ -53,19 +55,26 @@ public class KafkaServiceImpl implements KafkaService { private final ZkUtils zkUtils; private final ConsumerFactory<String, String> kafkaConsumerFactory; + private final KafkaProducer<String, String> kafkaProducer; private final AdminUtils$ adminUtils; + @Autowired + private Environment environment; + /** * @param zkUtils A utility class used to interact with ZooKeeper. * @param kafkaConsumerFactory A class used to create {@link KafkaConsumer} in order to interact with Kafka. + * @param kafkaProducer A class used to produce messages to Kafka. * @param adminUtils A utility class used to do administration operations on Kafka. */ @Autowired public KafkaServiceImpl(final ZkUtils zkUtils, final ConsumerFactory<String, String> kafkaConsumerFactory, + final KafkaProducer<String, String> kafkaProducer, final AdminUtils$ adminUtils) { this.zkUtils = zkUtils; this.kafkaConsumerFactory = kafkaConsumerFactory; + this.kafkaProducer = kafkaProducer; this.adminUtils = adminUtils; } @@ -141,4 +150,8 @@ public class KafkaServiceImpl implements KafkaService { return message; } + @Override + public void produceMessage(String topic, String message) throws RestException { + kafkaProducer.send(new ProducerRecord<>(topic, message)); + } } http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/main/resources/application.yml ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/resources/application.yml b/metron-interface/metron-rest/src/main/resources/application.yml index 4aff769..cf9218b 100644 --- a/metron-interface/metron-rest/src/main/resources/application.yml +++ b/metron-interface/metron-rest/src/main/resources/application.yml @@ -35,6 +35,10 @@ zookeeper: session: 10000 connection: 10000 +kafka: + topics: + escalation: escalation + curator: sleep: time: 1000 @@ -48,4 +52,5 @@ search: index: dao: # By default, we use the ElasticsearchDao and HBaseDao for backing updates. - impl: org.apache.metron.elasticsearch.dao.ElasticsearchDao,org.apache.metron.indexing.dao.HBaseDao \ No newline at end of file + impl: org.apache.metron.elasticsearch.dao.ElasticsearchDao,org.apache.metron.indexing.dao.HBaseDao + http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java index e9bb510..ea64fbe 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java @@ -17,6 +17,14 @@ */ package org.apache.metron.rest.config; +import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; import kafka.admin.AdminUtils$; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; @@ -26,6 +34,7 @@ import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.metron.common.configuration.ConfigurationsUtils; import org.apache.metron.hbase.mock.MockHBaseTableProvider; import org.apache.metron.integration.ComponentRunner; @@ -42,15 +51,6 @@ import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.web.client.RestTemplate; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - -import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE; - @Configuration @Profile(TEST_PROFILE) public class TestConfig { @@ -133,6 +133,21 @@ public class TestConfig { } @Bean + public Map<String, Object> producerProperties(KafkaComponent kafkaWithZKComponent) { + Map<String, Object> producerConfig = new HashMap<>(); + producerConfig.put("bootstrap.servers", kafkaWithZKComponent.getBrokerList()); + producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producerConfig.put("request.required.acks", 1); + return producerConfig; + } + + @Bean + public KafkaProducer kafkaProducer(KafkaComponent kafkaWithZKComponent) { + return new KafkaProducer<>(producerProperties(kafkaWithZKComponent)); + } + + @Bean public StormCLIWrapper stormCLIClientWrapper() { return new MockStormCLIClientWrapper(); } http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/AlertControllerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/AlertControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/AlertControllerIntegrationTest.java new file mode 100644 index 0000000..be320fc --- /dev/null +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/AlertControllerIntegrationTest.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.rest.controller; + +import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE; +import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf; +import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic; +import static org.springframework.security.test.web.servlet.setup.SecurityMockMvcConfigurers.springSecurity; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +import org.adrianwalker.multilinestring.Multiline; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.http.MediaType; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.test.web.servlet.MockMvc; +import org.springframework.test.web.servlet.setup.MockMvcBuilders; +import org.springframework.web.context.WebApplicationContext; + +@RunWith(SpringRunner.class) +@SpringBootTest(webEnvironment= SpringBootTest.WebEnvironment.RANDOM_PORT) +@ActiveProfiles(TEST_PROFILE) +public class AlertControllerIntegrationTest { + + /** + [ + { + "is_alert": true, + "field": "value1" + }, + { + "is_alert": true, + "field": "value2" + } + ] + */ + @Multiline + public static String alerts; + + @Autowired + private WebApplicationContext wac; + + private MockMvc mockMvc; + + private String alertUrl = "/api/v1/alert"; + private String user = "user"; + private String password = "password"; + + @Before + public void setup() throws Exception { + this.mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).apply(springSecurity()).build(); + } + + @Test + public void testSecurity() throws Exception { + this.mockMvc.perform(post(alertUrl + "/escalate").with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(alerts)) + .andExpect(status().isUnauthorized()); + } + + @Test + public void test() throws Exception { + this.mockMvc.perform(post(alertUrl + "/escalate").with(httpBasic(user,password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(alerts)) + .andExpect(status().isOk()); + + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/KafkaControllerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/KafkaControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/KafkaControllerIntegrationTest.java index 9e6d408..5719a41 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/KafkaControllerIntegrationTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/KafkaControllerIntegrationTest.java @@ -17,15 +17,24 @@ */ package org.apache.metron.rest.controller; +import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE; +import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf; +import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic; +import static org.springframework.security.test.web.servlet.setup.SecurityMockMvcConfigurers.springSecurity; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.delete; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + import kafka.common.TopicAlreadyMarkedForDeletionException; import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.integration.ComponentRunner; import org.apache.metron.integration.UnableToStartException; import org.apache.metron.integration.components.KafkaComponent; -import org.apache.metron.rest.generator.SampleDataGenerator; import org.apache.metron.rest.service.KafkaService; import org.hamcrest.Matchers; -import org.json.simple.parser.ParseException; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -41,19 +50,6 @@ import org.springframework.test.web.servlet.setup.MockMvcBuilders; import org.springframework.web.context.WebApplicationContext; import org.springframework.web.util.NestedServletException; -import java.io.IOException; - -import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE; -import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf; -import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic; -import static org.springframework.security.test.web.servlet.setup.SecurityMockMvcConfigurers.springSecurity; -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.delete; -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content; -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; - @RunWith(SpringRunner.class) @SpringBootTest(webEnvironment= SpringBootTest.WebEnvironment.RANDOM_PORT) @ActiveProfiles(TEST_PROFILE) @@ -99,35 +95,6 @@ public class KafkaControllerIntegrationTest { } } - class SampleDataRunner implements Runnable { - - private boolean stop = false; - private String path = "../../metron-platform/metron-integration-test/src/main/sample/data/bro/raw/BroExampleOutput"; - - @Override - public void run() { - SampleDataGenerator broSampleDataGenerator = new SampleDataGenerator(); - broSampleDataGenerator.setBrokerUrl(kafkaWithZKComponent.getBrokerList()); - broSampleDataGenerator.setNum(1); - broSampleDataGenerator.setSelectedSensorType("bro"); - broSampleDataGenerator.setDelay(0); - try { - while(!stop) { - broSampleDataGenerator.generateSampleData(path); - } - } catch (ParseException|IOException e) { - throw new IllegalStateException("Caught an error generating sample data", e); - } - } - - public void stop() { - stop = true; - } - } - - private SampleDataRunner sampleDataRunner = new SampleDataRunner(); - private Thread sampleDataThread = new Thread(sampleDataRunner); - /** { "name": "bro", @@ -139,6 +106,30 @@ public class KafkaControllerIntegrationTest { @Multiline public static String broTopic; + /** + * { + * "type":"message1" + * } + */ + @Multiline + public static String message1; + + /** + * { + * "type":"message2" + * } + */ + @Multiline + public static String message2; + + /** + * { + * "type":"message3" + * } + */ + @Multiline + public static String message3; + @Autowired private WebApplicationContext wac; @@ -179,6 +170,9 @@ public class KafkaControllerIntegrationTest { this.mockMvc.perform(get(kafkaUrl + "/topic/bro/sample")) .andExpect(status().isUnauthorized()); + this.mockMvc.perform(get(kafkaUrl + "/topic/bro/produce")) + .andExpect(status().isUnauthorized()); + this.mockMvc.perform(delete(kafkaUrl + "/topic/bro").with(csrf())) .andExpect(status().isUnauthorized()); } @@ -200,7 +194,7 @@ public class KafkaControllerIntegrationTest { .andExpect(jsonPath("$.numPartitions").value(1)) .andExpect(jsonPath("$.replicationFactor").value(1)) ); - sampleDataThread.start(); + Thread.sleep(1000); testAndRetry(() -> this.mockMvc.perform(get(kafkaUrl + "/topic/bro").with(httpBasic(user,password))) @@ -222,24 +216,42 @@ public class KafkaControllerIntegrationTest { .andExpect(jsonPath("$", Matchers.hasItem("bro"))) ); - for(int i = 0;i < KAFKA_RETRY;++i) { - MvcResult result = this.mockMvc.perform(get(kafkaUrl + "/topic/bro/sample").with(httpBasic(user, password))) - .andReturn(); - if(result.getResponse().getStatus() == 200) { - break; - } - Thread.sleep(1000); - } + testAndRetry(() -> + this.mockMvc.perform(post(kafkaUrl + "/topic/bro/produce").with(httpBasic(user,password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(message1)) + .andExpect(status().isOk()) + ); + testAndRetry(() -> + this.mockMvc.perform(get(kafkaUrl + "/topic/bro/sample").with(httpBasic(user,password))) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.parseMediaType("text/plain;charset=UTF-8"))) + .andExpect(jsonPath("$.type").value("message1")) + ); testAndRetry(() -> - this.mockMvc.perform(get(kafkaUrl + "/topic/bro/sample").with(httpBasic(user,password))) + this.mockMvc.perform(post(kafkaUrl + "/topic/bro/produce").with(httpBasic(user,password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(message2)) + .andExpect(status().isOk()) + ); + testAndRetry(() -> + this.mockMvc.perform(get(kafkaUrl + "/topic/bro/sample").with(httpBasic(user,password))) .andExpect(status().isOk()) .andExpect(content().contentType(MediaType.parseMediaType("text/plain;charset=UTF-8"))) - .andExpect(jsonPath("$").isNotEmpty()) + .andExpect(jsonPath("$.type").value("message2")) + ); + + testAndRetry(() -> + this.mockMvc.perform(post(kafkaUrl + "/topic/bro/produce").with(httpBasic(user,password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(message3)) + .andExpect(status().isOk()) + ); + testAndRetry(() -> + this.mockMvc.perform(get(kafkaUrl + "/topic/bro/sample").with(httpBasic(user,password))) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.parseMediaType("text/plain;charset=UTF-8"))) + .andExpect(jsonPath("$.type").value("message3")) ); this.mockMvc.perform(get(kafkaUrl + "/topic/someTopic/sample").with(httpBasic(user,password))) .andExpect(status().isNotFound()); + boolean deleted = false; for(int i = 0;i < KAFKA_RETRY;++i) { try { @@ -270,7 +282,6 @@ public class KafkaControllerIntegrationTest { @After public void tearDown() { - sampleDataRunner.stop(); runner.stop(); } } http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/AlertServiceImplTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/AlertServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/AlertServiceImplTest.java new file mode 100644 index 0000000..c55e0a5 --- /dev/null +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/AlertServiceImplTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.rest.service.impl; + +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; +import static org.powermock.api.mockito.PowerMockito.mock; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.metron.rest.MetronRestConstants; +import org.apache.metron.rest.service.AlertService; +import org.apache.metron.rest.service.KafkaService; +import org.junit.Before; +import org.junit.Test; +import org.springframework.core.env.Environment; + +@SuppressWarnings("unchecked") +public class AlertServiceImplTest { + + private KafkaService kafkaService; + private Environment environment; + private AlertService alertService; + + @SuppressWarnings("unchecked") + @Before + public void setUp() throws Exception { + kafkaService = mock(KafkaService.class); + environment = mock(Environment.class); + alertService = new AlertServiceImpl(kafkaService, environment); + } + + @Test + public void produceMessageShouldProperlyProduceMessage() throws Exception { + String escalationTopic = "escalation"; + final Map<String, Object> message1 = new HashMap<>(); + message1.put("field", "value1"); + final Map<String, Object> message2 = new HashMap<>(); + message2.put("field", "value2"); + List<Map<String, Object>> messages = Arrays.asList(message1, message2); + when(environment.getProperty(MetronRestConstants.KAFKA_TOPICS_ESCALATION_PROPERTY)).thenReturn(escalationTopic); + + alertService.escalateAlerts(messages); + + String expectedMessage1 = "{\"field\":\"value1\"}"; + String expectedMessage2 = "{\"field\":\"value2\"}"; + verify(kafkaService).produceMessage("escalation", expectedMessage1); + verify(kafkaService).produceMessage("escalation", expectedMessage2); + verifyZeroInteractions(kafkaService); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/7a3de673/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java index c92feab..1f300ea 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java @@ -17,9 +17,27 @@ */ package org.apache.metron.rest.service.impl; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; +import static org.powermock.api.mockito.PowerMockito.mock; + import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; import kafka.admin.AdminOperationException; import kafka.admin.AdminUtils$; import kafka.admin.RackAwareMode; @@ -27,6 +45,8 @@ import kafka.utils.ZkUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; @@ -43,25 +63,6 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import org.springframework.kafka.core.ConsumerFactory; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.verifyZeroInteractions; -import static org.mockito.Mockito.when; -import static org.powermock.api.mockito.PowerMockito.mock; - @SuppressWarnings("unchecked") @RunWith(PowerMockRunner.class) @PowerMockIgnore("javax.management.*") // resolve classloader conflict @@ -72,6 +73,7 @@ public class KafkaServiceImplTest { private ZkUtils zkUtils; private KafkaConsumer<String, String> kafkaConsumer; + private KafkaProducer<String, String> kafkaProducer; private ConsumerFactory<String, String> kafkaConsumerFactory; private AdminUtils$ adminUtils; @@ -90,11 +92,12 @@ public class KafkaServiceImplTest { zkUtils = mock(ZkUtils.class); kafkaConsumerFactory = mock(ConsumerFactory.class); kafkaConsumer = mock(KafkaConsumer.class); + kafkaProducer = mock(KafkaProducer.class); adminUtils = mock(AdminUtils$.class); when(kafkaConsumerFactory.createConsumer()).thenReturn(kafkaConsumer); - kafkaService = new KafkaServiceImpl(zkUtils, kafkaConsumerFactory, adminUtils); + kafkaService = new KafkaServiceImpl(zkUtils, kafkaConsumerFactory, kafkaProducer, adminUtils); } @Test @@ -304,4 +307,16 @@ public class KafkaServiceImplTest { verifyZeroInteractions(zkUtils, adminUtils); } + + @Test + public void produceMessageShouldProperlyProduceMessage() throws Exception { + final String topicName = "t"; + final String message = "{\"field\":\"value\"}"; + + kafkaService.produceMessage(topicName, message); + + String expectedMessage = "{\"field\":\"value\"}"; + verify(kafkaProducer).send(new ProducerRecord<>(topicName, expectedMessage)); + verifyZeroInteractions(kafkaProducer); + } }
