METRON-1731: PCAP - Escape colons in output dir names (mmiklavc via mmiklavc) 
closes apache/metron#1155


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/73dc63e6
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/73dc63e6
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/73dc63e6

Branch: refs/heads/feature/METRON-1699-create-batch-profiler
Commit: 73dc63e671b55d22d251f4be1c217259f4f5dc71
Parents: 05316a4
Author: mmiklavc <michael.miklav...@gmail.com>
Authored: Fri Aug 10 12:42:47 2018 -0600
Committer: Michael Miklavcic <michael.miklav...@gmail.com>
Committed: Fri Aug 10 12:42:47 2018 -0600

----------------------------------------------------------------------
 .../apache/metron/pcap/FixedPcapFilterTest.java | 286 ------------------
 .../org/apache/metron/pcap/PcapJobTest.java     | 290 -------------------
 .../apache/metron/pcap/QueryPcapFilterTest.java | 228 ---------------
 .../pcap/filter/fixed/FixedPcapFilter.java      |  14 +-
 .../pcap/filter/query/QueryPcapFilter.java      |  17 +-
 .../metron/pcap/mr/OutputDirFormatter.java      |  37 +++
 .../java/org/apache/metron/pcap/mr/PcapJob.java |   5 +-
 .../pcap/filter/fixed/FixedPcapFilterTest.java  | 271 ++++++++++++++++-
 .../pcap/filter/query/QueryPcapFilterTest.java  | 207 ++++++++++++-
 .../metron/pcap/mr/OutputDirFormatterTest.java  |  62 ++++
 .../org/apache/metron/pcap/mr/PcapJobTest.java  | 290 +++++++++++++++++++
 11 files changed, 877 insertions(+), 830 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java
 
b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java
deleted file mode 100644
index 84969d3..0000000
--- 
a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.pcap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.metron.common.Constants;
-import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class FixedPcapFilterTest {
-  @Test
-  public void testTrivialEquality() throws Exception {
-    Configuration config = new Configuration();
-    final Map<String, String> fields = new HashMap<String, String>() {{
-      put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
-      put(Constants.Fields.SRC_PORT.getName(), "0");
-      put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
-      put(Constants.Fields.DST_PORT.getName(), "1");
-      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
-    }};
-    new FixedPcapFilter.Configurator().addToConfig(fields, config);
-    {
-      FixedPcapFilter filter = new FixedPcapFilter() {
-        @Override
-        protected Map<String, Object> packetToFields(PacketInfo pi) {
-          return new HashMap<String, Object>() {{
-            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
-            put(Constants.Fields.SRC_PORT.getName(), 0);
-            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
-            put(Constants.Fields.DST_PORT.getName(), 1);
-          }};
-        }
-      };
-      filter.configure(config);
-      Assert.assertTrue(filter.test(null));
-    }
-  }
-
-  @Test
-  public void testReverseTraffic() throws Exception {
-    Configuration config = new Configuration();
-    final Map<String, String> fields = new HashMap<String, String>() {{
-      put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
-      put(Constants.Fields.SRC_PORT.getName(), "0");
-      put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
-      put(Constants.Fields.DST_PORT.getName(), "1");
-      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "true");
-    }};
-    new FixedPcapFilter.Configurator().addToConfig(fields, config);
-    {
-      FixedPcapFilter filter = new FixedPcapFilter() {
-        @Override
-        protected Map<String, Object> packetToFields(PacketInfo pi) {
-          return new HashMap<String, Object>() {{
-            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
-            put(Constants.Fields.SRC_PORT.getName(), 0);
-            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
-            put(Constants.Fields.DST_PORT.getName(), 1);
-          }};
-        }
-      };
-      filter.configure(config);
-      Assert.assertTrue(filter.test(null));
-    }
-    new FixedPcapFilter.Configurator().addToConfig(fields, config);
-    {
-      FixedPcapFilter filter = new FixedPcapFilter() {
-        @Override
-        protected Map<String, Object> packetToFields(PacketInfo pi) {
-          return new HashMap<String, Object>() {{
-            put(Constants.Fields.SRC_ADDR.getName(), "dst_ip");
-            put(Constants.Fields.SRC_PORT.getName(), 1);
-            put(Constants.Fields.DST_ADDR.getName(), "src_ip");
-            put(Constants.Fields.DST_PORT.getName(), 0);
-          }};
-        }
-      };
-      filter.configure(config);
-      Assert.assertTrue(filter.test(null));
-    }
-    new FixedPcapFilter.Configurator().addToConfig(fields, config);
-    {
-      FixedPcapFilter filter = new FixedPcapFilter() {
-        @Override
-        protected Map<String, Object> packetToFields(PacketInfo pi) {
-          return new HashMap<String, Object>() {{
-            put(Constants.Fields.SRC_ADDR.getName(), "dst_ip");
-            put(Constants.Fields.SRC_PORT.getName(), 0);
-            put(Constants.Fields.DST_ADDR.getName(), "src_ip");
-            put(Constants.Fields.DST_PORT.getName(), 1);
-          }};
-        }
-      };
-      filter.configure(config);
-      Assert.assertFalse(filter.test(null));
-    }
-  }
-@Test
-public void testMissingDstAddr() throws Exception {
-  Configuration config = new Configuration();
-  final HashMap<String, String> fields = new HashMap<String, String>() {{
-    put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
-    put(Constants.Fields.SRC_PORT.getName(), "0");
-    put(Constants.Fields.DST_PORT.getName(), "1");
-    put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
-  }};
-  new FixedPcapFilter.Configurator().addToConfig(fields, config);
-  {
-    FixedPcapFilter filter = new FixedPcapFilter() {
-      @Override
-      protected HashMap<String, Object> packetToFields(PacketInfo pi) {
-        return new HashMap<String, Object>() {{
-          put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
-          put(Constants.Fields.SRC_PORT.getName(), 0);
-          put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
-          put(Constants.Fields.DST_PORT.getName(), 1);
-        }};
-      }
-    };
-    filter.configure(config);
-    Assert.assertTrue(filter.test(null));
-  }
-  new FixedPcapFilter.Configurator().addToConfig(fields, config);
-  {
-    FixedPcapFilter filter = new FixedPcapFilter() {
-      @Override
-      protected HashMap<String, Object> packetToFields(PacketInfo pi) {
-        return new HashMap<String, Object>() {{
-          put(Constants.Fields.SRC_ADDR.getName(), "src_ip1");
-          put(Constants.Fields.SRC_PORT.getName(), 0);
-          put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
-          put(Constants.Fields.DST_PORT.getName(), 1);
-        }};
-      }
-    };
-    filter.configure(config);
-    Assert.assertFalse(filter.test(null));
-  }
-}
-  @Test
-  public void testMissingDstPort() throws Exception {
-    Configuration config = new Configuration();
-    final HashMap<String, String> fields = new HashMap<String, String>() {{
-      put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
-      put(Constants.Fields.SRC_PORT.getName(), "0");
-      put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
-      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
-    }};
-    new FixedPcapFilter.Configurator().addToConfig(fields, config);
-    {
-      FixedPcapFilter filter = new FixedPcapFilter() {
-        @Override
-        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
-          return new HashMap<String, Object>() {{
-            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
-            put(Constants.Fields.SRC_PORT.getName(), 0);
-            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
-            put(Constants.Fields.DST_PORT.getName(), 1);
-          }};
-        }
-      };
-      filter.configure(config);
-      Assert.assertTrue(filter.test(null));
-    }
-    new FixedPcapFilter.Configurator().addToConfig(fields, config);
-    {
-      FixedPcapFilter filter = new FixedPcapFilter() {
-        @Override
-        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
-          return new HashMap<String, Object>() {{
-            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
-            put(Constants.Fields.SRC_PORT.getName(), 0);
-            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
-            put(Constants.Fields.DST_PORT.getName(), 100);
-          }};
-        }
-      };
-      filter.configure(config);
-      Assert.assertTrue(filter.test(null));
-    }
-    new FixedPcapFilter.Configurator().addToConfig(fields, config);
-    {
-      FixedPcapFilter filter = new FixedPcapFilter() {
-        @Override
-        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
-          return new HashMap<String, Object>() {{
-            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
-            put(Constants.Fields.SRC_PORT.getName(), 100);
-            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
-            put(Constants.Fields.DST_PORT.getName(), 100);
-          }};
-        }
-      };
-      filter.configure(config);
-      Assert.assertFalse(filter.test(null));
-    }
-  }
-  @Test
-  public void testMissingSrcAddr() throws Exception {
-    Configuration config = new Configuration();
-    final HashMap<String, String> fields = new HashMap<String, String>() {{
-      put(Constants.Fields.SRC_PORT.getName(), "0");
-      put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
-      put(Constants.Fields.DST_PORT.getName(), "1");
-      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
-    }};
-    new FixedPcapFilter.Configurator().addToConfig(fields, config);
-    {
-      FixedPcapFilter filter = new FixedPcapFilter() {
-        @Override
-        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
-          return new HashMap<String, Object>() {{
-            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
-            put(Constants.Fields.SRC_PORT.getName(), 0);
-            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
-            put(Constants.Fields.DST_PORT.getName(), 1);
-          }};
-        }
-      };
-      filter.configure(config);
-      Assert.assertTrue(filter.test(null));
-    }
-  }
-  @Test
-  public void testMissingSrcPort() throws Exception {
-    Configuration config = new Configuration();
-    final HashMap<String, String> fields = new HashMap<String, String>() {{
-      put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
-      put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
-      put(Constants.Fields.DST_PORT.getName(), "1");
-      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
-    }};
-    new FixedPcapFilter.Configurator().addToConfig(fields, config);
-    {
-      FixedPcapFilter filter = new FixedPcapFilter() {
-        @Override
-        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
-          return new HashMap<String, Object>() {{
-            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
-            put(Constants.Fields.SRC_PORT.getName(), 0);
-            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
-            put(Constants.Fields.DST_PORT.getName(), 1);
-          }};
-        }
-      };
-      filter.configure(config);
-      Assert.assertTrue(filter.test(null));
-    }
-    new FixedPcapFilter.Configurator().addToConfig(fields, config);
-    {
-      FixedPcapFilter filter = new FixedPcapFilter() {
-        @Override
-        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
-          return new HashMap<String, Object>() {{
-            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
-            put(Constants.Fields.SRC_PORT.getName(), 100);
-            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
-            put(Constants.Fields.DST_PORT.getName(), 1);
-          }};
-        }
-      };
-      filter.configure(config);
-      Assert.assertTrue(filter.test(null));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
 
