METRON-891 KafkaConsumer should not be shared among threads (jjmeyer via merrimanr) closes apache/metron#567
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/47e2b735 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/47e2b735 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/47e2b735 Branch: refs/heads/Metron_0.4.0 Commit: 47e2b735ea682e7d647e3eaafac89a192a783e86 Parents: c0b0825 Author: jjmeyer <[email protected]> Authored: Fri May 19 10:10:25 2017 -0500 Committer: merrimanr <[email protected]> Committed: Fri May 19 10:10:25 2017 -0500 ---------------------------------------------------------------------- dependencies_with_url.csv | 1 + metron-interface/metron-rest/pom.xml | 34 +++- .../apache/metron/rest/config/KafkaConfig.java | 52 ++++++- .../metron/rest/controller/KafkaController.java | 104 +++++++------ .../metron/rest/service/KafkaService.java | 45 +++++- .../rest/service/impl/KafkaServiceImpl.java | 155 +++++++++++-------- .../apache/metron/rest/config/TestConfig.java | 32 ++-- .../rest/service/impl/KafkaServiceImplTest.java | 15 +- 8 files changed, 297 insertions(+), 141 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/47e2b735/dependencies_with_url.csv ---------------------------------------------------------------------- diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv index 93a19b7..3752366 100644 --- a/dependencies_with_url.csv +++ b/dependencies_with_url.csv @@ -296,3 +296,4 @@ org.htrace:htrace-core:jar:3.0.4:compile,ASLv2,http://htrace.incubator.apache.or net.byteseek:byteseek:jar:2.0.3:compile,BSD,https://github.com/nishihatapalmer/byteseek org.springframework.security.kerberos:spring-security-kerberos-client:jar:1.0.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security-kerberos org.springframework.security.kerberos:spring-security-kerberos-core:jar:1.0.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security-kerberos +org.springframework.kafka:spring-kafka:jar:1.1.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-kafka http://git-wip-us.apache.org/repos/asf/metron/blob/47e2b735/metron-interface/metron-rest/pom.xml ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/pom.xml b/metron-interface/metron-rest/pom.xml index b11e999..1c3ff92 100644 --- a/metron-interface/metron-rest/pom.xml +++ b/metron-interface/metron-rest/pom.xml @@ -33,9 +33,37 @@ <spring.kerberos.version>1.0.1.RELEASE</spring.kerberos.version> <swagger.version>2.5.0</swagger.version> <mysql.client.version>5.1.40</mysql.client.version> + <spring-kafka.version>1.1.1.RELEASE</spring-kafka.version> </properties> <dependencies> <dependency> + <groupId>org.springframework.kafka</groupId> + <artifactId>spring-kafka</artifactId> + <version>${spring-kafka.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </exclusion> + <exclusion> + <groupId>org.springframework.retry</groupId> + <artifactId>spring-retry</artifactId> + </exclusion> + <exclusion> + <groupId>org.springframework</groupId> + <artifactId>spring-messaging</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> @@ -75,7 +103,7 @@ <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </exclusion> - </exclusions> + </exclusions> </dependency> <dependency> <groupId>com.googlecode.json-simple</groupId> @@ -112,8 +140,8 @@ <artifactId>jackson-databind</artifactId> </exclusion> <exclusion> - <groupId>org.reflections</groupId> - <artifactId>reflections</artifactId> + <groupId>org.reflections</groupId> + <artifactId>reflections</artifactId> </exclusion> </exclusions> </dependency> http://git-wip-us.apache.org/repos/asf/metron/blob/47e2b735/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java index 309a549..247264b 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java @@ -27,33 +27,56 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; import org.springframework.core.env.Environment; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -import java.util.Properties; +import java.util.HashMap; +import java.util.Map; import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE; +/** + * Configuration used for connecting to Kafka. + */ @Configuration @Profile("!" + TEST_PROFILE) public class KafkaConfig { - + /** + * The Spring environment. + */ private Environment environment; + /** + * Construvtor used to inject {@link Environment}. + * @param environment Spring environment to inject. + */ @Autowired - public KafkaConfig(Environment environment) { + public KafkaConfig(final Environment environment) { this.environment = environment; } + /** + * The client used for ZooKeeper. + */ @Autowired private ZkClient zkClient; + /** + * Bean for ZooKeeper + */ @Bean public ZkUtils zkUtils() { return ZkUtils.apply(zkClient, false); } - @Bean(destroyMethod = "close") - public KafkaConsumer<String, String> kafkaConsumer() { - Properties props = new Properties(); + /** + * Create properties that will be used by {@link this#createConsumerFactory()} + * + * @return Configurations used by {@link this#createConsumerFactory()}. + */ + @Bean + public Map<String, Object> consumerProperties() { + final Map<String, Object> props = new HashMap<>(); props.put("bootstrap.servers", environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY)); props.put("group.id", "metron-rest"); props.put("enable.auto.commit", "false"); @@ -64,9 +87,24 @@ public class KafkaConfig { if (environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)) { props.put("security.protocol", "SASL_PLAINTEXT"); } - return new KafkaConsumer<>(props); + return props; + } + + /** + * Create a {@link ConsumerFactory} which will be used for certain Kafka interactions within config API. + * + * @return a {@link ConsumerFactory} used to create {@link KafkaConsumer} for interactions with Kafka. + */ + @Bean + public ConsumerFactory<String, String> createConsumerFactory() { + return new DefaultKafkaConsumerFactory<>(consumerProperties()); } + /** + * Create a bean for {@link AdminUtils$}. This is primarily done to make testing a bit easier. + * + * @return {@link AdminUtils$} is written in scala. We return a reference to this class. + */ @Bean public AdminUtils$ adminUtils() { return AdminUtils$.MODULE$; http://git-wip-us.apache.org/repos/asf/metron/blob/47e2b735/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java index 0cd4d54..2787504 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java @@ -35,62 +35,78 @@ import org.springframework.web.bind.annotation.RestController; import java.util.Set; +/** + * The API resource that is use to interact with Kafka. + */ @RestController @RequestMapping("/api/v1/kafka") public class KafkaController { - @Autowired - private KafkaService kafkaService; + /** + * Service used to interact with Kafka. + */ + @Autowired + private KafkaService kafkaService; - @ApiOperation(value = "Creates a new Kafka topic") + @ApiOperation(value = "Creates a new Kafka topic") + @ApiResponses({ @ApiResponse(message = "Returns saved Kafka topic", code = 200) - @RequestMapping(value = "/topic", method = RequestMethod.POST) - ResponseEntity<KafkaTopic> save(@ApiParam(name="topic", value="Kafka topic", required=true)@RequestBody KafkaTopic topic) throws RestException { - return new ResponseEntity<>(kafkaService.createTopic(topic), HttpStatus.CREATED); - } + }) + @RequestMapping(value = "/topic", method = RequestMethod.POST) + ResponseEntity<KafkaTopic> save(final @ApiParam(name = "topic", value = "Kafka topic", required = true) @RequestBody KafkaTopic topic) throws RestException { + return new ResponseEntity<>(kafkaService.createTopic(topic), HttpStatus.CREATED); + } - @ApiOperation(value = "Retrieves a Kafka topic") - @ApiResponses(value = { @ApiResponse(message = "Returns Kafka topic", code = 200), - @ApiResponse(message = "Kafka topic is missing", code = 404) }) - @RequestMapping(value = "/topic/{name}", method = RequestMethod.GET) - ResponseEntity<KafkaTopic> get(@ApiParam(name="name", value="Kafka topic name", required=true)@PathVariable String name) throws RestException { - KafkaTopic kafkaTopic = kafkaService.getTopic(name); - if (kafkaTopic != null) { - return new ResponseEntity<>(kafkaTopic, HttpStatus.OK); - } else { - return new ResponseEntity<>(HttpStatus.NOT_FOUND); - } + @ApiOperation(value = "Retrieves a Kafka topic") + @ApiResponses(value = { + @ApiResponse(message = "Returns Kafka topic", code = 200), + @ApiResponse(message = "Kafka topic is missing", code = 404) + }) + @RequestMapping(value = "/topic/{name}", method = RequestMethod.GET) + ResponseEntity<KafkaTopic> get(final @ApiParam(name = "name", value = "Kafka topic name", required = true) @PathVariable String name) throws RestException { + KafkaTopic kafkaTopic = kafkaService.getTopic(name); + if (kafkaTopic != null) { + return new ResponseEntity<>(kafkaTopic, HttpStatus.OK); + } else { + return new ResponseEntity<>(HttpStatus.NOT_FOUND); } + } - @ApiOperation(value = "Retrieves all Kafka topics") + @ApiOperation(value = "Retrieves all Kafka topics") + @ApiResponses({ @ApiResponse(message = "Returns a list of all Kafka topics", code = 200) - @RequestMapping(value = "/topic", method = RequestMethod.GET) - ResponseEntity<Set<String>> list() throws Exception { - return new ResponseEntity<>(kafkaService.listTopics(), HttpStatus.OK); - } + }) + @RequestMapping(value = "/topic", method = RequestMethod.GET) + ResponseEntity<Set<String>> list() throws Exception { + return new ResponseEntity<>(kafkaService.listTopics(), HttpStatus.OK); + } - @ApiOperation(value = "Deletes a Kafka topic") - @ApiResponses(value = { @ApiResponse(message = "Kafka topic was deleted", code = 200), - @ApiResponse(message = "Kafka topic is missing", code = 404) }) - @RequestMapping(value = "/topic/{name}", method = RequestMethod.DELETE) - ResponseEntity<Void> delete(@ApiParam(name="name", value="Kafka topic name", required=true)@PathVariable String name) throws RestException { - if (kafkaService.deleteTopic(name)) { - return new ResponseEntity<>(HttpStatus.OK); - } else { - return new ResponseEntity<>(HttpStatus.NOT_FOUND); - } + @ApiOperation(value = "Deletes a Kafka topic") + @ApiResponses(value = { + @ApiResponse(message = "Kafka topic was deleted", code = 200), + @ApiResponse(message = "Kafka topic is missing", code = 404) + }) + @RequestMapping(value = "/topic/{name}", method = RequestMethod.DELETE) + ResponseEntity<Void> delete(final @ApiParam(name = "name", value = "Kafka topic name", required = true) @PathVariable String name) throws RestException { + if (kafkaService.deleteTopic(name)) { + return new ResponseEntity<>(HttpStatus.OK); + } else { + return new ResponseEntity<>(HttpStatus.NOT_FOUND); } + } - @ApiOperation(value = "Retrieves a sample message from a Kafka topic using the most recent offset") - @ApiResponses(value = { @ApiResponse(message = "Returns sample message", code = 200), - @ApiResponse(message = "Either Kafka topic is missing or contains no messages", code = 404) }) - @RequestMapping(value = "/topic/{name}/sample", method = RequestMethod.GET) - ResponseEntity<String> getSample(@ApiParam(name="name", value="Kafka topic name", required=true)@PathVariable String name) throws RestException { - String sampleMessage = kafkaService.getSampleMessage(name); - if (sampleMessage != null) { - return new ResponseEntity<>(sampleMessage, HttpStatus.OK); - } else { - return new ResponseEntity<>(HttpStatus.NOT_FOUND); - } + @ApiOperation(value = "Retrieves a sample message from a Kafka topic using the most recent offset") + @ApiResponses(value = { + @ApiResponse(message = "Returns sample message", code = 200), + @ApiResponse(message = "Either Kafka topic is missing or contains no messages", code = 404) + }) + @RequestMapping(value = "/topic/{name}/sample", method = RequestMethod.GET) + ResponseEntity<String> getSample(final @ApiParam(name = "name", value = "Kafka topic name", required = true) @PathVariable String name) throws RestException { + String sampleMessage = kafkaService.getSampleMessage(name); + if (sampleMessage != null) { + return new ResponseEntity<>(sampleMessage, HttpStatus.OK); + } else { + return new ResponseEntity<>(HttpStatus.NOT_FOUND); } + } } http://git-wip-us.apache.org/repos/asf/metron/blob/47e2b735/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java index f3cd901..bee00f2 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java @@ -22,16 +22,49 @@ import org.apache.metron.rest.model.KafkaTopic; import java.util.Set; +/** + * This is a set of operations created to interact with Kafka. + */ public interface KafkaService { - String CONSUMER_OFFSETS_TOPIC = "__consumer_offsets"; + /** + * Please see the following for documentation. + * + * @see <a href="https://kafka.apache.org/documentation/#impl_offsettracking">Kafka offset tracking documentation</a>. + */ + String CONSUMER_OFFSETS_TOPIC = "__consumer_offsets"; - KafkaTopic createTopic(KafkaTopic topic) throws RestException; + /** + * Create a topic in Kafka for given information. + * @param topic The information used to create a Kafka topic. + * @return The Kafka topic created. + * @throws RestException If exceptions occur when creating a topic they should be wrapped in a {@link RestException}. + */ + KafkaTopic createTopic(KafkaTopic topic) throws RestException; - boolean deleteTopic(String name); + /** + * Delete a topic for a given name. + * @param name The name of the topic to delete. + * @return If topic was deleted true; otherwise false. + */ + boolean deleteTopic(String name); - KafkaTopic getTopic(String name); + /** + * Retrieves the Kafka topic for a given name. + * @param name The name of the Kafka topic to retrieve. + * @return A {@link KafkaTopic} with the name of {@code name}. Null if topic with name, {@code name}, doesn't exist. + */ + KafkaTopic getTopic(String name); - Set<String> listTopics(); + /** + * Returns a set of all topics. + * @return A set of all topics in Kafka. + */ + Set<String> listTopics(); - String getSampleMessage(String topic); + /** + * Return a single sample message from a given topic. + * @param topic The name of the topic to retrieve a sample message from. + * @return A string representation of the sample message retrieved. If topic doesn't exist null will be returned. + */ + String getSampleMessage(String topic); } http://git-wip-us.apache.org/repos/asf/metron/blob/47e2b735/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java index 33cb2e3..61e2618 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java @@ -20,8 +20,8 @@ package org.apache.metron.rest.service.impl; import kafka.admin.AdminOperationException; import kafka.admin.AdminUtils$; import kafka.admin.RackAwareMode; -import kafka.admin.RackAwareMode$; import kafka.utils.ZkUtils; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; @@ -30,6 +30,7 @@ import org.apache.metron.rest.RestException; import org.apache.metron.rest.model.KafkaTopic; import org.apache.metron.rest.service.KafkaService; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.ConsumerFactory; import org.springframework.stereotype.Service; import java.util.HashSet; @@ -38,88 +39,106 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +/** + * The default service layer implementation of {@link KafkaService}. + * + * @see KafkaService + */ @Service public class KafkaServiceImpl implements KafkaService { - private ZkUtils zkUtils; - private KafkaConsumer<String, String> kafkaConsumer; - private AdminUtils$ adminUtils; + /** + * The timeout used when polling Kafka. + */ + private static final int KAFKA_CONSUMER_TIMEOUT = 100; - @Autowired - public KafkaServiceImpl(ZkUtils zkUtils, KafkaConsumer<String, String> kafkaConsumer, AdminUtils$ adminUtils) { - this.zkUtils = zkUtils; - this.kafkaConsumer = kafkaConsumer; - this.adminUtils = adminUtils; - } + private final ZkUtils zkUtils; + private final ConsumerFactory<String, String> kafkaConsumerFactory; + private final AdminUtils$ adminUtils; - @Override - public KafkaTopic createTopic(KafkaTopic topic) throws RestException { - if (!listTopics().contains(topic.getName())) { - try { - adminUtils.createTopic(zkUtils, topic.getName(), topic.getNumPartitions(), topic.getReplicationFactor(), topic.getProperties(),RackAwareMode.Disabled$.MODULE$ ); - } catch (AdminOperationException e) { - throw new RestException(e); - } - } - return topic; + /** + * @param zkUtils A utility class used to interact with ZooKeeper. + * @param kafkaConsumerFactory A class used to create {@link KafkaConsumer} in order to interact with Kafka. + * @param adminUtils A utility class used to do administration operations on Kafka. + */ + @Autowired + public KafkaServiceImpl(final ZkUtils zkUtils, + final ConsumerFactory<String, String> kafkaConsumerFactory, + final AdminUtils$ adminUtils) { + this.zkUtils = zkUtils; + this.kafkaConsumerFactory = kafkaConsumerFactory; + this.adminUtils = adminUtils; + } + + @Override + public KafkaTopic createTopic(final KafkaTopic topic) throws RestException { + if (!listTopics().contains(topic.getName())) { + try { + adminUtils.createTopic(zkUtils, topic.getName(), topic.getNumPartitions(), topic.getReplicationFactor(), topic.getProperties(), RackAwareMode.Disabled$.MODULE$); + } catch (AdminOperationException e) { + throw new RestException(e); + } } + return topic; + } - @Override - public boolean deleteTopic(String name) { - Set<String> topics = listTopics(); - if (topics != null && topics.contains(name)) { - adminUtils.deleteTopic(zkUtils, name); - return true; - } else { - return false; - } + @Override + public boolean deleteTopic(final String name) { + final Set<String> topics = listTopics(); + if (topics != null && topics.contains(name)) { + adminUtils.deleteTopic(zkUtils, name); + return true; + } else { + return false; } + } - @Override - public KafkaTopic getTopic(String name) { - KafkaTopic kafkaTopic = null; - if (listTopics().contains(name)) { - List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor(name); - if (partitionInfos.size() > 0) { - PartitionInfo partitionInfo = partitionInfos.get(0); - kafkaTopic = new KafkaTopic(); - kafkaTopic.setName(name); - kafkaTopic.setNumPartitions(partitionInfos.size()); - kafkaTopic.setReplicationFactor(partitionInfo.replicas().length); - } + @Override + public KafkaTopic getTopic(final String name) { + KafkaTopic kafkaTopic = null; + if (listTopics().contains(name)) { + try (Consumer<String, String> consumer = kafkaConsumerFactory.createConsumer()) { + final List<PartitionInfo> partitionInfos = consumer.partitionsFor(name); + if (partitionInfos.size() > 0) { + final PartitionInfo partitionInfo = partitionInfos.get(0); + kafkaTopic = new KafkaTopic(); + kafkaTopic.setName(name); + kafkaTopic.setNumPartitions(partitionInfos.size()); + kafkaTopic.setReplicationFactor(partitionInfo.replicas().length); } - return kafkaTopic; + } } + return kafkaTopic; + } - @Override - public Set<String> listTopics() { - Set<String> topics; - synchronized (this) { - Map<String, List<PartitionInfo>> topicsInfo = kafkaConsumer.listTopics(); - topics = topicsInfo == null ? new HashSet<>() : topicsInfo.keySet(); - topics.remove(CONSUMER_OFFSETS_TOPIC); - } - return topics; + @Override + public Set<String> listTopics() { + try (Consumer<String, String> consumer = kafkaConsumerFactory.createConsumer()) { + final Map<String, List<PartitionInfo>> topicsInfo = consumer.listTopics(); + final Set<String> topics = topicsInfo == null ? new HashSet<>() : topicsInfo.keySet(); + topics.remove(CONSUMER_OFFSETS_TOPIC); + return topics; } + } - @Override - public String getSampleMessage(String topic) { - String message = null; - if (listTopics().contains(topic)) { - synchronized (this) { - kafkaConsumer.assign(kafkaConsumer.partitionsFor(topic).stream() - .map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())) - .collect(Collectors.toList())); + @Override + public String getSampleMessage(final String topic) { + String message = null; + if (listTopics().contains(topic)) { + try (Consumer<String, String> kafkaConsumer = kafkaConsumerFactory.createConsumer()) { + kafkaConsumer.assign(kafkaConsumer.partitionsFor(topic).stream() + .map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())) + .collect(Collectors.toList())); - kafkaConsumer.assignment().stream() - .filter(p -> (kafkaConsumer.position(p) -1) >= 0) - .forEach(p -> kafkaConsumer.seek(p, kafkaConsumer.position(p) - 1)); + kafkaConsumer.assignment().stream() + .filter(p -> (kafkaConsumer.position(p) - 1) >= 0) + .forEach(p -> kafkaConsumer.seek(p, kafkaConsumer.position(p) - 1)); - ConsumerRecords<String, String> records = kafkaConsumer.poll(100); - message = records.isEmpty() ? null : records.iterator().next().value(); - kafkaConsumer.unsubscribe(); - } - } - return message; + final ConsumerRecords<String, String> records = kafkaConsumer.poll(KAFKA_CONSUMER_TIMEOUT); + message = records.isEmpty() ? null : records.iterator().next().value(); + kafkaConsumer.unsubscribe(); + } } + return message; + } } http://git-wip-us.apache.org/repos/asf/metron/blob/47e2b735/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java index edfd542..adfe056 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java @@ -25,7 +25,6 @@ import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.metron.integration.ComponentRunner; import org.apache.metron.integration.UnableToStartException; import org.apache.metron.integration.components.KafkaComponent; @@ -36,8 +35,12 @@ import org.apache.metron.rest.service.impl.StormCLIWrapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.web.client.RestTemplate; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE; @@ -54,7 +57,7 @@ public class TestConfig { @Bean public ZKServerComponent zkServerComponent(Properties zkProperties) { return new ZKServerComponent() - .withPostStartCallback((zkComponent) -> zkProperties.setProperty(ZKServerComponent.ZOOKEEPER_PROPERTY, zkComponent.getConnectionString())); + .withPostStartCallback((zkComponent) -> zkProperties.setProperty(ZKServerComponent.ZOOKEEPER_PROPERTY, zkComponent.getConnectionString())); } @Bean @@ -66,10 +69,10 @@ public class TestConfig { @Bean public ComponentRunner componentRunner(ZKServerComponent zkServerComponent, KafkaComponent kafkaWithZKComponent) { ComponentRunner runner = new ComponentRunner.Builder() - .withComponent("zk", zkServerComponent) - .withComponent("kafka", kafkaWithZKComponent) - .withCustomShutdownOrder(new String[] {"kafka","zk"}) - .build(); + .withComponent("zk", zkServerComponent) + .withComponent("kafka", kafkaWithZKComponent) + .withCustomShutdownOrder(new String[]{"kafka", "zk"}) + .build(); try { runner.start(); } catch (UnableToStartException e) { @@ -78,14 +81,14 @@ public class TestConfig { return runner; } - @Bean(initMethod = "start", destroyMethod="close") + @Bean(initMethod = "start", destroyMethod = "close") public CuratorFramework client(ComponentRunner componentRunner) { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); ZKServerComponent zkServerComponent = componentRunner.getComponent("zk", ZKServerComponent.class); return CuratorFrameworkFactory.newClient(zkServerComponent.getConnectionString(), retryPolicy); } - @Bean(destroyMethod="close") + @Bean(destroyMethod = "close") public ZkClient zkClient(ComponentRunner componentRunner) { ZKServerComponent zkServerComponent = componentRunner.getComponent("zk", ZKServerComponent.class); return new ZkClient(zkServerComponent.getConnectionString(), 10000, 10000, ZKStringSerializer$.MODULE$); @@ -96,9 +99,9 @@ public class TestConfig { return ZkUtils.apply(zkClient, false); } - @Bean(destroyMethod="close") - public KafkaConsumer<String, String> kafkaConsumer(KafkaComponent kafkaWithZKComponent) { - Properties props = new Properties(); + @Bean + public Map<String, Object> kafkaConsumer(KafkaComponent kafkaWithZKComponent) { + Map<String, Object> props = new HashMap<>(); props.put("bootstrap.servers", kafkaWithZKComponent.getBrokerList()); props.put("group.id", "metron-config"); props.put("enable.auto.commit", "false"); @@ -106,7 +109,12 @@ public class TestConfig { props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - return new KafkaConsumer<>(props); + return props; + } + + @Bean + public ConsumerFactory<String, String> createConsumerFactory() { + return new DefaultKafkaConsumerFactory<>(kafkaConsumer(kafkaWithZKComponent(zkProperties()))); } @Bean http://git-wip-us.apache.org/repos/asf/metron/blob/47e2b735/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java index c7d42b3..c92feab 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java @@ -41,6 +41,7 @@ import org.junit.runner.RunWith; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import org.springframework.kafka.core.ConsumerFactory; import java.util.ArrayList; import java.util.HashMap; @@ -71,6 +72,7 @@ public class KafkaServiceImplTest { private ZkUtils zkUtils; private KafkaConsumer<String, String> kafkaConsumer; + private ConsumerFactory<String, String> kafkaConsumerFactory; private AdminUtils$ adminUtils; private KafkaService kafkaService; @@ -86,10 +88,13 @@ public class KafkaServiceImplTest { @Before public void setUp() throws Exception { zkUtils = mock(ZkUtils.class); + kafkaConsumerFactory = mock(ConsumerFactory.class); kafkaConsumer = mock(KafkaConsumer.class); adminUtils = mock(AdminUtils$.class); - kafkaService = new KafkaServiceImpl(zkUtils, kafkaConsumer, adminUtils); + when(kafkaConsumerFactory.createConsumer()).thenReturn(kafkaConsumer); + + kafkaService = new KafkaServiceImpl(zkUtils, kafkaConsumerFactory, adminUtils); } @Test @@ -104,6 +109,7 @@ public class KafkaServiceImplTest { verifyZeroInteractions(zkUtils); verify(kafkaConsumer).listTopics(); + verify(kafkaConsumer).close(); verifyNoMoreInteractions(kafkaConsumer, zkUtils, adminUtils); } @@ -119,6 +125,7 @@ public class KafkaServiceImplTest { verifyZeroInteractions(zkUtils); verify(kafkaConsumer).listTopics(); + verify(kafkaConsumer).close(); verifyNoMoreInteractions(kafkaConsumer, zkUtils); } @@ -137,6 +144,7 @@ public class KafkaServiceImplTest { verifyZeroInteractions(zkUtils); verify(kafkaConsumer).listTopics(); + verify(kafkaConsumer).close(); verifyNoMoreInteractions(kafkaConsumer, zkUtils); } @@ -156,6 +164,7 @@ public class KafkaServiceImplTest { verifyZeroInteractions(zkUtils); verify(kafkaConsumer).listTopics(); + verify(kafkaConsumer).close(); verifyNoMoreInteractions(kafkaConsumer, zkUtils); } @@ -167,6 +176,7 @@ public class KafkaServiceImplTest { verifyZeroInteractions(zkUtils); verify(kafkaConsumer).listTopics(); + verify(kafkaConsumer).close(); verifyNoMoreInteractions(kafkaConsumer, zkUtils); } @@ -180,6 +190,7 @@ public class KafkaServiceImplTest { assertTrue(kafkaService.deleteTopic("non_existent_topic")); verify(kafkaConsumer).listTopics(); + verify(kafkaConsumer).close(); verify(adminUtils).deleteTopic(zkUtils, "non_existent_topic"); verifyNoMoreInteractions(kafkaConsumer); } @@ -193,6 +204,7 @@ public class KafkaServiceImplTest { assertFalse(kafkaService.deleteTopic("non_existent_topic")); verify(kafkaConsumer).listTopics(); + verify(kafkaConsumer).close(); verifyNoMoreInteractions(kafkaConsumer); } @@ -230,6 +242,7 @@ public class KafkaServiceImplTest { verify(kafkaConsumer).listTopics(); verify(kafkaConsumer, times(0)).partitionsFor("t"); + verify(kafkaConsumer).close(); verifyZeroInteractions(zkUtils); verifyNoMoreInteractions(kafkaConsumer); }
