METRON-870: Add filtering by packet payload to the pcap query closes apache/incubator-metron#541
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/bf2528fd Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/bf2528fd Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/bf2528fd Branch: refs/heads/master Commit: bf2528fd3f4d474164ddf497459cd2421df3e4bb Parents: 5dd8788 Author: cstella <[email protected]> Authored: Thu Apr 27 12:20:45 2017 -0400 Committer: cstella <[email protected]> Committed: Thu Apr 27 12:20:45 2017 -0400 ---------------------------------------------------------------------- dependencies_with_url.csv | 1 + .../pcapservice/PcapReceiverImplRestEasy.java | 24 ++- .../PcapReceiverImplRestEasyTest.java | 85 ++++---- .../org/apache/metron/common/Constants.java | 7 +- .../stellar/GeoEnrichmentFunctions.java | 2 +- metron-platform/metron-pcap-backend/README.md | 29 ++- .../metron/pcap/query/FixedCliConfig.java | 12 +- .../metron/pcap/query/FixedCliParser.java | 15 +- .../org/apache/metron/utils/PcapInspector.java | 11 +- .../apache/metron/pcap/FixedPcapFilterTest.java | 211 ++++++++++--------- .../apache/metron/pcap/QueryPcapFilterTest.java | 121 +++++------ .../PcapTopologyIntegrationTest.java | 42 +++- .../apache/metron/pcap/query/PcapCliTest.java | 47 +++-- metron-platform/metron-pcap/pom.xml | 19 ++ .../java/org/apache/metron/pcap/PacketInfo.java | 25 ++- .../java/org/apache/metron/pcap/PcapHelper.java | 38 +++- .../metron/pcap/filter/PcapFieldResolver.java | 8 +- .../apache/metron/pcap/filter/PcapFilter.java | 2 +- .../pcap/filter/fixed/FixedPcapFilter.java | 71 ++++++- .../pcap/filter/query/QueryPcapFilter.java | 7 +- .../pcap/pattern/ByteArrayMatcherFunction.java | 63 ++++++ .../pcap/pattern/ByteArrayMatchingUtil.java | 64 ++++++ .../pcap/filter/fixed/FixedPcapFilterTest.java | 22 +- .../pcap/pattern/ByteArrayMatchingUtilTest.java | 133 ++++++++++++ 24 files changed, 754 insertions(+), 305 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/dependencies_with_url.csv ---------------------------------------------------------------------- diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv index 053f11a..93a19b7 100644 --- a/dependencies_with_url.csv +++ b/dependencies_with_url.csv @@ -293,5 +293,6 @@ org.atteo.classindex:classindex:jar:3.3:compile,ASLv2,https://github.com/atteo/c com.squareup.okhttp:okhttp:jar:2.4.0:compile,ASLv2,https://github.com/square/okhttp com.squareup.okio:okio:jar:1.4.0:compile,ASLv2,https://github.com/square/okhttp org.htrace:htrace-core:jar:3.0.4:compile,ASLv2,http://htrace.incubator.apache.org/ +net.byteseek:byteseek:jar:2.0.3:compile,BSD,https://github.com/nishihatapalmer/byteseek org.springframework.security.kerberos:spring-security-kerberos-client:jar:1.0.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security-kerberos org.springframework.security.kerberos:spring-security-kerberos-core:jar:1.0.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security-kerberos http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java index 1f3c03e..9c58813 100644 --- a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java +++ b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java @@ -27,6 +27,7 @@ import org.apache.log4j.Logger; import org.apache.metron.common.Constants; import org.apache.metron.common.hadoop.SequenceFileIterable; import org.apache.metron.common.utils.timestamp.TimestampConverters; +import org.apache.metron.pcap.PcapHelper; import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; import org.apache.metron.pcap.filter.query.QueryPcapFilter; import org.apache.metron.pcap.mr.PcapJob; @@ -37,10 +38,7 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.EnumMap; -import java.util.List; +import java.util.*; @Path("/") public class PcapReceiverImplRestEasy { @@ -197,6 +195,7 @@ public class PcapReceiverImplRestEasy { @DefaultValue("-1") @QueryParam ("endTime")long endTime, @DefaultValue("10") @QueryParam ("numReducers")int numReducers, @DefaultValue("false") @QueryParam ("includeReverseTraffic") boolean includeReverseTraffic, + @DefaultValue("") @QueryParam ("packetFilter") String packetFilter, @Context HttpServletResponse servlet_response) throws IOException { @@ -225,23 +224,26 @@ public class PcapReceiverImplRestEasy { //convert to nanoseconds since the epoch startTime = TimestampConverters.MILLISECONDS.toNanoseconds(startTime); endTime = TimestampConverters.MILLISECONDS.toNanoseconds(endTime); - EnumMap<Constants.Fields, String> query = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{ + Map<String, String> query = new HashMap<String, String>() {{ if(srcIp != null) { - put(Constants.Fields.SRC_ADDR, srcIp); + put(Constants.Fields.SRC_ADDR.getName(), srcIp); } if(dstIp != null) { - put(Constants.Fields.DST_ADDR, dstIp); + put(Constants.Fields.DST_ADDR.getName(), dstIp); } if(srcPort != null) { - put(Constants.Fields.SRC_PORT, srcPort); + put(Constants.Fields.SRC_PORT.getName(), srcPort); } if(dstPort != null) { - put(Constants.Fields.DST_PORT, dstPort); + put(Constants.Fields.DST_PORT.getName(), dstPort); } if(protocol != null) { - put(Constants.Fields.PROTOCOL, protocol); + put(Constants.Fields.PROTOCOL.getName(), protocol); + } + put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "" + includeReverseTrafficF); + if(!org.apache.commons.lang3.StringUtils.isEmpty(packetFilter)) { + put(PcapHelper.PacketFields.PACKET_FILTER.getName(), packetFilter); } - put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "" + includeReverseTrafficF); }}; if(LOGGER.isDebugEnabled()) { LOGGER.debug("Query received: " + Joiner.on(",").join(query.entrySet())); http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java index dba87cf..ea6db70 100644 --- a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java +++ b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path; import org.apache.metron.common.Constants; import org.apache.metron.common.hadoop.SequenceFileIterable; import org.apache.metron.common.utils.timestamp.TimestampConverters; +import org.apache.metron.pcap.PcapHelper; import org.apache.metron.pcap.filter.PcapFilterConfigurator; import org.apache.metron.pcap.mr.PcapJob; import org.junit.Assert; @@ -32,6 +33,7 @@ import org.junit.Test; import java.io.IOException; import java.util.EnumMap; +import java.util.Map; public class PcapReceiverImplRestEasyTest { @@ -65,7 +67,7 @@ public class PcapReceiverImplRestEasyTest { } } - final MockQueryHandler<EnumMap<Constants.Fields, String>> fixedQueryHandler = new MockQueryHandler<EnumMap<Constants.Fields, String>>(); + final MockQueryHandler<Map<String, String>> fixedQueryHandler = new MockQueryHandler<>(); final MockQueryHandler<String> queryQueryHandler = new MockQueryHandler<String>(); PcapReceiverImplRestEasy fixedRestEndpoint = new PcapReceiverImplRestEasy() {{ this.queryUtil = fixedQueryHandler; @@ -89,32 +91,35 @@ public class PcapReceiverImplRestEasyTest { String dstPort = "100"; long startTime = 100; long endTime = 1000; + String query = "`blah`"; { boolean includeReverseTraffic = false; - fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null); + fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, query, null); Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath); Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath); - Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR)); - Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR)); - Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT)); - Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT)); + Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR.getName())); + Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR.getName())); + Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT.getName())); + Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT.getName())); Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), fixedQueryHandler.beginNS); Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), fixedQueryHandler.endNS); - Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC))); + Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName()))); + Assert.assertEquals(query, fixedQueryHandler.fields.get(PcapHelper.PacketFields.PACKET_FILTER.getName())); } { boolean includeReverseTraffic = true; - fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null); + fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, query, null); Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath); Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath); - Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR)); - Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR)); - Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT)); - Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT)); + Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR.getName())); + Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR.getName())); + Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT.getName())); + Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT.getName())); Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), fixedQueryHandler.beginNS); Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), fixedQueryHandler.endNS); - Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC))); + Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName()))); + Assert.assertEquals(query, fixedQueryHandler.fields.get(PcapHelper.PacketFields.PACKET_FILTER.getName())); } } @@ -141,16 +146,18 @@ public class PcapReceiverImplRestEasyTest { long startTime = 100; long endTime = 1000; boolean includeReverseTraffic = false; - fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null); + String query = "`metron`"; + fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, query, null); Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath); Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath); - Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR)); - Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR)); - Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT)); - Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT)); + Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR.getName())); + Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR.getName())); + Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT.getName())); + Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT.getName())); Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), fixedQueryHandler.beginNS); Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), fixedQueryHandler.endNS); - Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC))); + Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName()))); + Assert.assertEquals(query, fixedQueryHandler.fields.get(PcapHelper.PacketFields.PACKET_FILTER.getName())); } @Test @@ -162,17 +169,19 @@ public class PcapReceiverImplRestEasyTest { String dstPort = "100"; long startTime = 100; long endTime = 1000; + String query = null; boolean includeReverseTraffic = false; - fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null); + fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, query, null); Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath); Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath); - Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR)); - Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR)); - Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT)); - Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT)); + Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR.getName())); + Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR.getName())); + Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT.getName())); + Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT.getName())); Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), fixedQueryHandler.beginNS); Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), fixedQueryHandler.endNS); - Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC))); + Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName()))); + Assert.assertEquals(query, fixedQueryHandler.fields.get(PcapHelper.PacketFields.PACKET_FILTER.getName())); } @Test @@ -185,17 +194,19 @@ public class PcapReceiverImplRestEasyTest { long startTime = -1; long endTime = 1000; { + String query = ""; boolean includeReverseTraffic = false; - fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null); + fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, query, null); Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath); Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath); - Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR)); - Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR)); - Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT)); - Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT)); + Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR.getName())); + Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR.getName())); + Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT.getName() )); + Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT.getName())); Assert.assertEquals(0, fixedQueryHandler.beginNS); Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), fixedQueryHandler.endNS); - Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC))); + Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName()))); + Assert.assertEquals(null, fixedQueryHandler.fields.get(PcapHelper.PacketFields.PACKET_FILTER.getName())); } { String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'"; @@ -219,16 +230,16 @@ public class PcapReceiverImplRestEasyTest { long endTime = -1; { boolean includeReverseTraffic = false; - fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null); + fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null, null); Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath); Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath); - Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR)); - Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR)); - Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT)); - Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT)); + Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR.getName())); + Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR.getName())); + Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT.getName())); + Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT.getName())); Assert.assertEquals(0, fixedQueryHandler.beginNS); Assert.assertTrue(fixedQueryHandler.endNS > 0); - Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC))); + Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName()))); } { String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'"; http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java index 1dc73da..c2ede49 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java @@ -17,7 +17,9 @@ */ package org.apache.metron.common; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; public class Constants { @@ -33,7 +35,10 @@ public class Constants { public static final String SIMPLE_HBASE_THREAT_INTEL = "hbaseThreatIntel"; public static final String GUID = "guid"; - public static enum Fields { + public interface Field { + String getName(); + } + public enum Fields implements Field { SRC_ADDR("ip_src_addr") ,SRC_PORT("ip_src_port") ,DST_ADDR("ip_dst_addr") http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/GeoEnrichmentFunctions.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/GeoEnrichmentFunctions.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/GeoEnrichmentFunctions.java index 11e024e..42913b2 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/GeoEnrichmentFunctions.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/GeoEnrichmentFunctions.java @@ -36,7 +36,7 @@ public class GeoEnrichmentFunctions { ,namespace="GEO" ,description="Look up an IPV4 address and returns geographic information about it" ,params = { - "ip - The IPV4 address to lookup" + + "ip - The IPV4 address to lookup", "fields - Optional list of GeoIP fields to grab. Options are locID, country, city, postalCode, dmaCode, latitude, longitude, location_point" } ,returns = "If a Single field is requested a string of the field, If multiple fields a map of string of the fields, and null otherwise" http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-pcap-backend/README.md ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/README.md b/metron-platform/metron-pcap-backend/README.md index e1a2683..5b554b5 100644 --- a/metron-platform/metron-pcap-backend/README.md +++ b/metron-platform/metron-pcap-backend/README.md @@ -44,11 +44,13 @@ sequence files. ## Configuration The configuration file for the Flux topology is located at -`$METRON_HOME/config/etc/env/pcap.properties` and the possible options +`$METRON_HOME/config/pcap.properties` and the possible options are as follows: * `spout.kafka.topic.pcap` : The kafka topic to listen to +* `storm.auto.credentials` : The kerberos ticket renewal. If running on a kerberized cluster, this should be `['org.apache.storm.security.auth.kerberos.AutoTGT']` +* `kafka.security.protocol` : The security protocol to use for kafka. This should be `PLAINTEXT` for a non-kerberized cluster and probably `SASL_PLAINTEXT` for a kerberized cluster. * `kafka.zk` : The comma separated zookeeper quorum (i.e. host:2181,host2:2181) -* `kafka.pcap.start` : One of `START`, `END`, `WHERE_I_LEFT_OFF` representing where to start listening on the queue. +* `kafka.pcap.start` : One of `EARLIEST`, `LATEST`, `UNCOMMITTED_EARLIEST`, `UNCOMMITTED_LATEST` representing where to start listening on the queue. * `kafka.pcap.numPackets` : The number of packets to keep in one file. * `kafka.pcap.maxTimeMS` : The number of packets to keep in one file in terms of duration (in milliseconds). For instance, you may only want to keep an hour's worth of packets in a given file. * `kafka.pcap.ts_scheme` : One of `FROM_KEY` or `FROM_VALUE`. You really only want `FROM_KEY` as that fits the current tooling. `FROM_VALUE` assumes that fully headerized packets are coming in on the value, which is legacy. @@ -78,7 +80,7 @@ usage: PcapInspector ### Query Filter Utility This tool exposes the two methods for filtering PCAP data via a command line tool: - fixed -- query (Metron Stellar) +- query (via Stellar) The tool is executed via ``` @@ -97,6 +99,7 @@ usage: Fixed filter options and end_time. Default is to use time in millis since the epoch. -dp,--ip_dst_port <arg> Destination port + -pf,--packet_filter <arg> Packet filter regex -et,--end_time <arg> Packet end time range. Default is current system time. -nr,--num_reducers <arg> The number of reducers to use. Default @@ -127,3 +130,23 @@ usage: Query filter options -q,--query <arg> Query string to use as a filter -st,--start_time <arg> (required) Packet start time range. ``` + +The Query filter's `--query` argument specifies the Stellar expression to +execute on each packet. To interact with the packet, a few variables are exposed: +* `packet` : The packet data (a `byte[]`) +* `ip_src_addr` : The source address for the packet (a `String`) +* `ip_src_port` : The source port for the packet (an `Integer`) +* `ip_dst_addr` : The destination address for the packet (a `String`) +* `ip_dst_port` : The destination port for the packet (an `Integer`) + +#### Binary Regex + +Filtering can be done both by the packet header as well as via a binary regular expression +which can be run on the packet payload itself. This filter can be specified via: +* The `-pf` or `--packet_filter` options for the fixed query filter +* The `BYTEARRAY_MATCHER(pattern, data)` Stellar function. +The first argument is the regex pattern and the second argument is the data. +The packet data will be exposed via the`packet` variable in Stellar. + +The format of this regular expression is described [here](https://github.com/nishihatapalmer/byteseek/blob/master/sequencesyntax.md). + http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java index 897e0fd..df653e1 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java @@ -20,24 +20,26 @@ package org.apache.metron.pcap.query; import org.apache.metron.common.Constants; import java.util.EnumMap; +import java.util.LinkedHashMap; +import java.util.Map; public class FixedCliConfig extends CliConfig { - private EnumMap<Constants.Fields, String> fixedFields; + private Map<String, String> fixedFields; public FixedCliConfig() { - this.fixedFields = new EnumMap<>(Constants.Fields.class); + this.fixedFields = new LinkedHashMap<>(); } - public EnumMap<Constants.Fields, String> getFixedFields() { + public Map<String, String> getFixedFields() { return fixedFields; } - public void setFixedFields(EnumMap<Constants.Fields, String> fixedFields) { + public void setFixedFields(Map<String, String> fixedFields) { this.fixedFields = fixedFields; } - public void putFixedField(Constants.Fields key, String value) { + public void putFixedField(String key, String value) { String trimmedVal = value != null ? value.trim() : null; if (!isNullOrEmpty(trimmedVal)) { this.fixedFields.put(key, value); http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java index 1123cad..fda8692 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java @@ -21,6 +21,7 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.metron.common.Constants; +import org.apache.metron.pcap.PcapHelper; public class FixedCliParser extends CliParser { private Options fixedOptions; @@ -36,6 +37,7 @@ public class FixedCliParser extends CliParser { options.addOption(newOption("sp", "ip_src_port", true, "Source port")); options.addOption(newOption("dp", "ip_dst_port", true, "Destination port")); options.addOption(newOption("p", "protocol", true, "IP Protocol")); + options.addOption(newOption("pf", "packet_filter", true, "Packet Filter regex")); options.addOption(newOption("ir", "include_reverse", false, "Indicates if filter should check swapped src/dest addresses and IPs")); return options; } @@ -51,12 +53,13 @@ public class FixedCliParser extends CliParser { CommandLine commandLine = getParser().parse(fixedOptions, args); FixedCliConfig config = new FixedCliConfig(); super.parse(commandLine, config); - config.putFixedField(Constants.Fields.SRC_ADDR, commandLine.getOptionValue("ip_src_addr")); - config.putFixedField(Constants.Fields.DST_ADDR, commandLine.getOptionValue("ip_dst_addr")); - config.putFixedField(Constants.Fields.SRC_PORT, commandLine.getOptionValue("ip_src_port")); - config.putFixedField(Constants.Fields.DST_PORT, commandLine.getOptionValue("ip_dst_port")); - config.putFixedField(Constants.Fields.PROTOCOL, commandLine.getOptionValue("protocol")); - config.putFixedField(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, Boolean.toString(commandLine.hasOption("include_reverse"))); + config.putFixedField(Constants.Fields.SRC_ADDR.getName(), commandLine.getOptionValue("ip_src_addr")); + config.putFixedField(Constants.Fields.DST_ADDR.getName(), commandLine.getOptionValue("ip_dst_addr")); + config.putFixedField(Constants.Fields.SRC_PORT.getName(), commandLine.getOptionValue("ip_src_port")); + config.putFixedField(Constants.Fields.DST_PORT.getName(), commandLine.getOptionValue("ip_dst_port")); + config.putFixedField(Constants.Fields.PROTOCOL.getName(), commandLine.getOptionValue("protocol")); + config.putFixedField(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), Boolean.toString(commandLine.hasOption("include_reverse"))); + config.putFixedField(PcapHelper.PacketFields.PACKET_FILTER.getName(), commandLine.getOptionValue("packet_filter")); return config; } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java index ef11af8..f460db3 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java @@ -35,10 +35,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Date; -import java.util.EnumMap; -import java.util.List; +import java.util.*; public class PcapInspector { private static abstract class OptionHandler implements Function<String, Option> {} @@ -143,13 +140,13 @@ public class PcapInspector { long millis = Long.divideUnsigned(key.get(), 1000000); String ts = DATE_FORMAT.format(new Date(millis)); for(PacketInfo pi : PcapHelper.toPacketInfo(value.copyBytes())) { - EnumMap<Constants.Fields, Object> result = PcapHelper.packetToFields(pi); + Map<String, Object> result = PcapHelper.packetToFields(pi); List<String> fieldResults = new ArrayList<String>() {{ add("TS: " + ts); }}; for(Constants.Fields field : Constants.Fields.values()) { - if(result.containsKey(field)) { - fieldResults.add(field.getName() + ": " + result.get(field)); + if(result.containsKey(field.getName())) { + fieldResults.add(field.getName() + ": " + result.get(field.getName())); } } System.out.println(Joiner.on(",").join(fieldResults)); http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java index 218d143..84969d3 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java @@ -24,29 +24,30 @@ import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; import org.junit.Assert; import org.junit.Test; -import java.util.EnumMap; +import java.util.HashMap; +import java.util.Map; public class FixedPcapFilterTest { @Test public void testTrivialEquality() throws Exception { Configuration config = new Configuration(); - final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "src_ip"); - put(Constants.Fields.SRC_PORT, "0"); - put(Constants.Fields.DST_ADDR, "dst_ip"); - put(Constants.Fields.DST_PORT, "1"); - put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false"); + final Map<String, String> fields = new HashMap<String, String>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), "0"); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), "1"); + put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false"); }}; new FixedPcapFilter.Configurator().addToConfig(fields, config); { FixedPcapFilter filter = new FixedPcapFilter() { @Override - protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) { - return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "src_ip"); - put(Constants.Fields.SRC_PORT, 0); - put(Constants.Fields.DST_ADDR, "dst_ip"); - put(Constants.Fields.DST_PORT, 1); + protected Map<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); }}; } }; @@ -58,23 +59,23 @@ public class FixedPcapFilterTest { @Test public void testReverseTraffic() throws Exception { Configuration config = new Configuration(); - final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "src_ip"); - put(Constants.Fields.SRC_PORT, "0"); - put(Constants.Fields.DST_ADDR, "dst_ip"); - put(Constants.Fields.DST_PORT, "1"); - put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "true"); + final Map<String, String> fields = new HashMap<String, String>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), "0"); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), "1"); + put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "true"); }}; new FixedPcapFilter.Configurator().addToConfig(fields, config); { FixedPcapFilter filter = new FixedPcapFilter() { @Override - protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) { - return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "src_ip"); - put(Constants.Fields.SRC_PORT, 0); - put(Constants.Fields.DST_ADDR, "dst_ip"); - put(Constants.Fields.DST_PORT, 1); + protected Map<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); }}; } }; @@ -85,12 +86,12 @@ public class FixedPcapFilterTest { { FixedPcapFilter filter = new FixedPcapFilter() { @Override - protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) { - return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "dst_ip"); - put(Constants.Fields.SRC_PORT, 1); - put(Constants.Fields.DST_ADDR, "src_ip"); - put(Constants.Fields.DST_PORT, 0); + protected Map<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "dst_ip"); + put(Constants.Fields.SRC_PORT.getName(), 1); + put(Constants.Fields.DST_ADDR.getName(), "src_ip"); + put(Constants.Fields.DST_PORT.getName(), 0); }}; } }; @@ -101,12 +102,12 @@ public class FixedPcapFilterTest { { FixedPcapFilter filter = new FixedPcapFilter() { @Override - protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) { - return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "dst_ip"); - put(Constants.Fields.SRC_PORT, 0); - put(Constants.Fields.DST_ADDR, "src_ip"); - put(Constants.Fields.DST_PORT, 1); + protected Map<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "dst_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "src_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); }}; } }; @@ -117,22 +118,22 @@ public class FixedPcapFilterTest { @Test public void testMissingDstAddr() throws Exception { Configuration config = new Configuration(); - final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "src_ip"); - put(Constants.Fields.SRC_PORT, "0"); - put(Constants.Fields.DST_PORT, "1"); - put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false"); + final HashMap<String, String> fields = new HashMap<String, String>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), "0"); + put(Constants.Fields.DST_PORT.getName(), "1"); + put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false"); }}; new FixedPcapFilter.Configurator().addToConfig(fields, config); { FixedPcapFilter filter = new FixedPcapFilter() { @Override - protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) { - return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "src_ip"); - put(Constants.Fields.SRC_PORT, 0); - put(Constants.Fields.DST_ADDR, "dst_ip"); - put(Constants.Fields.DST_PORT, 1); + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); }}; } }; @@ -143,12 +144,12 @@ public void testMissingDstAddr() throws Exception { { FixedPcapFilter filter = new FixedPcapFilter() { @Override - protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) { - return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "src_ip1"); - put(Constants.Fields.SRC_PORT, 0); - put(Constants.Fields.DST_ADDR, "dst_ip"); - put(Constants.Fields.DST_PORT, 1); + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip1"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); }}; } }; @@ -159,22 +160,22 @@ public void testMissingDstAddr() throws Exception { @Test public void testMissingDstPort() throws Exception { Configuration config = new Configuration(); - final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "src_ip"); - put(Constants.Fields.SRC_PORT, "0"); - put(Constants.Fields.DST_ADDR, "dst_ip"); - put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false"); + final HashMap<String, String> fields = new HashMap<String, String>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), "0"); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false"); }}; new FixedPcapFilter.Configurator().addToConfig(fields, config); { FixedPcapFilter filter = new FixedPcapFilter() { @Override - protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) { - return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "src_ip"); - put(Constants.Fields.SRC_PORT, 0); - put(Constants.Fields.DST_ADDR, "dst_ip"); - put(Constants.Fields.DST_PORT, 1); + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); }}; } }; @@ -185,12 +186,12 @@ public void testMissingDstAddr() throws Exception { { FixedPcapFilter filter = new FixedPcapFilter() { @Override - protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) { - return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "src_ip"); - put(Constants.Fields.SRC_PORT, 0); - put(Constants.Fields.DST_ADDR, "dst_ip"); - put(Constants.Fields.DST_PORT, 100); + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 100); }}; } }; @@ -201,12 +202,12 @@ public void testMissingDstAddr() throws Exception { { FixedPcapFilter filter = new FixedPcapFilter() { @Override - protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) { - return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "src_ip"); - put(Constants.Fields.SRC_PORT, 100); - put(Constants.Fields.DST_ADDR, "dst_ip"); - put(Constants.Fields.DST_PORT, 100); + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 100); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 100); }}; } }; @@ -217,22 +218,22 @@ public void testMissingDstAddr() throws Exception { @Test public void testMissingSrcAddr() throws Exception { Configuration config = new Configuration(); - final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_PORT, "0"); - put(Constants.Fields.DST_ADDR, "dst_ip"); - put(Constants.Fields.DST_PORT, "1"); - put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false"); + final HashMap<String, String> fields = new HashMap<String, String>() {{ + put(Constants.Fields.SRC_PORT.getName(), "0"); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), "1"); + put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false"); }}; new FixedPcapFilter.Configurator().addToConfig(fields, config); { FixedPcapFilter filter = new FixedPcapFilter() { @Override - protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) { - return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "src_ip"); - put(Constants.Fields.SRC_PORT, 0); - put(Constants.Fields.DST_ADDR, "dst_ip"); - put(Constants.Fields.DST_PORT, 1); + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); }}; } }; @@ -243,22 +244,22 @@ public void testMissingDstAddr() throws Exception { @Test public void testMissingSrcPort() throws Exception { Configuration config = new Configuration(); - final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "src_ip"); - put(Constants.Fields.DST_ADDR, "dst_ip"); - put(Constants.Fields.DST_PORT, "1"); - put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false"); + final HashMap<String, String> fields = new HashMap<String, String>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), "1"); + put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false"); }}; new FixedPcapFilter.Configurator().addToConfig(fields, config); { FixedPcapFilter filter = new FixedPcapFilter() { @Override - protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) { - return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "src_ip"); - put(Constants.Fields.SRC_PORT, 0); - put(Constants.Fields.DST_ADDR, "dst_ip"); - put(Constants.Fields.DST_PORT, 1); + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); }}; } }; @@ -269,12 +270,12 @@ public void testMissingDstAddr() throws Exception { { FixedPcapFilter filter = new FixedPcapFilter() { @Override - protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) { - return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "src_ip"); - put(Constants.Fields.SRC_PORT, 100); - put(Constants.Fields.DST_ADDR, "dst_ip"); - put(Constants.Fields.DST_PORT, 1); + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 100); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); }}; } }; http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java index 5bb5d4a..7e3d55c 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java @@ -26,6 +26,7 @@ import org.junit.Assert; import org.junit.Test; import java.util.EnumMap; +import java.util.HashMap; public class QueryPcapFilterTest { @@ -37,12 +38,12 @@ public class QueryPcapFilterTest { { PcapFilter filter = new QueryPcapFilter() { @Override - protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) { - return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "src_ip"); - put(Constants.Fields.SRC_PORT, 0); - put(Constants.Fields.DST_ADDR, "dst_ip"); - put(Constants.Fields.DST_PORT, 1); + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); }}; } }; @@ -59,12 +60,12 @@ public class QueryPcapFilterTest { { PcapFilter filter = new QueryPcapFilter() { @Override - protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) { - return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "src_ip"); - put(Constants.Fields.SRC_PORT, 0); - put(Constants.Fields.DST_ADDR, "dst_ip"); - put(Constants.Fields.DST_PORT, 1); + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); }}; } }; @@ -81,12 +82,12 @@ public class QueryPcapFilterTest { { QueryPcapFilter filter = new QueryPcapFilter() { @Override - protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) { - return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "src_ip"); - put(Constants.Fields.SRC_PORT, 0); - put(Constants.Fields.DST_ADDR, "dst_ip"); - put(Constants.Fields.DST_PORT, 1); + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); }}; } }; @@ -97,12 +98,12 @@ public class QueryPcapFilterTest { { QueryPcapFilter filter = new QueryPcapFilter() { @Override - protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) { - return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "src_ip_no_match"); - put(Constants.Fields.SRC_PORT, 0); - put(Constants.Fields.DST_ADDR, "dst_ip"); - put(Constants.Fields.DST_PORT, 1); + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip_no_match"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); }}; } }; @@ -119,12 +120,12 @@ public class QueryPcapFilterTest { { QueryPcapFilter filter = new QueryPcapFilter() { @Override - protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) { - return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "src_ip"); - put(Constants.Fields.SRC_PORT, 0); - put(Constants.Fields.DST_ADDR, "dst_ip"); - put(Constants.Fields.DST_PORT, 1); + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); }}; } }; @@ -135,12 +136,12 @@ public class QueryPcapFilterTest { { QueryPcapFilter filter = new QueryPcapFilter() { @Override - protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) { - return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "src_ip"); - put(Constants.Fields.SRC_PORT, 0); - put(Constants.Fields.DST_ADDR, "dst_ip"); - put(Constants.Fields.DST_PORT, 100); + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 100); }}; } }; @@ -151,12 +152,12 @@ public class QueryPcapFilterTest { { QueryPcapFilter filter = new QueryPcapFilter() { @Override - protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) { - return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "src_ip"); - put(Constants.Fields.SRC_PORT, 100); - put(Constants.Fields.DST_ADDR, "dst_ip"); - put(Constants.Fields.DST_PORT, 100); + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 100); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 100); }}; } }; @@ -173,12 +174,12 @@ public class QueryPcapFilterTest { { QueryPcapFilter filter = new QueryPcapFilter() { @Override - protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) { - return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "src_ip"); - put(Constants.Fields.SRC_PORT, 0); - put(Constants.Fields.DST_ADDR, "dst_ip"); - put(Constants.Fields.DST_PORT, 1); + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); }}; } }; @@ -195,12 +196,12 @@ public class QueryPcapFilterTest { { QueryPcapFilter filter = new QueryPcapFilter() { @Override - protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) { - return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "src_ip"); - put(Constants.Fields.SRC_PORT, 0); - put(Constants.Fields.DST_ADDR, "dst_ip"); - put(Constants.Fields.DST_PORT, 1); + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); }}; } }; @@ -211,12 +212,12 @@ public class QueryPcapFilterTest { { QueryPcapFilter filter = new QueryPcapFilter() { @Override - protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) { - return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "src_ip"); - put(Constants.Fields.SRC_PORT, 100); - put(Constants.Fields.DST_ADDR, "dst_ip"); - put(Constants.Fields.DST_PORT, 1); + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 100); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); }}; } }; http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java index 84e7574..a869723 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java @@ -109,6 +109,13 @@ public class PcapTopologyIntegrationTest { BytesWritable value = new BytesWritable(); while (reader.next(key, value)) { byte[] pcapWithHeader = value.copyBytes(); + //if you are debugging and want the hex dump of the packets, uncomment the following: + + //for(byte b : pcapWithHeader) { + // System.out.print(String.format("%02x", b)); + //} + //System.out.println(""); + long calculatedTs = PcapHelper.getTimestamp(pcapWithHeader); { List<PacketInfo> info = PcapHelper.toPacketInfo(pcapWithHeader); @@ -274,7 +281,7 @@ public class PcapTopologyIntegrationTest { , getTimestamp(4, pcapEntries) , getTimestamp(5, pcapEntries) , 10 - , new EnumMap<>(Constants.Fields.class) + , new HashMap<>() , new Configuration() , FileSystem.get(new Configuration()) , new FixedPcapFilter.Configurator() @@ -307,8 +314,8 @@ public class PcapTopologyIntegrationTest { , getTimestamp(0, pcapEntries) , getTimestamp(1, pcapEntries) , 10 - , new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{ - put(Constants.Fields.DST_ADDR, "207.28.210.1"); + , new HashMap<String, String>() {{ + put(Constants.Fields.DST_ADDR.getName(), "207.28.210.1"); }} , new Configuration() , FileSystem.get(new Configuration()) @@ -342,8 +349,8 @@ public class PcapTopologyIntegrationTest { , getTimestamp(0, pcapEntries) , getTimestamp(1, pcapEntries) , 10 - , new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{ - put(Constants.Fields.PROTOCOL, "foo"); + , new HashMap<String, String>() {{ + put(Constants.Fields.PROTOCOL.getName(), "foo"); }} , new Configuration() , FileSystem.get(new Configuration()) @@ -377,7 +384,7 @@ public class PcapTopologyIntegrationTest { , getTimestamp(0, pcapEntries) , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 , 10 - , new EnumMap<>(Constants.Fields.class) + , new HashMap<>() , new Configuration() , FileSystem.get(new Configuration()) , new FixedPcapFilter.Configurator() @@ -409,8 +416,8 @@ public class PcapTopologyIntegrationTest { , getTimestamp(0, pcapEntries) , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 , 10 - , new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{ - put(Constants.Fields.DST_PORT, "22"); + , new HashMap<String, String>() {{ + put(Constants.Fields.DST_PORT.getName(), "22"); }} , new Configuration() , FileSystem.get(new Configuration()) @@ -433,6 +440,25 @@ public class PcapTopologyIntegrationTest { Assert.assertTrue(baos.toByteArray().length > 0); } { + //test with query filter and byte array matching + Iterable<byte[]> results = + job.query(new Path(outDir.getAbsolutePath()) + , new Path(queryDir.getAbsolutePath()) + , getTimestamp(0, pcapEntries) + , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 + , 10 + , "BYTEARRAY_MATCHER('2f56abd814bc56420489ca38e7faf8cec3d4', packet)" + , new Configuration() + , FileSystem.get(new Configuration()) + , new QueryPcapFilter.Configurator() + ); + assertInOrder(results); + Assert.assertEquals(1, Iterables.size(results)); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PcapMerger.merge(baos, Iterables.partition(results, 1).iterator().next()); + Assert.assertTrue(baos.toByteArray().length > 0); + } + { //test with query filter Iterable<byte[]> results = job.query(new Path(outDir.getAbsolutePath()) http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java index bad22e4..4f441f1 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java @@ -24,6 +24,7 @@ import org.apache.metron.common.Constants; import org.apache.metron.common.hadoop.SequenceFileIterable; import org.apache.metron.common.system.Clock; import org.apache.metron.common.utils.timestamp.TimestampConverters; +import org.apache.metron.pcap.PcapHelper; import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; import org.apache.metron.pcap.filter.query.QueryPcapFilter; import org.apache.metron.pcap.mr.PcapJob; @@ -70,7 +71,8 @@ public class PcapCliTest { "-ip_dst_addr", "192.168.1.2", "-ip_src_port", "8081", "-ip_dst_port", "8082", - "-protocol", "6" + "-protocol", "6", + "-packet_filter", "`casey`" }; List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")}); Iterator iterator = pcaps.iterator(); @@ -79,13 +81,14 @@ public class PcapCliTest { Path base_path = new Path(CliParser.BASE_PATH_DEFAULT); Path base_output_path = new Path(CliParser.BASE_OUTPUT_PATH_DEFAULT); - EnumMap<Constants.Fields, String> query = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "192.168.1.1"); - put(Constants.Fields.DST_ADDR, "192.168.1.2"); - put(Constants.Fields.SRC_PORT, "8081"); - put(Constants.Fields.DST_PORT, "8082"); - put(Constants.Fields.PROTOCOL, "6"); - put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false"); + HashMap<String, String> query = new HashMap<String, String>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1"); + put(Constants.Fields.DST_ADDR.getName(), "192.168.1.2"); + put(Constants.Fields.SRC_PORT.getName(), "8081"); + put(Constants.Fields.DST_PORT.getName(), "8082"); + put(Constants.Fields.PROTOCOL.getName(), "6"); + put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false"); + put(PcapHelper.PacketFields.PACKET_FILTER.getName(), "`casey`"); }}; when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable); @@ -120,13 +123,13 @@ public class PcapCliTest { Path base_path = new Path("/base/path"); Path base_output_path = new Path("/base/output/path"); - EnumMap<Constants.Fields, String> query = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "192.168.1.1"); - put(Constants.Fields.DST_ADDR, "192.168.1.2"); - put(Constants.Fields.SRC_PORT, "8081"); - put(Constants.Fields.DST_PORT, "8082"); - put(Constants.Fields.PROTOCOL, "6"); - put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "true"); + Map<String, String> query = new HashMap<String, String>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1"); + put(Constants.Fields.DST_ADDR.getName(), "192.168.1.2"); + put(Constants.Fields.SRC_PORT.getName(), "8081"); + put(Constants.Fields.DST_PORT.getName(), "8082"); + put(Constants.Fields.PROTOCOL.getName(), "6"); + put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "true"); }}; when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable); @@ -162,13 +165,13 @@ public class PcapCliTest { Path base_path = new Path("/base/path"); Path base_output_path = new Path("/base/output/path"); - EnumMap<Constants.Fields, String> query = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{ - put(Constants.Fields.SRC_ADDR, "192.168.1.1"); - put(Constants.Fields.DST_ADDR, "192.168.1.2"); - put(Constants.Fields.SRC_PORT, "8081"); - put(Constants.Fields.DST_PORT, "8082"); - put(Constants.Fields.PROTOCOL, "6"); - put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "true"); + Map<String, String> query = new HashMap<String, String>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1"); + put(Constants.Fields.DST_ADDR.getName(), "192.168.1.2"); + put(Constants.Fields.SRC_PORT.getName(), "8081"); + put(Constants.Fields.DST_PORT.getName(), "8082"); + put(Constants.Fields.PROTOCOL.getName(), "6"); + put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "true"); }}; long startAsNanos = asNanos("2016-06-13-18:35.00", "yyyy-MM-dd-HH:mm.ss"); http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-pcap/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/pom.xml b/metron-platform/metron-pcap/pom.xml index 16a34d1..73f57f0 100644 --- a/metron-platform/metron-pcap/pom.xml +++ b/metron-platform/metron-pcap/pom.xml @@ -26,6 +26,18 @@ </properties> <dependencies> <dependency> + <groupId>net.byteseek</groupId> + <artifactId>byteseek</artifactId> + <version>2.0.3</version> + <exclusions> + <!-- This is a LGPL dependency of byteseek, so we need to replace/mimic it minimally --> + <exclusion> + <groupId>net.sf.trove4j</groupId> + <artifactId>trove4j</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>org.apache.metron</groupId> <artifactId>metron-hbase</artifactId> <version>${project.parent.version}</version> @@ -126,6 +138,13 @@ <version>${project.parent.version}</version> </dependency> <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-common</artifactId> + <version>${project.parent.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${global_hadoop_version}</version> http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PacketInfo.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PacketInfo.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PacketInfo.java index fcaf1b0..f3a7e25 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PacketInfo.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PacketInfo.java @@ -37,6 +37,9 @@ import org.apache.metron.pcap.utils.PcapUtils; */ public class PacketInfo { + /** The packet data */ + private byte[] packetBytes = null; + /** The packetHeader. */ private PacketHeader packetHeader = null; @@ -158,9 +161,18 @@ public class PacketInfo { * the tcp packet * @param udpPacket * the udp packet + * @param packetBytes + * the raw packet data */ - public PacketInfo(GlobalHeader globalHeader, PacketHeader packetHeader, PcapPacket packet, Ipv4Packet ipv4Packet, TcpPacket tcpPacket, - UdpPacket udpPacket) { + public PacketInfo( GlobalHeader globalHeader + , PacketHeader packetHeader + , PcapPacket packet + , Ipv4Packet ipv4Packet + , TcpPacket tcpPacket + , UdpPacket udpPacket + , byte[] packetBytes + ) { + this.packetBytes = packetBytes; this.packetHeader = packetHeader; this.packet = packet; this.ipv4Packet = ipv4Packet; @@ -177,6 +189,15 @@ public class PacketInfo { public GlobalHeader getGlobalHeader() { return globalHeader; } + /** + * Gets the packet raw data. + * + * + * @return the packet data + */ + public byte[] getPacketBytes() { + return packetBytes; + } /** * Gets the packet header. http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java index e48824f..ebd7ac7 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java @@ -39,9 +39,7 @@ import org.krakenapps.pcap.util.ByteOrderConverter; import java.io.EOFException; import java.io.IOException; -import java.util.ArrayList; -import java.util.EnumMap; -import java.util.List; +import java.util.*; import static org.apache.metron.pcap.Constants.*; @@ -50,6 +48,22 @@ public class PcapHelper { public static final int PACKET_HEADER_SIZE = 4*Integer.BYTES; public static final int GLOBAL_HEADER_SIZE = 24; private static final Logger LOG = Logger.getLogger(PcapHelper.class); + + public enum PacketFields implements org.apache.metron.common.Constants.Field{ + PACKET_DATA("packet"), + PACKET_FILTER("packet_filter") + ; + String name; + PacketFields(String name) { + this.name = name; + } + + @Override + public String getName() { + return name; + } + } + public static ThreadLocal<MetronEthernetDecoder> ETHERNET_DECODER = new ThreadLocal<MetronEthernetDecoder>() { @Override protected MetronEthernetDecoder initialValue() { @@ -190,23 +204,25 @@ public class PcapHelper { System.arraycopy(packet, 0, ret, offset, packet.length); return ret; } - public static EnumMap<org.apache.metron.common.Constants.Fields, Object> packetToFields(PacketInfo pi) { - EnumMap<org.apache.metron.common.Constants.Fields, Object> ret = new EnumMap(org.apache.metron.common.Constants.Fields.class); + + public static Map<String, Object> packetToFields(PacketInfo pi) { + Map<String, Object> ret = new HashMap<>(); + ret.put(PacketFields.PACKET_DATA.getName(), pi.getPacketBytes()); if(pi.getTcpPacket() != null) { if(pi.getTcpPacket().getSourceAddress() != null) { - ret.put(org.apache.metron.common.Constants.Fields.SRC_ADDR, pi.getTcpPacket().getSourceAddress().getHostAddress()); + ret.put(org.apache.metron.common.Constants.Fields.SRC_ADDR.getName(), pi.getTcpPacket().getSourceAddress().getHostAddress()); } if(pi.getTcpPacket().getSource() != null ) { - ret.put(org.apache.metron.common.Constants.Fields.SRC_PORT, pi.getTcpPacket().getSource().getPort()); + ret.put(org.apache.metron.common.Constants.Fields.SRC_PORT.getName(), pi.getTcpPacket().getSource().getPort()); } if(pi.getTcpPacket().getDestinationAddress() != null ) { - ret.put(org.apache.metron.common.Constants.Fields.DST_ADDR, pi.getTcpPacket().getDestinationAddress().getHostAddress()); + ret.put(org.apache.metron.common.Constants.Fields.DST_ADDR.getName(), pi.getTcpPacket().getDestinationAddress().getHostAddress()); } if(pi.getTcpPacket().getDestination() != null ) { - ret.put(org.apache.metron.common.Constants.Fields.DST_PORT, pi.getTcpPacket().getDestination().getPort()); + ret.put(org.apache.metron.common.Constants.Fields.DST_PORT.getName(), pi.getTcpPacket().getDestination().getPort()); } if(pi.getIpv4Packet() != null) { - ret.put(org.apache.metron.common.Constants.Fields.PROTOCOL, pi.getIpv4Packet().getProtocol()); + ret.put(org.apache.metron.common.Constants.Fields.PROTOCOL.getName(), pi.getIpv4Packet().getProtocol()); } } return ret; @@ -281,7 +297,7 @@ public class PcapHelper { } packetInfoList.add(new PacketInfo(globalHeader, packetHeader, packet, - ipv4Packet, tcpPacket, udpPacket)); + ipv4Packet, tcpPacket, udpPacket, pcap)); } catch (NegativeArraySizeException ignored) { LOG.debug("Ignorable exception while parsing packet.", ignored); } catch (EOFException eof) { // $codepro.audit.disable logExceptions http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFieldResolver.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFieldResolver.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFieldResolver.java index 4aeec6c..e5a15e7 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFieldResolver.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFieldResolver.java @@ -22,17 +22,19 @@ import org.apache.metron.common.Constants; import org.apache.metron.common.dsl.VariableResolver; import java.util.EnumMap; +import java.util.HashMap; +import java.util.Map; public class PcapFieldResolver implements VariableResolver { - EnumMap<Constants.Fields, Object> fieldsMap = null; + Map<String, Object> fieldsMap = new HashMap<>(); - public PcapFieldResolver(EnumMap<Constants.Fields, Object> fieldsMap) { + public PcapFieldResolver(Map<String, Object> fieldsMap) { this.fieldsMap = fieldsMap; } @Override public Object resolve(String variable) { - return fieldsMap.get(Constants.Fields.fromString(variable)); + return fieldsMap.get(variable); } } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilter.java index c7168aa..6d775df 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilter.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilter.java @@ -23,6 +23,6 @@ import org.apache.metron.pcap.PacketInfo; import java.util.Map; import java.util.function.Predicate; -public interface PcapFilter extends Predicate<PacketInfo> { +public interface PcapFilter extends Predicate<PacketInfo>{ void configure(Iterable<Map.Entry<String, String>> config); }
