METRON-1674 Create REST endpoint for job status abstraction (merrimanr) closes apache/metron#1109
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/39ae9f46 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/39ae9f46 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/39ae9f46 Branch: refs/heads/feature/METRON-1554-pcap-query-panel Commit: 39ae9f4642073d3d4f0fa423339dd97f85974588 Parents: dbbf624 Author: merrimanr <[email protected]> Authored: Thu Jul 19 11:01:49 2018 -0500 Committer: rmerriman <[email protected]> Committed: Thu Jul 19 11:01:49 2018 -0500 ---------------------------------------------------------------------- .../rest/model/pcap/FixedPcapOptions.java | 42 ++++ .../rest/model/pcap/FixedPcapRequest.java | 72 ++++-- .../metron/rest/model/pcap/PcapRequest.java | 65 +++--- .../metron/rest/model/pcap/PcapStatus.java | 91 ++++++++ .../apache/metron/rest/MetronRestConstants.java | 6 +- .../apache/metron/rest/config/PcapConfig.java | 14 +- .../metron/rest/config/PcapJobSupplier.java | 54 +++++ .../metron/rest/controller/PcapController.java | 34 +-- .../apache/metron/rest/service/PcapService.java | 6 +- .../rest/service/impl/PcapServiceImpl.java | 126 ++++++----- .../src/main/resources/application.yml | 6 +- .../apache/metron/rest/config/TestConfig.java | 17 +- .../PcapControllerIntegrationTest.java | 127 ++++++++++- .../apache/metron/rest/mock/MockPcapJob.java | 106 ++++++--- .../metron/rest/mock/MockPcapJobSupplier.java | 36 +++ .../rest/service/impl/PcapServiceImplTest.java | 217 +++++++++++++------ .../common/configuration/ConfigOption.java | 12 +- .../apache/metron/job/JobNotFoundException.java | 30 +++ .../apache/metron/job/RuntimeJobException.java | 30 +++ .../metron/job/manager/InMemoryJobManager.java | 11 +- .../org/apache/metron/pcap/query/PcapCli.java | 12 - .../PcapTopologyIntegrationTest.java | 6 +- .../apache/metron/pcap/query/PcapCliTest.java | 2 - .../apache/metron/pcap/config/PcapOptions.java | 2 + .../metron/pcap/finalizer/PcapCliFinalizer.java | 4 +- .../metron/pcap/finalizer/PcapFinalizer.java | 8 +- .../pcap/finalizer/PcapRestFinalizer.java | 22 +- .../java/org/apache/metron/pcap/mr/PcapJob.java | 18 +- 28 files changed, 882 insertions(+), 294 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/FixedPcapOptions.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/FixedPcapOptions.java b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/FixedPcapOptions.java new file mode 100644 index 0000000..5e77005 --- /dev/null +++ b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/FixedPcapOptions.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.rest.model.pcap; + +import org.apache.metron.common.configuration.ConfigOption; + +public enum FixedPcapOptions implements ConfigOption { + IP_SRC_ADDR("ipSrcAddr"), + IP_DST_ADDR("ipDstAddr"), + IP_SRC_PORT("ipSrcPort"), + IP_DST_PORT("ipDstPort"), + PROTOCOL("protocol"), + PACKET_FILTER("packetFilter"), + INCLUDE_REVERSE("includeReverse") + ; + + String key; + + FixedPcapOptions(String key) { + this.key = key; + } + + @Override + public String getKey() { + return key; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/FixedPcapRequest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/FixedPcapRequest.java b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/FixedPcapRequest.java index 758340b..a2d345b 100644 --- a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/FixedPcapRequest.java +++ b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/FixedPcapRequest.java @@ -17,70 +17,100 @@ */ package org.apache.metron.rest.model.pcap; +import org.apache.metron.common.Constants; +import org.apache.metron.pcap.config.PcapOptions; +import org.apache.metron.pcap.PcapHelper; +import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; + +import java.util.HashMap; +import java.util.Map; + public class FixedPcapRequest extends PcapRequest { - private String ipSrcAddr; - private String ipDstAddr; - private Integer ipSrcPort; - private Integer ipDstPort; - private String protocol; - private String packetFilter; - private Boolean includeReverse = false; + public FixedPcapRequest() { + PcapOptions.FILTER_IMPL.put(this, new FixedPcapFilter.Configurator()); + } public String getIpSrcAddr() { - return ipSrcAddr; + return FixedPcapOptions.IP_SRC_ADDR.get(this, String.class); } public void setIpSrcAddr(String ipSrcAddr) { - this.ipSrcAddr = ipSrcAddr; + FixedPcapOptions.IP_SRC_ADDR.put(this, ipSrcAddr); } public String getIpDstAddr() { - return ipDstAddr; + return FixedPcapOptions.IP_DST_ADDR.get(this, String.class); } public void setIpDstAddr(String ipDstAddr) { - this.ipDstAddr = ipDstAddr; + FixedPcapOptions.IP_DST_ADDR.put(this, ipDstAddr); } public Integer getIpSrcPort() { - return ipSrcPort; + return FixedPcapOptions.IP_SRC_PORT.get(this, Integer.class); } public void setIpSrcPort(Integer ipSrcPort) { - this.ipSrcPort = ipSrcPort; + FixedPcapOptions.IP_SRC_PORT.put(this, ipSrcPort); } public Integer getIpDstPort() { - return ipDstPort; + return FixedPcapOptions.IP_DST_PORT.get(this, Integer.class); } public void setIpDstPort(Integer ipDstPort) { - this.ipDstPort = ipDstPort; + FixedPcapOptions.IP_DST_PORT.put(this, ipDstPort); } public String getProtocol() { - return protocol; + return FixedPcapOptions.PROTOCOL.get(this, String.class); } public void setProtocol(String protocol) { - this.protocol = protocol; + FixedPcapOptions.PROTOCOL.put(this, protocol); } public String getPacketFilter() { - return packetFilter; + return FixedPcapOptions.PACKET_FILTER.get(this, String.class); } public void setPacketFilter(String packetFilter) { - this.packetFilter = packetFilter; + FixedPcapOptions.PACKET_FILTER.put(this, packetFilter); } public Boolean getIncludeReverse() { - return includeReverse; + return FixedPcapOptions.INCLUDE_REVERSE.get(this, Boolean.class); } public void setIncludeReverse(Boolean includeReverse) { - this.includeReverse = includeReverse; + FixedPcapOptions.INCLUDE_REVERSE.put(this, includeReverse); + } + + public void setFields() { + Map<String, String> fields = new HashMap<>(); + if (getIpSrcAddr() != null) { + fields.put(Constants.Fields.SRC_ADDR.getName(), getIpSrcAddr()); + } + if (getIpDstAddr() != null) { + fields.put(Constants.Fields.DST_ADDR.getName(), getIpDstAddr()); + } + if (getIpSrcPort() != null) { + fields.put(Constants.Fields.SRC_PORT.getName(), getIpSrcPort().toString()); + } + if (getIpDstPort() != null) { + fields.put(Constants.Fields.DST_PORT.getName(), getIpDstPort().toString()); + } + if (getProtocol() != null) { + fields.put(Constants.Fields.PROTOCOL.getName(), getProtocol()); + } + if (getIncludeReverse() != null) { + fields.put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), getIncludeReverse().toString()); + } + if (getPacketFilter() != null) { + fields.put(PcapHelper.PacketFields.PACKET_FILTER.getName(), getPacketFilter()); + } + PcapOptions.FIELDS.put(this, fields); } @Override http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/PcapRequest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/PcapRequest.java b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/PcapRequest.java index 5941d17..64ed932 100644 --- a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/PcapRequest.java +++ b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/PcapRequest.java @@ -17,48 +17,57 @@ */ package org.apache.metron.rest.model.pcap; -// TODO reconcile with pcapmrjob - import org.apache.commons.collections4.map.AbstractMapDecorator; import org.apache.metron.pcap.config.PcapOptions; +import java.util.HashMap; + public class PcapRequest extends AbstractMapDecorator<String, Object> { public PcapRequest() { - setStartTime(0L); - setEndTime(System.currentTimeMillis()); - setNumReducers(1); + super(new HashMap<>()); + setStartTimeMs(0L); + setEndTimeMs(System.currentTimeMillis()); + setNumReducers(10); + } + + public String getBasePath() { + return PcapOptions.BASE_PATH.get(this, String.class); + } + + public void setBasePath(String basePath) { + PcapOptions.BASE_PATH.put(this, basePath); } - public String getBaseOutputPath() { + public String getBaseInterimResultPath() { return PcapOptions.BASE_INTERIM_RESULT_PATH.get(this, String.class); } - public void setBaseOutputPath(String baseOutputPath) { - PcapOptions.BASE_INTERIM_RESULT_PATH.put(this, baseOutputPath); + public void setBaseInterimResultPath(String baseInterimResultPath) { + PcapOptions.BASE_INTERIM_RESULT_PATH.put(this, baseInterimResultPath); } - public String getBasePath() { - return PcapOptions.BASE_PATH.get(this, String.class); + public String getFinalOutputPath() { + return PcapOptions.FINAL_OUTPUT_PATH.get(this, String.class); } - public void setBasePath(String basePath) { - PcapOptions.BASE_PATH.put(this, basePath); + public void setFinalOutputPath(String finalOutputPath) { + PcapOptions.FINAL_OUTPUT_PATH.put(this, finalOutputPath); } - public Long getStartTime() { + public Long getStartTimeMs() { return PcapOptions.START_TIME_MS.get(this, Long.class); } - public void setStartTime(Long startTime) { + public void setStartTimeMs(Long startTime) { PcapOptions.START_TIME_MS.put(this, startTime); } - public Long getEndTime() { + public Long getEndTimeMs() { return PcapOptions.END_TIME_MS.get(this, Long.class); } - public void setEndTime(Long endTime) { + public void setEndTimeMs(Long endTime) { PcapOptions.END_TIME_MS.put(this, endTime); } @@ -69,28 +78,4 @@ public class PcapRequest extends AbstractMapDecorator<String, Object> { public void setNumReducers(Integer numReducers) { PcapOptions.NUM_REDUCERS.put(this, numReducers); } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - PcapRequest pcapRequest = (PcapRequest) o; - - return (getBaseOutputPath() != null ? getBaseOutputPath().equals(pcapRequest.getBaseOutputPath()) : pcapRequest.getBaseOutputPath() != null) && - (getBasePath() != null ? getBasePath().equals(pcapRequest.getBasePath()) : pcapRequest.getBasePath() == null) && - (getStartTime() != null ? getStartTime().equals(pcapRequest.getStartTime()) : pcapRequest.getStartTime() == null) && - (getEndTime() != null ? getEndTime().equals(pcapRequest.getEndTime()) : pcapRequest.getEndTime() == null) && - (getNumReducers() != null ? getNumReducers().equals(pcapRequest.getNumReducers()) : pcapRequest.getNumReducers() == null); - } - - @Override - public int hashCode() { - int result = getBaseOutputPath() != null ? getBaseOutputPath().hashCode() : 0; - result = 31 * result + (getBasePath() != null ? getBasePath().hashCode() : 0); - result = 31 * result + (getStartTime() != null ? getStartTime().hashCode() : 0); - result = 31 * result + (getEndTime() != null ? getEndTime().hashCode() : 0); - result = 31 * result + (getNumReducers() != null ? getNumReducers().hashCode() : 0); - return result; - } } http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/PcapStatus.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/PcapStatus.java b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/PcapStatus.java new file mode 100644 index 0000000..f004eb5 --- /dev/null +++ b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/PcapStatus.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.rest.model.pcap; + +public class PcapStatus { + + private String jobId; + private String jobStatus; + private String description; + private Double percentComplete = 0.0; + private Integer pageTotal = 0; + + public String getJobId() { + return jobId; + } + + public void setJobId(String jobId) { + this.jobId = jobId; + } + + public String getJobStatus() { + return jobStatus; + } + + public void setJobStatus(String jobStatus) { + this.jobStatus = jobStatus; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public Double getPercentComplete() { + return percentComplete; + } + + public void setPercentComplete(Double percentComplete) { + this.percentComplete = percentComplete; + } + + public Integer getPageTotal() { + return pageTotal; + } + + public void setPageTotal(Integer size) { + this.pageTotal = size; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + PcapStatus pcapStatus = (PcapStatus) o; + + return (getJobId() != null ? getJobId().equals(pcapStatus.getJobId()) : pcapStatus.getJobId() != null) && + (getJobStatus() != null ? getJobStatus().equals(pcapStatus.getJobStatus()) : pcapStatus.getJobStatus() != null) && + (getDescription() != null ? getDescription().equals(pcapStatus.getDescription()) : pcapStatus.getDescription() != null) && + (getPercentComplete() != null ? getPercentComplete().equals(pcapStatus.getPercentComplete()) : pcapStatus.getPercentComplete() != null) && + (getPageTotal() != null ? getPageTotal().equals(pcapStatus.getPageTotal()) : pcapStatus.getPageTotal() != null); + } + + @Override + public int hashCode() { + int result = (getJobId() != null ? getJobId().hashCode() : 0); + result = 31 * result + (getJobStatus() != null ? getJobStatus().hashCode() : 0); + result = 31 * result + (getDescription() != null ? getDescription().hashCode() : 0); + result = 31 * result + (getPercentComplete() != null ? getPercentComplete().hashCode() : 0); + result = 31 * result + (getPageTotal() != null ? getPageTotal().hashCode() : 0); + return result; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java index 0989d12..8e14e38 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java @@ -75,6 +75,8 @@ public class MetronRestConstants { public static final String LOGGING_SYSTEM_PROPERTY = "org.springframework.boot.logging.LoggingSystem"; - public static final String PCAP_INPUT_PATH_SPRING_PROPERTY = "pcap.input.path"; - public static final String PCAP_OUTPUT_PATH_SPRING_PROPERTY = "pcap.output.path"; + public static final String PCAP_BASE_PATH_SPRING_PROPERTY = "pcap.base.path"; + public static final String PCAP_BASE_INTERIM_RESULT_PATH_SPRING_PROPERTY = "pcap.base.interim.result.path"; + public static final String PCAP_FINAL_OUTPUT_PATH_SPRING_PROPERTY = "pcap.final.output.path"; + public static final String PCAP_PAGE_SIZE_SPRING_PROPERTY = "pcap.page.size"; } http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapConfig.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapConfig.java index 8da5f96..a0b7f18 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapConfig.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapConfig.java @@ -17,7 +17,8 @@ */ package org.apache.metron.rest.config; -import org.apache.metron.pcap.mr.PcapJob; +import org.apache.metron.job.manager.InMemoryJobManager; +import org.apache.metron.job.manager.JobManager; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; @@ -29,7 +30,14 @@ import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE; public class PcapConfig { @Bean - public PcapJob pcapJob() { - return new PcapJob(); + public JobManager jobManager() { + return new InMemoryJobManager(); } + + @Bean + public PcapJobSupplier pcapJobSupplier() { + return new PcapJobSupplier(); + } + + } http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapJobSupplier.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapJobSupplier.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapJobSupplier.java new file mode 100644 index 0000000..1e79f6a --- /dev/null +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapJobSupplier.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.rest.config; + +import org.apache.hadoop.fs.Path; +import org.apache.metron.job.Finalizer; +import org.apache.metron.job.JobException; +import org.apache.metron.job.RuntimeJobException; +import org.apache.metron.job.Statusable; +import org.apache.metron.pcap.finalizer.PcapFinalizerStrategies; +import org.apache.metron.pcap.finalizer.PcapRestFinalizer; +import org.apache.metron.pcap.mr.PcapJob; +import org.apache.metron.rest.model.pcap.PcapRequest; + +import java.util.function.Supplier; + +public class PcapJobSupplier implements Supplier<Statusable<Path>> { + + private PcapRequest pcapRequest; + + @Override + public Statusable<Path> get() { + try { + PcapJob<Path> pcapJob = createPcapJob(); + return pcapJob.submit(PcapFinalizerStrategies.REST, pcapRequest); + } catch (JobException e) { + throw new RuntimeJobException(e.getMessage()); + } + } + + public void setPcapRequest(PcapRequest pcapRequest) { + this.pcapRequest = pcapRequest; + } + + protected PcapJob createPcapJob() { + return new PcapJob(); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/PcapController.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/PcapController.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/PcapController.java index 3524a8c..38bffb4 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/PcapController.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/PcapController.java @@ -21,10 +21,14 @@ import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; +import org.apache.hadoop.fs.Path; import org.apache.metron.job.JobStatus; +import org.apache.metron.job.Statusable; import org.apache.metron.rest.RestException; import org.apache.metron.rest.model.PcapResponse; import org.apache.metron.rest.model.pcap.FixedPcapRequest; +import org.apache.metron.rest.model.pcap.PcapStatus; +import org.apache.metron.rest.security.SecurityUtils; import org.apache.metron.rest.service.PcapService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; @@ -33,8 +37,12 @@ import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import java.util.List; +import java.util.Set; + @RestController @RequestMapping("/api/v1/pcap") public class PcapController { @@ -45,27 +53,23 @@ public class PcapController { @ApiOperation(value = "Executes a Fixed Pcap Query.") @ApiResponses(value = { @ApiResponse(message = "Returns a job status with job ID.", code = 200)}) @RequestMapping(value = "/fixed", method = RequestMethod.POST) - ResponseEntity<JobStatus> fixed(@ApiParam(name="fixedPcapRequest", value="A Fixed Pcap Request" + ResponseEntity<PcapStatus> fixed(@ApiParam(name="fixedPcapRequest", value="A Fixed Pcap Request" + " which includes fixed filter fields like ip source address and protocol.", required=true)@RequestBody FixedPcapRequest fixedPcapRequest) throws RestException { - JobStatus jobStatus = pcapQueryService.fixed(fixedPcapRequest); - return new ResponseEntity<>(jobStatus, HttpStatus.OK); + PcapStatus pcapStatus = pcapQueryService.fixed(SecurityUtils.getCurrentUser(), fixedPcapRequest); + return new ResponseEntity<>(pcapStatus, HttpStatus.OK); } @ApiOperation(value = "Gets job status for running job.") @ApiResponses(value = { @ApiResponse(message = "Returns a job status for the passed job.", code = 200)}) - @RequestMapping(value = "/getStatus", method = RequestMethod.GET) - ResponseEntity<JobStatus> getStatus(@ApiParam(name="jobId", value="Job ID of submitted job" + @RequestMapping(value = "/{jobId}", method = RequestMethod.GET) + ResponseEntity<PcapStatus> getStatus(@ApiParam(name="jobId", value="Job ID of submitted job" + " which includes fixed filter fields like ip source address and protocol.", required=true)@PathVariable String jobId) throws RestException { - JobStatus jobStatus = pcapQueryService.getJobStatus("metron", jobId); - return new ResponseEntity<>(jobStatus, HttpStatus.OK); - } + PcapStatus jobStatus = pcapQueryService.getJobStatus(SecurityUtils.getCurrentUser(), jobId); + if (jobStatus != null) { + return new ResponseEntity<>(jobStatus, HttpStatus.OK); + } else { + return new ResponseEntity<>(HttpStatus.NOT_FOUND); + } - @ApiOperation(value = "Gets results of a pcap job.") - @ApiResponses(value = { @ApiResponse(message = "Returns a PcapResponse containing an array of pcaps.", code = 200)}) - @RequestMapping(value = "/getPage", method = RequestMethod.GET) - ResponseEntity<PcapResponse> getPage(@ApiParam(name="fixedPcapRequest", value="Job ID of submitted job" - + " which includes fixed filter fields like ip source address and protocol.", required=true)@RequestBody String jobId, int pageNum) throws RestException { - PcapResponse pcapResponse = pcapQueryService.getPage("metron", jobId, pageNum); - return new ResponseEntity<>(pcapResponse, HttpStatus.OK); } } http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/PcapService.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/PcapService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/PcapService.java index ce8372c..603e013 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/PcapService.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/PcapService.java @@ -18,10 +18,12 @@ package org.apache.metron.rest.service; import org.apache.metron.rest.RestException; -import org.apache.metron.rest.model.PcapResponse; import org.apache.metron.rest.model.pcap.FixedPcapRequest; +import org.apache.metron.rest.model.pcap.PcapStatus; public interface PcapService { - PcapResponse fixed(FixedPcapRequest fixedPcapRequest) throws RestException; + PcapStatus fixed(String username, FixedPcapRequest fixedPcapRequest) throws RestException; + + PcapStatus getJobStatus(String username, String jobId) throws RestException; } http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java index 4dae1e5..218e9be 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java @@ -20,101 +20,107 @@ package org.apache.metron.rest.service.impl; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.metron.common.Constants; -import org.apache.metron.common.hadoop.SequenceFileIterable; -import org.apache.metron.common.utils.timestamp.TimestampConverters; -import org.apache.metron.pcap.PcapHelper; -import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; -import org.apache.metron.pcap.mr.PcapJob; +import org.apache.metron.job.JobException; +import org.apache.metron.job.JobNotFoundException; +import org.apache.metron.job.JobStatus; +import org.apache.metron.job.Pageable; +import org.apache.metron.job.Statusable; +import org.apache.metron.job.manager.JobManager; +import org.apache.metron.pcap.config.PcapOptions; +import org.apache.metron.pcap.finalizer.PcapRestFinalizer; import org.apache.metron.rest.MetronRestConstants; import org.apache.metron.rest.RestException; -import org.apache.metron.rest.model.PcapResponse; +import org.apache.metron.rest.config.PcapJobSupplier; import org.apache.metron.rest.model.pcap.FixedPcapRequest; +import org.apache.metron.rest.model.pcap.PcapRequest; +import org.apache.metron.rest.model.pcap.PcapStatus; import org.apache.metron.rest.service.PcapService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.stereotype.Service; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; @Service public class PcapServiceImpl implements PcapService { private Environment environment; private Configuration configuration; - private PcapJob pcapJob; + private PcapJobSupplier pcapJobSupplier; + private JobManager<Path> jobManager; @Autowired - public PcapServiceImpl(Environment environment, Configuration configuration, PcapJob pcapJob) { + public PcapServiceImpl(Environment environment, Configuration configuration, PcapJobSupplier pcapJobSupplier, JobManager<Path> jobManager) { this.environment = environment; this.configuration = configuration; - this.pcapJob = pcapJob; + this.pcapJobSupplier = pcapJobSupplier; + this.jobManager = jobManager; } @Override - public PcapResponse fixed(FixedPcapRequest fixedPcapRequest) throws RestException { - if (fixedPcapRequest.getBasePath() == null) { - fixedPcapRequest.setBasePath(environment.getProperty(MetronRestConstants.PCAP_INPUT_PATH_SPRING_PROPERTY)); - } - if (fixedPcapRequest.getBaseOutputPath() == null) { - fixedPcapRequest.setBaseOutputPath(environment.getProperty(MetronRestConstants.PCAP_OUTPUT_PATH_SPRING_PROPERTY)); + public PcapStatus fixed(String username, FixedPcapRequest fixedPcapRequest) throws RestException { + try { + setPcapOptions(username, fixedPcapRequest); + fixedPcapRequest.setFields(); + pcapJobSupplier.setPcapRequest(fixedPcapRequest); + JobStatus jobStatus = jobManager.submit(pcapJobSupplier, username); + return jobStatusToPcapStatus(jobStatus); + } catch (IOException | JobException e) { + throw new RestException(e); } - PcapResponse response = new PcapResponse(); - SequenceFileIterable results; + } + + @Override + public PcapStatus getJobStatus(String username, String jobId) throws RestException { + PcapStatus pcapStatus = null; try { - results = pcapJob.query( - new Path(fixedPcapRequest.getBasePath()), - new Path(fixedPcapRequest.getBaseOutputPath()), - TimestampConverters.MILLISECONDS.toNanoseconds(fixedPcapRequest.getStartTime()), - TimestampConverters.MILLISECONDS.toNanoseconds(fixedPcapRequest.getEndTime()), - fixedPcapRequest.getNumReducers(), - getFixedFields(fixedPcapRequest), - configuration, - getFileSystem(), - new FixedPcapFilter.Configurator() - ); - if (results != null) { - List<byte[]> pcaps = new ArrayList<>(); - results.iterator().forEachRemaining(pcaps::add); - response.setPcaps(pcaps); + Statusable<Path> statusable = jobManager.getJob(username, jobId); + if (statusable != null) { + pcapStatus = jobStatusToPcapStatus(statusable.getStatus()); + if (statusable.isDone()) { + Pageable<Path> pageable = statusable.get(); + if (pageable != null) { + pcapStatus.setPageTotal(pageable.getSize()); + } + } } - } catch (IOException | ClassNotFoundException | InterruptedException e) { + } catch (JobNotFoundException | InterruptedException e) { + // do nothing and return null pcapStatus + } catch (JobException e) { throw new RestException(e); } - return response; + return pcapStatus; } - protected Map<String, String> getFixedFields(FixedPcapRequest fixedPcapRequest) { - Map<String, String> fixedFields = new HashMap<>(); - if (fixedPcapRequest.getIpSrcAddr() != null) { - fixedFields.put(Constants.Fields.SRC_ADDR.getName(), fixedPcapRequest.getIpSrcAddr()); - } - if (fixedPcapRequest.getIpDstAddr() != null) { - fixedFields.put(Constants.Fields.DST_ADDR.getName(), fixedPcapRequest.getIpDstAddr()); - } - if (fixedPcapRequest.getIpSrcPort() != null) { - fixedFields.put(Constants.Fields.SRC_PORT.getName(), fixedPcapRequest.getIpSrcPort().toString()); - } - if (fixedPcapRequest.getIpDstPort() != null) { - fixedFields.put(Constants.Fields.DST_PORT.getName(), fixedPcapRequest.getIpDstPort().toString()); - } - if (fixedPcapRequest.getProtocol() != null) { - fixedFields.put(Constants.Fields.PROTOCOL.getName(), fixedPcapRequest.getProtocol()); + protected void setPcapOptions(String username, PcapRequest pcapRequest) throws IOException { + PcapOptions.JOB_NAME.put(pcapRequest, "jobName"); + PcapOptions.USERNAME.put(pcapRequest, username); + PcapOptions.HADOOP_CONF.put(pcapRequest, configuration); + PcapOptions.FILESYSTEM.put(pcapRequest, getFileSystem()); + + if (pcapRequest.getBasePath() == null) { + pcapRequest.setBasePath(environment.getProperty(MetronRestConstants.PCAP_BASE_PATH_SPRING_PROPERTY)); } - if (fixedPcapRequest.getIncludeReverse() != null) { - fixedFields.put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), fixedPcapRequest.getIncludeReverse().toString()); + if (pcapRequest.getBaseInterimResultPath() == null) { + pcapRequest.setBaseInterimResultPath(environment.getProperty(MetronRestConstants.PCAP_BASE_INTERIM_RESULT_PATH_SPRING_PROPERTY)); } - if (fixedPcapRequest.getPacketFilter() != null) { - fixedFields.put(PcapHelper.PacketFields.PACKET_FILTER.getName(), fixedPcapRequest.getPacketFilter()); + if (pcapRequest.getFinalOutputPath() == null) { + pcapRequest.setFinalOutputPath(environment.getProperty(MetronRestConstants.PCAP_FINAL_OUTPUT_PATH_SPRING_PROPERTY)); } - return fixedFields; + + PcapOptions.NUM_RECORDS_PER_FILE.put(pcapRequest, Integer.parseInt(environment.getProperty(MetronRestConstants.PCAP_PAGE_SIZE_SPRING_PROPERTY))); } protected FileSystem getFileSystem() throws IOException { return FileSystem.get(configuration); } + + protected PcapStatus jobStatusToPcapStatus(JobStatus jobStatus) { + PcapStatus pcapStatus = new PcapStatus(); + pcapStatus.setJobId(jobStatus.getJobId()); + pcapStatus.setJobStatus(jobStatus.getState().toString()); + pcapStatus.setDescription(jobStatus.getDescription()); + pcapStatus.setPercentComplete(jobStatus.getPercentComplete()); + return pcapStatus; + } } http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/main/resources/application.yml ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/resources/application.yml b/metron-interface/metron-rest/src/main/resources/application.yml index 10c2f50..5fd9d72 100644 --- a/metron-interface/metron-rest/src/main/resources/application.yml +++ b/metron-interface/metron-rest/src/main/resources/application.yml @@ -74,5 +74,7 @@ user: cf: cf pcap: - input.path: /apps/metron/pcap - output.path: /tmp \ No newline at end of file + base.path: /apps/metron/pcap/input + base.interim.result.path: /apps/metron/pcap/interim + final.output.path: /apps/metron/pcap/output + page.size: 10 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java index a9e70d2..486a7dc 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java @@ -37,9 +37,12 @@ import org.apache.metron.integration.ComponentRunner; import org.apache.metron.integration.UnableToStartException; import org.apache.metron.integration.components.KafkaComponent; import org.apache.metron.integration.components.ZKServerComponent; +import org.apache.metron.job.manager.InMemoryJobManager; +import org.apache.metron.job.manager.JobManager; import org.apache.metron.pcap.mr.PcapJob; import org.apache.metron.rest.RestException; import org.apache.metron.rest.mock.MockPcapJob; +import org.apache.metron.rest.mock.MockPcapJobSupplier; import org.apache.metron.rest.mock.MockStormCLIClientWrapper; import org.apache.metron.rest.mock.MockStormRestTemplate; import org.apache.metron.rest.service.impl.StormCLIWrapper; @@ -189,7 +192,19 @@ public class TestConfig { } @Bean - public PcapJob mockPcapJob() { + public JobManager jobManager() { + return new InMemoryJobManager(); + } + + @Bean + public MockPcapJob mockPcapJob() { return new MockPcapJob(); } + + @Bean + public PcapJobSupplier pcapJobSupplier(MockPcapJob mockPcapJob) { + MockPcapJobSupplier mockPcapJobSupplier = new MockPcapJobSupplier(); + mockPcapJobSupplier.setMockPcapJob(mockPcapJob); + return mockPcapJobSupplier; + } } http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/PcapControllerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/PcapControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/PcapControllerIntegrationTest.java index 5e4875a..462d83d 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/PcapControllerIntegrationTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/PcapControllerIntegrationTest.java @@ -18,9 +18,13 @@ package org.apache.metron.rest.controller; import org.adrianwalker.multilinestring.Multiline; +import org.apache.hadoop.fs.Path; import org.apache.metron.common.Constants; import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.job.JobStatus; +import org.apache.metron.job.Pageable; import org.apache.metron.pcap.PcapHelper; +import org.apache.metron.pcap.PcapPages; import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; import org.apache.metron.rest.mock.MockPcapJob; import org.apache.metron.rest.model.PcapResponse; @@ -43,11 +47,14 @@ import java.util.List; import java.util.Map; import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE; +import static org.hamcrest.Matchers.hasSize; import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf; import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic; import static org.springframework.security.test.web.servlet.setup.SecurityMockMvcConfigurers.springSecurity; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @RunWith(SpringRunner.class) @@ -57,23 +64,38 @@ public class PcapControllerIntegrationTest { /** { - "basePath": "/apps/metron/pcap", - "baseOutputPath": "/tmp", - "endTime": 10, + "basePath": "/base/path", + "baseInterimResultPath": "/base/interim/result/path", + "finalOutputPath": "/final/output/path", + "startTimeMs": 10, + "endTimeMs": 20, + "numReducers": 2, "includeReverse": "true", "ipDstAddr": "192.168.1.1", "ipDstPort": "1000", "ipSrcAddr": "192.168.1.2", "ipSrcPort": "2000", - "numReducers": 2, "packetFilter": "filter", - "protocol": "TCP", - "startTime": 1 + "protocol": "TCP" } */ @Multiline public static String fixedJson; + /** + { + "includeReverse": "true", + "ipDstAddr": "192.168.1.1", + "ipDstPort": "1000", + "ipSrcAddr": "192.168.1.2", + "ipSrcPort": "2000", + "packetFilter": "filter", + "protocol": "TCP" + } + */ + @Multiline + public static String fixedWithDefaultsJson; + @Autowired private PcapService pcapService; @@ -84,6 +106,7 @@ public class PcapControllerIntegrationTest { private String pcapUrl = "/api/v1/pcap"; private String user = "user"; + private String user2 = "user2"; private String password = "password"; @Before @@ -98,22 +121,24 @@ public class PcapControllerIntegrationTest { } @Test - public void testFixed() throws Exception { + public void testFixedRequest() throws Exception { MockPcapJob mockPcapJob = (MockPcapJob) wac.getBean("mockPcapJob"); List<byte[]> results = Arrays.asList("pcap1".getBytes(), "pcap2".getBytes()); mockPcapJob.setResults(results); + mockPcapJob.setStatus(new JobStatus().withState(JobStatus.State.RUNNING)); PcapResponse expectedReponse = new PcapResponse(); expectedReponse.setPcaps(results); this.mockMvc.perform(post(pcapUrl + "/fixed").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(fixedJson)) .andExpect(status().isOk()) .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) - .andExpect(content().json(JSONUtils.INSTANCE.toJSON(expectedReponse, false))); + .andExpect(jsonPath("$.jobStatus").value("RUNNING")); - Assert.assertEquals("/apps/metron/pcap", mockPcapJob.getBasePath()); - Assert.assertEquals("/tmp", mockPcapJob.getBaseOutputPath()); - Assert.assertEquals(1, mockPcapJob.getStartTime()); - Assert.assertEquals(10, mockPcapJob.getEndTime()); + Assert.assertEquals("/base/path", mockPcapJob.getBasePath()); + Assert.assertEquals("/base/interim/result/path", mockPcapJob.getBaseInterrimResultPath()); + Assert.assertEquals("/final/output/path", mockPcapJob.getFinalOutputPath()); + Assert.assertEquals(10000000, mockPcapJob.getStartTimeNs()); + Assert.assertEquals(20000000, mockPcapJob.getEndTimeNs()); Assert.assertEquals(2, mockPcapJob.getNumReducers()); Assert.assertTrue(mockPcapJob.getFilterImpl() instanceof FixedPcapFilter.Configurator); Map<String, String> actualFixedFields = mockPcapJob.getFixedFields(); @@ -124,6 +149,84 @@ public class PcapControllerIntegrationTest { Assert.assertEquals("true", actualFixedFields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName())); Assert.assertEquals("TCP", actualFixedFields.get(Constants.Fields.PROTOCOL.getName())); Assert.assertEquals("filter", actualFixedFields.get(PcapHelper.PacketFields.PACKET_FILTER.getName())); + } + + + @Test + public void testFixedRequestDefaults() throws Exception { + MockPcapJob mockPcapJob = (MockPcapJob) wac.getBean("mockPcapJob"); + mockPcapJob.setStatus(new JobStatus().withState(JobStatus.State.RUNNING)); + long beforeJobTime = System.currentTimeMillis(); + + this.mockMvc.perform(post(pcapUrl + "/fixed").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(fixedWithDefaultsJson)) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) + .andExpect(jsonPath("$.jobStatus").value("RUNNING")); + + Assert.assertEquals("/apps/metron/pcap/input", mockPcapJob.getBasePath()); + Assert.assertEquals("/apps/metron/pcap/interim", mockPcapJob.getBaseInterrimResultPath()); + Assert.assertEquals("/apps/metron/pcap/output", mockPcapJob.getFinalOutputPath()); + Assert.assertEquals(0, mockPcapJob.getStartTimeNs()); + Assert.assertTrue(beforeJobTime < mockPcapJob.getEndTimeNs() / 1000000); + Assert.assertTrue(System.currentTimeMillis() > mockPcapJob.getEndTimeNs() / 1000000); + Assert.assertEquals(10, mockPcapJob.getNumReducers()); + Assert.assertTrue(mockPcapJob.getFilterImpl() instanceof FixedPcapFilter.Configurator); + Map<String, String> actualFixedFields = mockPcapJob.getFixedFields(); + Assert.assertEquals("192.168.1.2", actualFixedFields.get(Constants.Fields.SRC_ADDR.getName())); + Assert.assertEquals("2000", actualFixedFields.get(Constants.Fields.SRC_PORT.getName())); + Assert.assertEquals("192.168.1.1", actualFixedFields.get(Constants.Fields.DST_ADDR.getName())); + Assert.assertEquals("1000", actualFixedFields.get(Constants.Fields.DST_PORT.getName())); + Assert.assertEquals("true", actualFixedFields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName())); + Assert.assertEquals("TCP", actualFixedFields.get(Constants.Fields.PROTOCOL.getName())); + Assert.assertEquals("filter", actualFixedFields.get(PcapHelper.PacketFields.PACKET_FILTER.getName())); + } + + @Test + public void testGetStatus() throws Exception { + MockPcapJob mockPcapJob = (MockPcapJob) wac.getBean("mockPcapJob"); + + this.mockMvc.perform(get(pcapUrl + "/jobId").with(httpBasic(user, password))) + .andExpect(status().isNotFound()); + + mockPcapJob.setStatus(new JobStatus().withJobId("jobId").withState(JobStatus.State.RUNNING)); + + this.mockMvc.perform(post(pcapUrl + "/fixed").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(fixedJson)) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) + .andExpect(jsonPath("$.jobId").value("jobId")) + .andExpect(jsonPath("$.jobStatus").value("RUNNING")); + + mockPcapJob.setStatus(new JobStatus().withJobId("jobId").withState(JobStatus.State.SUCCEEDED)); + + Pageable<Path> pageable = new PcapPages(Arrays.asList(new Path("path1"), new Path("path1"))); + mockPcapJob.setIsDone(true); + mockPcapJob.setPageable(pageable); + this.mockMvc.perform(get(pcapUrl + "/jobId").with(httpBasic(user, password))) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) + .andExpect(jsonPath("$.jobStatus").value("SUCCEEDED")) + .andExpect(jsonPath("$.pageTotal").value(2)); + + mockPcapJob.setStatus(new JobStatus().withJobId("jobId").withState(JobStatus.State.FINALIZING)); + + this.mockMvc.perform(get(pcapUrl + "/jobId").with(httpBasic(user, password))) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) + .andExpect(jsonPath("$.jobStatus").value("FINALIZING")); + + mockPcapJob.setStatus(new JobStatus().withJobId("jobId").withState(JobStatus.State.FAILED)); + + this.mockMvc.perform(get(pcapUrl + "/jobId").with(httpBasic(user, password))) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) + .andExpect(jsonPath("$.jobStatus").value("FAILED")); + + mockPcapJob.setStatus(new JobStatus().withJobId("jobId").withState(JobStatus.State.KILLED)); + + this.mockMvc.perform(get(pcapUrl + "/jobId").with(httpBasic(user, password))) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) + .andExpect(jsonPath("$.jobStatus").value("KILLED")); } } http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java index a7eca31..df65635 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java @@ -17,47 +17,79 @@ */ package org.apache.metron.rest.mock; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.util.List; -import java.util.Map; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.metron.common.hadoop.SequenceFileIterable; +import org.apache.metron.job.Finalizer; +import org.apache.metron.job.JobException; +import org.apache.metron.job.JobStatus; +import org.apache.metron.job.Pageable; +import org.apache.metron.job.Statusable; +import org.apache.metron.pcap.config.PcapOptions; import org.apache.metron.pcap.filter.PcapFilterConfigurator; import org.apache.metron.pcap.mr.PcapJob; -public class MockPcapJob extends PcapJob { +import java.util.List; +import java.util.Map; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class MockPcapJob extends PcapJob<Path> { private String basePath; - private String baseOutputPath; - private long beginNS; - private long endNS; + private String baseInterrimResultPath; + private String finalOutputPath; + private long startTimeNs; + private long endTimeNs; private int numReducers; private Map<String, String> fixedFields; private PcapFilterConfigurator filterImpl; + private int recPerFile; private SequenceFileIterable sequenceFileIterable; + private Statusable<Path> statusable; public MockPcapJob() { sequenceFileIterable = mock(SequenceFileIterable.class); + statusable = mock(Statusable.class); } - @SuppressWarnings(value = "unchecked") @Override - public <T> SequenceFileIterable query(Path basePath, Path baseOutputPath, long beginNS, long endNS, int numReducers, T fields, Configuration conf, FileSystem fs, PcapFilterConfigurator<T> filterImpl) throws IOException, ClassNotFoundException, InterruptedException { - this.basePath = basePath.toString(); - this.baseOutputPath = baseOutputPath.toString(); - this.beginNS = beginNS; - this.endNS = endNS; - this.numReducers = numReducers; + public Statusable<Path> submit(Finalizer<Path> finalizer, Map<String, Object> configuration) throws JobException { + this.basePath = PcapOptions.BASE_PATH.get(configuration, String.class); + this.baseInterrimResultPath = PcapOptions.BASE_INTERIM_RESULT_PATH.get(configuration, String.class); + this.finalOutputPath = PcapOptions.FINAL_OUTPUT_PATH.get(configuration, String.class); + this.startTimeNs = PcapOptions.START_TIME_MS.get(configuration, Long.class) * 1000000; + this.endTimeNs = PcapOptions.END_TIME_MS.get(configuration, Long.class) * 1000000; + this.numReducers = PcapOptions.NUM_REDUCERS.get(configuration, Integer.class); + Object fields = PcapOptions.FIELDS.get(configuration, Object.class); if (fields instanceof Map) { this.fixedFields = (Map<String, String>) fields; } - this.filterImpl = filterImpl; - return sequenceFileIterable; + this.filterImpl = PcapOptions.FILTER_IMPL.get(configuration, PcapFilterConfigurator.class); + this.recPerFile = PcapOptions.NUM_RECORDS_PER_FILE.get(configuration, Integer.class); + return statusable; + } + + @Override + public JobStatus getStatus() throws JobException { + return statusable.getStatus(); + } + + @Override + public Pageable<Path> get() throws JobException, InterruptedException { + return statusable.get(); + } + + public void setStatus(JobStatus jobStatus) throws JobException { + when(statusable.getStatus()).thenReturn(jobStatus); + } + + public void setPageable(Pageable<Path> pageable) throws JobException, InterruptedException { + when(statusable.get()).thenReturn(pageable); + } + + public void setIsDone(boolean isDone) { + when(statusable.isDone()).thenReturn(isDone); } public void setResults(List<byte[]> pcaps) { @@ -68,16 +100,32 @@ public class MockPcapJob extends PcapJob { return basePath; } - public String getBaseOutputPath() { - return baseOutputPath; + public void setBasePath(String basePath) { + this.basePath = basePath; + } + + public String getBaseInterrimResultPath() { + return baseInterrimResultPath; } - public long getStartTime() { - return beginNS / 1000000; + public void setBaseInterrimResultPath(String baseInterrimResultPath) { + this.baseInterrimResultPath = baseInterrimResultPath; } - public long getEndTime() { - return endNS / 1000000; + public String getFinalOutputPath() { + return finalOutputPath; + } + + public void setFinalOutputPath(String finalOutputPath) { + this.finalOutputPath = finalOutputPath; + } + + public long getStartTimeNs() { + return startTimeNs; + } + + public long getEndTimeNs() { + return endTimeNs; } public int getNumReducers() { @@ -91,4 +139,8 @@ public class MockPcapJob extends PcapJob { public PcapFilterConfigurator getFilterImpl() { return filterImpl; } + + public int getRecPerFile() { + return recPerFile; + } } http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJobSupplier.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJobSupplier.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJobSupplier.java new file mode 100644 index 0000000..9a1ac7f --- /dev/null +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJobSupplier.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.rest.mock; + +import org.apache.metron.pcap.mr.PcapJob; +import org.apache.metron.rest.config.PcapJobSupplier; + +public class MockPcapJobSupplier extends PcapJobSupplier { + + private MockPcapJob mockPcapJob = new MockPcapJob(); + + @Override + protected PcapJob createPcapJob() { + return mockPcapJob; + } + + public void setMockPcapJob(MockPcapJob mockPcapJob) { + this.mockPcapJob = mockPcapJob; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java index 1a11c79..2b6bea3 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java @@ -17,12 +17,23 @@ */ package org.apache.metron.rest.service.impl; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - import org.apache.hadoop.conf.Configuration; -import org.apache.metron.pcap.mr.PcapJob; +import org.apache.hadoop.fs.FileSystem; +import org.apache.metron.common.Constants; +import org.apache.metron.job.JobException; +import org.apache.metron.job.JobStatus; +import org.apache.metron.job.Pageable; +import org.apache.metron.job.manager.InMemoryJobManager; +import org.apache.metron.job.manager.JobManager; +import org.apache.metron.pcap.PcapHelper; +import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; import org.apache.metron.rest.MetronRestConstants; +import org.apache.metron.rest.RestException; +import org.apache.metron.rest.config.PcapJobSupplier; +import org.apache.metron.rest.mock.MockPcapJob; +import org.apache.metron.rest.mock.MockPcapJobSupplier; +import org.apache.metron.rest.model.pcap.FixedPcapRequest; +import org.apache.metron.rest.model.pcap.PcapStatus; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -30,6 +41,15 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.springframework.core.env.Environment; +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; +import static org.powermock.api.mockito.PowerMockito.doReturn; +import static org.powermock.api.mockito.PowerMockito.whenNew; + @SuppressWarnings("ALL") public class PcapServiceImplTest { @Rule @@ -37,32 +57,28 @@ public class PcapServiceImplTest { Environment environment; Configuration configuration; - PcapJob pcapJob; + MockPcapJobSupplier mockPcapJobSupplier; @Before public void setUp() throws Exception { environment = mock(Environment.class); - pcapJob = mock(PcapJob.class); configuration = mock(Configuration.class); + mockPcapJobSupplier = new MockPcapJobSupplier(); - when(environment.getProperty(MetronRestConstants.PCAP_INPUT_PATH_SPRING_PROPERTY)).thenReturn("/input/path"); - when(environment.getProperty(MetronRestConstants.PCAP_OUTPUT_PATH_SPRING_PROPERTY)).thenReturn("/output/path"); + when(environment.getProperty(MetronRestConstants.PCAP_BASE_PATH_SPRING_PROPERTY)).thenReturn("/base/path"); + when(environment.getProperty(MetronRestConstants.PCAP_BASE_INTERIM_RESULT_PATH_SPRING_PROPERTY)).thenReturn("/base/interim/result/path"); + when(environment.getProperty(MetronRestConstants.PCAP_FINAL_OUTPUT_PATH_SPRING_PROPERTY)).thenReturn("/final/output/path"); + when(environment.getProperty(MetronRestConstants.PCAP_PAGE_SIZE_SPRING_PROPERTY)).thenReturn("100"); } - // TODO - - @Test - public void placeholder() { - Assert.assertTrue(true); - } -/* @Test public void fixedShouldProperlyCallPcapJobQuery() throws Exception { FixedPcapRequest fixedPcapRequest = new FixedPcapRequest(); - fixedPcapRequest.setBaseOutputPath("baseOutputPath"); fixedPcapRequest.setBasePath("basePath"); - fixedPcapRequest.setStartTime(1L); - fixedPcapRequest.setEndTime(2L); + fixedPcapRequest.setBaseInterimResultPath("baseOutputPath"); + fixedPcapRequest.setFinalOutputPath("finalOutputPath"); + fixedPcapRequest.setStartTimeMs(1L); + fixedPcapRequest.setEndTimeMs(2L); fixedPcapRequest.setNumReducers(2); fixedPcapRequest.setIpSrcAddr("ip_src_addr"); fixedPcapRequest.setIpDstAddr("ip_dst_addr"); @@ -71,10 +87,19 @@ public class PcapServiceImplTest { fixedPcapRequest.setProtocol("tcp"); fixedPcapRequest.setPacketFilter("filter"); fixedPcapRequest.setIncludeReverse(true); + MockPcapJob mockPcapJob = new MockPcapJob(); + mockPcapJobSupplier.setMockPcapJob(mockPcapJob); + JobManager jobManager = new InMemoryJobManager<>(); - PcapServiceImpl pcapService = spy(new PcapServiceImpl(environment, configuration, pcapJob)); + PcapServiceImpl pcapService = spy(new PcapServiceImpl(environment, configuration, mockPcapJobSupplier, jobManager)); FileSystem fileSystem = mock(FileSystem.class); doReturn(fileSystem).when(pcapService).getFileSystem(); + mockPcapJob.setStatus(new JobStatus() + .withJobId("jobId") + .withDescription("description") + .withPercentComplete(0L) + .withState(JobStatus.State.RUNNING)); + Map<String, String> expectedFields = new HashMap<String, String>() {{ put(Constants.Fields.SRC_ADDR.getName(), "ip_src_addr"); put(Constants.Fields.DST_ADDR.getName(), "ip_dst_addr"); @@ -84,72 +109,128 @@ public class PcapServiceImplTest { put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "true"); put(PcapHelper.PacketFields.PACKET_FILTER.getName(), "filter"); }}; - List<byte[]> expectedPcaps = Arrays.asList("pcap1".getBytes(), "pcap2".getBytes()); - SequenceFileIterable results = mock(SequenceFileIterable.class); - when(results.iterator()).thenReturn(expectedPcaps.iterator()); - when(pcapJob.query(eq(new Path("basePath")), - eq(new Path("baseOutputPath")), - eq(1000000L), - eq(2000000L), - eq(2), - eq(expectedFields), - eq(configuration), - any(FileSystem.class), - any(FixedPcapFilter.Configurator.class))).thenReturn(results); - - PcapResponse pcapsResponse = pcapService.fixed(fixedPcapRequest); - Assert.assertEquals(expectedPcaps, pcapsResponse.getPcaps()); + PcapStatus expectedPcapStatus = new PcapStatus(); + expectedPcapStatus.setJobId("jobId"); + expectedPcapStatus.setJobStatus(JobStatus.State.RUNNING.name()); + expectedPcapStatus.setDescription("description"); + + Assert.assertEquals(expectedPcapStatus, pcapService.fixed("user", fixedPcapRequest)); + Assert.assertEquals(expectedPcapStatus, pcapService.jobStatusToPcapStatus(jobManager.getJob("user", "jobId").getStatus())); + Assert.assertEquals("basePath", mockPcapJob.getBasePath()); + Assert.assertEquals("baseOutputPath", mockPcapJob.getBaseInterrimResultPath()); + Assert.assertEquals("finalOutputPath", mockPcapJob.getFinalOutputPath()); + Assert.assertEquals(1000000, mockPcapJob.getStartTimeNs()); + Assert.assertEquals(2000000, mockPcapJob.getEndTimeNs()); + Assert.assertEquals(2, mockPcapJob.getNumReducers()); + Assert.assertEquals(100, mockPcapJob.getRecPerFile()); + Assert.assertTrue(mockPcapJob.getFilterImpl() instanceof FixedPcapFilter.Configurator); + Map<String, String> actualFixedFields = mockPcapJob.getFixedFields(); + Assert.assertEquals("ip_src_addr", actualFixedFields.get(Constants.Fields.SRC_ADDR.getName())); + Assert.assertEquals("1000", actualFixedFields.get(Constants.Fields.SRC_PORT.getName())); + Assert.assertEquals("ip_dst_addr", actualFixedFields.get(Constants.Fields.DST_ADDR.getName())); + Assert.assertEquals("2000", actualFixedFields.get(Constants.Fields.DST_PORT.getName())); + Assert.assertEquals("true", actualFixedFields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName())); + Assert.assertEquals("tcp", actualFixedFields.get(Constants.Fields.PROTOCOL.getName())); + Assert.assertEquals("filter", actualFixedFields.get(PcapHelper.PacketFields.PACKET_FILTER.getName())); } @Test public void fixedShouldProperlyCallPcapJobQueryWithDefaults() throws Exception { + long beforeJobTime = System.currentTimeMillis(); + FixedPcapRequest fixedPcapRequest = new FixedPcapRequest(); + MockPcapJob mockPcapJob = new MockPcapJob(); + mockPcapJobSupplier.setMockPcapJob(mockPcapJob); + JobManager jobManager = new InMemoryJobManager<>(); - PcapServiceImpl pcapService = spy(new PcapServiceImpl(environment, configuration, pcapJob)); + PcapServiceImpl pcapService = spy(new PcapServiceImpl(environment, configuration, mockPcapJobSupplier, jobManager)); FileSystem fileSystem = mock(FileSystem.class); doReturn(fileSystem).when(pcapService).getFileSystem(); - Map<String, String> expectedFields = new HashMap<String, String>() {{ - put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false"); - }}; - List<byte[]> expectedPcaps = Arrays.asList("pcap1".getBytes(), "pcap2".getBytes()); - SequenceFileIterable results = mock(SequenceFileIterable.class); - when(results.iterator()).thenReturn(expectedPcaps.iterator()); - when(pcapJob.query(eq(new Path("/input/path")), - eq(new Path("/output/path")), - eq(0L), - eq(fixedPcapRequest.getEndTime() * 1000000), - eq(1), - eq(expectedFields), - eq(configuration), - any(FileSystem.class), - any(FixedPcapFilter.Configurator.class))).thenReturn(results); - - PcapResponse pcapsResponse = pcapService.fixed(fixedPcapRequest); - Assert.assertEquals(expectedPcaps, pcapsResponse.getPcaps()); + mockPcapJob.setStatus(new JobStatus() + .withJobId("jobId") + .withDescription("description") + .withPercentComplete(0L) + .withState(JobStatus.State.RUNNING)); + + PcapStatus expectedPcapStatus = new PcapStatus(); + expectedPcapStatus.setJobId("jobId"); + expectedPcapStatus.setJobStatus(JobStatus.State.RUNNING.name()); + expectedPcapStatus.setDescription("description"); + + Assert.assertEquals(expectedPcapStatus, pcapService.fixed("user", fixedPcapRequest)); + Assert.assertEquals("/base/path", mockPcapJob.getBasePath()); + Assert.assertEquals("/base/interim/result/path", mockPcapJob.getBaseInterrimResultPath()); + Assert.assertEquals("/final/output/path", mockPcapJob.getFinalOutputPath()); + Assert.assertEquals(0, mockPcapJob.getStartTimeNs()); + Assert.assertTrue(beforeJobTime <= mockPcapJob.getEndTimeNs() / 1000000); + Assert.assertTrue(System.currentTimeMillis() >= mockPcapJob.getEndTimeNs() / 1000000); + Assert.assertEquals(10, mockPcapJob.getNumReducers()); + Assert.assertEquals(100, mockPcapJob.getRecPerFile()); + Assert.assertTrue(mockPcapJob.getFilterImpl() instanceof FixedPcapFilter.Configurator); + Assert.assertEquals(new HashMap<>(), mockPcapJob.getFixedFields()); } @Test public void fixedShouldThrowRestException() throws Exception { exception.expect(RestException.class); - exception.expectMessage("some exception"); + exception.expectMessage("some job exception"); FixedPcapRequest fixedPcapRequest = new FixedPcapRequest(); - - PcapServiceImpl pcapService = spy(new PcapServiceImpl(environment, configuration, pcapJob)); + JobManager jobManager = mock(JobManager.class); + PcapJobSupplier pcapJobSupplier = new PcapJobSupplier(); + PcapServiceImpl pcapService = spy(new PcapServiceImpl(environment, configuration, pcapJobSupplier, jobManager)); FileSystem fileSystem = mock(FileSystem.class); doReturn(fileSystem).when(pcapService).getFileSystem(); + when(jobManager.submit(pcapJobSupplier, "user")).thenThrow(new JobException("some job exception")); + + pcapService.fixed("user", fixedPcapRequest); + } + + @Test + public void getStatusShouldProperlyReturnStatus() throws Exception { + MockPcapJob mockPcapJob = mock(MockPcapJob.class); + JobManager jobManager = mock(JobManager.class); + JobStatus actualJobStatus = new JobStatus() + .withJobId("jobId") + .withState(JobStatus.State.SUCCEEDED) + .withDescription("description") + .withPercentComplete(100.0); + Pageable pageable = mock(Pageable.class); + when(pageable.getSize()).thenReturn(2); + when(mockPcapJob.getStatus()).thenReturn(actualJobStatus); + when(mockPcapJob.isDone()).thenReturn(true); + when(mockPcapJob.get()).thenReturn(pageable); + when(jobManager.getJob("user", "jobId")).thenReturn(mockPcapJob); + + PcapServiceImpl pcapService = new PcapServiceImpl(environment, configuration, mockPcapJobSupplier, jobManager); + PcapStatus expectedPcapStatus = new PcapStatus(); + expectedPcapStatus.setJobId("jobId"); + expectedPcapStatus.setJobStatus(JobStatus.State.SUCCEEDED.name()); + expectedPcapStatus.setDescription("description"); + expectedPcapStatus.setPercentComplete(100.0); + expectedPcapStatus.setPageTotal(2); + + Assert.assertEquals(expectedPcapStatus, pcapService.getJobStatus("user", "jobId")); + } + + @Test + public void getStatusShouldReturnNullOnMissingStatus() throws Exception { + JobManager jobManager = new InMemoryJobManager(); + PcapServiceImpl pcapService = new PcapServiceImpl(environment, configuration, new PcapJobSupplier(), jobManager); - when(pcapJob.query(any(), - any(), - eq(0L), - eq(fixedPcapRequest.getEndTime() * 1000000), - eq(1), - any(), - any(), - any(FileSystem.class), - any(FixedPcapFilter.Configurator.class))).thenThrow(new IOException("some exception")); - - pcapService.fixed(fixedPcapRequest); + Assert.assertNull(pcapService.getJobStatus("user", "jobId")); } - */ + + @Test + public void getStatusShouldThrowRestException() throws Exception { + exception.expect(RestException.class); + exception.expectMessage("some job exception"); + + JobManager jobManager = mock(JobManager.class); + when(jobManager.getJob("user", "jobId")).thenThrow(new JobException("some job exception")); + + PcapServiceImpl pcapService = new PcapServiceImpl(environment, configuration, new PcapJobSupplier(), jobManager); + pcapService.getJobStatus("user", "jobId"); + } + } http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java index 473664c..8e4211b 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java @@ -18,6 +18,8 @@ package org.apache.metron.common.configuration; +import org.apache.metron.stellar.common.utils.ConversionUtils; + import java.util.Map; import java.util.function.BiFunction; @@ -32,11 +34,17 @@ public interface ConfigOption { } default <T> T get(Map<String, Object> map, Class<T> clazz) { - return clazz.cast(map.get(getKey())); + Object obj = map.get(getKey()); + if(clazz.isInstance(obj)) { + return clazz.cast(obj); + } + else { + return ConversionUtils.convert(obj, clazz); + } } default <T> T get(Map<String, Object> map, BiFunction<String, Object, T> transform, Class<T> clazz) { - return clazz.cast(map.get(getKey())); + return clazz.cast(transform.apply(getKey(), map.get(getKey()))); } default <T> T getTransformed(Map<String, Object> map, Class<T> clazz) { http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobNotFoundException.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobNotFoundException.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobNotFoundException.java new file mode 100644 index 0000000..6a677bf --- /dev/null +++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobNotFoundException.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.job; + +public class JobNotFoundException extends JobException { + + public JobNotFoundException(String message) { + super(message); + } + + public JobNotFoundException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-job/src/main/java/org/apache/metron/job/RuntimeJobException.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/RuntimeJobException.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/RuntimeJobException.java new file mode 100644 index 0000000..9013ef8 --- /dev/null +++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/RuntimeJobException.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.job; + +public class RuntimeJobException extends RuntimeException { + + public RuntimeJobException(String message) { + super(message); + } + + public RuntimeJobException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java index bf0baa7..1340aa5 100644 --- a/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java +++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.function.Supplier; import org.apache.metron.job.JobException; +import org.apache.metron.job.JobNotFoundException; import org.apache.metron.job.JobStatus; import org.apache.metron.job.Statusable; import org.slf4j.Logger; @@ -52,7 +53,7 @@ public class InMemoryJobManager<PAGE_T> implements JobManager<PAGE_T> { @Override public JobStatus getStatus(String username, String jobId) throws JobException { - return jobs.get(username).get(jobId).getStatus(); + return getJob(username, jobId).getStatus(); } @Override @@ -67,7 +68,11 @@ public class InMemoryJobManager<PAGE_T> implements JobManager<PAGE_T> { @Override public Statusable<PAGE_T> getJob(String username, String jobId) throws JobException { - return getUserJobs(username).get(jobId); + Map<String, Statusable<PAGE_T>> jobStatusables = getUserJobs(username); + if (jobStatusables.size() > 0 && jobStatusables.containsKey(jobId)) { + return jobStatusables.get(jobId); + } + throw new JobNotFoundException("Could not find job " + jobId + " for user " + username); } private Map<String, Statusable<PAGE_T>> getUserJobs(String username) { @@ -76,7 +81,7 @@ public class InMemoryJobManager<PAGE_T> implements JobManager<PAGE_T> { @Override public List<Statusable<PAGE_T>> getJobs(String username) throws JobException { - return new ArrayList<Statusable<PAGE_T>>(getUserJobs(username).values()); + return new ArrayList<>(getUserJobs(username).values()); } } http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java index 3462921..1a23740 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java @@ -98,12 +98,6 @@ public class PcapCli { fixedParser.printHelp(); return 0; } - Pair<Long, Long> time = timeAsNanosecondsSinceEpoch(config.getStartTimeMs(), config.getEndTimeMs()); - long startTime = time.getLeft(); - long endTime = time.getRight(); - - PcapOptions.START_TIME_NS.put(commonConfig, startTime); - PcapOptions.END_TIME_NS.put(commonConfig, endTime); PcapOptions.FILTER_IMPL.put(commonConfig, new FixedPcapFilter.Configurator()); PcapOptions.HADOOP_CONF.put(commonConfig, hadoopConf); try { @@ -128,12 +122,6 @@ public class PcapCli { queryParser.printHelp(); return 0; } - Pair<Long, Long> time = timeAsNanosecondsSinceEpoch(config.getStartTimeMs(), config.getEndTimeMs()); - long startTime = time.getLeft(); - long endTime = time.getRight(); - - PcapOptions.START_TIME_NS.put(commonConfig, startTime); - PcapOptions.END_TIME_NS.put(commonConfig, endTime); PcapOptions.FILTER_IMPL.put(commonConfig, new FixedPcapFilter.Configurator()); PcapOptions.HADOOP_CONF.put(commonConfig, hadoopConf); try { http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java index 9ea7912..0be33d6 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java @@ -615,8 +615,10 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { private void waitForJob(Statusable statusable) throws Exception { for (int t = 0; t < MAX_RETRIES; ++t, Thread.sleep(SLEEP_MS)) { - if (statusable.isDone()) { - return; + if (!statusable.getStatus().getState().equals(JobStatus.State.RUNNING)) { + if (statusable.isDone()) { + return; + } } } throw new Exception("Job did not complete within " + (MAX_RETRIES * SLEEP_MS) + " seconds"); http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java index 763f0c6..c7d6fdf 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java @@ -208,8 +208,6 @@ public class PcapCliTest { PcapOptions.NUM_REDUCERS.put(config, 10); PcapOptions.START_TIME_MS.put(config, startAsNanos / 1000000L); // needed bc defaults in config PcapOptions.END_TIME_MS.put(config, endAsNanos / 1000000L); // needed bc defaults in config - PcapOptions.START_TIME_NS.put(config, startAsNanos); - PcapOptions.END_TIME_NS.put(config, endAsNanos); PcapOptions.NUM_RECORDS_PER_FILE.put(config, 1000); when(jobRunner.submit(isA(Finalizer.class), argThat(mapContaining(config)))).thenReturn(jobRunner); http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java index 09effd4..3d7c4f6 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java @@ -24,6 +24,8 @@ import org.apache.metron.common.configuration.ConfigOption; public enum PcapOptions implements ConfigOption { JOB_NAME("jobName"), + JOB_ID("jobId"), + USERNAME("username"), FINAL_FILENAME_PREFIX("finalFilenamePrefix"), BASE_PATH("basePath", (s, o) -> o == null ? null : new Path(o.toString())), INTERIM_RESULT_PATH("interimResultPath", (s, o) -> o == null ? null : new Path(o.toString())), http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java index e032158..c379515 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java @@ -36,10 +36,10 @@ public class PcapCliFinalizer extends PcapFinalizer { private static final String PCAP_CLI_FILENAME_FORMAT = "%s/pcap-data-%s+%04d.pcap"; @Override - protected String getOutputFileName(Map<String, Object> config, int partition) { + protected Path getOutputPath(Map<String, Object> config, int partition) { Path finalOutputPath = PcapOptions.FINAL_OUTPUT_PATH.get(config, PcapOptions.STRING_TO_PATH, Path.class); String prefix = PcapOptions.FINAL_FILENAME_PREFIX.get(config, String.class); - return String.format(PCAP_CLI_FILENAME_FORMAT, finalOutputPath, prefix, partition); + return new Path(String.format(PCAP_CLI_FILENAME_FORMAT, finalOutputPath, prefix, partition)); } } http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java index d5ac675..2c55e15 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java @@ -79,10 +79,10 @@ public abstract class PcapFinalizer implements Finalizer<Path> { int part = 1; if (partitions.iterator().hasNext()) { for (List<byte[]> data : partitions) { - String outFileName = getOutputFileName(config, part++); + Path outputPath = getOutputPath(config, part++); if (data.size() > 0) { - getResultsWriter().write(hadoopConfig, data, outFileName); - outFiles.add(new Path(outFileName)); + getResultsWriter().write(hadoopConfig, data, outputPath.toUri().getPath()); + outFiles.add(outputPath); } } } else { @@ -100,7 +100,7 @@ public abstract class PcapFinalizer implements Finalizer<Path> { return new PcapPages(outFiles); } - protected abstract String getOutputFileName(Map<String, Object> config, int partition); + protected abstract Path getOutputPath(Map<String, Object> config, int partition); /** * Returns a lazily-read Iterable over a set of sequence files.
