Repository: metron
Updated Branches:
  refs/heads/feature/METRON-1554-pcap-query-panel 4e586ac55 -> 9cee51eb2


METRON-1641: Enable Pcap jobs to be submitted asynchronously (mmiklavc via 
mmiklavc) closes apache/metron#1081


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/9cee51eb
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/9cee51eb
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/9cee51eb

Branch: refs/heads/feature/METRON-1554-pcap-query-panel
Commit: 9cee51eb2c428eb7bf3aa791f5ec1293ed36c43a
Parents: 4e586ac
Author: mmiklavc <[email protected]>
Authored: Tue Jul 10 19:24:58 2018 -0600
Committer: Michael Miklavcic <[email protected]>
Committed: Tue Jul 10 19:24:58 2018 -0600

----------------------------------------------------------------------
 .../rest/service/impl/PcapServiceImpl.java      |  11 +-
 .../apache/metron/rest/mock/MockPcapJob.java    |  15 +-
 .../apache/metron/common/utils/HDFSUtils.java   |  39 +++-
 .../metron/common/utils/HDFSUtilsTest.java      |  59 +++++
 metron-platform/metron-job/pom.xml              |  39 ++++
 .../java/org/apache/metron/job/JobStatus.java   |  83 +++++++
 .../java/org/apache/metron/job/Pageable.java    |  38 ++++
 .../java/org/apache/metron/job/Statusable.java  |  56 +++++
 .../org/apache/metron/pcap/query/PcapCli.java   |  39 +---
 .../apache/metron/pcap/query/ResultsWriter.java |  48 ----
 .../org/apache/metron/pcap/PcapJobTest.java     | 198 ++++++++++++++++
 .../PcapTopologyIntegrationTest.java            |   1 -
 .../apache/metron/pcap/query/PcapCliTest.java   |  54 +++--
 metron-platform/metron-pcap/pom.xml             |   7 +-
 .../java/org/apache/metron/pcap/PcapFiles.java  |  42 ++++
 .../java/org/apache/metron/pcap/mr/PcapJob.java | 225 +++++++++++++++----
 .../metron/pcap/writer/ResultsWriter.java       |  59 +++++
 metron-platform/pom.xml                         |   1 +
 18 files changed, 848 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java
index 4dae1e5..dd4af5c 100644
--- 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java
@@ -17,6 +17,11 @@
  */
 package org.apache.metron.rest.service.impl;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -35,12 +40,6 @@ import 