b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
deleted file mode 100644
index 796c8a5..0000000
--- 
a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.pcap;
-
-import static java.lang.Long.toUnsignedString;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Timer;
-import java.util.TimerTask;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.metron.common.utils.timestamp.TimestampConverters;
-import org.apache.metron.job.Finalizer;
-import org.apache.metron.job.JobStatus;
-import org.apache.metron.job.JobStatus.State;
-import org.apache.metron.job.Pageable;
-import org.apache.metron.job.Statusable;
-import org.apache.metron.pcap.config.FixedPcapConfig;
-import org.apache.metron.pcap.config.PcapOptions;
-import org.apache.metron.pcap.filter.PcapFilterConfigurator;
-import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
-import org.apache.metron.pcap.mr.PcapJob;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-public class PcapJobTest {
-
-  @Mock
-  private Job mrJob;
-  @Mock
-  private org.apache.hadoop.mapreduce.JobStatus mrStatus;
-  @Mock
-  private JobID jobId;
-  @Mock
-  private Finalizer<Path> finalizer;
-  private TestTimer timer;
-  private Pageable<Path> pageableResult;
-  private FixedPcapConfig config;
-  private Configuration hadoopConfig;
-  private FileSystem fileSystem;
-  private String jobIdVal = "job_abc_123";
-  private Path basePath;
-  private Path baseOutPath;
-  private long startTime;
-  private long endTime;
-  private int numReducers;
-  private int numRecordsPerFile;
-  private Path finalOutputPath;
-  private Map<String, String> fixedFields;
-  private PcapJob<Map<String, String>> testJob;
-
-  @Before
-  public void setup() throws IOException {
-    MockitoAnnotations.initMocks(this);
-    basePath = new Path("basepath");
-    baseOutPath = new Path("outpath");
-    startTime = 100;
-    endTime = 200;
-    numReducers = 5;
-    numRecordsPerFile = 5;
-    fixedFields = new HashMap<>();
-    fixedFields.put("ip_src_addr", "192.168.1.1");
-    hadoopConfig = new Configuration();
-    fileSystem = FileSystem.get(hadoopConfig);
-    finalOutputPath = new Path("finaloutpath");
-    when(jobId.toString()).thenReturn(jobIdVal);
-    when(mrStatus.getJobID()).thenReturn(jobId);
-    when(mrJob.getJobID()).thenReturn(jobId);
-    pageableResult = new PcapPages();
-    timer = new TestTimer();
-    // handles setting the file name prefix under the hood
-    config = new FixedPcapConfig(clock -> "clockprefix");
-    PcapOptions.HADOOP_CONF.put(config, hadoopConfig);
-    PcapOptions.FILESYSTEM.put(config, FileSystem.get(hadoopConfig));
-    PcapOptions.BASE_PATH.put(config, basePath);
-    PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, baseOutPath);
-    PcapOptions.START_TIME_NS.put(config, startTime);
-    PcapOptions.END_TIME_NS.put(config, endTime);
-    PcapOptions.NUM_REDUCERS.put(config, numReducers);
-    PcapOptions.FIELDS.put(config, fixedFields);
-    PcapOptions.FILTER_IMPL.put(config, new FixedPcapFilter.Configurator());
-    PcapOptions.NUM_RECORDS_PER_FILE.put(config, numRecordsPerFile);
-    PcapOptions.FINAL_OUTPUT_PATH.put(config, finalOutputPath);
-    testJob = new TestJob<>(mrJob);
-    testJob.setStatusInterval(1);
-    testJob.setCompleteCheckInterval(1);
-    testJob.setTimer(timer);
-  }
-
-  private class TestJob<T> extends PcapJob<T> {
-
-    private final Job mrJob;
-
-    public TestJob(Job mrJob) {
-      this.mrJob = mrJob;
-    }
-
-    @Override
-    public Job createJob(Optional<String> jobName,
-        Path basePath,
-        Path outputPath,
-        long beginNS,
-        long endNS,
-        int numReducers,
-        T fields,
-        Configuration conf,
-        FileSystem fs,
-        PcapFilterConfigurator<T> filterImpl) throws IOException {
-      return mrJob;
-    }
-  }
-
-  private class TestTimer extends Timer {
-
-    private TimerTask task;
-
-    @Override
-    public void scheduleAtFixedRate(TimerTask task, long delay, long period) {
-      this.task = task;
-    }
-
-    public void updateJobStatus() {
-      task.run();
-    }
-
-  }
-
-  @Test
-  public void partition_gives_value_in_range() throws Exception {
-    long start = 1473897600000000000L;
-    long end = TimestampConverters.MILLISECONDS.toNanoseconds(1473995927455L);
-    Configuration conf = new Configuration();
-    conf.set(PcapJob.START_TS_CONF, toUnsignedString(start));
-    conf.set(PcapJob.END_TS_CONF, toUnsignedString(end));
-    conf.set(PcapJob.WIDTH_CONF, "" + PcapJob.findWidth(start, end, 10));
-    PcapJob.PcapPartitioner partitioner = new PcapJob.PcapPartitioner();
-    partitioner.setConf(conf);
-    Assert.assertThat("Partition not in range",
-        partitioner.getPartition(new LongWritable(1473978789181189000L), new 
BytesWritable(), 10),
-        equalTo(8));
-  }
-
-  @Test
-  public void job_succeeds_synchronously() throws Exception {
-    pageableResult = new PcapPages(
-        Arrays.asList(new Path("1.txt"), new Path("2.txt"), new 
Path("3.txt")));
-    when(finalizer.finalizeJob(any())).thenReturn(pageableResult);
-    when(mrJob.isComplete()).thenReturn(true);
-    
when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED);
-    when(mrJob.getStatus()).thenReturn(mrStatus);
-    Statusable<Path> statusable = testJob.submit(finalizer, config);
-    timer.updateJobStatus();
-    Pageable<Path> results = statusable.get();
-    Assert.assertThat(results.getSize(), equalTo(3));
-    JobStatus status = statusable.getStatus();
-    Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED));
-    Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
-    Assert.assertThat(status.getJobId(), equalTo(jobIdVal));
-  }
-
-  @Test
-  public void job_fails_synchronously() throws Exception {
-    when(mrJob.isComplete()).thenReturn(true);
-    
when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
-    when(mrJob.getStatus()).thenReturn(mrStatus);
-    Statusable<Path> statusable = testJob.submit(finalizer, config);
-    timer.updateJobStatus();
-    Pageable<Path> results = statusable.get();
-    JobStatus status = statusable.getStatus();
-    Assert.assertThat(status.getState(), equalTo(State.FAILED));
-    Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
-    Assert.assertThat(results.getSize(), equalTo(0));
-  }
-
-  @Test
-  public void job_fails_with_killed_status_synchronously() throws Exception {
-    when(mrJob.isComplete()).thenReturn(true);
-    
when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
-    when(mrJob.getStatus()).thenReturn(mrStatus);
-    Statusable<Path> statusable = testJob.submit(finalizer, config);
-    timer.updateJobStatus();
-    Pageable<Path> results = statusable.get();
-    JobStatus status = statusable.getStatus();
-    Assert.assertThat(status.getState(), equalTo(State.KILLED));
-    Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
-    Assert.assertThat(results.getSize(), equalTo(0));
-  }
-
-  @Test
-  public void job_succeeds_asynchronously() throws Exception {
-    when(mrJob.isComplete()).thenReturn(true);
-    
when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED);
-    when(mrJob.getStatus()).thenReturn(mrStatus);
-    Statusable<Path> statusable = testJob.submit(finalizer, config);
-    timer.updateJobStatus();
-    JobStatus status = statusable.getStatus();
-    Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED));
-    Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
-  }
-
-  @Test
-  public void job_reports_percent_complete() throws Exception {
-    when(mrJob.isComplete()).thenReturn(false);
-    
when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.RUNNING);
-    when(mrJob.getStatus()).thenReturn(mrStatus);
-    when(mrJob.mapProgress()).thenReturn(0.5f);
-    when(mrJob.reduceProgress()).thenReturn(0f);
-    Statusable<Path> statusable = testJob.submit(finalizer, config);
-    timer.updateJobStatus();
-    JobStatus status = statusable.getStatus();
-    Assert.assertThat(status.getState(), equalTo(State.RUNNING));
-    Assert.assertThat(status.getDescription(), equalTo("map: 50.0%, reduce: 
0.0%"));
-    Assert.assertThat(status.getPercentComplete(), equalTo(25.0));
-    when(mrJob.mapProgress()).thenReturn(1.0f);
-    when(mrJob.reduceProgress()).thenReturn(0.5f);
-    timer.updateJobStatus();
-    status = statusable.getStatus();
-    Assert.assertThat(status.getDescription(), equalTo("map: 100.0%, reduce: 
50.0%"));
-    Assert.assertThat(status.getPercentComplete(), equalTo(75.0));
-  }
-
-  @Test
-  public void killing_job_causes_status_to_return_KILLED_state() throws 
Exception {
-    when(mrJob.isComplete()).thenReturn(false);
-    
when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.RUNNING);
-    when(mrJob.getStatus()).thenReturn(mrStatus);
-    Statusable<Path> statusable = testJob.submit(finalizer, config);
-    statusable.kill();
-    when(mrJob.isComplete()).thenReturn(true);
-    
when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
-    timer.updateJobStatus();
-    JobStatus status = statusable.getStatus();
-    Assert.assertThat(status.getState(), equalTo(State.KILLED));
-  }
-
-  @Test
-  public void handles_null_values_with_defaults() throws Exception {
-    PcapOptions.START_TIME_NS.put(config, null);
-    PcapOptions.END_TIME_NS.put(config, null);
-    PcapOptions.NUM_REDUCERS.put(config, null);
-    PcapOptions.NUM_RECORDS_PER_FILE.put(config, null);
-
-    pageableResult = new PcapPages(
-        Arrays.asList(new Path("1.txt"), new Path("2.txt"), new 
Path("3.txt")));
-    when(finalizer.finalizeJob(any())).thenReturn(pageableResult);
-    when(mrJob.isComplete()).thenReturn(true);
-    
when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED);
-    when(mrJob.getStatus()).thenReturn(mrStatus);
-    Statusable<Path> statusable = testJob.submit(finalizer, config);
-    timer.updateJobStatus();
-    Pageable<Path> results = statusable.get();
-    Assert.assertThat(results.getSize(), equalTo(3));
-    JobStatus status = statusable.getStatus();
-    Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED));
-    Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
-    Assert.assertThat(status.getJobId(), equalTo(jobIdVal));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java
 
