Repository: incubator-metron Updated Branches: refs/heads/master aef84636a -> 7b8d90036
METRON-773: Intermittent unit test errors in the KafkaControllerIntegrationTest this closes apache/incubator-metron#491 Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/7b8d9003 Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/7b8d9003 Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/7b8d9003 Branch: refs/heads/master Commit: 7b8d90036952c2911269632fa8c11bda958cd48d Parents: aef8463 Author: cstella <ceste...@gmail.com> Authored: Thu Mar 30 23:11:02 2017 -0400 Committer: cstella <ceste...@gmail.com> Committed: Thu Mar 30 23:11:02 2017 -0400 ---------------------------------------------------------------------- .../KafkaControllerIntegrationTest.java | 278 +++++++++++-------- 1 file changed, 155 insertions(+), 123 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7b8d9003/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/KafkaControllerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/KafkaControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/KafkaControllerIntegrationTest.java index 0299708..745bc56 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/KafkaControllerIntegrationTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/KafkaControllerIntegrationTest.java @@ -17,6 +17,7 @@ */ package org.apache.metron.rest.controller; +import kafka.common.TopicAlreadyMarkedForDeletionException; import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.integration.components.KafkaComponent; import org.apache.metron.rest.generator.SampleDataGenerator; @@ -33,8 +34,11 @@ import org.springframework.http.MediaType; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.web.servlet.MockMvc; +import org.springframework.test.web.servlet.MvcResult; +import org.springframework.test.web.servlet.ResultActions; import org.springframework.test.web.servlet.setup.MockMvcBuilders; import org.springframework.web.context.WebApplicationContext; +import org.springframework.web.util.NestedServletException; import java.io.IOException; @@ -54,135 +58,163 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. @ActiveProfiles(TEST_PROFILE) public class KafkaControllerIntegrationTest { - @Autowired - private KafkaComponent kafkaWithZKComponent; - - class SampleDataRunner implements Runnable { - - private boolean stop = false; - private String path = "../../metron-platform/metron-integration-test/src/main/sample/data/bro/raw/BroExampleOutput"; - - @Override - public void run() { - SampleDataGenerator broSampleDataGenerator = new SampleDataGenerator(); - broSampleDataGenerator.setBrokerUrl(kafkaWithZKComponent.getBrokerList()); - broSampleDataGenerator.setNum(1); - broSampleDataGenerator.setSelectedSensorType("bro"); - broSampleDataGenerator.setDelay(0); - try { - while(!stop) { - broSampleDataGenerator.generateSampleData(path); - } - } catch (IOException e) { - e.printStackTrace(); - } catch (ParseException e) { - e.printStackTrace(); - } - } - - public void stop() { - stop = true; + private static final int KAFKA_RETRY = 10; + @Autowired + private KafkaComponent kafkaWithZKComponent; + + class SampleDataRunner implements Runnable { + + private boolean stop = false; + private String path = "../../metron-platform/metron-integration-test/src/main/sample/data/bro/raw/BroExampleOutput"; + + @Override + public void run() { + SampleDataGenerator broSampleDataGenerator = new SampleDataGenerator(); + broSampleDataGenerator.setBrokerUrl(kafkaWithZKComponent.getBrokerList()); + broSampleDataGenerator.setNum(1); + broSampleDataGenerator.setSelectedSensorType("bro"); + broSampleDataGenerator.setDelay(0); + try { + while(!stop) { + broSampleDataGenerator.generateSampleData(path); } + } catch (ParseException|IOException e) { + e.printStackTrace(); + } } - private SampleDataRunner sampleDataRunner = new SampleDataRunner(); - private Thread sampleDataThread = new Thread(sampleDataRunner); - - /** - { - "name": "bro", - "numPartitions": 1, - "properties": {}, - "replicationFactor": 1 - } - */ - @Multiline - public static String broTopic; - - @Autowired - private WebApplicationContext wac; - - @Autowired - private KafkaService kafkaService; - - private MockMvc mockMvc; - - private String kafkaUrl = "/api/v1/kafka"; - private String user = "user"; - private String password = "password"; - - @Before - public void setup() throws Exception { - this.mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).apply(springSecurity()).build(); + public void stop() { + stop = true; } - - @Test - public void testSecurity() throws Exception { - this.mockMvc.perform(post(kafkaUrl + "/topic").with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broTopic)) - .andExpect(status().isUnauthorized()); - - this.mockMvc.perform(get(kafkaUrl + "/topic/bro")) - .andExpect(status().isUnauthorized()); - - this.mockMvc.perform(get(kafkaUrl + "/topic")) - .andExpect(status().isUnauthorized()); - - this.mockMvc.perform(get(kafkaUrl + "/topic/bro/sample")) - .andExpect(status().isUnauthorized()); - - this.mockMvc.perform(delete(kafkaUrl + "/topic/bro").with(csrf())) - .andExpect(status().isUnauthorized()); + } + + private SampleDataRunner sampleDataRunner = new SampleDataRunner(); + private Thread sampleDataThread = new Thread(sampleDataRunner); + + /** + { + "name": "bro", + "numPartitions": 1, + "properties": {}, + "replicationFactor": 1 + } + */ + @Multiline + public static String broTopic; + + @Autowired + private WebApplicationContext wac; + + @Autowired + private KafkaService kafkaService; + + private MockMvc mockMvc; + + private String kafkaUrl = "/api/v1/kafka"; + private String user = "user"; + private String password = "password"; + + @Before + public void setup() throws Exception { + this.mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).apply(springSecurity()).build(); + } + + @Test + public void testSecurity() throws Exception { + this.mockMvc.perform(post(kafkaUrl + "/topic").with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broTopic)) + .andExpect(status().isUnauthorized()); + + this.mockMvc.perform(get(kafkaUrl + "/topic/bro")) + .andExpect(status().isUnauthorized()); + + this.mockMvc.perform(get(kafkaUrl + "/topic")) + .andExpect(status().isUnauthorized()); + + this.mockMvc.perform(get(kafkaUrl + "/topic/bro/sample")) + .andExpect(status().isUnauthorized()); + + this.mockMvc.perform(delete(kafkaUrl + "/topic/bro").with(csrf())) + .andExpect(status().isUnauthorized()); + } + + @Test + public void test() throws Exception { + this.kafkaService.deleteTopic("bro"); + this.kafkaService.deleteTopic("someTopic"); + Thread.sleep(1000); + + this.mockMvc.perform(delete(kafkaUrl + "/topic/bro").with(httpBasic(user,password)).with(csrf())) + .andExpect(status().isNotFound()); + + this.mockMvc.perform(post(kafkaUrl + "/topic").with(httpBasic(user,password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broTopic)) + .andExpect(status().isCreated()) + .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) + .andExpect(jsonPath("$.name").value("bro")) + .andExpect(jsonPath("$.numPartitions").value(1)) + .andExpect(jsonPath("$.replicationFactor").value(1)); + + sampleDataThread.start(); + Thread.sleep(1000); + + this.mockMvc.perform(get(kafkaUrl + "/topic/bro").with(httpBasic(user,password))) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) + .andExpect(jsonPath("$.name").value("bro")) + .andExpect(jsonPath("$.numPartitions").value(1)) + .andExpect(jsonPath("$.replicationFactor").value(1)); + + this.mockMvc.perform(get(kafkaUrl + "/topic/someTopic").with(httpBasic(user,password))) + .andExpect(status().isNotFound()); + + this.mockMvc.perform(get(kafkaUrl + "/topic").with(httpBasic(user,password))) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) + .andExpect(jsonPath("$", Matchers.hasItem("bro"))); + for(int i = 0;i < KAFKA_RETRY;++i) { + MvcResult result = this.mockMvc.perform(get(kafkaUrl + "/topic/bro/sample").with(httpBasic(user, password))) + .andReturn(); + if(result.getResponse().getStatus() == 200) { + break; + } + Thread.sleep(1000); } - - @Test - public void test() throws Exception { - this.kafkaService.deleteTopic("bro"); - this.kafkaService.deleteTopic("someTopic"); - Thread.sleep(1000); - - this.mockMvc.perform(delete(kafkaUrl + "/topic/bro").with(httpBasic(user,password)).with(csrf())) - .andExpect(status().isNotFound()); - - this.mockMvc.perform(post(kafkaUrl + "/topic").with(httpBasic(user,password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broTopic)) - .andExpect(status().isCreated()) - .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) - .andExpect(jsonPath("$.name").value("bro")) - .andExpect(jsonPath("$.numPartitions").value(1)) - .andExpect(jsonPath("$.replicationFactor").value(1)); - - sampleDataThread.start(); + this.mockMvc.perform(get(kafkaUrl + "/topic/bro/sample").with(httpBasic(user,password))) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.parseMediaType("text/plain;charset=UTF-8"))) + .andExpect(jsonPath("$").isNotEmpty()); + + this.mockMvc.perform(get(kafkaUrl + "/topic/someTopic/sample").with(httpBasic(user,password))) + .andExpect(status().isNotFound()); + boolean deleted = false; + for(int i = 0;i < KAFKA_RETRY;++i) { + try { + MvcResult result = this.mockMvc.perform(delete(kafkaUrl + "/topic/bro").with(httpBasic(user, password)).with(csrf())).andReturn(); + if(result.getResponse().getStatus() == 200) { + deleted = true; + break; + } Thread.sleep(1000); - - this.mockMvc.perform(get(kafkaUrl + "/topic/bro").with(httpBasic(user,password))) - .andExpect(status().isOk()) - .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) - .andExpect(jsonPath("$.name").value("bro")) - .andExpect(jsonPath("$.numPartitions").value(1)) - .andExpect(jsonPath("$.replicationFactor").value(1)); - - this.mockMvc.perform(get(kafkaUrl + "/topic/someTopic").with(httpBasic(user,password))) - .andExpect(status().isNotFound()); - - this.mockMvc.perform(get(kafkaUrl + "/topic").with(httpBasic(user,password))) - .andExpect(status().isOk()) - .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) - .andExpect(jsonPath("$", Matchers.hasItem("bro"))); - - - this.mockMvc.perform(get(kafkaUrl + "/topic/bro/sample").with(httpBasic(user,password))) - .andExpect(status().isOk()) - .andExpect(content().contentType(MediaType.parseMediaType("text/plain;charset=UTF-8"))) - .andExpect(jsonPath("$").isNotEmpty()); - - this.mockMvc.perform(get(kafkaUrl + "/topic/someTopic/sample").with(httpBasic(user,password))) - .andExpect(status().isNotFound()); - - this.mockMvc.perform(delete(kafkaUrl + "/topic/bro").with(httpBasic(user,password)).with(csrf())) - .andExpect(status().isOk()); + } + catch(NestedServletException nse) { + Throwable t = nse.getRootCause(); + if(t instanceof TopicAlreadyMarkedForDeletionException) { + continue; + } + else { + throw nse; + } + } + catch(Throwable t) { + throw t; + } } - - @After - public void tearDown() { - sampleDataRunner.stop(); + if(!deleted) { + throw new IllegalStateException("Unable to delete kafka topic \"bro\""); } + } + + @After + public void tearDown() { + sampleDataRunner.stop(); + } }