org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.core.env.Environment;
 import org.springframework.stereotype.Service;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 @Service
 public class PcapServiceImpl implements PcapService {
 

http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java
 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java
index 3aa9ce3..a7eca31 100644
--- 
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java
+++ 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java
@@ -17,6 +17,12 @@
  */
 package org.apache.metron.rest.mock;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -24,15 +30,6 @@ import org.apache.metron.common.hadoop.SequenceFileIterable;
 import org.apache.metron.pcap.filter.PcapFilterConfigurator;
 import org.apache.metron.pcap.mr.PcapJob;
 
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-import static org.mockito.Matchers.anyList;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 public class MockPcapJob extends PcapJob {
 
   private String basePath;

http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HDFSUtils.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HDFSUtils.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HDFSUtils.java
index ee00b7e..ae09edf 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HDFSUtils.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HDFSUtils.java
@@ -23,14 +23,18 @@ import java.util.List;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 public class HDFSUtils {
 
   /**
-   * Reads full HDFS FS file contents into a List of Strings. Initializes file 
system with default
-   * configuration. Opens and closes the file system on each call. Never null.
+   * Reads full file contents into a List of Strings. Reads from local FS if 
file:/// used as the
+   * scheme. Initializes file system with default configuration.
+   * Automatically handles file system translation for the provided path's 
scheme.
+   * Opens and closes the file system on each call. Never null.
+   * Null/empty scheme defaults to default configured FS.
    *
    * @param path path to file
    * @return file contents as a String
@@ -41,8 +45,10 @@ public class HDFSUtils {
   }
 
   /**
-   * Reads full HDFS FS file contents into a String. Opens and closes the file 
system on each call.
-   * Never null.
+   * Reads full file contents into a String. Reads from local FS if file:/// 
used as the scheme.
+   * Opens and closes the file system on each call.
+   * Never null. Automatically handles file system translation for the 
provided path's scheme.
+   * Null/empty scheme defaults to default configured FS.
    *
    * @param config Hadoop configuration
    * @param path path to file
@@ -50,10 +56,29 @@ public class HDFSUtils {
    * @throws IOException
    */
   public static List<String> readFile(Configuration config, String path) 
throws IOException {
-    FileSystem fs = FileSystem.newInstance(config);
-    Path hdfsPath = new Path(path);
-    FSDataInputStream inputStream = fs.open(hdfsPath);
+    Path inPath = new Path(path);
+    FileSystem fs = FileSystem.newInstance(inPath.toUri(), config);
+    FSDataInputStream inputStream = fs.open(inPath);
     return IOUtils.readLines(inputStream, "UTF-8");
   }
 
+  /**
+   * Write file to HDFS. Writes to local FS if file:/// used as the scheme.
+   * Automatically handles file system translation for the provided path's 
scheme.
+   * Null/empty scheme defaults to default configured FS.
+   *
+   * @param config filesystem configuration
+   * @param bytes bytes to write
+   * @param path output path
+   * @throws IOException This is the generic Hadoop "everything is an 
IOException."
+   */
+  public static void write(Configuration config, byte[] bytes, String path) 
throws IOException {
+    Path outPath = new Path(path);
+    FileSystem fs = FileSystem.newInstance(outPath.toUri(), config);
+    fs.mkdirs(outPath.getParent());
+    try (FSDataOutputStream outputStream = fs.create(outPath)) {
+      outputStream.write(bytes);
+      outputStream.flush();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/HDFSUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/HDFSUtilsTest.java
 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/HDFSUtilsTest.java
new file mode 100644
index 0000000..a572e24
--- /dev/null
+++ 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/HDFSUtilsTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.utils;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.metron.integration.utils.TestUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class HDFSUtilsTest {
+
+  @Rule
+  public TemporaryFolder tempDir = new TemporaryFolder();
+
+  @Test
+  public void writes_file_to_local_fs() throws Exception {
+    String outText = "small brown bike and casket lottery";
+    String outFile = tempDir.getRoot().getAbsolutePath() + "/outfile.txt";
+    Configuration config = new Configuration();
+    config.set("fs.default.name", "file:///");
+    HDFSUtils.write(config, outText.getBytes(StandardCharsets.UTF_8), outFile);
+    String actual = TestUtils.read(new File(outFile));
+    assertThat("Text should match", actual, equalTo(outText));
+  }
+
+  @Test
+  public void writes_file_to_local_fs_with_scheme_defined_only_in_uri() throws 
Exception {
+    String outText = "small brown bike and casket lottery";
+    String outFile = tempDir.getRoot().getAbsolutePath() + "/outfile.txt";
+    Configuration config = new Configuration();
+    HDFSUtils.write(config, outText.getBytes(StandardCharsets.UTF_8), 
"file:///" + outFile);
+
+    String actual = TestUtils.read(new File(outFile));
+    assertThat("Text should match", actual, equalTo(outText));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-job/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-job/pom.xml 
b/metron-platform/metron-job/pom.xml
new file mode 100644
index 0000000..49cc3cb
--- /dev/null
+++ b/metron-platform/metron-job/pom.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software
+       Foundation (ASF) under one or more contributor license agreements. See 
the
+       NOTICE file distributed with this work for additional information 
regarding
+       copyright ownership. The ASF licenses this file to You under the Apache 
License,
+       Version 2.0 (the "License"); you may not use this file except in 
compliance
+       with the License. You may obtain a copy of the License at 
http://www.apache.org/licenses/LICENSE-2.0
+       Unless required by applicable law or agreed to in writing, software 
distributed
+       under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+       OR CONDITIONS OF ANY KIND, either express or implied. See the License 
for
+  the specific language governing permissions and limitations under the 
License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.metron</groupId>
+    <artifactId>metron-platform</artifactId>
+    <version>0.5.1</version>
+  </parent>
+
+  <artifactId>metron-job</artifactId>
+  <name>metron-job</name>
+  <description>Metron Job</description>
+  <url>https://metron.apache.org/</url>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.metron</groupId>
+      <artifactId>metron-common</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java 
b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java
new file mode 100644
index 0000000..ec006fb
--- /dev/null
+++ 
b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.job;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Capture metadata about a batch job.
+ */
+public class JobStatus {
+
+  public enum State {
+    NOT_RUNNING,
+    RUNNING,
+    SUCCEEDED,
+    FAILED,
+    KILLED
+  }
+
+  private String jobId;
+  private State state = State.NOT_RUNNING;
+  private double percentComplete = 0.0;
+  private String description;
+  private Path resultPath;
+
+  public JobStatus withJobId(String jobId) {
+    this.jobId = jobId;
+    return this;
+  }
+
+  public JobStatus withState(State state) {
+    this.state = state;
+    return this;
+  }
+
+  public JobStatus withPercentComplete(double percentComplete) {
+    this.percentComplete = percentComplete;
+    return this;
+  }
+
+  public JobStatus withDescription(String description) {
+    this.description = description;
+    return this;
+  }
+
+  public JobStatus withResultPath(Path resultPath) {
+    this.resultPath = resultPath;
+    return this;
+  }
+
+  public State getState() {
+    return state;
+  }
+
+  public double getPercentComplete() {
+    return percentComplete;
+  }
+
+  public String getDescription() {
+    return description;
+  }
+
+  public Path getResultPath() {
+    return resultPath;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-job/src/main/java/org/apache/metron/job/Pageable.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-job/src/main/java/org/apache/metron/job/Pageable.java 
b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Pageable.java
new file mode 100644
index 0000000..1038ab8
--- /dev/null
+++ 
b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Pageable.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.job;
+
+public interface Pageable<T> {
+
+  /**
+   * Transform into an Iterable.
+   *
+   * @return Iterable version of this Pageable.
+   */
+  Iterable<T> asIterable();
+
+  /**
+   * Provides access to a specific page of results in the result set.
+   *
+   * @param num page number to access.
+   * @return value at the specified page.
+   */
+  T getPage(int num);
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java
 
b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java
new file mode 100644
index 0000000..7a8fc02
--- /dev/null
+++ 
b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.job;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Abstraction for getting status on running jobs. Also provides options for 
killing and validating.
+ */
+public interface Statusable {
+
+  /**
+   * Current job status.
+   *
+   * @return status
+   */
+  JobStatus getStatus();
+
+  /**
+   * Completion flag.
+   *
+   * @return true if job is completed, whether KILLED, FAILED, SUCCEEDED. 
False otherwise.
+   */
+  boolean isDone();
+
+  /**
+   * Kill job.
+   */
+  void kill() throws IOException;
+
+  /**
+   * Validate job after submitted.
+   *
+   * @param configuration config for validating the job.
+   * @return true if job is valid based on passed configuration, false if 
invalid.
+   */
+  boolean validate(Map<String, Object> configuration);
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
 
b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
index b50d488..0fda801 100644
--- 
a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
+++ 
b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
@@ -17,11 +17,9 @@
  */
 package org.apache.metron.pcap.query;
 
-import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.Arrays;
-import java.util.List;
 import java.util.UUID;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.lang3.tuple.Pair;
@@ -35,6 +33,7 @@ import 
org.apache.metron.common.utils.timestamp.TimestampConverters;
 import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
 import org.apache.metron.pcap.filter.query.QueryPcapFilter;
 import org.apache.metron.pcap.mr.PcapJob;
+import org.apache.metron.pcap.writer.ResultsWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -108,10 +107,7 @@ public class PcapCli {
                 hadoopConf,
                 FileSystem.get(hadoopConf),
                 new FixedPcapFilter.Configurator());
-      } catch (IOException | ClassNotFoundException e) {
-        LOGGER.error("Failed to execute fixed filter job: {}", e.getMessage(), 
e);
-        return -1;
-      } catch (InterruptedException e) {
+      } catch (IOException | ClassNotFoundException | InterruptedException e) {
         LOGGER.error("Failed to execute fixed filter job: {}", e.getMessage(), 
e);
         return -1;
       }
@@ -145,10 +141,7 @@ public class PcapCli {
                 hadoopConf,
                 FileSystem.get(hadoopConf),
                 new QueryPcapFilter.Configurator());
-      } catch (IOException | ClassNotFoundException e) {
-        LOGGER.error("Failed to execute query filter job: {}", e.getMessage(), 
e);
-        return -1;
-      } catch (InterruptedException e) {
+      } catch (IOException | ClassNotFoundException | InterruptedException e) {
         LOGGER.error("Failed to execute query filter job: {}", e.getMessage(), 
e);
         return -1;
       }
@@ -156,30 +149,18 @@ public class PcapCli {
       printBasicHelp();
       return -1;
     }
-    try {
 
-      Iterable<List<byte[]>> partitions = Iterables.partition(results, 
commonConfig.getNumRecordsPerFile());
-      int part = 1;
-      if (partitions.iterator().hasNext()) {
-        for (List<byte[]> data : partitions) {
-          String outFileName = String.format("pcap-data-%s+%04d.pcap", 
commonConfig.getPrefix(), part++);
-          if(data.size() > 0) {
-            resultsWriter.write(data, outFileName);
-          }
-        }
-      } else {
-        System.out.println("No results returned.");
-      }
+    try {
+      // write to local FS in the executing directory
+      String execDir = System.getProperty("user.dir");
+      jobRunner.writeResults(results, resultsWriter, new Path("file:///" + 
execDir),
+          commonConfig.getNumRecordsPerFile(),
+          commonConfig.getPrefix());
     } catch (IOException e) {
       LOGGER.error("Unable to write file", e);
       return -1;
-    } finally {
-      try {
-        results.cleanup();
-      } catch(IOException e) {
-        LOGGER.warn("Unable to cleanup files in HDFS", e);
-      }
     }
+
     return 0;
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/ResultsWriter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/ResultsWriter.java
 
b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/ResultsWriter.java
deleted file mode 100644
index ab11770..0000000
--- 
a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/ResultsWriter.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.pcap.query;
-
-import org.apache.metron.pcap.PcapMerger;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.List;
-
-public class ResultsWriter {
-
-  public void write(List<byte[]> pcaps, String outPath) throws IOException {
-    File out = new File(outPath);
-    try (FileOutputStream fos = new FileOutputStream(out)) {
-      fos.write(mergePcaps(pcaps));
-    }
-  }
-
-  public byte[] mergePcaps(List<byte[]> pcaps) throws IOException {
-    if (pcaps == null) {
-      return new byte[]{};
-    }
-    if (pcaps.size() == 1) {
-      return pcaps.get(0);
-    }
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    PcapMerger.merge(baos, pcaps);
-    return baos.toByteArray();
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
 
b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
index 3536a7e..5a5d406 100644
--- 
a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
+++ 
b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
@@ -19,18 +19,70 @@
 package org.apache.metron.pcap;
 
 import static java.lang.Long.toUnsignedString;
+import static java.lang.String.format;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.startsWith;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.job.JobStatus;
+import org.apache.metron.job.JobStatus.State;
+import org.apache.metron.job.Statusable;
+import org.apache.metron.pcap.filter.PcapFilterConfigurator;
+import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
 import org.apache.metron.pcap.mr.PcapJob;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 
 public class PcapJobTest {
 
+  @Mock
+  private Job job;
+  @Mock
+  private org.apache.hadoop.mapreduce.JobStatus mrStatus;
+  @Mock
+  private JobID jobId;
+  private static final String JOB_ID_VAL = "job_abc_123";
+  private Path basePath;
+  private Path baseOutPath;
+  private long startTime;
+  private long endTime;
+  private int numReducers;
+  private Map<String, String> fixedFields;
+  private Configuration hadoopConfig;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    basePath = new Path("basepath");
+    baseOutPath = new Path("outpath");
+    startTime = 100;
+    endTime = 200;
+    numReducers = 5;
+    fixedFields = new HashMap<>();
+    fixedFields.put("ip_src_addr", "192.168.1.1");
+    hadoopConfig = new Configuration();
+    when(jobId.toString()).thenReturn(JOB_ID_VAL);
+    when(mrStatus.getJobID()).thenReturn(jobId);
+  }
+
   @Test
   public void partition_gives_value_in_range() throws Exception {
     long start = 1473897600000000000L;
@@ -46,4 +98,150 @@ public class PcapJobTest {
         equalTo(8));
   }
 
+  private class TestJob extends PcapJob {
+
+    @Override
+    public <T> Job createJob(Optional<String> jobName, Path basePath, Path 
outputPath, long beginNS, long endNS,
+        int numReducers, T fields, Configuration conf, FileSystem fs,
+        PcapFilterConfigurator<T> filterImpl) throws IOException {
+      return job;
+    }
+  }
+
+  @Test
+  public void job_succeeds_synchronously() throws Exception {
+    when(job.isComplete()).thenReturn(true);
+    
when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED);
+    when(job.getStatus()).thenReturn(mrStatus);
+    TestJob testJob = new TestJob();
+    Statusable statusable = testJob.query(
+        Optional.empty(),
+        basePath,
+        baseOutPath,
+        startTime,
+        endTime,
+        numReducers,
+        fixedFields,
+        hadoopConfig,
+        FileSystem.get(hadoopConfig),
+        new FixedPcapFilter.Configurator(),
+        true);
+    verify(job, times(1)).waitForCompletion(true);
+    JobStatus status = statusable.getStatus();
+    Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED));
+    Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
+    String expectedOutPath = new Path(baseOutPath, format("%s_%s_%s", 
startTime, endTime, "192.168.1.1")).toString();
+    Assert.assertThat(status.getResultPath(), notNullValue());
+    Assert.assertThat(status.getResultPath().toString(), 
startsWith(expectedOutPath));
+  }
+
+  @Test
+  public void job_fails_synchronously() throws Exception {
+    when(job.isComplete()).thenReturn(true);
+    
when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
+    when(job.getStatus()).thenReturn(mrStatus);
+    TestJob testJob = new TestJob();
+    Statusable statusable = testJob.query(
+        Optional.empty(),
+        basePath,
+        baseOutPath,
+        startTime,
+        endTime,
+        numReducers,
+        fixedFields,
+        hadoopConfig,
+        FileSystem.get(hadoopConfig),
+        new FixedPcapFilter.Configurator(),
+        true);
+    JobStatus status = statusable.getStatus();
+    Assert.assertThat(status.getState(), equalTo(State.FAILED));
+    Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
+    String expectedOutPath = new Path(baseOutPath, format("%s_%s_%s", 
startTime, endTime, "192.168.1.1")).toString();
+    Assert.assertThat(status.getResultPath(), notNullValue());
+    Assert.assertThat(status.getResultPath().toString(), 
startsWith(expectedOutPath));
+  }
+
+  @Test
+  public void job_fails_with_killed_status_synchronously() throws Exception {
+    when(job.isComplete()).thenReturn(true);
+    
when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
+    when(job.getStatus()).thenReturn(mrStatus);
+    TestJob testJob = new TestJob();
+    Statusable statusable = testJob.query(
+        Optional.empty(),
+        basePath,
+        baseOutPath,
+        startTime,
+        endTime,
+        numReducers,
+        fixedFields,
+        hadoopConfig,
+        FileSystem.get(hadoopConfig),
+        new FixedPcapFilter.Configurator(),
+        true);
+    JobStatus status = statusable.getStatus();
+    Assert.assertThat(status.getState(), equalTo(State.KILLED));
+    Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
+    String expectedOutPath = new Path(baseOutPath, format("%s_%s_%s", 
startTime, endTime, "192.168.1.1")).toString();
+    Assert.assertThat(status.getResultPath(), notNullValue());
+    Assert.assertThat(status.getResultPath().toString(), 
startsWith(expectedOutPath));
+  }
+
+  @Test
+  public void job_succeeds_asynchronously() throws Exception {
+    when(job.isComplete()).thenReturn(true);
+    
when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED);
+    when(job.getStatus()).thenReturn(mrStatus);
+    TestJob testJob = new TestJob();
+    Statusable statusable = testJob.query(
+        Optional.empty(),
+        basePath,
+        baseOutPath,
+        startTime,
+        endTime,
+        numReducers,
+        fixedFields,
+        hadoopConfig,
+        FileSystem.get(hadoopConfig),
+        new FixedPcapFilter.Configurator(),
+        false);
+    JobStatus status = statusable.getStatus();
+    Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED));
+    Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
+    String expectedOutPath = new Path(baseOutPath, format("%s_%s_%s", 
startTime, endTime, "192.168.1.1")).toString();
+    Assert.assertThat(status.getResultPath(), notNullValue());
+    Assert.assertThat(status.getResultPath().toString(), 
startsWith(expectedOutPath));
+  }
+
+  @Test
+  public void job_reports_percent_complete() throws Exception {
+    when(job.isComplete()).thenReturn(false);
+    
when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.RUNNING);
+    when(job.getStatus()).thenReturn(mrStatus);
+    TestJob testJob = new TestJob();
+    Statusable statusable = testJob.query(
+        Optional.empty(),
+        basePath,
+        baseOutPath,
+        startTime,
+        endTime,
+        numReducers,
+        fixedFields,
+        hadoopConfig,
+        FileSystem.get(hadoopConfig),
+        new FixedPcapFilter.Configurator(),
+        false);
+    when(job.mapProgress()).thenReturn(0.5f);
+    when(job.reduceProgress()).thenReturn(0f);
+    JobStatus status = statusable.getStatus();
+    Assert.assertThat(status.getState(), equalTo(State.RUNNING));
+    Assert.assertThat(status.getDescription(), equalTo("map: 50.0%, reduce: 
0.0%"));
+    Assert.assertThat(status.getPercentComplete(), equalTo(25.0));
+    when(job.mapProgress()).thenReturn(1.0f);
+    when(job.reduceProgress()).thenReturn(0.5f);
+    status = statusable.getStatus();
+    Assert.assertThat(status.getPercentComplete(), equalTo(75.0));
+    Assert.assertThat(status.getDescription(), equalTo("map: 100.0%, reduce: 
50.0%"));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
 
b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
index 29c68d0..c7292ab 100644
--- 
a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
+++ 
b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -29,7 +29,6 @@ import java.io.FilenameFilter;
 import java.io.IOException;
 import java.util.AbstractMap;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
 
b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
index 7202819..3468a7c 100644
--- 
a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
+++ 
b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
@@ -17,6 +17,30 @@
  */
 package org.apache.metron.pcap.query;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -28,26 +52,13 @@ import org.apache.metron.pcap.PcapHelper;
 import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
 import org.apache.metron.pcap.filter.query.QueryPcapFilter;
 import org.apache.metron.pcap.mr.PcapJob;
+import org.apache.metron.pcap.writer.ResultsWriter;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-import java.nio.charset.StandardCharsets;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.*;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.*;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 public class PcapCliTest {
 
   @Mock
@@ -56,10 +67,13 @@ public class PcapCliTest {
   private ResultsWriter resultsWriter;
   @Mock
   private Clock clock;
+  private String execDir;
 
   @Before
-  public void setup() {
+  public void setup() throws IOException {
     MockitoAnnotations.initMocks(this);
+    doCallRealMethod().when(jobRunner).writeResults(anyObject(), anyObject(), 
anyObject(), anyInt(), anyObject());
+    execDir = System.getProperty("user.dir");
   }
 
   @Test
@@ -95,7 +109,7 @@ public class PcapCliTest {
 
     PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> 
"random_prefix");
     assertThat("Expect no errors on run", cli.run(args), equalTo(0));
-    Mockito.verify(resultsWriter).write(pcaps, 
"pcap-data-random_prefix+0001.pcap");
+    Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), 
eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap"));
   }
 
   @Test
@@ -135,7 +149,7 @@ public class PcapCliTest {
 
     PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> 
"random_prefix");
     assertThat("Expect no errors on run", cli.run(args), equalTo(0));
-    Mockito.verify(resultsWriter).write(pcaps, 
"pcap-data-random_prefix+0001.pcap");
+    Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), 
eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap"));
   }
 
   @Test
@@ -178,7 +192,7 @@ public class PcapCliTest {
 
     PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> 
"random_prefix");
     assertThat("Expect no errors on run", cli.run(args), equalTo(0));
-    Mockito.verify(resultsWriter).write(pcaps, 
"pcap-data-random_prefix+0001.pcap");
+    Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), 
eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap"));
   }
 
   private long asNanos(String inDate, String format) throws ParseException {
@@ -211,7 +225,7 @@ public class PcapCliTest {
 
     PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> 
"random_prefix");
     assertThat("Expect no errors on run", cli.run(args), equalTo(0));
-    Mockito.verify(resultsWriter).write(pcaps, 
"pcap-data-random_prefix+0001.pcap");
+    Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), 
eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap"));
   }
 
   @Test
@@ -239,7 +253,7 @@ public class PcapCliTest {
 
     PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> 
"random_prefix");
     assertThat("Expect no errors on run", cli.run(args), equalTo(0));
-    Mockito.verify(resultsWriter).write(pcaps, 
"pcap-data-random_prefix+0001.pcap");
+    Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), 
eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap"));
   }
 
   // INVALID OPTION CHECKS

http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-pcap/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/pom.xml 
b/metron-platform/metron-pcap/pom.xml
index c9f873e..640cdc6 100644
--- a/metron-platform/metron-pcap/pom.xml
+++ b/metron-platform/metron-pcap/pom.xml
@@ -40,8 +40,13 @@
             </exclusions>
         </dependency>
         <dependency>
+          <groupId>org.apache.metron</groupId>
+          <artifactId>metron-hbase</artifactId>
+          <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.metron</groupId>
-            <artifactId>metron-hbase</artifactId>
+            <artifactId>metron-job</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFiles.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFiles.java
 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFiles.java
new file mode 100644
index 0000000..997c5f7
--- /dev/null
+++ 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFiles.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap;
+
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.job.Pageable;
+
+public class PcapFiles implements Pageable<Path> {
+
+  private List<Path> files;
+
+  public PcapFiles(List<Path> files) {
+    this.files = files;
+  }
+
+  @Override
+  public Iterable<Path> asIterable() {
+    return files;
+  }
+
+  @Override
+  public Path getPage(int num) {
+    return files.get(num);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
index 14ea3cb..269f69b 100644
--- 
a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
+++ 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
@@ -22,6 +22,7 @@ import static 
org.apache.metron.pcap.PcapHelper.greaterThanOrEqualTo;
 import static org.apache.metron.pcap.PcapHelper.lessThanOrEqualTo;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.text.DateFormat;
@@ -30,6 +31,8 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.stream.Stream;
 import org.apache.commons.lang3.StringUtils;
@@ -48,21 +51,29 @@ import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.metron.common.hadoop.SequenceFileIterable;
+import org.apache.metron.job.JobStatus;
+import org.apache.metron.job.JobStatus.State;
+import org.apache.metron.job.Pageable;
+import org.apache.metron.job.Statusable;
 import org.apache.metron.pcap.PacketInfo;
+import org.apache.metron.pcap.PcapFiles;
 import org.apache.metron.pcap.PcapHelper;
 import org.apache.metron.pcap.filter.PcapFilter;
 import org.apache.metron.pcap.filter.PcapFilterConfigurator;
 import org.apache.metron.pcap.filter.PcapFilters;
 import org.apache.metron.pcap.utils.FileFilterUtil;
+import org.apache.metron.pcap.writer.ResultsWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PcapJob {
+public class PcapJob implements Statusable {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   public static final String START_TS_CONF = "start_ts";
   public static final String END_TS_CONF = "end_ts";
   public static final String WIDTH_CONF = "width";
+  private Job job; // store a running MR job reference for async status check
+  private Path outputPath;
 
   public static enum PCAP_COUNTER {
     MALFORMED_PACKET_COUNT
@@ -75,12 +86,12 @@ public class PcapJob {
     Long width = null;
     @Override
     public int getPartition(LongWritable longWritable, BytesWritable 
bytesWritable, int numPartitions) {
-      if(start == null) {
+      if (start == null) {
         initialize();
       }
       long x = longWritable.get();
       int ret = (int)Long.divideUnsigned(x - start, width);
-      if(ret > numPartitions) {
+      if (ret > numPartitions) {
         throw new IllegalArgumentException(String.format("Bad partition: 
key=%s, width=%d, partition=%d, numPartitions=%d"
                 , Long.toUnsignedString(x), width, ret, numPartitions)
             );
@@ -104,6 +115,7 @@ public class PcapJob {
       return configuration;
     }
   }
+
   public static class PcapMapper extends Mapper<LongWritable, BytesWritable, 
LongWritable, BytesWritable> {
 
     PcapFilter filter;
@@ -129,7 +141,7 @@ public class PcapJob {
         List<PacketInfo> packetInfos;
         try {
           packetInfos = PcapHelper.toPacketInfo(value.copyBytes());
-        } catch(Exception e) {
+        } catch (Exception e) {
           // toPacketInfo is throwing RuntimeExceptions. Attempt to catch and 
count errors with malformed packets
           context.getCounter(PCAP_COUNTER.MALFORMED_PACKET_COUNT).increment(1);
           return;
@@ -149,30 +161,15 @@ public class PcapJob {
   public static class PcapReducer extends Reducer<LongWritable, BytesWritable, 
LongWritable, BytesWritable> {
     @Override
     protected void reduce(LongWritable key, Iterable<BytesWritable> values, 
Context context) throws IOException, InterruptedException {
-      for(BytesWritable value : values) {
+      for (BytesWritable value : values) {
         context.write(key, value);
       }
     }
   }
 
   /**
-   * Returns a lazily-read Iterable over a set of sequence files
+   * Run query synchronously.
    */
-  private SequenceFileIterable readResults(Path outputPath, Configuration 
config, FileSystem fs) throws IOException {
-    List<Path> files = new ArrayList<>();
-    for (RemoteIterator<LocatedFileStatus> it = fs.listFiles(outputPath, 
false); it.hasNext(); ) {
-      Path p = it.next().getPath();
-      if (p.getName().equals("_SUCCESS")) {
-        fs.delete(p, false);
-        continue;
-      }
-      files.add(p);
-    }
-    LOG.debug("Output path={}", outputPath);
-    Collections.sort(files, (o1,o2) -> o1.getName().compareTo(o2.getName()));
-    return new SequenceFileIterable(files, config);
-  }
-
   public <T> SequenceFileIterable query(Path basePath
                             , Path baseOutputPath
                             , long beginNS
@@ -183,43 +180,118 @@ public class PcapJob {
                             , FileSystem fs
                             , PcapFilterConfigurator<T> filterImpl
                             ) throws IOException, ClassNotFoundException, 
InterruptedException {
-    String fileName = Joiner.on("_").join(beginNS, endNS, 
filterImpl.queryToString(fields), UUID.randomUUID().toString());
+    Statusable statusable = query(Optional.empty(), basePath, baseOutputPath, 
beginNS, endNS, numReducers, fields,
+        conf,
+        fs, filterImpl, true);
+    JobStatus jobStatus = statusable.getStatus();
+    if (jobStatus.getState() == State.SUCCEEDED) {
+      Path resultPath = jobStatus.getResultPath();
+      return readResults(resultPath, conf, fs);
+    } else {
+      throw new RuntimeException(
+          "Unable to complete query due to errors.  Please check logs for full 
errors.");
+    }
+  }
+
+  /**
+   * Run query sync OR async based on flag. Async mode allows the client to 
check the returned
+   * statusable object for status details.
+   */
+  public <T> Statusable query(Optional<String> jobName,
+      Path basePath,
+      Path baseOutputPath,
+      long beginNS,
+      long endNS,
+      int numReducers,
+      T fields,
+      Configuration conf,
+      FileSystem fs,
+      PcapFilterConfigurator<T> filterImpl,
+      boolean sync)
+      throws IOException, ClassNotFoundException, InterruptedException {
+    String outputDirName = Joiner.on("_").join(beginNS, endNS, 
filterImpl.queryToString(fields), UUID.randomUUID().toString());
     if(LOG.isDebugEnabled()) {
       DateFormat format = SimpleDateFormat.getDateTimeInstance( 
SimpleDateFormat.LONG
-                                                              , 
SimpleDateFormat.LONG
-                                                              );
+          , SimpleDateFormat.LONG
+      );
       String from = format.format(new Date(Long.divideUnsigned(beginNS, 
1000000)));
       String to = format.format(new Date(Long.divideUnsigned(endNS, 1000000)));
       LOG.debug("Executing query {} on timerange from {} to {}", 
filterImpl.queryToString(fields), from, to);
     }
-    Path outputPath =  new Path(baseOutputPath, fileName);
-    Job job = createJob( basePath
-                       , outputPath
-                       , beginNS
-                       , endNS
-                       , numReducers
-                       , fields
-                       , conf
-                       , fs
-                       , filterImpl
-                       );
-    if (job == null) {
-      LOG.info("No files to process with specified date range.");
-      return new SequenceFileIterable(new ArrayList<>(), conf);
+    outputPath =  new Path(baseOutputPath, outputDirName);
+    job = createJob(jobName
+        , basePath
+        , outputPath
+        , beginNS
+        , endNS
+        , numReducers
+        , fields
+        , conf
+        , fs
+        , filterImpl
+    );
+    if (sync) {
+      job.waitForCompletion(true);
+    } else {
+      job.submit();
     }
-    boolean completed = job.waitForCompletion(true);
-    if(completed) {
-      return readResults(outputPath, conf, fs);
+    return this;
+  }
+
+  /**
+   * Returns a lazily-read Iterable over a set of sequence files
+   */
+  private SequenceFileIterable readResults(Path outputPath, Configuration 
config, FileSystem fs) throws IOException {
+    List<Path> files = new ArrayList<>();
+    for (RemoteIterator<LocatedFileStatus> it = fs.listFiles(outputPath, 
false); it.hasNext(); ) {
+      Path p = it.next().getPath();
+      if (p.getName().equals("_SUCCESS")) {
+        fs.delete(p, false);
+        continue;
+      }
+      files.add(p);
+    }
+    if (files.size() == 0) {
+      LOG.info("No files to process with specified date range.");
     } else {
-      throw new RuntimeException("Unable to complete query due to errors.  
Please check logs for full errors.");
+      LOG.debug("Output path={}", outputPath);
+      Collections.sort(files, (o1, o2) -> 
o1.getName().compareTo(o2.getName()));
     }
+    return new SequenceFileIterable(files, config);
   }
 
-  public static long findWidth(long start, long end, int numReducers) {
-    return Long.divideUnsigned(end - start, numReducers) + 1;
+  public Pageable<Path> writeResults(SequenceFileIterable results, 
ResultsWriter resultsWriter,
+      Path outPath, int recPerFile, String prefix) throws IOException {
+    List<Path> outFiles = new ArrayList<>();
+    try {
+      Iterable<List<byte[]>> partitions = Iterables.partition(results, 
recPerFile);
+      int part = 1;
+      if (partitions.iterator().hasNext()) {
+        for (List<byte[]> data : partitions) {
+          String outFileName = String.format("%s/pcap-data-%s+%04d.pcap", 
outPath, prefix, part++);
+          if (data.size() > 0) {
+            resultsWriter.write(new Configuration(), data, outFileName);
+            outFiles.add(new Path(outFileName));
+          }
+        }
+      } else {
+        LOG.info("No results returned.");
+      }
+    } finally {
+      try {
+        results.cleanup();
+      } catch (IOException e) {
+        LOG.warn("Unable to cleanup files in HDFS", e);
+      }
+    }
+    return new PcapFiles(outFiles);
   }
 
-  public <T> Job createJob( Path basePath
+  /**
+   * Creates, but does not submit the job.
+   */
+  public <T> Job createJob(Optional<String> jobName
+                      ,Path basePath
                       , Path outputPath
                       , long beginNS
                       , long endNS
@@ -235,6 +307,7 @@ public class PcapJob {
     conf.set(WIDTH_CONF, "" + findWidth(beginNS, endNS, numReducers));
     filterImpl.addToConfig(fields, conf);
     Job job = Job.getInstance(conf);
+    jobName.ifPresent(job::setJobName);
     job.setJarByClass(PcapJob.class);
     job.setMapperClass(PcapJob.PcapMapper.class);
     job.setMapOutputKeyClass(LongWritable.class);
@@ -256,6 +329,10 @@ public class PcapJob {
     return job;
   }
 
+  public static long findWidth(long start, long end, int numReducers) {
+    return Long.divideUnsigned(end - start, numReducers) + 1;
+  }
+
   protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws 
IOException {
     List<Path> ret = new ArrayList<>();
     RemoteIterator<LocatedFileStatus> filesIt = fs.listFiles(basePath, true);
@@ -265,4 +342,62 @@ public class PcapJob {
     return ret;
   }
 
+  @Override
+  public JobStatus getStatus() {
+    // Note: this method is only reading state from the underlying job, so 
locking not needed
+    JobStatus status = new JobStatus().withResultPath(outputPath);
+    if (job == null) {
+      status.withPercentComplete(100).withState(State.SUCCEEDED);
+    } else {
+      try {
+        status.withJobId(job.getStatus().getJobID().toString());
+        if (job.isComplete()) {
+          status.withPercentComplete(100);
+          switch (job.getStatus().getState()) {
+            case SUCCEEDED:
+              
status.withState(State.SUCCEEDED).withDescription(State.SUCCEEDED.toString());
+              break;
+            case FAILED:
+              status.withState(State.FAILED);
+              break;
+            case KILLED:
+              status.withState(State.KILLED);
+              break;
+          }
+        } else {
+          float mapProg = job.mapProgress();
+          float reduceProg = job.reduceProgress();
+          float totalProgress = ((mapProg / 2) + (reduceProg / 2)) * 100;
+          String description = String.format("map: %s%%, reduce: %s%%", 
mapProg * 100, reduceProg * 100);
+          status.withPercentComplete(totalProgress).withState(State.RUNNING)
+              .withDescription(description);
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Error occurred while attempting to 
retrieve job status.", e);
+      }
+    }
+    return status;
+  }
+
+  @Override
+  public boolean isDone() {
+    // Note: this method is only reading state from the underlying job, so 
locking not needed
+    try {
+      return job.isComplete();
+    } catch (Exception e) {
+      throw new RuntimeException("Error occurred while attempting to retrieve 
job status.", e);
+    }
+  }
+
+  @Override
+  public void kill() throws IOException {
+    job.killJob();
+  }
+
+  @Override
+  public boolean validate(Map<String, Object> configuration) {
+    // default implementation placeholder
+    return true;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/ResultsWriter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/ResultsWriter.java
 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/ResultsWriter.java
new file mode 100644
index 0000000..3934aca
--- /dev/null
+++ 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/ResultsWriter.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.writer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.metron.common.utils.HDFSUtils;
+import org.apache.metron.pcap.PcapMerger;
+
+public class ResultsWriter {
+
+  /**
+   * Write out pcaps. Configuration offers ability to configure for HDFS or 
local FS, if desired.
+   *
+   * @param config Standard hadoop filesystem config.
+   * @param pcaps pcap data to write. Pre-merged format as a list of pcaps as 
byte arrays.
+   * @param outPath where to write the pcap data to.
+   * @throws IOException I/O issue encountered.
+   */
+  public void write(Configuration config, List<byte[]> pcaps, String outPath) 
throws IOException {
+    HDFSUtils.write(config, mergePcaps(pcaps), outPath);
+  }
+
+  /**
+   * Creates a pcap file with proper global header from individual pcaps.
+   *
+   * @param pcaps pcap records to merge into a pcap file with header.
+   * @return merged result.
+   * @throws IOException I/O issue encountered.
+   */
+  public byte[] mergePcaps(List<byte[]> pcaps) throws IOException {
+    if (pcaps == null) {
+      return new byte[]{};
+    }
+    if (pcaps.size() == 1) {
+      return pcaps.get(0);
+    }
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PcapMerger.merge(baos, pcaps);
+    return baos.toByteArray();
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/9cee51eb/metron-platform/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/pom.xml b/metron-platform/pom.xml
index a99dbc7..cb64f9e 100644
--- a/metron-platform/pom.xml
+++ b/metron-platform/pom.xml
@@ -46,6 +46,7 @@
                <module>metron-enrichment</module>
                <module>metron-solr</module>
                <module>metron-parsers</module>
+               <module>metron-job</module>
                <module>metron-pcap-backend</module>
                <module>metron-data-management</module>
                <module>metron-pcap</module>

Reply via email to