METRON-1686 Create stop job endpoint for Pcap queries (mmiklavc via merrimanr) closes apache/metron#1115
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/f316d15f Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/f316d15f Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/f316d15f Branch: refs/heads/master Commit: f316d15f082415f44d22bb124818ec116985ba1f Parents: 39ae9f4 Author: mmiklavc <michael.miklav...@gmail.com> Authored: Thu Jul 19 16:36:12 2018 -0500 Committer: rmerriman <merrim...@gmail.com> Committed: Thu Jul 19 16:36:12 2018 -0500 ---------------------------------------------------------------------- .../metron/rest/controller/PcapController.java | 22 ++++--- .../apache/metron/rest/service/PcapService.java | 3 + .../rest/service/impl/PcapServiceImpl.java | 17 ++++- .../apache/metron/rest/config/TestConfig.java | 20 +++--- .../PcapControllerIntegrationTest.java | 68 +++++++++++++++----- .../rest/service/impl/PcapServiceImplTest.java | 58 ++++++++++++++--- .../metron/job/manager/InMemoryJobManager.java | 3 + .../org/apache/metron/pcap/PcapJobTest.java | 13 ++++ 8 files changed, 157 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/f316d15f/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/PcapController.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/PcapController.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/PcapController.java index 38bffb4..6663659 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/PcapController.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/PcapController.java @@ -21,11 +21,7 @@ import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; -import org.apache.hadoop.fs.Path; -import org.apache.metron.job.JobStatus; -import org.apache.metron.job.Statusable; import org.apache.metron.rest.RestException; -import org.apache.metron.rest.model.PcapResponse; import org.apache.metron.rest.model.pcap.FixedPcapRequest; import org.apache.metron.rest.model.pcap.PcapStatus; import org.apache.metron.rest.security.SecurityUtils; @@ -37,12 +33,8 @@ import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; -import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; -import java.util.List; -import java.util.Set; - @RestController @RequestMapping("/api/v1/pcap") public class PcapController { @@ -70,6 +62,20 @@ public class PcapController { } else { return new ResponseEntity<>(HttpStatus.NOT_FOUND); } + } + @ApiOperation(value = "Kills running job.") + @ApiResponses(value = { @ApiResponse(message = "Kills passed job.", code = 200)}) + @RequestMapping(value = "/kill/{jobId}", method = RequestMethod.DELETE) + ResponseEntity<PcapStatus> killJob( + @ApiParam(name = "jobId", value = "Job ID of submitted job", required = true) @PathVariable String jobId) + throws RestException { + PcapStatus jobStatus = pcapQueryService.killJob(SecurityUtils.getCurrentUser(), jobId); + if (jobStatus != null) { + return new ResponseEntity<>(jobStatus, HttpStatus.OK); + } else { + return new ResponseEntity<>(HttpStatus.NOT_FOUND); + } } + } http://git-wip-us.apache.org/repos/asf/metron/blob/f316d15f/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/PcapService.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/PcapService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/PcapService.java index 603e013..8073573 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/PcapService.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/PcapService.java @@ -26,4 +26,7 @@ public interface PcapService { PcapStatus fixed(String username, FixedPcapRequest fixedPcapRequest) throws RestException; PcapStatus getJobStatus(String username, String jobId) throws RestException; + + PcapStatus killJob(String username, String jobId) throws RestException; + } http://git-wip-us.apache.org/repos/asf/metron/blob/f316d15f/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java index 218e9be..6c21e77 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java @@ -17,6 +17,7 @@ */ package org.apache.metron.rest.service.impl; +import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -27,7 +28,6 @@ import org.apache.metron.job.Pageable; import org.apache.metron.job.Statusable; import org.apache.metron.job.manager.JobManager; import org.apache.metron.pcap.config.PcapOptions; -import org.apache.metron.pcap.finalizer.PcapRestFinalizer; import org.apache.metron.rest.MetronRestConstants; import org.apache.metron.rest.RestException; import org.apache.metron.rest.config.PcapJobSupplier; @@ -39,8 +39,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.stereotype.Service; -import java.io.IOException; - @Service public class PcapServiceImpl implements PcapService { @@ -92,6 +90,19 @@ public class PcapServiceImpl implements PcapService { return pcapStatus; } + @Override + public PcapStatus killJob(String username, String jobId) throws RestException { + try { + jobManager.killJob(username, jobId); + } catch (JobNotFoundException e) { + // do nothing and return null pcapStatus + return null; + } catch (JobException e) { + throw new RestException(e); + } + return getJobStatus(username, jobId); + } + protected void setPcapOptions(String username, PcapRequest pcapRequest) throws IOException { PcapOptions.JOB_NAME.put(pcapRequest, "jobName"); PcapOptions.USERNAME.put(pcapRequest, username); http://git-wip-us.apache.org/repos/asf/metron/blob/f316d15f/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java index 486a7dc..a5a0236 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java @@ -17,6 +17,15 @@ */ package org.apache.metron.rest.config; +import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; import kafka.admin.AdminUtils$; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; @@ -39,7 +48,6 @@ import org.apache.metron.integration.components.KafkaComponent; import org.apache.metron.integration.components.ZKServerComponent; import org.apache.metron.job.manager.InMemoryJobManager; import org.apache.metron.job.manager.JobManager; -import org.apache.metron.pcap.mr.PcapJob; import org.apache.metron.rest.RestException; import org.apache.metron.rest.mock.MockPcapJob; import org.apache.metron.rest.mock.MockPcapJobSupplier; @@ -53,16 +61,6 @@ import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.web.client.RestTemplate; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - -import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE; - @Configuration @Profile(TEST_PROFILE) public class TestConfig { http://git-wip-us.apache.org/repos/asf/metron/blob/f316d15f/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/PcapControllerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/PcapControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/PcapControllerIntegrationTest.java index 462d83d..2363204 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/PcapControllerIntegrationTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/PcapControllerIntegrationTest.java @@ -17,10 +17,23 @@ */ package org.apache.metron.rest.controller; +import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE; +import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf; +import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic; +import static org.springframework.security.test.web.servlet.setup.SecurityMockMvcConfigurers.springSecurity; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.delete; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; import org.adrianwalker.multilinestring.Multiline; import org.apache.hadoop.fs.Path; import org.apache.metron.common.Constants; -import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.job.JobStatus; import org.apache.metron.job.Pageable; import org.apache.metron.pcap.PcapHelper; @@ -42,21 +55,6 @@ import org.springframework.test.web.servlet.MockMvc; import org.springframework.test.web.servlet.setup.MockMvcBuilders; import org.springframework.web.context.WebApplicationContext; -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE; -import static org.hamcrest.Matchers.hasSize; -import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf; -import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic; -import static org.springframework.security.test.web.servlet.setup.SecurityMockMvcConfigurers.springSecurity; -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content; -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; - @RunWith(SpringRunner.class) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @ActiveProfiles(TEST_PROFILE) @@ -229,4 +227,42 @@ public class PcapControllerIntegrationTest { .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) .andExpect(jsonPath("$.jobStatus").value("KILLED")); } + + @Test + public void testKillJob() throws Exception { + MockPcapJob mockPcapJob = (MockPcapJob) wac.getBean("mockPcapJob"); + + this.mockMvc.perform(get(pcapUrl + "/jobId123").with(httpBasic(user, password))) + .andExpect(status().isNotFound()); + + mockPcapJob.setStatus(new JobStatus().withJobId("jobId123").withState(JobStatus.State.RUNNING)); + + this.mockMvc.perform(post(pcapUrl + "/fixed").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(fixedJson)) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) + .andExpect(jsonPath("$.jobId").value("jobId123")) + .andExpect(jsonPath("$.jobStatus").value("RUNNING")); + + mockPcapJob.setStatus(new JobStatus().withJobId("jobId123").withState(JobStatus.State.KILLED)); + + this.mockMvc.perform(delete(pcapUrl + "/kill/{id}", "jobId123").with(httpBasic(user, password))) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) + .andExpect(jsonPath("$.jobId").value("jobId123")) + .andExpect(jsonPath("$.jobStatus").value("KILLED")); + + mockPcapJob.setStatus(new JobStatus().withJobId("jobId").withState(JobStatus.State.KILLED)); + } + + @Test + public void testKillNonExistentJobReturns404() throws Exception { + MockPcapJob mockPcapJob = (MockPcapJob) wac.getBean("mockPcapJob"); + + this.mockMvc.perform(get(pcapUrl + "/jobId123").with(httpBasic(user, password))) + .andExpect(status().isNotFound()); + + this.mockMvc.perform(delete(pcapUrl + "/kill/{id}", "jobId123").with(httpBasic(user, password))) + .andExpect(status().isNotFound()); + } + } http://git-wip-us.apache.org/repos/asf/metron/blob/f316d15f/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java index 2b6bea3..8b628b3 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java @@ -17,11 +17,25 @@ */ package org.apache.metron.rest.service.impl; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.powermock.api.mockito.PowerMockito.doReturn; + +import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.metron.common.Constants; import org.apache.metron.job.JobException; +import org.apache.metron.job.JobNotFoundException; import org.apache.metron.job.JobStatus; +import org.apache.metron.job.JobStatus.State; import org.apache.metron.job.Pageable; import org.apache.metron.job.manager.InMemoryJobManager; import org.apache.metron.job.manager.JobManager; @@ -34,6 +48,7 @@ import org.apache.metron.rest.mock.MockPcapJob; import org.apache.metron.rest.mock.MockPcapJobSupplier; import org.apache.metron.rest.model.pcap.FixedPcapRequest; import org.apache.metron.rest.model.pcap.PcapStatus; +import org.hamcrest.CoreMatchers; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -41,15 +56,6 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.springframework.core.env.Environment; -import java.util.HashMap; -import java.util.Map; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; -import static org.powermock.api.mockito.PowerMockito.doReturn; -import static org.powermock.api.mockito.PowerMockito.whenNew; - @SuppressWarnings("ALL") public class PcapServiceImplTest { @Rule @@ -233,4 +239,38 @@ public class PcapServiceImplTest { pcapService.getJobStatus("user", "jobId"); } + @Test + public void killJobShouldKillJobAndReportStatus() throws Exception { + MockPcapJob mockPcapJob = mock(MockPcapJob.class); + JobManager jobManager = mock(JobManager.class); + JobStatus actualJobStatus = new JobStatus() + .withJobId("jobId") + .withState(State.KILLED) + .withDescription("description") + .withPercentComplete(100.0); + Pageable pageable = mock(Pageable.class); + when(pageable.getSize()).thenReturn(0); + when(mockPcapJob.getStatus()).thenReturn(actualJobStatus); + when(mockPcapJob.isDone()).thenReturn(true); + when(mockPcapJob.get()).thenReturn(pageable); + when(jobManager.getJob("user", "jobId")).thenReturn(mockPcapJob); + + PcapServiceImpl pcapService = new PcapServiceImpl(environment, configuration, mockPcapJobSupplier, jobManager); + PcapStatus status = pcapService.killJob("user", "jobId"); + verify(jobManager, times(1)).killJob("user", "jobId"); + assertThat(status.getJobStatus(), CoreMatchers.equalTo(State.KILLED.toString())); + } + + @Test + public void killNonExistentJobShouldReturnNull() throws Exception { + MockPcapJob mockPcapJob = mock(MockPcapJob.class); + JobManager jobManager = mock(JobManager.class); + doThrow(new JobNotFoundException("Not found test exception.")).when(jobManager).killJob("user", "jobId"); + + PcapServiceImpl pcapService = new PcapServiceImpl(environment, configuration, mockPcapJobSupplier, jobManager); + PcapStatus status = pcapService.killJob("user", "jobId"); + verify(jobManager, times(1)).killJob("user", "jobId"); + assertNull(status); + } + } http://git-wip-us.apache.org/repos/asf/metron/blob/f316d15f/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java index 1340aa5..807af4d 100644 --- a/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java +++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java @@ -61,6 +61,9 @@ public class InMemoryJobManager<PAGE_T> implements JobManager<PAGE_T> { return getJob(username, jobId).isDone(); } + /** + * Note: throws JobNotFoundException for non-existent jobs. + */ @Override public void killJob(String username, String jobId) throws JobException { getJob(username, jobId).kill(); http://git-wip-us.apache.org/repos/asf/metron/blob/f316d15f/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java index 1e389d9..b28c428 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java @@ -221,4 +221,17 @@ public class PcapJobTest { Assert.assertThat(status.getDescription(), equalTo("map: 100.0%, reduce: 50.0%")); } + @Test + public void killing_job_causes_status_to_return_KILLED_state() throws Exception { + when(mrJob.isComplete()).thenReturn(false); + when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.RUNNING); + when(mrJob.getStatus()).thenReturn(mrStatus); + Statusable<Path> statusable = testJob.submit(finalizer, config); + statusable.kill(); + when(mrJob.isComplete()).thenReturn(true); + when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.KILLED); + JobStatus status = statusable.getStatus(); + Assert.assertThat(status.getState(), equalTo(State.KILLED)); + } + }