METRON-1726: Refactor PcapTopologyIntegrationTest (mmiklavc via mmiklavc) 
closes apache/metron#1140


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/7967f358
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/7967f358
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/7967f358

Branch: refs/heads/feature/METRON-1699-create-batch-profiler
Commit: 7967f358c6c4b8437935c43e54179126e48e248f
Parents: 3e77859
Author: mmiklavc <michael.miklav...@gmail.com>
Authored: Tue Aug 7 15:02:20 2018 -0600
Committer: Michael Miklavcic <michael.miklav...@gmail.com>
Committed: Tue Aug 7 15:02:20 2018 -0600

----------------------------------------------------------------------
 .../org/apache/metron/pcap/query/PcapCli.java   |   2 +
 .../PcapTopologyIntegrationTest.java            | 892 ++++++++++---------
 .../metron/pcap/finalizer/PcapCliFinalizer.java |   5 +-
 3 files changed, 473 insertions(+), 426 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/7967f358/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
 
b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
index 0b06b0c..c23f037 100644
--- 
a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
+++ 
b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
@@ -87,6 +87,7 @@ public class PcapCli {
       try {
         config = fixedParser.parse(otherArgs);
         commonConfig = config;
+        PcapOptions.FINAL_OUTPUT_PATH.put(commonConfig, new Path(execDir));
       } catch (ParseException | java.text.ParseException e) {
         System.err.println(e.getMessage());
         System.err.flush();
@@ -112,6 +113,7 @@ public class PcapCli {
       try {
         config = queryParser.parse(otherArgs);
         commonConfig = config;
+        PcapOptions.FINAL_OUTPUT_PATH.put(commonConfig, new Path(execDir));
       } catch (ParseException | java.text.ParseException e) {
         System.err.println(e.getMessage());
         queryParser.printHelp();

http://git-wip-us.apache.org/repos/asf/metron/blob/7967f358/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
 
b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
index 108fd2b..c30267d 100644
--- 
a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
+++ 
b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -58,6 +58,7 @@ import org.apache.metron.integration.components.MRComponent;
 import org.apache.metron.integration.components.ZKServerComponent;
 import org.apache.metron.integration.utils.KafkaUtil;
 import org.apache.metron.job.JobStatus;
+import org.apache.metron.job.Pageable;
 import org.apache.metron.job.Statusable;
 import org.apache.metron.pcap.PacketInfo;
 import org.apache.metron.pcap.PcapHelper;
@@ -73,7 +74,10 @@ import org.apache.metron.spout.pcap.Endianness;
 import org.apache.metron.spout.pcap.deserializer.Deserializers;
 import org.apache.metron.test.utils.UnitTestHelper;
 import org.json.simple.JSONObject;
+import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -85,12 +89,19 @@ public class PcapTopologyIntegrationTest extends 
BaseIntegrationTest {
   private static String OUTPUT_DIR = BASE_DIR + "/output";
   private static final int MAX_RETRIES = 30;
   private static final int SLEEP_MS = 500;
-  private String topologiesDir = "src/main/flux";
-  private String targetDir = "target";
+  private static String topologiesDir = "src/main/flux";
+  private static String targetDir = "target";
+  private static ComponentRunner runner;
+  private static File inputDir;
+  private static File interimResultDir;
+  private static File outputDir;
+  private static List<Map.Entry<byte[], byte[]>> pcapEntries;
+  private static boolean withHeaders;
+  private FixedPcapConfig configuration;
 
   private static void clearOutDirs(File... dirs) throws IOException {
-    for(File dir: dirs) {
-      for(File f : dir.listFiles()) {
+    for (File dir : dirs) {
+      for (File f : dir.listFiles()) {
         if (f.isDirectory()) {
           FileUtils.deleteDirectory(f);
         } else {
@@ -99,8 +110,8 @@ public class PcapTopologyIntegrationTest extends 
BaseIntegrationTest {
       }
     }
   }
-  private static int numFiles(File outDir, Configuration config) {
 
+  private static int numFiles(File outDir, Configuration config) {
     return outDir.list(new FilenameFilter() {
       @Override
       public boolean accept(File dir, String name) {
@@ -109,11 +120,12 @@ public class PcapTopologyIntegrationTest extends 
BaseIntegrationTest {
     }).length;
   }
 
-  // This will eventually be completely deprecated.  As it takes a significant 
amount of testing, the test is being disabled.
+  // This will eventually be completely deprecated.
+  // As it takes a significant amount of testing, the test is being disabled.
   @Ignore
   @Test
   public void testTimestampInPacket() throws Exception {
-    testTopology(new Function<Properties, Void>() {
+    setupTopology(new Function<Properties, Void>() {
       @Nullable
       @Override
       public Void apply(@Nullable Properties input) {
@@ -129,9 +141,14 @@ public class PcapTopologyIntegrationTest extends 
BaseIntegrationTest {
                );
   }
 
-  @Test
-  public void testTimestampInKey() throws Exception {
-    testTopology(new Function<Properties, Void>() {
+  /**
+   * Sets up component infrastructure once for all tests.
+   */
+  @BeforeClass
+  public static void setupAll() throws Exception {
+    System.out.println("Setting up test components");
+    withHeaders = false;
+    setupTopology(new Function<Properties, Void>() {
       @Nullable
       @Override
       public Void apply(@Nullable Properties input) {
@@ -154,7 +171,30 @@ public class PcapTopologyIntegrationTest extends 
BaseIntegrationTest {
           System.out.println("Wrote " + pcapEntries.size() + " to kafka");
         }
       }
-    }, false);
+    }, withHeaders);
+    System.out.println("Done with setup.");
+  }
+
+  private static File getDir(String targetDir, String childDir) {
+    File directory = new File(new File(targetDir), childDir);
+    if (!directory.exists()) {
+      directory.mkdirs();
+    }
+    return directory;
+  }
+
+  /**
+   * Cleans up component infrastructure after all tests finish running.
+   */
+  @AfterClass
+  public static void teardownAll() throws Exception {
+    System.out.println("Tearing down test infrastructure");
+    System.out.println("Stopping runner");
+    runner.stop();
+    System.out.println("Done stopping runner");
+    System.out.println("Clearing output directories");
+    clearOutDirs(inputDir, interimResultDir, outputDir);
+    System.out.println("Finished");
   }
 
   private static long getTimestamp(int offset, List<Map.Entry<byte[], byte[]>> 
entries) {
@@ -165,27 +205,27 @@ public class PcapTopologyIntegrationTest extends 
BaseIntegrationTest {
     public void send(KafkaComponent kafkaComponent, List<Map.Entry<byte[], 
byte[]>> entries) throws Exception;
   }
 
-  public void testTopology(Function<Properties, Void> updatePropertiesCallback
+  public static void setupTopology(Function<Properties, Void> 
updatePropertiesCallback
                           ,SendEntries sendPcapEntriesCallback
                           ,boolean withHeaders
                           )
-          throws Exception
-  {
+          throws Exception {
     if (!new File(topologiesDir).exists()) {
       topologiesDir = UnitTestHelper.findDir("topologies");
     }
     targetDir = UnitTestHelper.findDir("target");
-    final File inputDir = getDir(targetDir, DATA_DIR);
-    final File interimResultDir = getDir(targetDir, INTERIM_RESULT);
-    final File outputDir = getDir(targetDir, OUTPUT_DIR);
+    inputDir = getDir(targetDir, DATA_DIR);
+    interimResultDir = getDir(targetDir, INTERIM_RESULT);
+    outputDir = getDir(targetDir, OUTPUT_DIR);
     clearOutDirs(inputDir, interimResultDir, outputDir);
 
     File baseDir = new File(new File(targetDir), BASE_DIR);
     //Assert.assertEquals(0, numFiles(outDir));
     Assert.assertNotNull(topologiesDir);
     Assert.assertNotNull(targetDir);
-    Path pcapFile = new 
Path("../metron-integration-test/src/main/sample/data/SampleInput/PCAPExampleOutput");
-    final List<Map.Entry<byte[], byte[]>> pcapEntries = 
Lists.newArrayList(readPcaps(pcapFile, withHeaders));
+    Path pcapFile = new Path(
+        
"../metron-integration-test/src/main/sample/data/SampleInput/PCAPExampleOutput");
+    pcapEntries = Lists.newArrayList(readPcaps(pcapFile, withHeaders));
     Assert.assertTrue(Iterables.size(pcapEntries) > 0);
     final Properties topologyProperties = new Properties() {{
       setProperty("topology.workers", "1");
@@ -206,415 +246,428 @@ public class PcapTopologyIntegrationTest extends 
BaseIntegrationTest {
 
     final ZKServerComponent zkServerComponent = 
getZKServerComponent(topologyProperties);
 
-    final KafkaComponent kafkaComponent = 
getKafkaComponent(topologyProperties, Collections.singletonList(
+    final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties,
+        Collections.singletonList(
             new KafkaComponent.Topic(KAFKA_TOPIC, 1)));
 
-
     final MRComponent mr = new 
MRComponent().withBasePath(baseDir.getAbsolutePath());
 
     FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
-            .withTopologyLocation(new File(topologiesDir + 
"/pcap/remote.yaml"))
-            .withTopologyName("pcap")
-            .withTopologyProperties(topologyProperties)
-            .build();
+        .withTopologyLocation(new File(topologiesDir + "/pcap/remote.yaml"))
+        .withTopologyName("pcap")
+        .withTopologyProperties(topologyProperties)
+        .build();
     //UnitTestHelper.verboseLogging();
-    ComponentRunner runner = new ComponentRunner.Builder()
-            .withComponent("mr", mr)
-            .withComponent("zk",zkServerComponent)
-            .withComponent("kafka", kafkaComponent)
-            .withComponent("storm", fluxComponent)
-            .withMaxTimeMS(-1)
-            .withMillisecondsBetweenAttempts(2000)
-            .withNumRetries(10)
-            .withCustomShutdownOrder(new String[]{"storm","kafka","zk","mr"})
-            .build();
-    try {
-      runner.start();
-
-      fluxComponent.submitTopology();
-      sendPcapEntriesCallback.send(kafkaComponent, pcapEntries);
-      runner.process(new Processor<Void>() {
-        @Override
-        public ReadinessState process(ComponentRunner runner) {
-          int numFiles = numFiles(inputDir, mr.getConfiguration());
-          int expectedNumFiles = pcapEntries.size() / 2;
-          if (numFiles == expectedNumFiles) {
-            return ReadinessState.READY;
-          } else {
-            return ReadinessState.NOT_READY;
-          }
-        }
-
-        @Override
-        public ProcessorResult<Void> getResult() {
-          return null;
+    runner = new ComponentRunner.Builder()
+        .withComponent("mr", mr)
+        .withComponent("zk", zkServerComponent)
+        .withComponent("kafka", kafkaComponent)
+        .withComponent("storm", fluxComponent)
+        .withMaxTimeMS(-1)
+        .withMillisecondsBetweenAttempts(2000)
+        .withNumRetries(10)
+        .withCustomShutdownOrder(new String[]{"storm", "kafka", "zk", "mr"})
+        .build();
+    runner.start();
+
+    fluxComponent.submitTopology();
+    sendPcapEntriesCallback.send(kafkaComponent, pcapEntries);
+    runner.process(new Processor<Void>() {
+      @Override
+      public ReadinessState process(ComponentRunner runner) {
+        int numFiles = numFiles(inputDir, mr.getConfiguration());
+        int expectedNumFiles = pcapEntries.size() / 2;
+        if (numFiles == expectedNumFiles) {
+          return ReadinessState.READY;
+        } else {
+          return ReadinessState.NOT_READY;
         }
-      });
-
-      FixedPcapConfig configuration = new 
FixedPcapConfig(PcapCli.PREFIX_STRATEGY);
-      Configuration hadoopConf = new Configuration();
-      PcapOptions.JOB_NAME.put(configuration, "jobName");
-      PcapOptions.HADOOP_CONF.put(configuration, hadoopConf);
-      PcapOptions.FILESYSTEM.put(configuration, FileSystem.get(hadoopConf));
-      PcapOptions.BASE_PATH.put(configuration, new 
Path(inputDir.getAbsolutePath()));
-      PcapOptions.BASE_INTERIM_RESULT_PATH.put(configuration, new 
Path(interimResultDir.getAbsolutePath()));
-      PcapOptions.START_TIME_NS.put(configuration, getTimestamp(4, 
pcapEntries));
-      PcapOptions.END_TIME_NS.put(configuration, getTimestamp(5, pcapEntries));
-      PcapOptions.NUM_REDUCERS.put(configuration, 10);
-      PcapOptions.NUM_RECORDS_PER_FILE.put(configuration, 2);
-      PcapOptions.FINAL_OUTPUT_PATH.put(configuration, new 
Path(outputDir.getAbsolutePath()));
-      {
-        //Ensure that only two pcaps are returned when we look at 4 and 5
-        PcapOptions.FILTER_IMPL.put(configuration, new 
FixedPcapFilter.Configurator());
-        PcapOptions.FIELDS.put(configuration, new HashMap());
-        PcapJob<Map<String, String>> job = new PcapJob<>();
-        Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
-        Assert.assertEquals(Statusable.JobType.MAP_REDUCE, 
results.getJobType());
-        waitForJob(results);
-
-        Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
-        Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
-          try {
-            return HDFSUtils.readBytes(path);
-          } catch (IOException e) {
-            throw new IllegalStateException(e);
-          }
-        });
-        assertInOrder(bytes);
-        Assert.assertEquals(results.get().getSize(), 1);
-      }
-      {
-        // Ensure that only two pcaps are returned when we look at 4 and 5
-        // test with empty query filter
-        PcapOptions.FILTER_IMPL.put(configuration, new 
QueryPcapFilter.Configurator());
-        PcapOptions.FIELDS.put(configuration, "");
-        PcapJob<String> job = new PcapJob<>();
-        Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
-        Assert.assertEquals(Statusable.JobType.MAP_REDUCE, 
results.getJobType());
-        waitForJob(results);
-
-        Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
-        Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
-          try {
-            return HDFSUtils.readBytes(path);
-          } catch (IOException e) {
-            throw new IllegalStateException(e);
-          }
-        });
-        assertInOrder(bytes);
-        Assert.assertEquals(results.get().getSize(), 1);
-      }
-      {
-        //ensure that none get returned since date range has no results
-        PcapOptions.FILTER_IMPL.put(configuration, new 
FixedPcapFilter.Configurator());
-        PcapOptions.FIELDS.put(configuration, new HashMap<>());
-        PcapOptions.START_TIME_NS.put(configuration, 0);
-        PcapOptions.END_TIME_NS.put(configuration, 1);
-        PcapJob<Map<String, String>> job = new PcapJob<>();
-        Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
-        Assert.assertEquals(Statusable.JobType.MAP_REDUCE, 
results.getJobType());
-        waitForJob(results);
-
-        Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
-        Assert.assertEquals(100.0, results.getStatus().getPercentComplete(), 
0.0);
-        Assert.assertEquals("No results in specified date range.",
-            results.getStatus().getDescription());
-        Assert.assertEquals(results.get().getSize(), 0);
-      }
-      {
-        //ensure that none get returned since that destination IP address 
isn't in the dataset
-        PcapOptions.FILTER_IMPL.put(configuration, new 
FixedPcapFilter.Configurator());
-        PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{
-          put(Constants.Fields.DST_ADDR.getName(), "207.28.210.1");
-        }});
-        PcapJob<Map<String, String>> job = new PcapJob<>();
-        Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
-        Assert.assertEquals(Statusable.JobType.MAP_REDUCE, 
results.getJobType());
-        waitForJob(results);
-
-        Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
-        Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
-          try {
-            return HDFSUtils.readBytes(path);
-          } catch (IOException e) {
-            throw new IllegalStateException(e);
-          }
-        });
-        assertInOrder(bytes);
-        Assert.assertEquals(results.get().getSize(), 0);
-      }
-      {
-        // ensure that none get returned since that destination IP address 
isn't in the dataset
-        // test with query filter
-        PcapOptions.FILTER_IMPL.put(configuration, new 
QueryPcapFilter.Configurator());
-        PcapOptions.FIELDS.put(configuration, "ip_dst_addr == '207.28.210.1'");
-        PcapJob<String> job = new PcapJob<>();
-        Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
-        Assert.assertEquals(Statusable.JobType.MAP_REDUCE, 
results.getJobType());
-        waitForJob(results);
-
-        Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
-        Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
-          try {
-            return HDFSUtils.readBytes(path);
-          } catch (IOException e) {
-            throw new IllegalStateException(e);
-          }
-        });
-        assertInOrder(bytes);
-        Assert.assertEquals(results.get().getSize(), 0);
       }
-      {
-        //same with protocol as before with the destination addr
-        PcapOptions.FILTER_IMPL.put(configuration, new 
FixedPcapFilter.Configurator());
-        PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{
-          put(Constants.Fields.PROTOCOL.getName(), "foo");
-        }});
-        PcapJob<Map<String, String>> job = new PcapJob<>();
-        Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
-        Assert.assertEquals(Statusable.JobType.MAP_REDUCE, 
results.getJobType());
-        waitForJob(results);
-
-        Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
-        Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
-          try {
-            return HDFSUtils.readBytes(path);
-          } catch (IOException e) {
-            throw new IllegalStateException(e);
-          }
-        });
-        assertInOrder(bytes);
-        Assert.assertEquals(results.get().getSize(), 0);
+
+      @Override
+      public ProcessorResult<Void> getResult() {
+        return null;
       }
-      {
-        //same with protocol as before with the destination addr
-        //test with query filter
-        PcapOptions.FILTER_IMPL.put(configuration, new 
QueryPcapFilter.Configurator());
-        PcapOptions.FIELDS.put(configuration, "protocol == 'foo'");
-        PcapJob<String> job = new PcapJob<>();
-        Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
-        Assert.assertEquals(Statusable.JobType.MAP_REDUCE, 
results.getJobType());
-        waitForJob(results);
-
-        Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
-        Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
-          try {
-            return HDFSUtils.readBytes(path);
-          } catch (IOException e) {
-            throw new IllegalStateException(e);
-          }
-        });
-        assertInOrder(bytes);
-        Assert.assertEquals(results.get().getSize(), 0);
+    });
+  }
+
+  /**
+   * This is executed before each individual test.
+   */
+  @Before
+  public void setup() throws IOException {
+    configuration = new FixedPcapConfig(PcapCli.PREFIX_STRATEGY);
+    Configuration hadoopConf = new Configuration();
+    PcapOptions.JOB_NAME.put(configuration, "jobName");
+    PcapOptions.HADOOP_CONF.put(configuration, hadoopConf);
+    PcapOptions.FILESYSTEM.put(configuration, FileSystem.get(hadoopConf));
+    PcapOptions.BASE_PATH.put(configuration, new 
Path(inputDir.getAbsolutePath()));
+    PcapOptions.BASE_INTERIM_RESULT_PATH.put(configuration, new 
Path(interimResultDir.getAbsolutePath()));
+    PcapOptions.NUM_REDUCERS.put(configuration, 10);
+    PcapOptions.NUM_RECORDS_PER_FILE.put(configuration, 1);
+    PcapOptions.FINAL_OUTPUT_PATH.put(configuration, new 
Path(outputDir.getAbsolutePath()));
+  }
+
+  @Test
+  public void filters_pcaps_by_start_end_ns_with_fixed_filter() throws 
Exception {
+    PcapOptions.FILTER_IMPL.put(configuration, new 
FixedPcapFilter.Configurator());
+    PcapOptions.START_TIME_NS.put(configuration, getTimestamp(4, pcapEntries));
+    PcapOptions.END_TIME_NS.put(configuration, getTimestamp(5, pcapEntries));
+    PcapOptions.FIELDS.put(configuration, new HashMap());
+    PcapJob<Map<String, String>> job = new PcapJob<>();
+    Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
+    Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+    waitForJob(results);
+
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
+    Pageable<Path> resultPages = results.get();
+    Iterable<byte[]> bytes = Iterables.transform(resultPages, path -> {
+      try {
+        return HDFSUtils.readBytes(path);
+      } catch (IOException e) {
+        throw new IllegalStateException(e);
       }
-      {
-        //make sure I get them all.
-        PcapOptions.FILTER_IMPL.put(configuration, new 
FixedPcapFilter.Configurator());
-        PcapOptions.FIELDS.put(configuration, new HashMap<>());
-        PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, 
pcapEntries));
-        PcapOptions.END_TIME_NS.put(configuration, 
getTimestamp(pcapEntries.size()-1, pcapEntries) + 1);
-        PcapJob<Map<String, String>> job = new PcapJob<>();
-        Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
-        Assert.assertEquals(Statusable.JobType.MAP_REDUCE, 
results.getJobType());
-        waitForJob(results);
-
-        Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
-        Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
-          try {
-            return HDFSUtils.readBytes(path);
-          } catch (IOException e) {
-            throw new IllegalStateException(e);
-          }
-        });
-        assertInOrder(bytes);
-        Assert.assertEquals(10, results.get().getSize());
+    });
+    assertInOrder(bytes);
+    Assert.assertEquals("Expected 2 records returned.", 2, 
resultPages.getSize());
+    Assert.assertEquals("Expected 1 record in first file.", 1,
+        PcapHelper.toPacketInfo(Iterables.get(bytes, 0)).size());
+    Assert.assertEquals("Expected 1 record in second file.", 1,
+        PcapHelper.toPacketInfo(Iterables.get(bytes, 1)).size());
+  }
+
+  @Test
+  public void filters_pcaps_by_start_end_ns_with_empty_query_filter() throws 
Exception {
+    PcapOptions.FILTER_IMPL.put(configuration, new 
QueryPcapFilter.Configurator());
+    PcapOptions.START_TIME_NS.put(configuration, getTimestamp(4, pcapEntries));
+    PcapOptions.END_TIME_NS.put(configuration, getTimestamp(5, pcapEntries));
+    PcapOptions.FIELDS.put(configuration, "");
+    PcapJob<String> job = new PcapJob<>();
+    Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
+    Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+    waitForJob(results);
+
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
+    Pageable<Path> resultPages = results.get();
+    Iterable<byte[]> bytes = Iterables.transform(resultPages, path -> {
+      try {
+        return HDFSUtils.readBytes(path);
+      } catch (IOException e) {
+        throw new IllegalStateException(e);
       }
-      {
-        //make sure I get them all.
-        //with query filter
-        PcapOptions.FILTER_IMPL.put(configuration, new 
QueryPcapFilter.Configurator());
-        PcapOptions.FIELDS.put(configuration, "");
-        PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, 
pcapEntries));
-        PcapOptions.END_TIME_NS.put(configuration, 
getTimestamp(pcapEntries.size()-1, pcapEntries) + 1);
-        PcapJob<String> job = new PcapJob<>();
-        Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
-        Assert.assertEquals(Statusable.JobType.MAP_REDUCE, 
results.getJobType());
-        waitForJob(results);
-
-        Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
-        Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
-          try {
-            return HDFSUtils.readBytes(path);
-          } catch (IOException e) {
-            throw new IllegalStateException(e);
-          }
-        });
-        assertInOrder(bytes);
-        Assert.assertEquals(10, results.get().getSize());
+    });
+    assertInOrder(bytes);
+    Assert.assertEquals("Expected 2 records returned.", 2, 
resultPages.getSize());
+    Assert.assertEquals("Expected 1 record in first file.", 1,
+        PcapHelper.toPacketInfo(Iterables.get(bytes, 0)).size());
+    Assert.assertEquals("Expected 1 record in second file.", 1,
+        PcapHelper.toPacketInfo(Iterables.get(bytes, 1)).size());
+  }
+
+  @Test
+  public void date_range_filters_out_all_results() throws Exception {
+    PcapOptions.FILTER_IMPL.put(configuration, new 
FixedPcapFilter.Configurator());
+    PcapOptions.FIELDS.put(configuration, new HashMap<>());
+    PcapOptions.START_TIME_NS.put(configuration, 0);
+    PcapOptions.END_TIME_NS.put(configuration, 1);
+    PcapJob<Map<String, String>> job = new PcapJob<>();
+    Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
+    Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+    waitForJob(results);
+
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
+    Assert.assertEquals(100.0, results.getStatus().getPercentComplete(), 0.0);
+    Assert.assertEquals("No results in specified date range.",
+        results.getStatus().getDescription());
+    Assert.assertEquals(results.get().getSize(), 0);
+  }
+
+  @Test
+  public void ip_address_filters_out_all_results_with_fixed_filter() throws 
Exception {
+    PcapOptions.FILTER_IMPL.put(configuration, new 
FixedPcapFilter.Configurator());
+    PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries));
+    PcapOptions.END_TIME_NS.put(configuration, getTimestamp(1, pcapEntries));
+    PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{
+      put(Constants.Fields.DST_ADDR.getName(), "207.28.210.1");
+    }});
+    PcapJob<Map<String, String>> job = new PcapJob<>();
+    Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
+    Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+    waitForJob(results);
+
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
+    Assert.assertEquals(results.get().getSize(), 0);
+  }
+
+  @Test
+  public void ip_address_filters_out_all_results_with_query_filter() throws 
Exception {
+    PcapOptions.FILTER_IMPL.put(configuration, new 
QueryPcapFilter.Configurator());
+    PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries));
+    PcapOptions.END_TIME_NS.put(configuration, getTimestamp(1, pcapEntries));
+    PcapOptions.FIELDS.put(configuration, "ip_dst_addr == '207.28.210.1'");
+    PcapJob<String> job = new PcapJob<>();
+    Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
+    Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+    waitForJob(results);
+
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
+    Assert.assertEquals(results.get().getSize(), 0);
+  }
+
+  @Test
+  public void protocol_filters_out_all_results_with_fixed_filter() throws 
Exception {
+    PcapOptions.FILTER_IMPL.put(configuration, new 
FixedPcapFilter.Configurator());
+    PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries));
+    PcapOptions.END_TIME_NS.put(configuration, getTimestamp(1, pcapEntries));
+    PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{
+      put(Constants.Fields.PROTOCOL.getName(), "foo");
+    }});
+    PcapJob<Map<String, String>> job = new PcapJob<>();
+    Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
+    Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+    waitForJob(results);
+
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
+    Assert.assertEquals(results.get().getSize(), 0);
+  }
+
+  @Test
+  public void protocol_filters_out_all_results_with_query_filter() throws 
Exception {
+    PcapOptions.FILTER_IMPL.put(configuration, new 
QueryPcapFilter.Configurator());
+    PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries));
+    PcapOptions.END_TIME_NS.put(configuration, getTimestamp(1, pcapEntries));
+    PcapOptions.FIELDS.put(configuration, "protocol == 'foo'");
+    PcapJob<String> job = new PcapJob<>();
+    Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
+    Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+    waitForJob(results);
+
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
+    Assert.assertEquals(results.get().getSize(), 0);
+  }
+
+  @Test
+  public void fixed_filter_returns_all_results_for_full_date_range() throws 
Exception {
+    PcapOptions.FILTER_IMPL.put(configuration, new 
FixedPcapFilter.Configurator());
+    PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries));
+    PcapOptions.END_TIME_NS
+        .put(configuration, getTimestamp(pcapEntries.size() - 1, pcapEntries) 
+ 1);
+    PcapOptions.FIELDS.put(configuration, new HashMap<>());
+    PcapJob<Map<String, String>> job = new PcapJob<>();
+    Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
+    Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+    waitForJob(results);
+
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
+    Pageable<Path> resultPages = results.get();
+    Iterable<byte[]> bytes = Iterables.transform(resultPages, path -> {
+      try {
+        return HDFSUtils.readBytes(path);
+      } catch (IOException e) {
+        throw new IllegalStateException(e);
       }
-      {
-        PcapOptions.FILTER_IMPL.put(configuration, new 
FixedPcapFilter.Configurator());
-        PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{
-          put(Constants.Fields.DST_PORT.getName(), "22");
-        }});
-        PcapOptions.NUM_RECORDS_PER_FILE.put(configuration, 1);
-        PcapJob<Map<String, String>> job = new PcapJob<>();
-        Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
-        Assert.assertEquals(Statusable.JobType.MAP_REDUCE, 
results.getJobType());
-        waitForJob(results);
-
-        Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
-        Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
-          try {
-            return HDFSUtils.readBytes(path);
-          } catch (IOException e) {
-            throw new IllegalStateException(e);
-          }
-        });
-        assertInOrder(bytes);
-        Assert.assertTrue(results.get().getSize() > 0);
-        Assert.assertEquals(Iterables.size(bytes)
-                , Iterables.size(filterPcaps(pcapEntries, new 
Predicate<JSONObject>() {
-                          @Override
-                          public boolean apply(@Nullable JSONObject input) {
-                            Object prt = 
input.get(Constants.Fields.DST_PORT.getName());
-                            return prt != null && prt.toString().equals("22");
-                          }
-                        }, withHeaders)
-                )
-        );
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0)));
-        Assert.assertTrue(baos.toByteArray().length > 0);
+    });
+    assertInOrder(bytes);
+    Assert.assertEquals(pcapEntries.size(), resultPages.getSize());
+  }
+
+  @Test
+  public void query_filter_returns_all_results_for_full_date_range() throws 
Exception {
+    PcapOptions.FILTER_IMPL.put(configuration, new 
QueryPcapFilter.Configurator());
+    PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries));
+    PcapOptions.END_TIME_NS
+        .put(configuration, getTimestamp(pcapEntries.size() - 1, pcapEntries) 
+ 1);
+    PcapOptions.FIELDS.put(configuration, "");
+    PcapJob<String> job = new PcapJob<>();
+    Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
+    Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+    waitForJob(results);
+
+    Pageable<Path> resultPages = results.get();
+    Iterable<byte[]> bytes = Iterables.transform(resultPages, path -> {
+      try {
+        return HDFSUtils.readBytes(path);
+      } catch (IOException e) {
+        throw new IllegalStateException(e);
       }
-      {
-        //same with protocol as before with the destination addr
-        //test with query filter
-        PcapOptions.FILTER_IMPL.put(configuration, new 
QueryPcapFilter.Configurator());
-        PcapOptions.FIELDS.put(configuration, "ip_dst_port == 22");
-        PcapJob<String> job = new PcapJob<>();
-        Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
-        Assert.assertEquals(Statusable.JobType.MAP_REDUCE, 
results.getJobType());
-        waitForJob(results);
-
-        Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
-        Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
-          try {
-            return HDFSUtils.readBytes(path);
-          } catch (IOException e) {
-            throw new IllegalStateException(e);
-          }
-        });
-        assertInOrder(bytes);
-        Assert.assertEquals(Iterables.size(bytes)
-                , Iterables.size(filterPcaps(pcapEntries, new 
Predicate<JSONObject>() {
-                          @Override
-                          public boolean apply(@Nullable JSONObject input) {
-                            Object prt = 
input.get(Constants.Fields.DST_PORT.getName());
-                            return prt != null && prt.toString().equals("22");
-                          }
-                        }, withHeaders)
-                )
-        );
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0)));
-        Assert.assertTrue(baos.toByteArray().length > 0);
+    });
+    assertInOrder(bytes);
+    Assert.assertEquals(pcapEntries.size(), resultPages.getSize());
+  }
+
+  @Test
+  public void filters_results_by_dst_port_with_fixed_filter() throws Exception 
{
+    PcapOptions.FILTER_IMPL.put(configuration, new 
FixedPcapFilter.Configurator());
+    PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries));
+    PcapOptions.END_TIME_NS
+        .put(configuration, getTimestamp(pcapEntries.size() - 1, pcapEntries) 
+ 1);
+    PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{
+      put(Constants.Fields.DST_PORT.getName(), "22");
+    }});
+    PcapOptions.NUM_RECORDS_PER_FILE.put(configuration, 1);
+    PcapJob<Map<String, String>> job = new PcapJob<>();
+    Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
+    Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+    waitForJob(results);
+
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
+    Pageable<Path> resultPages = results.get();
+    Iterable<byte[]> bytes = Iterables.transform(resultPages, path -> {
+      try {
+        return HDFSUtils.readBytes(path);
+      } catch (IOException e) {
+        throw new IllegalStateException(e);
       }
-      {
-        // test with query filter ip_dst_port > 20 and ip_dst_port < 55792
-        PcapOptions.FILTER_IMPL.put(configuration, new 
QueryPcapFilter.Configurator());
-        PcapOptions.FIELDS.put(configuration, "ip_dst_port > 20 and 
ip_dst_port < 55792");
-        PcapJob<String> job = new PcapJob<>();
-        Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
-        Assert.assertEquals(Statusable.JobType.MAP_REDUCE, 
results.getJobType());
-        waitForJob(results);
-
-        Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
-        Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
-          try {
-            return HDFSUtils.readBytes(path);
-          } catch (IOException e) {
-            throw new IllegalStateException(e);
+    });
+    assertInOrder(bytes);
+    Assert.assertTrue(resultPages.getSize() > 0);
+    Assert.assertEquals(Iterables.size(filterPcaps(pcapEntries, new 
Predicate<JSONObject>() {
+          @Override
+          public boolean apply(@Nullable JSONObject input) {
+            Object prt = input.get(Constants.Fields.DST_PORT.getName());
+            return prt != null && prt.toString().equals("22");
           }
-        });
-        assertInOrder(bytes);
-        Assert.assertEquals(Iterables.size(bytes)
-                , Iterables.size(filterPcaps(pcapEntries, new 
Predicate<JSONObject>() {
-                          @Override
-                          public boolean apply(@Nullable JSONObject input) {
-                            Object prt = 
input.get(Constants.Fields.DST_PORT.getName());
-                            return prt != null && ((Long) prt > 20 && (Long) 
prt < 55792);
-                          }
-                        }, withHeaders)
-                )
-        );
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0)));
-        Assert.assertTrue(baos.toByteArray().length > 0);
+        }, withHeaders)
+        ), resultPages.getSize()
+    );
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PcapMerger.merge(baos, HDFSUtils.readBytes(resultPages.getPage(0)));
+    Assert.assertTrue(baos.toByteArray().length > 0);
+  }
+
+  @Test
+  public void filters_results_by_dst_port_with_query_filter() throws Exception 
{
+    PcapOptions.FILTER_IMPL.put(configuration, new 
QueryPcapFilter.Configurator());
+    PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries));
+    PcapOptions.END_TIME_NS
+        .put(configuration, getTimestamp(pcapEntries.size() - 1, pcapEntries) 
+ 1);
+    PcapOptions.FIELDS.put(configuration, "ip_dst_port == 22");
+    PcapJob<String> job = new PcapJob<>();
+    Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
+    Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+    waitForJob(results);
+
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
+    Pageable<Path> resultPages = results.get();
+    Iterable<byte[]> bytes = Iterables.transform(resultPages, path -> {
+      try {
+        return HDFSUtils.readBytes(path);
+      } catch (IOException e) {
+        throw new IllegalStateException(e);
       }
-      {
-        //test with query filter ip_dst_port > 55790
-        PcapOptions.FILTER_IMPL.put(configuration, new 
QueryPcapFilter.Configurator());
-        PcapOptions.FIELDS.put(configuration, "ip_dst_port > 55790");
-        PcapJob<String> job = new PcapJob<>();
-        Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
-        Assert.assertEquals(Statusable.JobType.MAP_REDUCE, 
results.getJobType());
-        waitForJob(results);
-
-        Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
-        Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
-          try {
-            return HDFSUtils.readBytes(path);
-          } catch (IOException e) {
-            throw new IllegalStateException(e);
-          }
-        });
-        assertInOrder(bytes);
-        Assert.assertEquals(Iterables.size(bytes)
-                , Iterables.size(filterPcaps(pcapEntries, new 
Predicate<JSONObject>() {
-                          @Override
-                          public boolean apply(@Nullable JSONObject input) {
-                            Object prt = 
input.get(Constants.Fields.DST_PORT.getName());
-                            return prt != null && (Long) prt > 55790;
-                          }
-                        }, withHeaders)
-                )
-        );
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0)));
-        Assert.assertTrue(baos.toByteArray().length > 0);
+    });
+    assertInOrder(bytes);
+    Assert.assertEquals(Iterables.size(filterPcaps(pcapEntries, new 
Predicate<JSONObject>() {
+              @Override
+              public boolean apply(@Nullable JSONObject input) {
+                Object prt = input.get(Constants.Fields.DST_PORT.getName());
+                return prt != null && prt.toString().equals("22");
+              }
+            }, withHeaders)
+        ), resultPages.getSize()
+    );
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PcapMerger.merge(baos, HDFSUtils.readBytes(resultPages.getPage(0)));
+    Assert.assertTrue(baos.toByteArray().length > 0);
+  }
+
+  @Test
+  public void filters_results_by_dst_port_range_with_query_filter() throws 
Exception {
+    PcapOptions.FILTER_IMPL.put(configuration, new 
QueryPcapFilter.Configurator());
+    PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries));
+    PcapOptions.END_TIME_NS
+        .put(configuration, getTimestamp(pcapEntries.size() - 1, pcapEntries) 
+ 1);
+    PcapOptions.FIELDS.put(configuration, "ip_dst_port > 20 and ip_dst_port < 
55792");
+    PcapJob<String> job = new PcapJob<>();
+    Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
+    Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+    waitForJob(results);
+
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
+    Pageable<Path> resultPages = results.get();
+    Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+      try {
+        return HDFSUtils.readBytes(path);
+      } catch (IOException e) {
+        throw new IllegalStateException(e);
       }
-      {
-        //test with query filter and byte array matching
-        PcapOptions.FILTER_IMPL.put(configuration, new 
QueryPcapFilter.Configurator());
-        PcapOptions.FIELDS.put(configuration, 
"BYTEARRAY_MATCHER('2f56abd814bc56420489ca38e7faf8cec3d4', packet)");
-        PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, 
pcapEntries));
-        PcapOptions.END_TIME_NS.put(configuration, 
getTimestamp(pcapEntries.size()-1, pcapEntries) + 1);
-        PcapJob<String> job = new PcapJob<>();
-        Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
-        Assert.assertEquals(Statusable.JobType.MAP_REDUCE, 
results.getJobType());
-        waitForJob(results);
-
-        Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
-        Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
-          try {
-            return HDFSUtils.readBytes(path);
-          } catch (IOException e) {
-            throw new IllegalStateException(e);
-          }
-        });
-        assertInOrder(bytes);
-        Assert.assertEquals(1, results.get().getSize());
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0)));
-        Assert.assertTrue(baos.toByteArray().length > 0);
+    });
+    assertInOrder(bytes);
+    Assert.assertEquals(Iterables.size(filterPcaps(pcapEntries, new 
Predicate<JSONObject>() {
+              @Override
+              public boolean apply(@Nullable JSONObject input) {
+                Object prt = input.get(Constants.Fields.DST_PORT.getName());
+                return prt != null && ((Long) prt > 20 && (Long) prt < 55792);
+              }
+            }, withHeaders)
+        ), resultPages.getSize()
+    );
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PcapMerger.merge(baos, HDFSUtils.readBytes(resultPages.getPage(0)));
+    Assert.assertTrue(baos.toByteArray().length > 0);
+  }
+
+  @Test
+  public void 
filters_results_by_dst_port_greater_than_value_with_query_filter() throws 
Exception {
+    PcapOptions.FILTER_IMPL.put(configuration, new 
QueryPcapFilter.Configurator());
+    PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries));
+    PcapOptions.END_TIME_NS
+        .put(configuration, getTimestamp(pcapEntries.size() - 1, pcapEntries) 
+ 1);
+    PcapOptions.FIELDS.put(configuration, "ip_dst_port > 55790");
+    PcapJob<String> job = new PcapJob<>();
+    Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
+    Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+    waitForJob(results);
+
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
+    Pageable<Path> resultPages = results.get();
+    Iterable<byte[]> bytes = Iterables.transform(resultPages, path -> {
+      try {
+        return HDFSUtils.readBytes(path);
+      } catch (IOException e) {
+        throw new IllegalStateException(e);
       }
+    });
+    assertInOrder(bytes);
+    Assert.assertEquals(Iterables.size(filterPcaps(pcapEntries, new 
Predicate<JSONObject>() {
+              @Override
+              public boolean apply(@Nullable JSONObject input) {
+                Object prt = input.get(Constants.Fields.DST_PORT.getName());
+                return prt != null && (Long) prt > 55790;
+              }
+            }, withHeaders)
+        ), resultPages.getSize()
+    );
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PcapMerger.merge(baos, HDFSUtils.readBytes(resultPages.getPage(0)));
+    Assert.assertTrue(baos.toByteArray().length > 0);
+  }
 