b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java
deleted file mode 100644
index 7e3d55c..0000000
--- 
a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.pcap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.metron.common.Constants;
-import org.apache.metron.pcap.filter.PcapFilter;
-import org.apache.metron.pcap.filter.query.QueryPcapFilter;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.EnumMap;
-import java.util.HashMap;
-
-public class QueryPcapFilterTest {
-
-  @Test
-  public void testEmptyQueryFilter() throws Exception {
-    Configuration config = new Configuration();
-    String query = "";
-    new QueryPcapFilter.Configurator().addToConfig(query, config);
-    {
-      PcapFilter filter = new QueryPcapFilter() {
-        @Override
-        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
-          return new HashMap<String, Object>() {{
-            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
-            put(Constants.Fields.SRC_PORT.getName(), 0);
-            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
-            put(Constants.Fields.DST_PORT.getName(), 1);
-          }};
-        }
-      };
-      filter.configure(config);
-      Assert.assertTrue(filter.test(null));
-    }
-  }
-
-  @Test
-  public void testTrivialEquality() throws Exception {
-    Configuration config = new Configuration();
-    String query = "ip_src_addr == 'src_ip' and ip_src_port == 0 and 
ip_dst_addr == 'dst_ip' and ip_dst_port == 1";
-    new QueryPcapFilter.Configurator().addToConfig(query, config);
-    {
-      PcapFilter filter = new QueryPcapFilter() {
-        @Override
-        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
-          return new HashMap<String, Object>() {{
-            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
-            put(Constants.Fields.SRC_PORT.getName(), 0);
-            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
-            put(Constants.Fields.DST_PORT.getName(), 1);
-          }};
-        }
-      };
-      filter.configure(config);
-      Assert.assertTrue(filter.test(null));
-    }
-  }
-
-  @Test
-  public void testMissingDstAddr() throws Exception {
-    Configuration config = new Configuration();
-    String query = "ip_src_addr == 'src_ip' and ip_src_port == 0 and 
ip_dst_port == 1";
-    new QueryPcapFilter.Configurator().addToConfig(query, config);
-    {
-      QueryPcapFilter filter = new QueryPcapFilter() {
-        @Override
-        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
-          return new HashMap<String, Object>() {{
-            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
-            put(Constants.Fields.SRC_PORT.getName(), 0);
-            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
-            put(Constants.Fields.DST_PORT.getName(), 1);
-          }};
-        }
-      };
-      filter.configure(config);
-      Assert.assertTrue(filter.test(null));
-    }
-    new QueryPcapFilter.Configurator().addToConfig(query, config);
-    {
-      QueryPcapFilter filter = new QueryPcapFilter() {
-        @Override
-        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
-          return new HashMap<String, Object>() {{
-            put(Constants.Fields.SRC_ADDR.getName(), "src_ip_no_match");
-            put(Constants.Fields.SRC_PORT.getName(), 0);
-            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
-            put(Constants.Fields.DST_PORT.getName(), 1);
-          }};
-        }
-      };
-      filter.configure(config);
-      Assert.assertFalse(filter.test(null));
-    }
-  }
-
-  @Test
-  public void testMissingDstPort() throws Exception {
-    Configuration config = new Configuration();
-    String query = "ip_src_addr == 'src_ip' and ip_src_port == 0 and 
ip_dst_addr == 'dst_ip'";
-    new QueryPcapFilter.Configurator().addToConfig(query, config);
-    {
-      QueryPcapFilter filter = new QueryPcapFilter() {
-        @Override
-        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
-          return new HashMap<String, Object>() {{
-            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
-            put(Constants.Fields.SRC_PORT.getName(), 0);
-            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
-            put(Constants.Fields.DST_PORT.getName(), 1);
-          }};
-        }
-      };
-      filter.configure(config);
-      Assert.assertTrue(filter.test(null));
-    }
-    new QueryPcapFilter.Configurator().addToConfig(query, config);
-    {
-      QueryPcapFilter filter = new QueryPcapFilter() {
-        @Override
-        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
-          return new HashMap<String, Object>() {{
-            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
-            put(Constants.Fields.SRC_PORT.getName(), 0);
-            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
-            put(Constants.Fields.DST_PORT.getName(), 100);
-          }};
-        }
-      };
-      filter.configure(config);
-      Assert.assertTrue(filter.test(null));
-    }
-    new QueryPcapFilter.Configurator().addToConfig(query, config);
-    {
-      QueryPcapFilter filter = new QueryPcapFilter() {
-        @Override
-        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
-          return new HashMap<String, Object>() {{
-            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
-            put(Constants.Fields.SRC_PORT.getName(), 100);
-            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
-            put(Constants.Fields.DST_PORT.getName(), 100);
-          }};
-        }
-      };
-      filter.configure(config);
-      Assert.assertFalse(filter.test(null));
-    }
-  }
-
-  @Test
-  public void testMissingSrcAddr() throws Exception {
-    Configuration config = new Configuration();
-    String query = "ip_src_port == 0 and ip_dst_addr == 'dst_ip' and 
ip_dst_port == 1";
-    new QueryPcapFilter.Configurator().addToConfig(query, config);
-    {
-      QueryPcapFilter filter = new QueryPcapFilter() {
-        @Override
-        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
-          return new HashMap<String, Object>() {{
-            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
-            put(Constants.Fields.SRC_PORT.getName(), 0);
-            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
-            put(Constants.Fields.DST_PORT.getName(), 1);
-          }};
-        }
-      };
-      filter.configure(config);
-      Assert.assertTrue(filter.test(null));
-    }
-  }
-
-  @Test
-  public void testMissingSrcPort() throws Exception {
-    Configuration config = new Configuration();
-    String query = "ip_src_addr == 'src_ip' and ip_dst_addr == 'dst_ip' and 
ip_dst_port == 1";
-    new QueryPcapFilter.Configurator().addToConfig(query, config);
-    {
-      QueryPcapFilter filter = new QueryPcapFilter() {
-        @Override
-        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
-          return new HashMap<String, Object>() {{
-            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
-            put(Constants.Fields.SRC_PORT.getName(), 0);
-            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
-            put(Constants.Fields.DST_PORT.getName(), 1);
-          }};
-        }
-      };
-      filter.configure(config);
-      Assert.assertTrue(filter.test(null));
-    }
-    new QueryPcapFilter.Configurator().addToConfig(query, config);
-    {
-      QueryPcapFilter filter = new QueryPcapFilter() {
-        @Override
-        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
-          return new HashMap<String, Object>() {{
-            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
-            put(Constants.Fields.SRC_PORT.getName(), 100);
-            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
-            put(Constants.Fields.DST_PORT.getName(), 1);
-          }};
-        }
-      };
-      filter.configure(config);
-      Assert.assertTrue(filter.test(null));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java
 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java
