Repository: metron Updated Branches: refs/heads/feature/METRON-1554-pcap-query-panel 4e586ac55 -> 9cee51eb2
METRON-1641: Enable Pcap jobs to be submitted asynchronously (mmiklavc via mmiklavc) closes apache/metron#1081 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/9cee51eb Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/9cee51eb Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/9cee51eb Branch: refs/heads/feature/METRON-1554-pcap-query-panel Commit: 9cee51eb2c428eb7bf3aa791f5ec1293ed36c43a Parents: 4e586ac Author: mmiklavc <[email protected]> Authored: Tue Jul 10 19:24:58 2018 -0600 Committer: Michael Miklavcic <[email protected]> Committed: Tue Jul 10 19:24:58 2018 -0600 ---------------------------------------------------------------------- .../rest/service/impl/PcapServiceImpl.java | 11 +- .../apache/metron/rest/mock/MockPcapJob.java | 15 +- .../apache/metron/common/utils/HDFSUtils.java | 39 +++- .../metron/common/utils/HDFSUtilsTest.java | 59 +++++ metron-platform/metron-job/pom.xml | 39 ++++ .../java/org/apache/metron/job/JobStatus.java | 83 +++++++ .../java/org/apache/metron/job/Pageable.java | 38 ++++ .../java/org/apache/metron/job/Statusable.java | 56 +++++ .../org/apache/metron/pcap/query/PcapCli.java | 39 +--- .../apache/metron/pcap/query/ResultsWriter.java | 48 ---- .../org/apache/metron/pcap/PcapJobTest.java | 198 ++++++++++++++++ .../PcapTopologyIntegrationTest.java | 1 - .../apache/metron/pcap/query/PcapCliTest.java | 54 +++-- metron-platform/metron-pcap/pom.xml | 7 +- .../java/org/apache/metron/pcap/PcapFiles.java | 42 ++++ .../java/org/apache/metron/pcap/mr/PcapJob.java | 225 +++++++++++++++---- .../metron/pcap/writer/ResultsWriter.java | 59 +++++ metron-platform/pom.xml | 1 + 18 files changed, 848 insertions(+), 166 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java index 4dae1e5..dd4af5c 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java @@ -17,6 +17,11 @@ */ package org.apache.metron.rest.service.impl; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -35,12 +40,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.stereotype.Service; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - @Service public class PcapServiceImpl implements PcapService { http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java index 3aa9ce3..a7eca31 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java @@ -17,6 +17,12 @@ */ package org.apache.metron.rest.mock; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -24,15 +30,6 @@ import org.apache.metron.common.hadoop.SequenceFileIterable; import org.apache.metron.pcap.filter.PcapFilterConfigurator; import org.apache.metron.pcap.mr.PcapJob; -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -import static org.mockito.Matchers.anyList; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - public class MockPcapJob extends PcapJob { private String basePath; http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HDFSUtils.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HDFSUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HDFSUtils.java index ee00b7e..ae09edf 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HDFSUtils.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HDFSUtils.java @@ -23,14 +23,18 @@ import java.util.List; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class HDFSUtils { /** - * Reads full HDFS FS file contents into a List of Strings. Initializes file system with default - * configuration. Opens and closes the file system on each call. Never null. + * Reads full file contents into a List of Strings. Reads from local FS if file:/// used as the + * scheme. Initializes file system with default configuration. + * Automatically handles file system translation for the provided path's scheme. + * Opens and closes the file system on each call. Never null. + * Null/empty scheme defaults to default configured FS. * * @param path path to file * @return file contents as a String @@ -41,8 +45,10 @@ public class HDFSUtils { } /** - * Reads full HDFS FS file contents into a String. Opens and closes the file system on each call. - * Never null. + * Reads full file contents into a String. Reads from local FS if file:/// used as the scheme. + * Opens and closes the file system on each call. + * Never null. Automatically handles file system translation for the provided path's scheme. + * Null/empty scheme defaults to default configured FS. * * @param config Hadoop configuration * @param path path to file @@ -50,10 +56,29 @@ public class HDFSUtils { * @throws IOException */ public static List<String> readFile(Configuration config, String path) throws IOException { - FileSystem fs = FileSystem.newInstance(config); - Path hdfsPath = new Path(path); - FSDataInputStream inputStream = fs.open(hdfsPath); + Path inPath = new Path(path); + FileSystem fs = FileSystem.newInstance(inPath.toUri(), config); + FSDataInputStream inputStream = fs.open(inPath); return IOUtils.readLines(inputStream, "UTF-8"); } + /** + * Write file to HDFS. Writes to local FS if file:/// used as the scheme. + * Automatically handles file system translation for the provided path's scheme. + * Null/empty scheme defaults to default configured FS. + * + * @param config filesystem configuration + * @param bytes bytes to write + * @param path output path + * @throws IOException This is the generic Hadoop "everything is an IOException." + */ + public static void write(Configuration config, byte[] bytes, String path) throws IOException { + Path outPath = new Path(path); + FileSystem fs = FileSystem.newInstance(outPath.toUri(), config); + fs.mkdirs(outPath.getParent()); + try (FSDataOutputStream outputStream = fs.create(outPath)) { + outputStream.write(bytes); + outputStream.flush(); + } + } } http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/HDFSUtilsTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/HDFSUtilsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/HDFSUtilsTest.java new file mode 100644 index 0000000..a572e24 --- /dev/null +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/HDFSUtilsTest.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.utils; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import org.apache.hadoop.conf.Configuration; +import org.apache.metron.integration.utils.TestUtils; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class HDFSUtilsTest { + + @Rule + public TemporaryFolder tempDir = new TemporaryFolder(); + + @Test + public void writes_file_to_local_fs() throws Exception { + String outText = "small brown bike and casket lottery"; + String outFile = tempDir.getRoot().getAbsolutePath() + "/outfile.txt"; + Configuration config = new Configuration(); + config.set("fs.default.name", "file:///"); + HDFSUtils.write(config, outText.getBytes(StandardCharsets.UTF_8), outFile); + String actual = TestUtils.read(new File(outFile)); + assertThat("Text should match", actual, equalTo(outText)); + } + + @Test + public void writes_file_to_local_fs_with_scheme_defined_only_in_uri() throws Exception { + String outText = "small brown bike and casket lottery"; + String outFile = tempDir.getRoot().getAbsolutePath() + "/outfile.txt"; + Configuration config = new Configuration(); + HDFSUtils.write(config, outText.getBytes(StandardCharsets.UTF_8), "file:///" + outFile); + + String actual = TestUtils.read(new File(outFile)); + assertThat("Text should match", actual, equalTo(outText)); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-job/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-job/pom.xml b/metron-platform/metron-job/pom.xml new file mode 100644 index 0000000..49cc3cb --- /dev/null +++ b/metron-platform/metron-job/pom.xml @@ -0,0 +1,39 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software + Foundation (ASF) under one or more contributor license agreements. See the + NOTICE file distributed with this work for additional information regarding + copyright ownership. The ASF licenses this file to You under the Apache License, + Version 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed + under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.metron</groupId> + <artifactId>metron-platform</artifactId> + <version>0.5.1</version> + </parent> + + <artifactId>metron-job</artifactId> + <name>metron-job</name> + <description>Metron Job</description> + <url>https://metron.apache.org/</url> + + <dependencies> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-common</artifactId> + <version>${project.parent.version}</version> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java new file mode 100644 index 0000000..ec006fb --- /dev/null +++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.job; + +import org.apache.hadoop.fs.Path; + +/** + * Capture metadata about a batch job. + */ +public class JobStatus { + + public enum State { + NOT_RUNNING, + RUNNING, + SUCCEEDED, + FAILED, + KILLED + } + + private String jobId; + private State state = State.NOT_RUNNING; + private double percentComplete = 0.0; + private String description; + private Path resultPath; + + public JobStatus withJobId(String jobId) { + this.jobId = jobId; + return this; + } + + public JobStatus withState(State state) { + this.state = state; + return this; + } + + public JobStatus withPercentComplete(double percentComplete) { + this.percentComplete = percentComplete; + return this; + } + + public JobStatus withDescription(String description) { + this.description = description; + return this; + } + + public JobStatus withResultPath(Path resultPath) { + this.resultPath = resultPath; + return this; + } + + public State getState() { + return state; + } + + public double getPercentComplete() { + return percentComplete; + } + + public String getDescription() { + return description; + } + + public Path getResultPath() { + return resultPath; + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-job/src/main/java/org/apache/metron/job/Pageable.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/Pageable.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Pageable.java new file mode 100644 index 0000000..1038ab8 --- /dev/null +++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Pageable.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.job; + +public interface Pageable<T> { + + /** + * Transform into an Iterable. + * + * @return Iterable version of this Pageable. + */ + Iterable<T> asIterable(); + + /** + * Provides access to a specific page of results in the result set. + * + * @param num page number to access. + * @return value at the specified page. + */ + T getPage(int num); + +} http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java new file mode 100644 index 0000000..7a8fc02 --- /dev/null +++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.job; + +import java.io.IOException; +import java.util.Map; + +/** + * Abstraction for getting status on running jobs. Also provides options for killing and validating. + */ +public interface Statusable { + + /** + * Current job status. + * + * @return status + */ + JobStatus getStatus(); + + /** + * Completion flag. + * + * @return true if job is completed, whether KILLED, FAILED, SUCCEEDED. False otherwise. + */ + boolean isDone(); + + /** + * Kill job. + */ + void kill() throws IOException; + + /** + * Validate job after submitted. + * + * @param configuration config for validating the job. + * @return true if job is valid based on passed configuration, false if invalid. + */ + boolean validate(Map<String, Object> configuration); + +} http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java index b50d488..0fda801 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java @@ -17,11 +17,9 @@ */ package org.apache.metron.pcap.query; -import com.google.common.collect.Iterables; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.Arrays; -import java.util.List; import java.util.UUID; import org.apache.commons.cli.ParseException; import org.apache.commons.lang3.tuple.Pair; @@ -35,6 +33,7 @@ import org.apache.metron.common.utils.timestamp.TimestampConverters; import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; import org.apache.metron.pcap.filter.query.QueryPcapFilter; import org.apache.metron.pcap.mr.PcapJob; +import org.apache.metron.pcap.writer.ResultsWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -108,10 +107,7 @@ public class PcapCli { hadoopConf, FileSystem.get(hadoopConf), new FixedPcapFilter.Configurator()); - } catch (IOException | ClassNotFoundException e) { - LOGGER.error("Failed to execute fixed filter job: {}", e.getMessage(), e); - return -1; - } catch (InterruptedException e) { + } catch (IOException | ClassNotFoundException | InterruptedException e) { LOGGER.error("Failed to execute fixed filter job: {}", e.getMessage(), e); return -1; } @@ -145,10 +141,7 @@ public class PcapCli { hadoopConf, FileSystem.get(hadoopConf), new QueryPcapFilter.Configurator()); - } catch (IOException | ClassNotFoundException e) { - LOGGER.error("Failed to execute query filter job: {}", e.getMessage(), e); - return -1; - } catch (InterruptedException e) { + } catch (IOException | ClassNotFoundException | InterruptedException e) { LOGGER.error("Failed to execute query filter job: {}", e.getMessage(), e); return -1; } @@ -156,30 +149,18 @@ public class PcapCli { printBasicHelp(); return -1; } - try { - Iterable<List<byte[]>> partitions = Iterables.partition(results, commonConfig.getNumRecordsPerFile()); - int part = 1; - if (partitions.iterator().hasNext()) { - for (List<byte[]> data : partitions) { - String outFileName = String.format("pcap-data-%s+%04d.pcap", commonConfig.getPrefix(), part++); - if(data.size() > 0) { - resultsWriter.write(data, outFileName); - } - } - } else { - System.out.println("No results returned."); - } + try { + // write to local FS in the executing directory + String execDir = System.getProperty("user.dir"); + jobRunner.writeResults(results, resultsWriter, new Path("file:///" + execDir), + commonConfig.getNumRecordsPerFile(), + commonConfig.getPrefix()); } catch (IOException e) { LOGGER.error("Unable to write file", e); return -1; - } finally { - try { - results.cleanup(); - } catch(IOException e) { - LOGGER.warn("Unable to cleanup files in HDFS", e); - } } + return 0; } http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/ResultsWriter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/ResultsWriter.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/ResultsWriter.java deleted file mode 100644 index ab11770..0000000 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/ResultsWriter.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.metron.pcap.query; - -import org.apache.metron.pcap.PcapMerger; - -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.List; - -public class ResultsWriter { - - public void write(List<byte[]> pcaps, String outPath) throws IOException { - File out = new File(outPath); - try (FileOutputStream fos = new FileOutputStream(out)) { - fos.write(mergePcaps(pcaps)); - } - } - - public byte[] mergePcaps(List<byte[]> pcaps) throws IOException { - if (pcaps == null) { - return new byte[]{}; - } - if (pcaps.size() == 1) { - return pcaps.get(0); - } - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PcapMerger.merge(baos, pcaps); - return baos.toByteArray(); - } -} http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java index 3536a7e..5a5d406 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java @@ -19,18 +19,70 @@ package org.apache.metron.pcap; import static java.lang.Long.toUnsignedString; +import static java.lang.String.format; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.startsWith; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobID; import org.apache.metron.common.utils.timestamp.TimestampConverters; +import org.apache.metron.job.JobStatus; +import org.apache.metron.job.JobStatus.State; +import org.apache.metron.job.Statusable; +import org.apache.metron.pcap.filter.PcapFilterConfigurator; +import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; import org.apache.metron.pcap.mr.PcapJob; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; public class PcapJobTest { + @Mock + private Job job; + @Mock + private org.apache.hadoop.mapreduce.JobStatus mrStatus; + @Mock + private JobID jobId; + private static final String JOB_ID_VAL = "job_abc_123"; + private Path basePath; + private Path baseOutPath; + private long startTime; + private long endTime; + private int numReducers; + private Map<String, String> fixedFields; + private Configuration hadoopConfig; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + basePath = new Path("basepath"); + baseOutPath = new Path("outpath"); + startTime = 100; + endTime = 200; + numReducers = 5; + fixedFields = new HashMap<>(); + fixedFields.put("ip_src_addr", "192.168.1.1"); + hadoopConfig = new Configuration(); + when(jobId.toString()).thenReturn(JOB_ID_VAL); + when(mrStatus.getJobID()).thenReturn(jobId); + } + @Test public void partition_gives_value_in_range() throws Exception { long start = 1473897600000000000L; @@ -46,4 +98,150 @@ public class PcapJobTest { equalTo(8)); } + private class TestJob extends PcapJob { + + @Override + public <T> Job createJob(Optional<String> jobName, Path basePath, Path outputPath, long beginNS, long endNS, + int numReducers, T fields, Configuration conf, FileSystem fs, + PcapFilterConfigurator<T> filterImpl) throws IOException { + return job; + } + } + + @Test + public void job_succeeds_synchronously() throws Exception { + when(job.isComplete()).thenReturn(true); + when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED); + when(job.getStatus()).thenReturn(mrStatus); + TestJob testJob = new TestJob(); + Statusable statusable = testJob.query( + Optional.empty(), + basePath, + baseOutPath, + startTime, + endTime, + numReducers, + fixedFields, + hadoopConfig, + FileSystem.get(hadoopConfig), + new FixedPcapFilter.Configurator(), + true); + verify(job, times(1)).waitForCompletion(true); + JobStatus status = statusable.getStatus(); + Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED)); + Assert.assertThat(status.getPercentComplete(), equalTo(100.0)); + String expectedOutPath = new Path(baseOutPath, format("%s_%s_%s", startTime, endTime, "192.168.1.1")).toString(); + Assert.assertThat(status.getResultPath(), notNullValue()); + Assert.assertThat(status.getResultPath().toString(), startsWith(expectedOutPath)); + } + + @Test + public void job_fails_synchronously() throws Exception { + when(job.isComplete()).thenReturn(true); + when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.FAILED); + when(job.getStatus()).thenReturn(mrStatus); + TestJob testJob = new TestJob(); + Statusable statusable = testJob.query( + Optional.empty(), + basePath, + baseOutPath, + startTime, + endTime, + numReducers, + fixedFields, + hadoopConfig, + FileSystem.get(hadoopConfig), + new FixedPcapFilter.Configurator(), + true); + JobStatus status = statusable.getStatus(); + Assert.assertThat(status.getState(), equalTo(State.FAILED)); + Assert.assertThat(status.getPercentComplete(), equalTo(100.0)); + String expectedOutPath = new Path(baseOutPath, format("%s_%s_%s", startTime, endTime, "192.168.1.1")).toString(); + Assert.assertThat(status.getResultPath(), notNullValue()); + Assert.assertThat(status.getResultPath().toString(), startsWith(expectedOutPath)); + } + + @Test + public void job_fails_with_killed_status_synchronously() throws Exception { + when(job.isComplete()).thenReturn(true); + when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.KILLED); + when(job.getStatus()).thenReturn(mrStatus); + TestJob testJob = new TestJob(); + Statusable statusable = testJob.query( + Optional.empty(), + basePath, + baseOutPath, + startTime, + endTime, + numReducers, + fixedFields, + hadoopConfig, + FileSystem.get(hadoopConfig), + new FixedPcapFilter.Configurator(), + true); + JobStatus status = statusable.getStatus(); + Assert.assertThat(status.getState(), equalTo(State.KILLED)); + Assert.assertThat(status.getPercentComplete(), equalTo(100.0)); + String expectedOutPath = new Path(baseOutPath, format("%s_%s_%s", startTime, endTime, "192.168.1.1")).toString(); + Assert.assertThat(status.getResultPath(), notNullValue()); + Assert.assertThat(status.getResultPath().toString(), startsWith(expectedOutPath)); + } + + @Test + public void job_succeeds_asynchronously() throws Exception { + when(job.isComplete()).thenReturn(true); + when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED); + when(job.getStatus()).thenReturn(mrStatus); + TestJob testJob = new TestJob(); + Statusable statusable = testJob.query( + Optional.empty(), + basePath, + baseOutPath, + startTime, + endTime, + numReducers, + fixedFields, + hadoopConfig, + FileSystem.get(hadoopConfig), + new FixedPcapFilter.Configurator(), + false); + JobStatus status = statusable.getStatus(); + Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED)); + Assert.assertThat(status.getPercentComplete(), equalTo(100.0)); + String expectedOutPath = new Path(baseOutPath, format("%s_%s_%s", startTime, endTime, "192.168.1.1")).toString(); + Assert.assertThat(status.getResultPath(), notNullValue()); + Assert.assertThat(status.getResultPath().toString(), startsWith(expectedOutPath)); + } + + @Test + public void job_reports_percent_complete() throws Exception { + when(job.isComplete()).thenReturn(false); + when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.RUNNING); + when(job.getStatus()).thenReturn(mrStatus); + TestJob testJob = new TestJob(); + Statusable statusable = testJob.query( + Optional.empty(), + basePath, + baseOutPath, + startTime, + endTime, + numReducers, + fixedFields, + hadoopConfig, + FileSystem.get(hadoopConfig), + new FixedPcapFilter.Configurator(), + false); + when(job.mapProgress()).thenReturn(0.5f); + when(job.reduceProgress()).thenReturn(0f); + JobStatus status = statusable.getStatus(); + Assert.assertThat(status.getState(), equalTo(State.RUNNING)); + Assert.assertThat(status.getDescription(), equalTo("map: 50.0%, reduce: 0.0%")); + Assert.assertThat(status.getPercentComplete(), equalTo(25.0)); + when(job.mapProgress()).thenReturn(1.0f); + when(job.reduceProgress()).thenReturn(0.5f); + status = statusable.getStatus(); + Assert.assertThat(status.getPercentComplete(), equalTo(75.0)); + Assert.assertThat(status.getDescription(), equalTo("map: 100.0%, reduce: 50.0%")); + } + } http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java index 29c68d0..c7292ab 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java @@ -29,7 +29,6 @@ import java.io.FilenameFilter; import java.io.IOException; import java.util.AbstractMap; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java index 7202819..3468a7c 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java @@ -17,6 +17,30 @@ */ package org.apache.metron.pcap.query; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -28,26 +52,13 @@ import org.apache.metron.pcap.PcapHelper; import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; import org.apache.metron.pcap.filter.query.QueryPcapFilter; import org.apache.metron.pcap.mr.PcapJob; +import org.apache.metron.pcap.writer.ResultsWriter; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; -import java.io.BufferedOutputStream; -import java.io.ByteArrayOutputStream; -import java.io.PrintStream; -import java.nio.charset.StandardCharsets; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.*; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - public class PcapCliTest { @Mock @@ -56,10 +67,13 @@ public class PcapCliTest { private ResultsWriter resultsWriter; @Mock private Clock clock; + private String execDir; @Before - public void setup() { + public void setup() throws IOException { MockitoAnnotations.initMocks(this); + doCallRealMethod().when(jobRunner).writeResults(anyObject(), anyObject(), anyObject(), anyInt(), anyObject()); + execDir = System.getProperty("user.dir"); } @Test @@ -95,7 +109,7 @@ public class PcapCliTest { PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix"); assertThat("Expect no errors on run", cli.run(args), equalTo(0)); - Mockito.verify(resultsWriter).write(pcaps, "pcap-data-random_prefix+0001.pcap"); + Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap")); } @Test @@ -135,7 +149,7 @@ public class PcapCliTest { PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix"); assertThat("Expect no errors on run", cli.run(args), equalTo(0)); - Mockito.verify(resultsWriter).write(pcaps, "pcap-data-random_prefix+0001.pcap"); + Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap")); } @Test @@ -178,7 +192,7 @@ public class PcapCliTest { PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix"); assertThat("Expect no errors on run", cli.run(args), equalTo(0)); - Mockito.verify(resultsWriter).write(pcaps, "pcap-data-random_prefix+0001.pcap"); + Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap")); } private long asNanos(String inDate, String format) throws ParseException { @@ -211,7 +225,7 @@ public class PcapCliTest { PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix"); assertThat("Expect no errors on run", cli.run(args), equalTo(0)); - Mockito.verify(resultsWriter).write(pcaps, "pcap-data-random_prefix+0001.pcap"); + Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap")); } @Test @@ -239,7 +253,7 @@ public class PcapCliTest { PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix"); assertThat("Expect no errors on run", cli.run(args), equalTo(0)); - Mockito.verify(resultsWriter).write(pcaps, "pcap-data-random_prefix+0001.pcap"); + Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap")); } // INVALID OPTION CHECKS http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-pcap/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/pom.xml b/metron-platform/metron-pcap/pom.xml index c9f873e..640cdc6 100644 --- a/metron-platform/metron-pcap/pom.xml +++ b/metron-platform/metron-pcap/pom.xml @@ -40,8 +40,13 @@ </exclusions> </dependency> <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-hbase</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> <groupId>org.apache.metron</groupId> - <artifactId>metron-hbase</artifactId> + <artifactId>metron-job</artifactId> <version>${project.parent.version}</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFiles.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFiles.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFiles.java new file mode 100644 index 0000000..997c5f7 --- /dev/null +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFiles.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.pcap; + +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.metron.job.Pageable; + +public class PcapFiles implements Pageable<Path> { + + private List<Path> files; + + public PcapFiles(List<Path> files) { + this.files = files; + } + + @Override + public Iterable<Path> asIterable() { + return files; + } + + @Override + public Path getPage(int num) { + return files.get(num); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java index 14ea3cb..269f69b 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java @@ -22,6 +22,7 @@ import static org.apache.metron.pcap.PcapHelper.greaterThanOrEqualTo; import static org.apache.metron.pcap.PcapHelper.lessThanOrEqualTo; import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.text.DateFormat; @@ -30,6 +31,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; @@ -48,21 +51,29 @@ import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.metron.common.hadoop.SequenceFileIterable; +import org.apache.metron.job.JobStatus; +import org.apache.metron.job.JobStatus.State; +import org.apache.metron.job.Pageable; +import org.apache.metron.job.Statusable; import org.apache.metron.pcap.PacketInfo; +import org.apache.metron.pcap.PcapFiles; import org.apache.metron.pcap.PcapHelper; import org.apache.metron.pcap.filter.PcapFilter; import org.apache.metron.pcap.filter.PcapFilterConfigurator; import org.apache.metron.pcap.filter.PcapFilters; import org.apache.metron.pcap.utils.FileFilterUtil; +import org.apache.metron.pcap.writer.ResultsWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PcapJob { +public class PcapJob implements Statusable { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public static final String START_TS_CONF = "start_ts"; public static final String END_TS_CONF = "end_ts"; public static final String WIDTH_CONF = "width"; + private Job job; // store a running MR job reference for async status check + private Path outputPath; public static enum PCAP_COUNTER { MALFORMED_PACKET_COUNT @@ -75,12 +86,12 @@ public class PcapJob { Long width = null; @Override public int getPartition(LongWritable longWritable, BytesWritable bytesWritable, int numPartitions) { - if(start == null) { + if (start == null) { initialize(); } long x = longWritable.get(); int ret = (int)Long.divideUnsigned(x - start, width); - if(ret > numPartitions) { + if (ret > numPartitions) { throw new IllegalArgumentException(String.format("Bad partition: key=%s, width=%d, partition=%d, numPartitions=%d" , Long.toUnsignedString(x), width, ret, numPartitions) ); @@ -104,6 +115,7 @@ public class PcapJob { return configuration; } } + public static class PcapMapper extends Mapper<LongWritable, BytesWritable, LongWritable, BytesWritable> { PcapFilter filter; @@ -129,7 +141,7 @@ public class PcapJob { List<PacketInfo> packetInfos; try { packetInfos = PcapHelper.toPacketInfo(value.copyBytes()); - } catch(Exception e) { + } catch (Exception e) { // toPacketInfo is throwing RuntimeExceptions. Attempt to catch and count errors with malformed packets context.getCounter(PCAP_COUNTER.MALFORMED_PACKET_COUNT).increment(1); return; @@ -149,30 +161,15 @@ public class PcapJob { public static class PcapReducer extends Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable> { @Override protected void reduce(LongWritable key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException { - for(BytesWritable value : values) { + for (BytesWritable value : values) { context.write(key, value); } } } /** - * Returns a lazily-read Iterable over a set of sequence files + * Run query synchronously. */ - private SequenceFileIterable readResults(Path outputPath, Configuration config, FileSystem fs) throws IOException { - List<Path> files = new ArrayList<>(); - for (RemoteIterator<LocatedFileStatus> it = fs.listFiles(outputPath, false); it.hasNext(); ) { - Path p = it.next().getPath(); - if (p.getName().equals("_SUCCESS")) { - fs.delete(p, false); - continue; - } - files.add(p); - } - LOG.debug("Output path={}", outputPath); - Collections.sort(files, (o1,o2) -> o1.getName().compareTo(o2.getName())); - return new SequenceFileIterable(files, config); - } - public <T> SequenceFileIterable query(Path basePath , Path baseOutputPath , long beginNS @@ -183,43 +180,118 @@ public class PcapJob { , FileSystem fs , PcapFilterConfigurator<T> filterImpl ) throws IOException, ClassNotFoundException, InterruptedException { - String fileName = Joiner.on("_").join(beginNS, endNS, filterImpl.queryToString(fields), UUID.randomUUID().toString()); + Statusable statusable = query(Optional.empty(), basePath, baseOutputPath, beginNS, endNS, numReducers, fields, + conf, + fs, filterImpl, true); + JobStatus jobStatus = statusable.getStatus(); + if (jobStatus.getState() == State.SUCCEEDED) { + Path resultPath = jobStatus.getResultPath(); + return readResults(resultPath, conf, fs); + } else { + throw new RuntimeException( + "Unable to complete query due to errors. Please check logs for full errors."); + } + } + + /** + * Run query sync OR async based on flag. Async mode allows the client to check the returned + * statusable object for status details. + */ + public <T> Statusable query(Optional<String> jobName, + Path basePath, + Path baseOutputPath, + long beginNS, + long endNS, + int numReducers, + T fields, + Configuration conf, + FileSystem fs, + PcapFilterConfigurator<T> filterImpl, + boolean sync) + throws IOException, ClassNotFoundException, InterruptedException { + String outputDirName = Joiner.on("_").join(beginNS, endNS, filterImpl.queryToString(fields), UUID.randomUUID().toString()); if(LOG.isDebugEnabled()) { DateFormat format = SimpleDateFormat.getDateTimeInstance( SimpleDateFormat.LONG - , SimpleDateFormat.LONG - ); + , SimpleDateFormat.LONG + ); String from = format.format(new Date(Long.divideUnsigned(beginNS, 1000000))); String to = format.format(new Date(Long.divideUnsigned(endNS, 1000000))); LOG.debug("Executing query {} on timerange from {} to {}", filterImpl.queryToString(fields), from, to); } - Path outputPath = new Path(baseOutputPath, fileName); - Job job = createJob( basePath - , outputPath - , beginNS - , endNS - , numReducers - , fields - , conf - , fs - , filterImpl - ); - if (job == null) { - LOG.info("No files to process with specified date range."); - return new SequenceFileIterable(new ArrayList<>(), conf); + outputPath = new Path(baseOutputPath, outputDirName); + job = createJob(jobName + , basePath + , outputPath + , beginNS + , endNS + , numReducers + , fields + , conf + , fs + , filterImpl + ); + if (sync) { + job.waitForCompletion(true); + } else { + job.submit(); } - boolean completed = job.waitForCompletion(true); - if(completed) { - return readResults(outputPath, conf, fs); + return this; + } + + /** + * Returns a lazily-read Iterable over a set of sequence files + */ + private SequenceFileIterable readResults(Path outputPath, Configuration config, FileSystem fs) throws IOException { + List<Path> files = new ArrayList<>(); + for (RemoteIterator<LocatedFileStatus> it = fs.listFiles(outputPath, false); it.hasNext(); ) { + Path p = it.next().getPath(); + if (p.getName().equals("_SUCCESS")) { + fs.delete(p, false); + continue; + } + files.add(p); + } + if (files.size() == 0) { + LOG.info("No files to process with specified date range."); } else { - throw new RuntimeException("Unable to complete query due to errors. Please check logs for full errors."); + LOG.debug("Output path={}", outputPath); + Collections.sort(files, (o1, o2) -> o1.getName().compareTo(o2.getName())); } + return new SequenceFileIterable(files, config); } - public static long findWidth(long start, long end, int numReducers) { - return Long.divideUnsigned(end - start, numReducers) + 1; + public Pageable<Path> writeResults(SequenceFileIterable results, ResultsWriter resultsWriter, + Path outPath, int recPerFile, String prefix) throws IOException { + List<Path> outFiles = new ArrayList<>(); + try { + Iterable<List<byte[]>> partitions = Iterables.partition(results, recPerFile); + int part = 1; + if (partitions.iterator().hasNext()) { + for (List<byte[]> data : partitions) { + String outFileName = String.format("%s/pcap-data-%s+%04d.pcap", outPath, prefix, part++); + if (data.size() > 0) { + resultsWriter.write(new Configuration(), data, outFileName); + outFiles.add(new Path(outFileName)); + } + } + } else { + LOG.info("No results returned."); + } + } finally { + try { + results.cleanup(); + } catch (IOException e) { + LOG.warn("Unable to cleanup files in HDFS", e); + } + } + return new PcapFiles(outFiles); } - public <T> Job createJob( Path basePath + /** + * Creates, but does not submit the job. + */ + public <T> Job createJob(Optional<String> jobName + ,Path basePath , Path outputPath , long beginNS , long endNS @@ -235,6 +307,7 @@ public class PcapJob { conf.set(WIDTH_CONF, "" + findWidth(beginNS, endNS, numReducers)); filterImpl.addToConfig(fields, conf); Job job = Job.getInstance(conf); + jobName.ifPresent(job::setJobName); job.setJarByClass(PcapJob.class); job.setMapperClass(PcapJob.PcapMapper.class); job.setMapOutputKeyClass(LongWritable.class); @@ -256,6 +329,10 @@ public class PcapJob { return job; } + public static long findWidth(long start, long end, int numReducers) { + return Long.divideUnsigned(end - start, numReducers) + 1; + } + protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException { List<Path> ret = new ArrayList<>(); RemoteIterator<LocatedFileStatus> filesIt = fs.listFiles(basePath, true); @@ -265,4 +342,62 @@ public class PcapJob { return ret; } + @Override + public JobStatus getStatus() { + // Note: this method is only reading state from the underlying job, so locking not needed + JobStatus status = new JobStatus().withResultPath(outputPath); + if (job == null) { + status.withPercentComplete(100).withState(State.SUCCEEDED); + } else { + try { + status.withJobId(job.getStatus().getJobID().toString()); + if (job.isComplete()) { + status.withPercentComplete(100); + switch (job.getStatus().getState()) { + case SUCCEEDED: + status.withState(State.SUCCEEDED).withDescription(State.SUCCEEDED.toString()); + break; + case FAILED: + status.withState(State.FAILED); + break; + case KILLED: + status.withState(State.KILLED); + break; + } + } else { + float mapProg = job.mapProgress(); + float reduceProg = job.reduceProgress(); + float totalProgress = ((mapProg / 2) + (reduceProg / 2)) * 100; + String description = String.format("map: %s%%, reduce: %s%%", mapProg * 100, reduceProg * 100); + status.withPercentComplete(totalProgress).withState(State.RUNNING) + .withDescription(description); + } + } catch (Exception e) { + throw new RuntimeException("Error occurred while attempting to retrieve job status.", e); + } + } + return status; + } + + @Override + public boolean isDone() { + // Note: this method is only reading state from the underlying job, so locking not needed + try { + return job.isComplete(); + } catch (Exception e) { + throw new RuntimeException("Error occurred while attempting to retrieve job status.", e); + } + } + + @Override + public void kill() throws IOException { + job.killJob(); + } + + @Override + public boolean validate(Map<String, Object> configuration) { + // default implementation placeholder + return true; + } + } http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/ResultsWriter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/ResultsWriter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/ResultsWriter.java new file mode 100644 index 0000000..3934aca --- /dev/null +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/ResultsWriter.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.pcap.writer; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.metron.common.utils.HDFSUtils; +import org.apache.metron.pcap.PcapMerger; + +public class ResultsWriter { + + /** + * Write out pcaps. Configuration offers ability to configure for HDFS or local FS, if desired. + * + * @param config Standard hadoop filesystem config. + * @param pcaps pcap data to write. Pre-merged format as a list of pcaps as byte arrays. + * @param outPath where to write the pcap data to. + * @throws IOException I/O issue encountered. + */ + public void write(Configuration config, List<byte[]> pcaps, String outPath) throws IOException { + HDFSUtils.write(config, mergePcaps(pcaps), outPath); + } + + /** + * Creates a pcap file with proper global header from individual pcaps. + * + * @param pcaps pcap records to merge into a pcap file with header. + * @return merged result. + * @throws IOException I/O issue encountered. + */ + public byte[] mergePcaps(List<byte[]> pcaps) throws IOException { + if (pcaps == null) { + return new byte[]{}; + } + if (pcaps.size() == 1) { + return pcaps.get(0); + } + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PcapMerger.merge(baos, pcaps); + return baos.toByteArray(); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/pom.xml b/metron-platform/pom.xml index a99dbc7..cb64f9e 100644 --- a/metron-platform/pom.xml +++ b/metron-platform/pom.xml @@ -46,6 +46,7 @@ <module>metron-enrichment</module> <module>metron-solr</module> <module>metron-parsers</module> + <module>metron-job</module> <module>metron-pcap-backend</module> <module>metron-data-management</module> <module>metron-pcap</module>