-      System.out.println("Ended");
-    } finally {
-      runner.stop();
-      clearOutDirs(inputDir, interimResultDir, outputDir);
-    }
+  @Test
+  public void filters_results_by_BYTEARRAY_MATCHER_with_query_filter() throws 
Exception {
+    PcapOptions.FILTER_IMPL.put(configuration, new 
QueryPcapFilter.Configurator());
+    PcapOptions.FIELDS.put(configuration, 
"BYTEARRAY_MATCHER('2f56abd814bc56420489ca38e7faf8cec3d4', packet)");
+    PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries));
+    PcapOptions.END_TIME_NS
+        .put(configuration, getTimestamp(pcapEntries.size() - 1, pcapEntries) 
+ 1);
+    PcapJob<String> job = new PcapJob<>();
+    Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
+    Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+    waitForJob(results);
+
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
+    Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+      try {
+        return HDFSUtils.readBytes(path);
+      } catch (IOException e) {
+        throw new IllegalStateException(e);
+      }
+    });
+    assertInOrder(bytes);
+    Assert.assertEquals(1, results.get().getSize());
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0)));
+    Assert.assertTrue(baos.toByteArray().length > 0);
   }
 
   private void waitForJob(Statusable statusable) throws Exception {
@@ -628,14 +681,6 @@ public class PcapTopologyIntegrationTest extends 
BaseIntegrationTest {
     throw new Exception("Job did not complete within " + (MAX_RETRIES * 
SLEEP_MS) + " seconds");
   }
 
-  private File getDir(String targetDir, String childDir) {
-    File directory = new File(new File(targetDir), childDir);
-    if (!directory.exists()) {
-      directory.mkdirs();
-    }
-    return directory;
-  }
-
   private static Iterable<Map.Entry<byte[], byte[]>> readPcaps(Path pcapFile, 
boolean withHeaders) throws IOException {
     SequenceFile.Reader reader = new SequenceFile.Reader(new Configuration(),
         SequenceFile.Reader.file(pcapFile)
@@ -655,28 +700,27 @@ public class PcapTopologyIntegrationTest extends 
BaseIntegrationTest {
       long calculatedTs = PcapHelper.getTimestamp(pcapWithHeader);
       {
         List<PacketInfo> info = PcapHelper.toPacketInfo(pcapWithHeader);
-        for(PacketInfo pi : info) {
+        for (PacketInfo pi : info) {
           Assert.assertEquals(calculatedTs, pi.getPacketTimeInNanos());
           //IF you are debugging and want to see the packets, uncomment the 
following.
           //System.out.println( Long.toUnsignedString(calculatedTs) + " => " + 
pi.getJsonDoc());
         }
       }
-      if(withHeaders) {
+      if (withHeaders) {
         ret.add(new 
AbstractMap.SimpleImmutableEntry<>(Bytes.toBytes(calculatedTs), 
pcapWithHeader));
-      }
-      else {
+      } else {
         byte[] pcapRaw = new byte[pcapWithHeader.length - 
PcapHelper.GLOBAL_HEADER_SIZE - PcapHelper.PACKET_HEADER_SIZE];
         System.arraycopy(pcapWithHeader, PcapHelper.GLOBAL_HEADER_SIZE + 
PcapHelper.PACKET_HEADER_SIZE, pcapRaw, 0, pcapRaw.length);
         ret.add(new 
AbstractMap.SimpleImmutableEntry<>(Bytes.toBytes(calculatedTs), pcapRaw));
       }
     }
-    return Iterables.limit(ret, 2*(ret.size()/2));
+    return Iterables.limit(ret, 2 * (ret.size() / 2));
   }
 
   public static void assertInOrder(Iterable<byte[]> packets) {
     long previous = 0;
-    for(byte[] packet : packets) {
-      for(JSONObject json : TO_JSONS.apply(packet)) {
+    for (byte[] packet : packets) {
+      for (JSONObject json : TO_JSONS.apply(packet)) {
         Long current = Long.parseLong(json.get("ts_micro").toString());
         Assert.assertNotNull(current);
         Assert.assertTrue(Long.compareUnsigned(current, previous) >= 0);

http://git-wip-us.apache.org/repos/asf/metron/blob/7967f358/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java
 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java
index c912e58..e4e9b95 100644
--- 
a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java
+++ 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java
@@ -37,7 +37,7 @@ public class PcapCliFinalizer extends PcapFinalizer {
    * as a formatted timestamp + uuid. A final sample format will look as 
follows:
    * 
/base/output/path/pcap-data-201807181911-09855b4ae3204dee8b63760d65198da3+0001.pcap
    */
-  private static final String PCAP_CLI_FILENAME_FORMAT = 
"pcap-data-%s+%04d.pcap";
+  private static final String PCAP_CLI_FILENAME_FORMAT = 
"%s/pcap-data-%s+%04d.pcap";
 
   @Override
   protected void write(PcapResultsWriter resultsWriter, Configuration 
hadoopConfig,
@@ -47,8 +47,9 @@ public class PcapCliFinalizer extends PcapFinalizer {
 
   @Override
   protected Path getOutputPath(Map<String, Object> config, int partition) {
+    Path finalOutputPath = PcapOptions.FINAL_OUTPUT_PATH.get(config, 
PcapOptions.STRING_TO_PATH, Path.class);
     String prefix = PcapOptions.FINAL_FILENAME_PREFIX.get(config, 
String.class);
-    return new Path(String.format(PCAP_CLI_FILENAME_FORMAT, prefix, 
partition));
+    return new Path(String.format(PCAP_CLI_FILENAME_FORMAT, finalOutputPath, 
prefix, partition));
   }
 
 }

Reply via email to