METRON-1726: Refactor PcapTopologyIntegrationTest (mmiklavc via mmiklavc) closes apache/metron#1140
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/7967f358 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/7967f358 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/7967f358 Branch: refs/heads/feature/METRON-1699-create-batch-profiler Commit: 7967f358c6c4b8437935c43e54179126e48e248f Parents: 3e77859 Author: mmiklavc <michael.miklav...@gmail.com> Authored: Tue Aug 7 15:02:20 2018 -0600 Committer: Michael Miklavcic <michael.miklav...@gmail.com> Committed: Tue Aug 7 15:02:20 2018 -0600 ---------------------------------------------------------------------- .../org/apache/metron/pcap/query/PcapCli.java | 2 + .../PcapTopologyIntegrationTest.java | 892 ++++++++++--------- .../metron/pcap/finalizer/PcapCliFinalizer.java | 5 +- 3 files changed, 473 insertions(+), 426 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/7967f358/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java index 0b06b0c..c23f037 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java @@ -87,6 +87,7 @@ public class PcapCli { try { config = fixedParser.parse(otherArgs); commonConfig = config; + PcapOptions.FINAL_OUTPUT_PATH.put(commonConfig, new Path(execDir)); } catch (ParseException | java.text.ParseException e) { System.err.println(e.getMessage()); System.err.flush(); @@ -112,6 +113,7 @@ public class PcapCli { try { config = queryParser.parse(otherArgs); commonConfig = config; + PcapOptions.FINAL_OUTPUT_PATH.put(commonConfig, new Path(execDir)); } catch (ParseException | java.text.ParseException e) { System.err.println(e.getMessage()); queryParser.printHelp(); http://git-wip-us.apache.org/repos/asf/metron/blob/7967f358/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java index 108fd2b..c30267d 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java @@ -58,6 +58,7 @@ import org.apache.metron.integration.components.MRComponent; import org.apache.metron.integration.components.ZKServerComponent; import org.apache.metron.integration.utils.KafkaUtil; import org.apache.metron.job.JobStatus; +import org.apache.metron.job.Pageable; import org.apache.metron.job.Statusable; import org.apache.metron.pcap.PacketInfo; import org.apache.metron.pcap.PcapHelper; @@ -73,7 +74,10 @@ import org.apache.metron.spout.pcap.Endianness; import org.apache.metron.spout.pcap.deserializer.Deserializers; import org.apache.metron.test.utils.UnitTestHelper; import org.json.simple.JSONObject; +import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; @@ -85,12 +89,19 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { private static String OUTPUT_DIR = BASE_DIR + "/output"; private static final int MAX_RETRIES = 30; private static final int SLEEP_MS = 500; - private String topologiesDir = "src/main/flux"; - private String targetDir = "target"; + private static String topologiesDir = "src/main/flux"; + private static String targetDir = "target"; + private static ComponentRunner runner; + private static File inputDir; + private static File interimResultDir; + private static File outputDir; + private static List<Map.Entry<byte[], byte[]>> pcapEntries; + private static boolean withHeaders; + private FixedPcapConfig configuration; private static void clearOutDirs(File... dirs) throws IOException { - for(File dir: dirs) { - for(File f : dir.listFiles()) { + for (File dir : dirs) { + for (File f : dir.listFiles()) { if (f.isDirectory()) { FileUtils.deleteDirectory(f); } else { @@ -99,8 +110,8 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { } } } - private static int numFiles(File outDir, Configuration config) { + private static int numFiles(File outDir, Configuration config) { return outDir.list(new FilenameFilter() { @Override public boolean accept(File dir, String name) { @@ -109,11 +120,12 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { }).length; } - // This will eventually be completely deprecated. As it takes a significant amount of testing, the test is being disabled. + // This will eventually be completely deprecated. + // As it takes a significant amount of testing, the test is being disabled. @Ignore @Test public void testTimestampInPacket() throws Exception { - testTopology(new Function<Properties, Void>() { + setupTopology(new Function<Properties, Void>() { @Nullable @Override public Void apply(@Nullable Properties input) { @@ -129,9 +141,14 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { ); } - @Test - public void testTimestampInKey() throws Exception { - testTopology(new Function<Properties, Void>() { + /** + * Sets up component infrastructure once for all tests. + */ + @BeforeClass + public static void setupAll() throws Exception { + System.out.println("Setting up test components"); + withHeaders = false; + setupTopology(new Function<Properties, Void>() { @Nullable @Override public Void apply(@Nullable Properties input) { @@ -154,7 +171,30 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { System.out.println("Wrote " + pcapEntries.size() + " to kafka"); } } - }, false); + }, withHeaders); + System.out.println("Done with setup."); + } + + private static File getDir(String targetDir, String childDir) { + File directory = new File(new File(targetDir), childDir); + if (!directory.exists()) { + directory.mkdirs(); + } + return directory; + } + + /** + * Cleans up component infrastructure after all tests finish running. + */ + @AfterClass + public static void teardownAll() throws Exception { + System.out.println("Tearing down test infrastructure"); + System.out.println("Stopping runner"); + runner.stop(); + System.out.println("Done stopping runner"); + System.out.println("Clearing output directories"); + clearOutDirs(inputDir, interimResultDir, outputDir); + System.out.println("Finished"); } private static long getTimestamp(int offset, List<Map.Entry<byte[], byte[]>> entries) { @@ -165,27 +205,27 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { public void send(KafkaComponent kafkaComponent, List<Map.Entry<byte[], byte[]>> entries) throws Exception; } - public void testTopology(Function<Properties, Void> updatePropertiesCallback + public static void setupTopology(Function<Properties, Void> updatePropertiesCallback ,SendEntries sendPcapEntriesCallback ,boolean withHeaders ) - throws Exception - { + throws Exception { if (!new File(topologiesDir).exists()) { topologiesDir = UnitTestHelper.findDir("topologies"); } targetDir = UnitTestHelper.findDir("target"); - final File inputDir = getDir(targetDir, DATA_DIR); - final File interimResultDir = getDir(targetDir, INTERIM_RESULT); - final File outputDir = getDir(targetDir, OUTPUT_DIR); + inputDir = getDir(targetDir, DATA_DIR); + interimResultDir = getDir(targetDir, INTERIM_RESULT); + outputDir = getDir(targetDir, OUTPUT_DIR); clearOutDirs(inputDir, interimResultDir, outputDir); File baseDir = new File(new File(targetDir), BASE_DIR); //Assert.assertEquals(0, numFiles(outDir)); Assert.assertNotNull(topologiesDir); Assert.assertNotNull(targetDir); - Path pcapFile = new Path("../metron-integration-test/src/main/sample/data/SampleInput/PCAPExampleOutput"); - final List<Map.Entry<byte[], byte[]>> pcapEntries = Lists.newArrayList(readPcaps(pcapFile, withHeaders)); + Path pcapFile = new Path( + "../metron-integration-test/src/main/sample/data/SampleInput/PCAPExampleOutput"); + pcapEntries = Lists.newArrayList(readPcaps(pcapFile, withHeaders)); Assert.assertTrue(Iterables.size(pcapEntries) > 0); final Properties topologyProperties = new Properties() {{ setProperty("topology.workers", "1"); @@ -206,415 +246,428 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties); - final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, Collections.singletonList( + final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, + Collections.singletonList( new KafkaComponent.Topic(KAFKA_TOPIC, 1))); - final MRComponent mr = new MRComponent().withBasePath(baseDir.getAbsolutePath()); FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder() - .withTopologyLocation(new File(topologiesDir + "/pcap/remote.yaml")) - .withTopologyName("pcap") - .withTopologyProperties(topologyProperties) - .build(); + .withTopologyLocation(new File(topologiesDir + "/pcap/remote.yaml")) + .withTopologyName("pcap") + .withTopologyProperties(topologyProperties) + .build(); //UnitTestHelper.verboseLogging(); - ComponentRunner runner = new ComponentRunner.Builder() - .withComponent("mr", mr) - .withComponent("zk",zkServerComponent) - .withComponent("kafka", kafkaComponent) - .withComponent("storm", fluxComponent) - .withMaxTimeMS(-1) - .withMillisecondsBetweenAttempts(2000) - .withNumRetries(10) - .withCustomShutdownOrder(new String[]{"storm","kafka","zk","mr"}) - .build(); - try { - runner.start(); - - fluxComponent.submitTopology(); - sendPcapEntriesCallback.send(kafkaComponent, pcapEntries); - runner.process(new Processor<Void>() { - @Override - public ReadinessState process(ComponentRunner runner) { - int numFiles = numFiles(inputDir, mr.getConfiguration()); - int expectedNumFiles = pcapEntries.size() / 2; - if (numFiles == expectedNumFiles) { - return ReadinessState.READY; - } else { - return ReadinessState.NOT_READY; - } - } - - @Override - public ProcessorResult<Void> getResult() { - return null; + runner = new ComponentRunner.Builder() + .withComponent("mr", mr) + .withComponent("zk", zkServerComponent) + .withComponent("kafka", kafkaComponent) + .withComponent("storm", fluxComponent) + .withMaxTimeMS(-1) + .withMillisecondsBetweenAttempts(2000) + .withNumRetries(10) + .withCustomShutdownOrder(new String[]{"storm", "kafka", "zk", "mr"}) + .build(); + runner.start(); + + fluxComponent.submitTopology(); + sendPcapEntriesCallback.send(kafkaComponent, pcapEntries); + runner.process(new Processor<Void>() { + @Override + public ReadinessState process(ComponentRunner runner) { + int numFiles = numFiles(inputDir, mr.getConfiguration()); + int expectedNumFiles = pcapEntries.size() / 2; + if (numFiles == expectedNumFiles) { + return ReadinessState.READY; + } else { + return ReadinessState.NOT_READY; } - }); - - FixedPcapConfig configuration = new FixedPcapConfig(PcapCli.PREFIX_STRATEGY); - Configuration hadoopConf = new Configuration(); - PcapOptions.JOB_NAME.put(configuration, "jobName"); - PcapOptions.HADOOP_CONF.put(configuration, hadoopConf); - PcapOptions.FILESYSTEM.put(configuration, FileSystem.get(hadoopConf)); - PcapOptions.BASE_PATH.put(configuration, new Path(inputDir.getAbsolutePath())); - PcapOptions.BASE_INTERIM_RESULT_PATH.put(configuration, new Path(interimResultDir.getAbsolutePath())); - PcapOptions.START_TIME_NS.put(configuration, getTimestamp(4, pcapEntries)); - PcapOptions.END_TIME_NS.put(configuration, getTimestamp(5, pcapEntries)); - PcapOptions.NUM_REDUCERS.put(configuration, 10); - PcapOptions.NUM_RECORDS_PER_FILE.put(configuration, 2); - PcapOptions.FINAL_OUTPUT_PATH.put(configuration, new Path(outputDir.getAbsolutePath())); - { - //Ensure that only two pcaps are returned when we look at 4 and 5 - PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator()); - PcapOptions.FIELDS.put(configuration, new HashMap()); - PcapJob<Map<String, String>> job = new PcapJob<>(); - Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); - Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); - waitForJob(results); - - Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); - Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { - try { - return HDFSUtils.readBytes(path); - } catch (IOException e) { - throw new IllegalStateException(e); - } - }); - assertInOrder(bytes); - Assert.assertEquals(results.get().getSize(), 1); - } - { - // Ensure that only two pcaps are returned when we look at 4 and 5 - // test with empty query filter - PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); - PcapOptions.FIELDS.put(configuration, ""); - PcapJob<String> job = new PcapJob<>(); - Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); - Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); - waitForJob(results); - - Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); - Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { - try { - return HDFSUtils.readBytes(path); - } catch (IOException e) { - throw new IllegalStateException(e); - } - }); - assertInOrder(bytes); - Assert.assertEquals(results.get().getSize(), 1); - } - { - //ensure that none get returned since date range has no results - PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator()); - PcapOptions.FIELDS.put(configuration, new HashMap<>()); - PcapOptions.START_TIME_NS.put(configuration, 0); - PcapOptions.END_TIME_NS.put(configuration, 1); - PcapJob<Map<String, String>> job = new PcapJob<>(); - Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); - Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); - waitForJob(results); - - Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); - Assert.assertEquals(100.0, results.getStatus().getPercentComplete(), 0.0); - Assert.assertEquals("No results in specified date range.", - results.getStatus().getDescription()); - Assert.assertEquals(results.get().getSize(), 0); - } - { - //ensure that none get returned since that destination IP address isn't in the dataset - PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator()); - PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{ - put(Constants.Fields.DST_ADDR.getName(), "207.28.210.1"); - }}); - PcapJob<Map<String, String>> job = new PcapJob<>(); - Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); - Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); - waitForJob(results); - - Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); - Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { - try { - return HDFSUtils.readBytes(path); - } catch (IOException e) { - throw new IllegalStateException(e); - } - }); - assertInOrder(bytes); - Assert.assertEquals(results.get().getSize(), 0); - } - { - // ensure that none get returned since that destination IP address isn't in the dataset - // test with query filter - PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); - PcapOptions.FIELDS.put(configuration, "ip_dst_addr == '207.28.210.1'"); - PcapJob<String> job = new PcapJob<>(); - Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); - Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); - waitForJob(results); - - Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); - Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { - try { - return HDFSUtils.readBytes(path); - } catch (IOException e) { - throw new IllegalStateException(e); - } - }); - assertInOrder(bytes); - Assert.assertEquals(results.get().getSize(), 0); } - { - //same with protocol as before with the destination addr - PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator()); - PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{ - put(Constants.Fields.PROTOCOL.getName(), "foo"); - }}); - PcapJob<Map<String, String>> job = new PcapJob<>(); - Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); - Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); - waitForJob(results); - - Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); - Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { - try { - return HDFSUtils.readBytes(path); - } catch (IOException e) { - throw new IllegalStateException(e); - } - }); - assertInOrder(bytes); - Assert.assertEquals(results.get().getSize(), 0); + + @Override + public ProcessorResult<Void> getResult() { + return null; } - { - //same with protocol as before with the destination addr - //test with query filter - PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); - PcapOptions.FIELDS.put(configuration, "protocol == 'foo'"); - PcapJob<String> job = new PcapJob<>(); - Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); - Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); - waitForJob(results); - - Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); - Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { - try { - return HDFSUtils.readBytes(path); - } catch (IOException e) { - throw new IllegalStateException(e); - } - }); - assertInOrder(bytes); - Assert.assertEquals(results.get().getSize(), 0); + }); + } + + /** + * This is executed before each individual test. + */ + @Before + public void setup() throws IOException { + configuration = new FixedPcapConfig(PcapCli.PREFIX_STRATEGY); + Configuration hadoopConf = new Configuration(); + PcapOptions.JOB_NAME.put(configuration, "jobName"); + PcapOptions.HADOOP_CONF.put(configuration, hadoopConf); + PcapOptions.FILESYSTEM.put(configuration, FileSystem.get(hadoopConf)); + PcapOptions.BASE_PATH.put(configuration, new Path(inputDir.getAbsolutePath())); + PcapOptions.BASE_INTERIM_RESULT_PATH.put(configuration, new Path(interimResultDir.getAbsolutePath())); + PcapOptions.NUM_REDUCERS.put(configuration, 10); + PcapOptions.NUM_RECORDS_PER_FILE.put(configuration, 1); + PcapOptions.FINAL_OUTPUT_PATH.put(configuration, new Path(outputDir.getAbsolutePath())); + } + + @Test + public void filters_pcaps_by_start_end_ns_with_fixed_filter() throws Exception { + PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator()); + PcapOptions.START_TIME_NS.put(configuration, getTimestamp(4, pcapEntries)); + PcapOptions.END_TIME_NS.put(configuration, getTimestamp(5, pcapEntries)); + PcapOptions.FIELDS.put(configuration, new HashMap()); + PcapJob<Map<String, String>> job = new PcapJob<>(); + Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); + Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); + waitForJob(results); + + Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); + Pageable<Path> resultPages = results.get(); + Iterable<byte[]> bytes = Iterables.transform(resultPages, path -> { + try { + return HDFSUtils.readBytes(path); + } catch (IOException e) { + throw new IllegalStateException(e); } - { - //make sure I get them all. - PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator()); - PcapOptions.FIELDS.put(configuration, new HashMap<>()); - PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries)); - PcapOptions.END_TIME_NS.put(configuration, getTimestamp(pcapEntries.size()-1, pcapEntries) + 1); - PcapJob<Map<String, String>> job = new PcapJob<>(); - Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); - Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); - waitForJob(results); - - Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); - Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { - try { - return HDFSUtils.readBytes(path); - } catch (IOException e) { - throw new IllegalStateException(e); - } - }); - assertInOrder(bytes); - Assert.assertEquals(10, results.get().getSize()); + }); + assertInOrder(bytes); + Assert.assertEquals("Expected 2 records returned.", 2, resultPages.getSize()); + Assert.assertEquals("Expected 1 record in first file.", 1, + PcapHelper.toPacketInfo(Iterables.get(bytes, 0)).size()); + Assert.assertEquals("Expected 1 record in second file.", 1, + PcapHelper.toPacketInfo(Iterables.get(bytes, 1)).size()); + } + + @Test + public void filters_pcaps_by_start_end_ns_with_empty_query_filter() throws Exception { + PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); + PcapOptions.START_TIME_NS.put(configuration, getTimestamp(4, pcapEntries)); + PcapOptions.END_TIME_NS.put(configuration, getTimestamp(5, pcapEntries)); + PcapOptions.FIELDS.put(configuration, ""); + PcapJob<String> job = new PcapJob<>(); + Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); + Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); + waitForJob(results); + + Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); + Pageable<Path> resultPages = results.get(); + Iterable<byte[]> bytes = Iterables.transform(resultPages, path -> { + try { + return HDFSUtils.readBytes(path); + } catch (IOException e) { + throw new IllegalStateException(e); } - { - //make sure I get them all. - //with query filter - PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); - PcapOptions.FIELDS.put(configuration, ""); - PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries)); - PcapOptions.END_TIME_NS.put(configuration, getTimestamp(pcapEntries.size()-1, pcapEntries) + 1); - PcapJob<String> job = new PcapJob<>(); - Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); - Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); - waitForJob(results); - - Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); - Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { - try { - return HDFSUtils.readBytes(path); - } catch (IOException e) { - throw new IllegalStateException(e); - } - }); - assertInOrder(bytes); - Assert.assertEquals(10, results.get().getSize()); + }); + assertInOrder(bytes); + Assert.assertEquals("Expected 2 records returned.", 2, resultPages.getSize()); + Assert.assertEquals("Expected 1 record in first file.", 1, + PcapHelper.toPacketInfo(Iterables.get(bytes, 0)).size()); + Assert.assertEquals("Expected 1 record in second file.", 1, + PcapHelper.toPacketInfo(Iterables.get(bytes, 1)).size()); + } + + @Test + public void date_range_filters_out_all_results() throws Exception { + PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator()); + PcapOptions.FIELDS.put(configuration, new HashMap<>()); + PcapOptions.START_TIME_NS.put(configuration, 0); + PcapOptions.END_TIME_NS.put(configuration, 1); + PcapJob<Map<String, String>> job = new PcapJob<>(); + Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); + Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); + waitForJob(results); + + Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); + Assert.assertEquals(100.0, results.getStatus().getPercentComplete(), 0.0); + Assert.assertEquals("No results in specified date range.", + results.getStatus().getDescription()); + Assert.assertEquals(results.get().getSize(), 0); + } + + @Test + public void ip_address_filters_out_all_results_with_fixed_filter() throws Exception { + PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator()); + PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries)); + PcapOptions.END_TIME_NS.put(configuration, getTimestamp(1, pcapEntries)); + PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{ + put(Constants.Fields.DST_ADDR.getName(), "207.28.210.1"); + }}); + PcapJob<Map<String, String>> job = new PcapJob<>(); + Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); + Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); + waitForJob(results); + + Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); + Assert.assertEquals(results.get().getSize(), 0); + } + + @Test + public void ip_address_filters_out_all_results_with_query_filter() throws Exception { + PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); + PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries)); + PcapOptions.END_TIME_NS.put(configuration, getTimestamp(1, pcapEntries)); + PcapOptions.FIELDS.put(configuration, "ip_dst_addr == '207.28.210.1'"); + PcapJob<String> job = new PcapJob<>(); + Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); + Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); + waitForJob(results); + + Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); + Assert.assertEquals(results.get().getSize(), 0); + } + + @Test + public void protocol_filters_out_all_results_with_fixed_filter() throws Exception { + PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator()); + PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries)); + PcapOptions.END_TIME_NS.put(configuration, getTimestamp(1, pcapEntries)); + PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{ + put(Constants.Fields.PROTOCOL.getName(), "foo"); + }}); + PcapJob<Map<String, String>> job = new PcapJob<>(); + Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); + Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); + waitForJob(results); + + Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); + Assert.assertEquals(results.get().getSize(), 0); + } + + @Test + public void protocol_filters_out_all_results_with_query_filter() throws Exception { + PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); + PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries)); + PcapOptions.END_TIME_NS.put(configuration, getTimestamp(1, pcapEntries)); + PcapOptions.FIELDS.put(configuration, "protocol == 'foo'"); + PcapJob<String> job = new PcapJob<>(); + Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); + Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); + waitForJob(results); + + Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); + Assert.assertEquals(results.get().getSize(), 0); + } + + @Test + public void fixed_filter_returns_all_results_for_full_date_range() throws Exception { + PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator()); + PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries)); + PcapOptions.END_TIME_NS + .put(configuration, getTimestamp(pcapEntries.size() - 1, pcapEntries) + 1); + PcapOptions.FIELDS.put(configuration, new HashMap<>()); + PcapJob<Map<String, String>> job = new PcapJob<>(); + Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); + Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); + waitForJob(results); + + Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); + Pageable<Path> resultPages = results.get(); + Iterable<byte[]> bytes = Iterables.transform(resultPages, path -> { + try { + return HDFSUtils.readBytes(path); + } catch (IOException e) { + throw new IllegalStateException(e); } - { - PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator()); - PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{ - put(Constants.Fields.DST_PORT.getName(), "22"); - }}); - PcapOptions.NUM_RECORDS_PER_FILE.put(configuration, 1); - PcapJob<Map<String, String>> job = new PcapJob<>(); - Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); - Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); - waitForJob(results); - - Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); - Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { - try { - return HDFSUtils.readBytes(path); - } catch (IOException e) { - throw new IllegalStateException(e); - } - }); - assertInOrder(bytes); - Assert.assertTrue(results.get().getSize() > 0); - Assert.assertEquals(Iterables.size(bytes) - , Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() { - @Override - public boolean apply(@Nullable JSONObject input) { - Object prt = input.get(Constants.Fields.DST_PORT.getName()); - return prt != null && prt.toString().equals("22"); - } - }, withHeaders) - ) - ); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0))); - Assert.assertTrue(baos.toByteArray().length > 0); + }); + assertInOrder(bytes); + Assert.assertEquals(pcapEntries.size(), resultPages.getSize()); + } + + @Test + public void query_filter_returns_all_results_for_full_date_range() throws Exception { + PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); + PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries)); + PcapOptions.END_TIME_NS + .put(configuration, getTimestamp(pcapEntries.size() - 1, pcapEntries) + 1); + PcapOptions.FIELDS.put(configuration, ""); + PcapJob<String> job = new PcapJob<>(); + Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); + Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); + waitForJob(results); + + Pageable<Path> resultPages = results.get(); + Iterable<byte[]> bytes = Iterables.transform(resultPages, path -> { + try { + return HDFSUtils.readBytes(path); + } catch (IOException e) { + throw new IllegalStateException(e); } - { - //same with protocol as before with the destination addr - //test with query filter - PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); - PcapOptions.FIELDS.put(configuration, "ip_dst_port == 22"); - PcapJob<String> job = new PcapJob<>(); - Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); - Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); - waitForJob(results); - - Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); - Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { - try { - return HDFSUtils.readBytes(path); - } catch (IOException e) { - throw new IllegalStateException(e); - } - }); - assertInOrder(bytes); - Assert.assertEquals(Iterables.size(bytes) - , Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() { - @Override - public boolean apply(@Nullable JSONObject input) { - Object prt = input.get(Constants.Fields.DST_PORT.getName()); - return prt != null && prt.toString().equals("22"); - } - }, withHeaders) - ) - ); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0))); - Assert.assertTrue(baos.toByteArray().length > 0); + }); + assertInOrder(bytes); + Assert.assertEquals(pcapEntries.size(), resultPages.getSize()); + } + + @Test + public void filters_results_by_dst_port_with_fixed_filter() throws Exception { + PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator()); + PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries)); + PcapOptions.END_TIME_NS + .put(configuration, getTimestamp(pcapEntries.size() - 1, pcapEntries) + 1); + PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{ + put(Constants.Fields.DST_PORT.getName(), "22"); + }}); + PcapOptions.NUM_RECORDS_PER_FILE.put(configuration, 1); + PcapJob<Map<String, String>> job = new PcapJob<>(); + Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); + Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); + waitForJob(results); + + Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); + Pageable<Path> resultPages = results.get(); + Iterable<byte[]> bytes = Iterables.transform(resultPages, path -> { + try { + return HDFSUtils.readBytes(path); + } catch (IOException e) { + throw new IllegalStateException(e); } - { - // test with query filter ip_dst_port > 20 and ip_dst_port < 55792 - PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); - PcapOptions.FIELDS.put(configuration, "ip_dst_port > 20 and ip_dst_port < 55792"); - PcapJob<String> job = new PcapJob<>(); - Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); - Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); - waitForJob(results); - - Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); - Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { - try { - return HDFSUtils.readBytes(path); - } catch (IOException e) { - throw new IllegalStateException(e); + }); + assertInOrder(bytes); + Assert.assertTrue(resultPages.getSize() > 0); + Assert.assertEquals(Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() { + @Override + public boolean apply(@Nullable JSONObject input) { + Object prt = input.get(Constants.Fields.DST_PORT.getName()); + return prt != null && prt.toString().equals("22"); } - }); - assertInOrder(bytes); - Assert.assertEquals(Iterables.size(bytes) - , Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() { - @Override - public boolean apply(@Nullable JSONObject input) { - Object prt = input.get(Constants.Fields.DST_PORT.getName()); - return prt != null && ((Long) prt > 20 && (Long) prt < 55792); - } - }, withHeaders) - ) - ); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0))); - Assert.assertTrue(baos.toByteArray().length > 0); + }, withHeaders) + ), resultPages.getSize() + ); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PcapMerger.merge(baos, HDFSUtils.readBytes(resultPages.getPage(0))); + Assert.assertTrue(baos.toByteArray().length > 0); + } + + @Test + public void filters_results_by_dst_port_with_query_filter() throws Exception { + PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); + PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries)); + PcapOptions.END_TIME_NS + .put(configuration, getTimestamp(pcapEntries.size() - 1, pcapEntries) + 1); + PcapOptions.FIELDS.put(configuration, "ip_dst_port == 22"); + PcapJob<String> job = new PcapJob<>(); + Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); + Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); + waitForJob(results); + + Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); + Pageable<Path> resultPages = results.get(); + Iterable<byte[]> bytes = Iterables.transform(resultPages, path -> { + try { + return HDFSUtils.readBytes(path); + } catch (IOException e) { + throw new IllegalStateException(e); } - { - //test with query filter ip_dst_port > 55790 - PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); - PcapOptions.FIELDS.put(configuration, "ip_dst_port > 55790"); - PcapJob<String> job = new PcapJob<>(); - Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); - Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); - waitForJob(results); - - Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); - Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { - try { - return HDFSUtils.readBytes(path); - } catch (IOException e) { - throw new IllegalStateException(e); - } - }); - assertInOrder(bytes); - Assert.assertEquals(Iterables.size(bytes) - , Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() { - @Override - public boolean apply(@Nullable JSONObject input) { - Object prt = input.get(Constants.Fields.DST_PORT.getName()); - return prt != null && (Long) prt > 55790; - } - }, withHeaders) - ) - ); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0))); - Assert.assertTrue(baos.toByteArray().length > 0); + }); + assertInOrder(bytes); + Assert.assertEquals(Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() { + @Override + public boolean apply(@Nullable JSONObject input) { + Object prt = input.get(Constants.Fields.DST_PORT.getName()); + return prt != null && prt.toString().equals("22"); + } + }, withHeaders) + ), resultPages.getSize() + ); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PcapMerger.merge(baos, HDFSUtils.readBytes(resultPages.getPage(0))); + Assert.assertTrue(baos.toByteArray().length > 0); + } + + @Test + public void filters_results_by_dst_port_range_with_query_filter() throws Exception { + PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); + PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries)); + PcapOptions.END_TIME_NS + .put(configuration, getTimestamp(pcapEntries.size() - 1, pcapEntries) + 1); + PcapOptions.FIELDS.put(configuration, "ip_dst_port > 20 and ip_dst_port < 55792"); + PcapJob<String> job = new PcapJob<>(); + Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); + Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); + waitForJob(results); + + Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); + Pageable<Path> resultPages = results.get(); + Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { + try { + return HDFSUtils.readBytes(path); + } catch (IOException e) { + throw new IllegalStateException(e); } - { - //test with query filter and byte array matching - PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); - PcapOptions.FIELDS.put(configuration, "BYTEARRAY_MATCHER('2f56abd814bc56420489ca38e7faf8cec3d4', packet)"); - PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries)); - PcapOptions.END_TIME_NS.put(configuration, getTimestamp(pcapEntries.size()-1, pcapEntries) + 1); - PcapJob<String> job = new PcapJob<>(); - Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); - Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); - waitForJob(results); - - Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); - Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { - try { - return HDFSUtils.readBytes(path); - } catch (IOException e) { - throw new IllegalStateException(e); - } - }); - assertInOrder(bytes); - Assert.assertEquals(1, results.get().getSize()); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0))); - Assert.assertTrue(baos.toByteArray().length > 0); + }); + assertInOrder(bytes); + Assert.assertEquals(Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() { + @Override + public boolean apply(@Nullable JSONObject input) { + Object prt = input.get(Constants.Fields.DST_PORT.getName()); + return prt != null && ((Long) prt > 20 && (Long) prt < 55792); + } + }, withHeaders) + ), resultPages.getSize() + ); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PcapMerger.merge(baos, HDFSUtils.readBytes(resultPages.getPage(0))); + Assert.assertTrue(baos.toByteArray().length > 0); + } + + @Test + public void filters_results_by_dst_port_greater_than_value_with_query_filter() throws Exception { + PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); + PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries)); + PcapOptions.END_TIME_NS + .put(configuration, getTimestamp(pcapEntries.size() - 1, pcapEntries) + 1); + PcapOptions.FIELDS.put(configuration, "ip_dst_port > 55790"); + PcapJob<String> job = new PcapJob<>(); + Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); + Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); + waitForJob(results); + + Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); + Pageable<Path> resultPages = results.get(); + Iterable<byte[]> bytes = Iterables.transform(resultPages, path -> { + try { + return HDFSUtils.readBytes(path); + } catch (IOException e) { + throw new IllegalStateException(e); } + }); + assertInOrder(bytes); + Assert.assertEquals(Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() { + @Override + public boolean apply(@Nullable JSONObject input) { + Object prt = input.get(Constants.Fields.DST_PORT.getName()); + return prt != null && (Long) prt > 55790; + } + }, withHeaders) + ), resultPages.getSize() + ); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PcapMerger.merge(baos, HDFSUtils.readBytes(resultPages.getPage(0))); + Assert.assertTrue(baos.toByteArray().length > 0); + } - System.out.println("Ended"); - } finally { - runner.stop(); - clearOutDirs(inputDir, interimResultDir, outputDir); - } + @Test + public void filters_results_by_BYTEARRAY_MATCHER_with_query_filter() throws Exception { + PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); + PcapOptions.FIELDS.put(configuration, "BYTEARRAY_MATCHER('2f56abd814bc56420489ca38e7faf8cec3d4', packet)"); + PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries)); + PcapOptions.END_TIME_NS + .put(configuration, getTimestamp(pcapEntries.size() - 1, pcapEntries) + 1); + PcapJob<String> job = new PcapJob<>(); + Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); + Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); + waitForJob(results); + + Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); + Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { + try { + return HDFSUtils.readBytes(path); + } catch (IOException e) { + throw new IllegalStateException(e); + } + }); + assertInOrder(bytes); + Assert.assertEquals(1, results.get().getSize()); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0))); + Assert.assertTrue(baos.toByteArray().length > 0); } private void waitForJob(Statusable statusable) throws Exception { @@ -628,14 +681,6 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { throw new Exception("Job did not complete within " + (MAX_RETRIES * SLEEP_MS) + " seconds"); } - private File getDir(String targetDir, String childDir) { - File directory = new File(new File(targetDir), childDir); - if (!directory.exists()) { - directory.mkdirs(); - } - return directory; - } - private static Iterable<Map.Entry<byte[], byte[]>> readPcaps(Path pcapFile, boolean withHeaders) throws IOException { SequenceFile.Reader reader = new SequenceFile.Reader(new Configuration(), SequenceFile.Reader.file(pcapFile) @@ -655,28 +700,27 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { long calculatedTs = PcapHelper.getTimestamp(pcapWithHeader); { List<PacketInfo> info = PcapHelper.toPacketInfo(pcapWithHeader); - for(PacketInfo pi : info) { + for (PacketInfo pi : info) { Assert.assertEquals(calculatedTs, pi.getPacketTimeInNanos()); //IF you are debugging and want to see the packets, uncomment the following. //System.out.println( Long.toUnsignedString(calculatedTs) + " => " + pi.getJsonDoc()); } } - if(withHeaders) { + if (withHeaders) { ret.add(new AbstractMap.SimpleImmutableEntry<>(Bytes.toBytes(calculatedTs), pcapWithHeader)); - } - else { + } else { byte[] pcapRaw = new byte[pcapWithHeader.length - PcapHelper.GLOBAL_HEADER_SIZE - PcapHelper.PACKET_HEADER_SIZE]; System.arraycopy(pcapWithHeader, PcapHelper.GLOBAL_HEADER_SIZE + PcapHelper.PACKET_HEADER_SIZE, pcapRaw, 0, pcapRaw.length); ret.add(new AbstractMap.SimpleImmutableEntry<>(Bytes.toBytes(calculatedTs), pcapRaw)); } } - return Iterables.limit(ret, 2*(ret.size()/2)); + return Iterables.limit(ret, 2 * (ret.size() / 2)); } public static void assertInOrder(Iterable<byte[]> packets) { long previous = 0; - for(byte[] packet : packets) { - for(JSONObject json : TO_JSONS.apply(packet)) { + for (byte[] packet : packets) { + for (JSONObject json : TO_JSONS.apply(packet)) { Long current = Long.parseLong(json.get("ts_micro").toString()); Assert.assertNotNull(current); Assert.assertTrue(Long.compareUnsigned(current, previous) >= 0); http://git-wip-us.apache.org/repos/asf/metron/blob/7967f358/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java index c912e58..e4e9b95 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java @@ -37,7 +37,7 @@ public class PcapCliFinalizer extends PcapFinalizer { * as a formatted timestamp + uuid. A final sample format will look as follows: * /base/output/path/pcap-data-201807181911-09855b4ae3204dee8b63760d65198da3+0001.pcap */ - private static final String PCAP_CLI_FILENAME_FORMAT = "pcap-data-%s+%04d.pcap"; + private static final String PCAP_CLI_FILENAME_FORMAT = "%s/pcap-data-%s+%04d.pcap"; @Override protected void write(PcapResultsWriter resultsWriter, Configuration hadoopConfig, @@ -47,8 +47,9 @@ public class PcapCliFinalizer extends PcapFinalizer { @Override protected Path getOutputPath(Map<String, Object> config, int partition) { + Path finalOutputPath = PcapOptions.FINAL_OUTPUT_PATH.get(config, PcapOptions.STRING_TO_PATH, Path.class); String prefix = PcapOptions.FINAL_FILENAME_PREFIX.get(config, String.class); - return new Path(String.format(PCAP_CLI_FILENAME_FORMAT, prefix, partition)); + return new Path(String.format(PCAP_CLI_FILENAME_FORMAT, finalOutputPath, prefix, partition)); } }