index 1954f1a..314bd85 100644
--- 
a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java
+++ 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java
@@ -19,20 +19,19 @@
 package org.apache.metron.pcap.filter.fixed;
 
 import com.google.common.base.Joiner;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import javax.xml.bind.DatatypeConverter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.metron.common.Constants;
-import org.apache.metron.stellar.dsl.MapVariableResolver;
-import org.apache.metron.stellar.dsl.VariableResolver;
 import org.apache.metron.pcap.PacketInfo;
 import org.apache.metron.pcap.PcapHelper;
 import org.apache.metron.pcap.filter.PcapFilter;
 import org.apache.metron.pcap.filter.PcapFilterConfigurator;
 import org.apache.metron.pcap.filter.PcapFilters;
 import org.apache.metron.pcap.pattern.ByteArrayMatchingUtil;
-
-import javax.xml.bind.DatatypeConverter;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
+import org.apache.metron.stellar.dsl.MapVariableResolver;
+import org.apache.metron.stellar.dsl.VariableResolver;
 
 
 public class FixedPcapFilter implements PcapFilter {
@@ -48,7 +47,8 @@ public class FixedPcapFilter implements PcapFilter {
 
     @Override
     public String queryToString(Map<String, String> fields) {
-      return (fields == null ? "" : Joiner.on("_").join(fields.values()));
+      return (fields == null ? "" : 
Joiner.on("_").join(fields.values()).replaceAll("\\s", "_")
+      );
     }
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java
 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java
index 552a5ae..e7fff16 100644
--- 
a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java
+++ 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java
@@ -18,19 +18,18 @@
 
 package org.apache.metron.pcap.filter.query;
 
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.metron.stellar.dsl.Context;
-import org.apache.metron.stellar.dsl.MapVariableResolver;
-import org.apache.metron.stellar.dsl.StellarFunctions;
-import org.apache.metron.stellar.common.StellarPredicateProcessor;
-import org.apache.metron.stellar.dsl.VariableResolver;
 import org.apache.metron.pcap.PacketInfo;
 import org.apache.metron.pcap.PcapHelper;
 import org.apache.metron.pcap.filter.PcapFilter;
 import org.apache.metron.pcap.filter.PcapFilterConfigurator;
 import org.apache.metron.pcap.filter.PcapFilters;
-
-import java.util.Map;
+import org.apache.metron.stellar.common.StellarPredicateProcessor;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.MapVariableResolver;
+import org.apache.metron.stellar.dsl.StellarFunctions;
+import org.apache.metron.stellar.dsl.VariableResolver;
 
 public class QueryPcapFilter implements PcapFilter {
   public static final String QUERY_STR_CONFIG = "mql";
@@ -45,9 +44,7 @@ public class QueryPcapFilter implements PcapFilter {
     @Override
     public String queryToString(String fields) {
       return (fields == null ? "" :
-              fields.trim().replaceAll("\\s", "_")
-                      .replace(".", "-")
-                      .replace("'", "")
+          fields.trim().replaceAll("\\s", "_")
       );
     }
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/OutputDirFormatter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/OutputDirFormatter.java
 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/OutputDirFormatter.java
new file mode 100644
index 0000000..0d464d5
--- /dev/null
+++ 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/OutputDirFormatter.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.mr;
+
+import com.google.common.base.Joiner;
+import java.util.UUID;
+
+public class OutputDirFormatter {
+
+  public String format(long beginNS, long endNS, String query) {
+    return sanitize(Joiner.on("_").join(beginNS, endNS, query, 
UUID.randomUUID().toString()));
+  }
+
+  private String sanitize(String path) {
+    return path
+        .replace(".", "-")
+        .replace("'", "")
+        .replace(":", "");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
index 10f31b4..0f5ad4d 100644
--- 
a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
+++ 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
@@ -35,7 +35,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Timer;
 import java.util.TimerTask;
-import java.util.UUID;
 import java.util.stream.Stream;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configurable;
@@ -83,6 +82,7 @@ public class PcapJob<T> implements Statusable<Path> {
   public static final String WIDTH_CONF = "width";
   private static final long THREE_SECONDS = 3000;
   private static final long ONE_SECOND = 1000;
+  private final OutputDirFormatter outputDirFormatter;
   private volatile Job mrJob; // store a running MR job reference for async 
status check
   private volatile JobStatus jobStatus; // overall job status, including 
finalization step
   private Finalizer<Path> finalizer;
@@ -187,6 +187,7 @@ public class PcapJob<T> implements Statusable<Path> {
   public PcapJob() {
     jobStatus = new JobStatus();
     finalResults = new PcapPages();
+    outputDirFormatter = new OutputDirFormatter();
     timer = new Timer();
     statusInterval = THREE_SECONDS;
     completeCheckInterval = ONE_SECOND;
@@ -271,7 +272,7 @@ public class PcapJob<T> implements Statusable<Path> {
       FileSystem fs,
       PcapFilterConfigurator<T> filterImpl)
       throws IOException, ClassNotFoundException, InterruptedException {
-    String outputDirName = Joiner.on("_").join(beginNS, endNS, 
filterImpl.queryToString(fields), UUID.randomUUID().toString());
+    String outputDirName = outputDirFormatter.format(beginNS, endNS, 
filterImpl.queryToString(fields));
     if(LOG.isDebugEnabled()) {
       DateFormat format = 
SimpleDateFormat.getDateTimeInstance(SimpleDateFormat.LONG
           , SimpleDateFormat.LONG

http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilterTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilterTest.java
 
b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilterTest.java
index af2afd3..b32f23f 100644
--- 
a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilterTest.java
+++ 
b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilterTest.java
@@ -18,14 +18,17 @@
 
 package org.apache.metron.pcap.filter.fixed;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.metron.common.Constants;
+import org.apache.metron.pcap.PacketInfo;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.LinkedHashMap;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-
 public class FixedPcapFilterTest {
 
   @Test
@@ -66,4 +69,264 @@ public class FixedPcapFilterTest {
     }
   }
 
+  @Test
+  public void testTrivialEquality() throws Exception {
+    Configuration config = new Configuration();
+    final Map<String, String> fields = new HashMap<String, String>() {{
+      put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+      put(Constants.Fields.SRC_PORT.getName(), "0");
+      put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+      put(Constants.Fields.DST_PORT.getName(), "1");
+      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
+    }};
+    new FixedPcapFilter.Configurator().addToConfig(fields, config);
+    {
+      FixedPcapFilter filter = new FixedPcapFilter() {
+        @Override
+        protected Map<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+  }
+
+  @Test
+  public void testReverseTraffic() throws Exception {
+    Configuration config = new Configuration();
+    final Map<String, String> fields = new HashMap<String, String>() {{
+      put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+      put(Constants.Fields.SRC_PORT.getName(), "0");
+      put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+      put(Constants.Fields.DST_PORT.getName(), "1");
+      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "true");
+    }};
+    new FixedPcapFilter.Configurator().addToConfig(fields, config);
+    {
+      FixedPcapFilter filter = new FixedPcapFilter() {
+        @Override
+        protected Map<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+    new FixedPcapFilter.Configurator().addToConfig(fields, config);
+    {
+      FixedPcapFilter filter = new FixedPcapFilter() {
+        @Override
+        protected Map<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 1);
+            put(Constants.Fields.DST_ADDR.getName(), "src_ip");
+            put(Constants.Fields.DST_PORT.getName(), 0);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+    new FixedPcapFilter.Configurator().addToConfig(fields, config);
+    {
+      FixedPcapFilter filter = new FixedPcapFilter() {
+        @Override
+        protected Map<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "src_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertFalse(filter.test(null));
+    }
+  }
+
+  @Test
+  public void testMissingDstAddr() throws Exception {
+    Configuration config = new Configuration();
+    final HashMap<String, String> fields = new HashMap<String, String>() {{
+      put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+      put(Constants.Fields.SRC_PORT.getName(), "0");
+      put(Constants.Fields.DST_PORT.getName(), "1");
+      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
+    }};
+    new FixedPcapFilter.Configurator().addToConfig(fields, config);
+    {
+      FixedPcapFilter filter = new FixedPcapFilter() {
+        @Override
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+    new FixedPcapFilter.Configurator().addToConfig(fields, config);
+    {
+      FixedPcapFilter filter = new FixedPcapFilter() {
+        @Override
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip1");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertFalse(filter.test(null));
+    }
+  }
+
+  @Test
+  public void testMissingDstPort() throws Exception {
+    Configuration config = new Configuration();
+    final HashMap<String, String> fields = new HashMap<String, String>() {{
+      put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+      put(Constants.Fields.SRC_PORT.getName(), "0");
+      put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
+    }};
+    new FixedPcapFilter.Configurator().addToConfig(fields, config);
+    {
+      FixedPcapFilter filter = new FixedPcapFilter() {
+        @Override
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+    new FixedPcapFilter.Configurator().addToConfig(fields, config);
+    {
+      FixedPcapFilter filter = new FixedPcapFilter() {
+        @Override
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 100);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+    new FixedPcapFilter.Configurator().addToConfig(fields, config);
+    {
+      FixedPcapFilter filter = new FixedPcapFilter() {
+        @Override
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 100);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 100);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertFalse(filter.test(null));
+    }
+  }
+
+  @Test
+  public void testMissingSrcAddr() throws Exception {
+    Configuration config = new Configuration();
+    final HashMap<String, String> fields = new HashMap<String, String>() {{
+      put(Constants.Fields.SRC_PORT.getName(), "0");
+      put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+      put(Constants.Fields.DST_PORT.getName(), "1");
+      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
+    }};
+    new FixedPcapFilter.Configurator().addToConfig(fields, config);
+    {
+      FixedPcapFilter filter = new FixedPcapFilter() {
+        @Override
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+  }
+
+  @Test
+  public void testMissingSrcPort() throws Exception {
+    Configuration config = new Configuration();
+    final HashMap<String, String> fields = new HashMap<String, String>() {{
+      put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+      put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+      put(Constants.Fields.DST_PORT.getName(), "1");
+      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
+    }};
+    new FixedPcapFilter.Configurator().addToConfig(fields, config);
+    {
+      FixedPcapFilter filter = new FixedPcapFilter() {
+        @Override
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+    new FixedPcapFilter.Configurator().addToConfig(fields, config);
+    {
+      FixedPcapFilter filter = new FixedPcapFilter() {
+        @Override
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 100);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/query/QueryPcapFilterTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/query/QueryPcapFilterTest.java
 
b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/query/QueryPcapFilterTest.java
index 061066e..2724e06 100644
--- 
a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/query/QueryPcapFilterTest.java
+++ 
b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/query/QueryPcapFilterTest.java
@@ -18,18 +18,23 @@
 
 package org.apache.metron.pcap.filter.query;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+
+import java.util.HashMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.metron.common.Constants;
+import org.apache.metron.pcap.PacketInfo;
+import org.apache.metron.pcap.filter.PcapFilter;
 import org.junit.Assert;
 import org.junit.Test;
 
-import static org.hamcrest.CoreMatchers.equalTo;
-
 public class QueryPcapFilterTest {
 
   @Test
   public void string_representation_of_query_gets_formatted() throws Exception 
{
     String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and 
ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'";
     String actual = new QueryPcapFilter.Configurator().queryToString(query);
-    String expected = 
"ip_src_addr_==_srcIp_and_ip_src_port_==_80_and_ip_dst_addr_==_dstIp_and_ip_dst_port_==_100_and_protocol_==_protocol";
+    String expected = 
"ip_src_addr_==_'srcIp'_and_ip_src_port_==_'80'_and_ip_dst_addr_==_'dstIp'_and_ip_dst_port_==_'100'_and_protocol_==_'protocol'";
     Assert.assertThat("string representation did not match", actual, 
equalTo(expected));
   }
 
@@ -55,4 +60,200 @@ public class QueryPcapFilterTest {
     }
   }
 
+  @Test
+  public void testEmptyQueryFilter() throws Exception {
+    Configuration config = new Configuration();
+    String query = "";
+    new QueryPcapFilter.Configurator().addToConfig(query, config);
+    {
+      PcapFilter filter = new QueryPcapFilter() {
+        @Override
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+  }
+
+  @Test
+  public void testTrivialEquality() throws Exception {
+    Configuration config = new Configuration();
+    String query = "ip_src_addr == 'src_ip' and ip_src_port == 0 and 
ip_dst_addr == 'dst_ip' and ip_dst_port == 1";
+    new QueryPcapFilter.Configurator().addToConfig(query, config);
+    {
+      PcapFilter filter = new QueryPcapFilter() {
+        @Override
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+  }
+
+  @Test
+  public void testMissingDstAddr() throws Exception {
+    Configuration config = new Configuration();
+    String query = "ip_src_addr == 'src_ip' and ip_src_port == 0 and 
ip_dst_port == 1";
+    new QueryPcapFilter.Configurator().addToConfig(query, config);
+    {
+      QueryPcapFilter filter = new QueryPcapFilter() {
+        @Override
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+    new QueryPcapFilter.Configurator().addToConfig(query, config);
+    {
+      QueryPcapFilter filter = new QueryPcapFilter() {
+        @Override
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip_no_match");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertFalse(filter.test(null));
+    }
+  }
+
+  @Test
+  public void testMissingDstPort() throws Exception {
+    Configuration config = new Configuration();
+    String query = "ip_src_addr == 'src_ip' and ip_src_port == 0 and 
ip_dst_addr == 'dst_ip'";
+    new QueryPcapFilter.Configurator().addToConfig(query, config);
+    {
+      QueryPcapFilter filter = new QueryPcapFilter() {
+        @Override
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+    new QueryPcapFilter.Configurator().addToConfig(query, config);
+    {
+      QueryPcapFilter filter = new QueryPcapFilter() {
+        @Override
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 100);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+    new QueryPcapFilter.Configurator().addToConfig(query, config);
+    {
+      QueryPcapFilter filter = new QueryPcapFilter() {
+        @Override
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 100);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 100);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertFalse(filter.test(null));
+    }
+  }
+
+  @Test
+  public void testMissingSrcAddr() throws Exception {
+    Configuration config = new Configuration();
+    String query = "ip_src_port == 0 and ip_dst_addr == 'dst_ip' and 
ip_dst_port == 1";
+    new QueryPcapFilter.Configurator().addToConfig(query, config);
+    {
+      QueryPcapFilter filter = new QueryPcapFilter() {
+        @Override
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+  }
+
+  @Test
+  public void testMissingSrcPort() throws Exception {
+    Configuration config = new Configuration();
+    String query = "ip_src_addr == 'src_ip' and ip_dst_addr == 'dst_ip' and 
ip_dst_port == 1";
+    new QueryPcapFilter.Configurator().addToConfig(query, config);
+    {
+      QueryPcapFilter filter = new QueryPcapFilter() {
+        @Override
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+    new QueryPcapFilter.Configurator().addToConfig(query, config);
+    {
+      QueryPcapFilter filter = new QueryPcapFilter() {
+        @Override
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 100);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/OutputDirFormatterTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/OutputDirFormatterTest.java
 
b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/OutputDirFormatterTest.java
new file mode 100644
index 0000000..ae1cda4
--- /dev/null
+++ 
b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/OutputDirFormatterTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.mr;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertThat;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
+import org.apache.metron.pcap.filter.query.QueryPcapFilter;
+import org.junit.Test;
+
+public class OutputDirFormatterTest {
+
+  @Test
+  public void formats_directory_name_for_query_filter_types() throws Exception 
{
+    long beginNS = 
TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis());
+    long endNS = 
TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis());
+    String query = "ip_dst_addr == '207.28.210.1' and protocol == 'PROTOCOL: 
ICMP(1)";
+    String queryFilterString = new 
QueryPcapFilter.Configurator().queryToString(query);
+    OutputDirFormatter formatter = new OutputDirFormatter();
+    String actual = formatter.format(beginNS, endNS, queryFilterString);
+    assertThat("Formatted directory names did not match.", actual, 
containsString("_ip_dst_addr_==_207-28-210-1_and_protocol_==_PROTOCOL_ICMP(1)_"));
+    // no URI exception should be thrown with dir name
+    new Path(actual);
+  }
+
+  @Test
+  public void formats_directory_name_for_fixed_filter_types() throws Exception 
{
+    long beginNS = 
TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis());
+    long endNS = 
TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis());
+    Map<String, String> fields = new HashMap<>();
+    fields.put("ip_src_address", "207.28.210.1");
+    fields.put("protocol", "PROTOCOL: ICMP(1)");
+    String fixedFilterString = new 
FixedPcapFilter.Configurator().queryToString(fields);
+    OutputDirFormatter formatter = new OutputDirFormatter();
+    String actual = formatter.format(beginNS, endNS, fixedFilterString);
+    assertThat("Formatted directory names did not match.", actual, 
containsString("PROTOCOL_ICMP(1)_207-28-210-1"));
+    // no URI exception should be thrown with dir name
+    new Path(actual);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/PcapJobTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/PcapJobTest.java
 
b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/PcapJobTest.java
new file mode 100644
index 0000000..0f555d0
--- /dev/null
+++ 
b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/PcapJobTest.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.mr;
+
+import static java.lang.Long.toUnsignedString;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Timer;
+import java.util.TimerTask;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.job.Finalizer;
+import org.apache.metron.job.JobStatus;
+import org.apache.metron.job.JobStatus.State;
+import org.apache.metron.job.Pageable;
+import org.apache.metron.job.Statusable;
+import org.apache.metron.pcap.PcapPages;
+import org.apache.metron.pcap.config.FixedPcapConfig;
+import org.apache.metron.pcap.config.PcapOptions;
+import org.apache.metron.pcap.filter.PcapFilterConfigurator;
+import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+public class PcapJobTest {
+
+  @Mock
+  private Job mrJob;
+  @Mock
+  private org.apache.hadoop.mapreduce.JobStatus mrStatus;
+  @Mock
+  private JobID jobId;
+  @Mock
+  private Finalizer<Path> finalizer;
+  private TestTimer timer;
+  private Pageable<Path> pageableResult;
+  private FixedPcapConfig config;
+  private Configuration hadoopConfig;
+  private FileSystem fileSystem;
+  private String jobIdVal = "job_abc_123";
+  private Path basePath;
+  private Path baseOutPath;
+  private long startTime;
+  private long endTime;
+  private int numReducers;
+  private int numRecordsPerFile;
+  private Path finalOutputPath;
+  private Map<String, String> fixedFields;
+  private PcapJob<Map<String, String>> testJob;
+
+  @Before
+  public void setup() throws IOException {
+    MockitoAnnotations.initMocks(this);
+    basePath = new Path("basepath");
+    baseOutPath = new Path("outpath");
+    startTime = 100;
+    endTime = 200;
+    numReducers = 5;
+    numRecordsPerFile = 5;
+    fixedFields = new HashMap<>();
+    fixedFields.put("ip_src_addr", "192.168.1.1");
+    hadoopConfig = new Configuration();
+    fileSystem = FileSystem.get(hadoopConfig);
+    finalOutputPath = new Path("finaloutpath");
+    when(jobId.toString()).thenReturn(jobIdVal);
+    when(mrStatus.getJobID()).thenReturn(jobId);
+    when(mrJob.getJobID()).thenReturn(jobId);
+    pageableResult = new PcapPages();
+    timer = new TestTimer();
+    // handles setting the file name prefix under the hood
+    config = new FixedPcapConfig(clock -> "clockprefix");
+    PcapOptions.HADOOP_CONF.put(config, hadoopConfig);
+    PcapOptions.FILESYSTEM.put(config, FileSystem.get(hadoopConfig));
+    PcapOptions.BASE_PATH.put(config, basePath);
+    PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, baseOutPath);
+    PcapOptions.START_TIME_NS.put(config, startTime);
+    PcapOptions.END_TIME_NS.put(config, endTime);
+    PcapOptions.NUM_REDUCERS.put(config, numReducers);
+    PcapOptions.FIELDS.put(config, fixedFields);
+    PcapOptions.FILTER_IMPL.put(config, new FixedPcapFilter.Configurator());
+    PcapOptions.NUM_RECORDS_PER_FILE.put(config, numRecordsPerFile);
+    PcapOptions.FINAL_OUTPUT_PATH.put(config, finalOutputPath);
+    testJob = new TestJob<>(mrJob);
+    testJob.setStatusInterval(1);
+    testJob.setCompleteCheckInterval(1);
+    testJob.setTimer(timer);
+  }
+
+  private class TestJob<T> extends PcapJob<T> {
+
+    private final Job mrJob;
+
+    public TestJob(Job mrJob) {
+      this.mrJob = mrJob;
+    }
+
+    @Override
+    public Job createJob(Optional<String> jobName,
+        Path basePath,
+        Path outputPath,
+        long beginNS,
+        long endNS,
+        int numReducers,
+        T fields,
+        Configuration conf,
+        FileSystem fs,
+        PcapFilterConfigurator<T> filterImpl) throws IOException {
+      return mrJob;
+    }
+  }
+
+  private class TestTimer extends Timer {
+
+    private TimerTask task;
+
+    @Override
+    public void scheduleAtFixedRate(TimerTask task, long delay, long period) {
+      this.task = task;
+    }
+
+    public void updateJobStatus() {
+      task.run();
+    }
+
+  }
+
+  @Test
+  public void partition_gives_value_in_range() throws Exception {
+    long start = 1473897600000000000L;
+    long end = TimestampConverters.MILLISECONDS.toNanoseconds(1473995927455L);
+    Configuration conf = new Configuration();
+    conf.set(PcapJob.START_TS_CONF, toUnsignedString(start));
+    conf.set(PcapJob.END_TS_CONF, toUnsignedString(end));
+    conf.set(PcapJob.WIDTH_CONF, "" + PcapJob.findWidth(start, end, 10));
+    PcapJob.PcapPartitioner partitioner = new PcapJob.PcapPartitioner();
+    partitioner.setConf(conf);
+    Assert.assertThat("Partition not in range",
+        partitioner.getPartition(new LongWritable(1473978789181189000L), new 
BytesWritable(), 10),
+        equalTo(8));
+  }
+
+  @Test
+  public void job_succeeds_synchronously() throws Exception {
+    pageableResult = new PcapPages(
+        Arrays.asList(new Path("1.txt"), new Path("2.txt"), new 
Path("3.txt")));
+    when(finalizer.finalizeJob(any())).thenReturn(pageableResult);
+    when(mrJob.isComplete()).thenReturn(true);
+    
when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED);
+    when(mrJob.getStatus()).thenReturn(mrStatus);
+    Statusable<Path> statusable = testJob.submit(finalizer, config);
+    timer.updateJobStatus();
+    Pageable<Path> results = statusable.get();
+    Assert.assertThat(results.getSize(), equalTo(3));
+    JobStatus status = statusable.getStatus();
+    Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED));
+    Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
+    Assert.assertThat(status.getJobId(), equalTo(jobIdVal));
+  }
+
+  @Test
+  public void job_fails_synchronously() throws Exception {
+    when(mrJob.isComplete()).thenReturn(true);
+    
when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
+    when(mrJob.getStatus()).thenReturn(mrStatus);
+    Statusable<Path> statusable = testJob.submit(finalizer, config);
+    timer.updateJobStatus();
+    Pageable<Path> results = statusable.get();
+    JobStatus status = statusable.getStatus();
+    Assert.assertThat(status.getState(), equalTo(State.FAILED));
+    Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
+    Assert.assertThat(results.getSize(), equalTo(0));
+  }
+
+  @Test
+  public void job_fails_with_killed_status_synchronously() throws Exception {
+    when(mrJob.isComplete()).thenReturn(true);
+    
when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
+    when(mrJob.getStatus()).thenReturn(mrStatus);
+    Statusable<Path> statusable = testJob.submit(finalizer, config);
+    timer.updateJobStatus();
+    Pageable<Path> results = statusable.get();
+    JobStatus status = statusable.getStatus();
+    Assert.assertThat(status.getState(), equalTo(State.KILLED));
+    Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
+    Assert.assertThat(results.getSize(), equalTo(0));
+  }
+
+  @Test
+  public void job_succeeds_asynchronously() throws Exception {
+    when(mrJob.isComplete()).thenReturn(true);
+    
when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED);
+    when(mrJob.getStatus()).thenReturn(mrStatus);
+    Statusable<Path> statusable = testJob.submit(finalizer, config);
+    timer.updateJobStatus();
+    JobStatus status = statusable.getStatus();
+    Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED));
+    Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
+  }
+
+  @Test
+  public void job_reports_percent_complete() throws Exception {
+    when(mrJob.isComplete()).thenReturn(false);
+    
when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.RUNNING);
+    when(mrJob.getStatus()).thenReturn(mrStatus);
+    when(mrJob.mapProgress()).thenReturn(0.5f);
+    when(mrJob.reduceProgress()).thenReturn(0f);
+    Statusable<Path> statusable = testJob.submit(finalizer, config);
+    timer.updateJobStatus();
+    JobStatus status = statusable.getStatus();
+    Assert.assertThat(status.getState(), equalTo(State.RUNNING));
+    Assert.assertThat(status.getDescription(), equalTo("map: 50.0%, reduce: 
0.0%"));
+    Assert.assertThat(status.getPercentComplete(), equalTo(25.0));
+    when(mrJob.mapProgress()).thenReturn(1.0f);
+    when(mrJob.reduceProgress()).thenReturn(0.5f);
+    timer.updateJobStatus();
+    status = statusable.getStatus();
+    Assert.assertThat(status.getDescription(), equalTo("map: 100.0%, reduce: 
50.0%"));
+    Assert.assertThat(status.getPercentComplete(), equalTo(75.0));
+  }
+
+  @Test
+  public void killing_job_causes_status_to_return_KILLED_state() throws 
Exception {
+    when(mrJob.isComplete()).thenReturn(false);
+    
when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.RUNNING);
+    when(mrJob.getStatus()).thenReturn(mrStatus);
+    Statusable<Path> statusable = testJob.submit(finalizer, config);
+    statusable.kill();
+    when(mrJob.isComplete()).thenReturn(true);
+    
when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
+    timer.updateJobStatus();
+    JobStatus status = statusable.getStatus();
+    Assert.assertThat(status.getState(), equalTo(State.KILLED));
+  }
+
+  @Test
+  public void handles_null_values_with_defaults() throws Exception {
+    PcapOptions.START_TIME_NS.put(config, null);
+    PcapOptions.END_TIME_NS.put(config, null);
+    PcapOptions.NUM_REDUCERS.put(config, null);
+    PcapOptions.NUM_RECORDS_PER_FILE.put(config, null);
+
+    pageableResult = new PcapPages(
+        Arrays.asList(new Path("1.txt"), new Path("2.txt"), new 
Path("3.txt")));
+    when(finalizer.finalizeJob(any())).thenReturn(pageableResult);
+    when(mrJob.isComplete()).thenReturn(true);
+    
when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED);
+    when(mrJob.getStatus()).thenReturn(mrStatus);
+    Statusable<Path> statusable = testJob.submit(finalizer, config);
+    timer.updateJobStatus();
+    Pageable<Path> results = statusable.get();
+    Assert.assertThat(results.getSize(), equalTo(3));
+    JobStatus status = statusable.getStatus();
+    Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED));
+    Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
+    Assert.assertThat(status.getJobId(), equalTo(jobIdVal));
+  }
+
+}

Reply via email to