METRON-936: Fixes to pcap for performance and testing (mmiklavc via cestella) closes apache/incubator-metron#585
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/c0b08252 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/c0b08252 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/c0b08252 Branch: refs/heads/Metron_0.4.0 Commit: c0b0825235ce5c26649282c43f3c57f092864f94 Parents: be03076 Author: mmiklavc <[email protected]> Authored: Wed May 17 10:32:46 2017 -0400 Committer: cstella <[email protected]> Committed: Wed May 17 10:32:46 2017 -0400 ---------------------------------------------------------------------- metron-platform/metron-pcap-backend/README.md | 198 +++++++++++++++++++ .../src/main/config/pcap.properties | 10 +- .../src/main/flux/pcap/remote.yaml | 14 +- .../metron/spout/pcap/HDFSWriterCallback.java | 76 ++++--- .../metron/spout/pcap/HDFSWriterConfig.java | 47 +++++ .../metron/spout/pcap/PartitionHDFSWriter.java | 61 ++++-- .../pcap/deserializer/FromKeyDeserializer.java | 22 +-- .../deserializer/FromPacketDeserializer.java | 16 +- .../pcap/deserializer/KeyValueDeserializer.java | 15 +- .../org/apache/metron/utils/PcapInspector.java | 29 +-- .../PcapTopologyIntegrationTest.java | 9 +- .../deserializer/FromKeyDeserializerTest.java | 39 ++++ .../java/org/apache/metron/pcap/PcapHelper.java | 53 ++++- .../java/org/apache/metron/pcap/mr/PcapJob.java | 19 +- 14 files changed, 505 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/README.md ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/README.md b/metron-platform/metron-pcap-backend/README.md index 5b554b5..da19611 100644 --- a/metron-platform/metron-pcap-backend/README.md +++ b/metron-platform/metron-pcap-backend/README.md @@ -4,6 +4,16 @@ The purpose of the Metron PCAP backend is to create a storm topology capable of ingesting rapidly raw packet capture data directly into HDFS from Kafka. +* [Sensors](#the-sensors-feeding-kafka) +* [PCAP Topology](#the-pcap-topology) +* [HDFS Files](#the-files-on-hdfs) +* [Configuration](#configuration) +* [Starting the Topology](#starting-the-topology) +* [Utilities](#utilities) + * [Inspector Utility](#inspector-utility) + * [Query Filter Utility](#query-filter-utility) +* [Performance Tuning](#performance-tuning) + ## The Sensors Feeding Kafka This component must be fed by fast packet capture components upstream @@ -150,3 +160,191 @@ The packet data will be exposed via the`packet` variable in Stellar. The format of this regular expression is described [here](https://github.com/nishihatapalmer/byteseek/blob/master/sequencesyntax.md). +## Performance Tuning +The PCAP topology is extremely lightweight and functions as a Spout-only topology. In order to tune the topology, users currently must specify a combination of +properties in pcap.properties as well as configuration in the pcap remote.yaml flux file itself. Tuning the number of partitions in your Kafka topic +will have a dramatic impact on performance as well. We ran data into Kafka at 1.1 Gbps and our tests resulted in configuring 128 partitions for our kakfa topic +along with the following settings in pcap.properties and remote.yaml (unrelated properties for performance have been removed): + +### pcap.properties file +``` +spout.kafka.topic.pcap=pcap +storm.topology.workers=16 +kafka.spout.parallelism=128 +kafka.pcap.numPackets=1000000000 +kafka.pcap.maxTimeMS=0 +hdfs.replication=1 +hdfs.sync.every=10000 +``` +You'll notice that the number of kakfa partitions equals the spout parallelism, and this is no coincidence. The ordering guarantees for a partition in Kafka enforces that you may have no more +consumers than 1 per topic. Any additional parallelism will leave you with dormant threads consuming resources but performing no additional work. For our cluster with 4 Storm Supervisors, we found 16 workers to +provide optimal throughput as well. We were largely IO bound rather than CPU bound with the incoming PCAP data. + +### remote.yaml +In the flux file, we introduced the following configuration: + +``` +name: "pcap" +config: + topology.workers: ${storm.topology.workers} + topology.worker.childopts: ${topology.worker.childopts} + topology.auto-credentials: ${storm.auto.credentials} + topology.ackers.executors: 0 +components: + + # Any kafka props for the producer go here. + - id: "kafkaProps" + className: "java.util.HashMap" + configMethods: + - name: "put" + args: + - "value.deserializer" + - "org.apache.kafka.common.serialization.ByteArrayDeserializer" + - name: "put" + args: + - "key.deserializer" + - "org.apache.kafka.common.serialization.ByteArrayDeserializer" + - name: "put" + args: + - "group.id" + - "pcap" + - name: "put" + args: + - "security.protocol" + - "${kafka.security.protocol}" + - name: "put" + args: + - "poll.timeout.ms" + - 100 + - name: "put" + args: + - "offset.commit.period.ms" + - 30000 + - name: "put" + args: + - "session.timeout.ms" + - 30000 + - name: "put" + args: + - "max.uncommitted.offsets" + - 200000000 + - name: "put" + args: + - "max.poll.interval.ms" + - 10 + - name: "put" + args: + - "max.poll.records" + - 200000 + - name: "put" + args: + - "receive.buffer.bytes" + - 431072 + - name: "put" + args: + - "max.partition.fetch.bytes" + - 8097152 + + - id: "hdfsProps" + className: "java.util.HashMap" + configMethods: + - name: "put" + args: + - "io.file.buffer.size" + - 1000000 + - name: "put" + args: + - "dfs.blocksize" + - 1073741824 + + - id: "kafkaConfig" + className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder" + constructorArgs: + - ref: "kafkaProps" + # topic name + - "${spout.kafka.topic.pcap}" + - "${kafka.zk}" + configMethods: + - name: "setFirstPollOffsetStrategy" + args: + # One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST + - ${kafka.pcap.start} + + - id: "writerConfig" + className: "org.apache.metron.spout.pcap.HDFSWriterConfig" + configMethods: + - name: "withOutputPath" + args: + - "${kafka.pcap.out}" + - name: "withNumPackets" + args: + - ${kafka.pcap.numPackets} + - name: "withMaxTimeMS" + args: + - ${kafka.pcap.maxTimeMS} + - name: "withZookeeperQuorum" + args: + - "${kafka.zk}" + - name: "withSyncEvery" + args: + - ${hdfs.sync.every} + - name: "withReplicationFactor" + args: + - ${hdfs.replication} + - name: "withHDFSConfig" + args: + - ref: "hdfsProps" + - name: "withDeserializer" + args: + - "${kafka.pcap.ts_scheme}" + - "${kafka.pcap.ts_granularity}" +spouts: + - id: "kafkaSpout" + className: "org.apache.metron.spout.pcap.KafkaToHDFSSpout" + parallelism: ${kafka.spout.parallelism} + constructorArgs: + - ref: "kafkaConfig" + - ref: "writerConfig" + +``` + +#### Flux Changes Introduced + +##### Topology Configuration + +The only change here is `topology.ackers.executors: 0`, which disables Storm tuple acking for maximum throughput. + +##### Kafka configuration + +``` +poll.timeout.ms +offset.commit.period.ms +session.timeout.ms +max.uncommitted.offsets +max.poll.interval.ms +max.poll.records +receive.buffer.bytes +max.partition.fetch.bytes +``` + +##### Writer Configuration + +This is a combination of settings for the HDFSWriter (see pcap.properties values above) as well as HDFS. + +__HDFS config__ + +Component config HashMap with the following properties: +``` +io.file.buffer.size +dfs.blocksize +``` + +__Writer config__ + +References the HDFS props component specified above. +``` + - name: "withHDFSConfig" + args: + - ref: "hdfsProps" +``` + http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/main/config/pcap.properties ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/config/pcap.properties b/metron-platform/metron-pcap-backend/src/main/config/pcap.properties index 6e51dc5..7160178 100644 --- a/metron-platform/metron-pcap-backend/src/main/config/pcap.properties +++ b/metron-platform/metron-pcap-backend/src/main/config/pcap.properties @@ -15,12 +15,18 @@ # limitations under the License. spout.kafka.topic.pcap=pcap -storm.auto.credentials=[] +topology.worker.childopts= +topology.auto-credentials=[] +topology.workers=1 kafka.zk=node1:2181 +hdfs.sync.every=1 +hdfs.replication.factor=-1 kafka.security.protocol=PLAINTEXT -kafka.pcap.start=END +# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST +kafka.pcap.start=UNCOMMITTED_EARLIEST kafka.pcap.numPackets=1000 kafka.pcap.maxTimeMS=300000 kafka.pcap.ts_scheme=FROM_KEY kafka.pcap.out=/apps/metron/pcap kafka.pcap.ts_granularity=MICROSECONDS +kafka.spout.parallelism=1 http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml b/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml index 2b7e0fd..d7f6f2f 100644 --- a/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml +++ b/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml @@ -16,8 +16,9 @@ name: "pcap" config: - topology.workers: 1 - topology.auto-credentials: ${storm.auto.credentials} + topology.workers: ${topology.workers} + topology.worker.childopts: ${topology.worker.childopts} + topology.auto-credentials: ${topology.auto-credentials} components: @@ -53,7 +54,7 @@ components: - name: "setFirstPollOffsetStrategy" args: # One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST - - "UNCOMMITTED_EARLIEST" + - ${kafka.pcap.start} - id: "writerConfig" className: "org.apache.metron.spout.pcap.HDFSWriterConfig" @@ -70,6 +71,12 @@ components: - name: "withZookeeperQuorum" args: - "${kafka.zk}" + - name: "withSyncEvery" + args: + - ${hdfs.sync.every} + - name: "withReplicationFactor" + args: + - ${hdfs.replication.factor} - name: "withDeserializer" args: - "${kafka.pcap.ts_scheme}" @@ -77,6 +84,7 @@ components: spouts: - id: "kafkaSpout" className: "org.apache.metron.spout.pcap.KafkaToHDFSSpout" + parallelism: ${kafka.spout.parallelism} constructorArgs: - ref: "kafkaConfig" - ref: "writerConfig" http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java index 43cd7e0..a6823e6 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java @@ -19,14 +19,13 @@ package org.apache.metron.spout.pcap; import com.google.common.base.Joiner; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.log4j.Logger; +import org.apache.metron.spout.pcap.deserializer.KeyValueDeserializer; import org.apache.storm.kafka.Callback; import org.apache.storm.kafka.EmitContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.xml.bind.DatatypeConverter; -import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -38,7 +37,7 @@ import java.util.Map; */ public class HDFSWriterCallback implements Callback { static final long serialVersionUID = 0xDEADBEEFL; - private static final Logger LOG = Logger.getLogger(HDFSWriterCallback.class); + private static final Logger LOG = LoggerFactory.getLogger(HDFSWriterCallback.class); /** * A topic+partition. We split the files up by topic+partition so the writers don't clobber each other @@ -80,29 +79,12 @@ public class HDFSWriterCallback implements Callback { } } - /** - * This is a static container of threadlocal LongWritables and BytesWritables. This keeps us from having to create so - * many objects on the heap. The Deserializers update these for every packet. - */ - private static class KeyValue { - static ThreadLocal<LongWritable> key = new ThreadLocal<LongWritable> () { - @Override - protected LongWritable initialValue() { - return new LongWritable(); - } - }; - static ThreadLocal<BytesWritable> value = new ThreadLocal<BytesWritable> () { - @Override - protected BytesWritable initialValue() { - return new BytesWritable(); - } - }; - } private HDFSWriterConfig config; private EmitContext context; private Map<Partition, PartitionHDFSWriter> writers = new HashMap<>(); private PartitionHDFSWriter lastWriter = null; private String topic; + private boolean inited = false; public HDFSWriterCallback() { } @@ -116,35 +98,43 @@ public class HDFSWriterCallback implements Callback { public List<Object> apply(List<Object> tuple, EmitContext context) { byte[] key = (byte[]) tuple.get(0); byte[] value = (byte[]) tuple.get(1); - if(!config.getDeserializer().deserializeKeyValue(key, value, KeyValue.key.get(), KeyValue.value.get())) { - if(LOG.isDebugEnabled()) { - List<String> debugStatements = new ArrayList<>(); - if(key != null) { - debugStatements.add("Key length: " + key.length); - debugStatements.add("Key: " + DatatypeConverter.printHexBinary(key)); - } - else { - debugStatements.add("Key is null!"); - } - - if(value != null) { - debugStatements.add("Value length: " + value.length); - debugStatements.add("Value: " + DatatypeConverter.printHexBinary(value)); - } - else { - debugStatements.add("Value is null!"); - } - LOG.debug("Dropping malformed packet: " + Joiner.on(" / ").join(debugStatements)); + long tsDeserializeStart = System.nanoTime(); + KeyValueDeserializer.Result result = config.getDeserializer().deserializeKeyValue(key, value); + long tsDeserializeEnd = System.nanoTime(); + + if (LOG.isDebugEnabled() && !result.foundTimestamp) { + List<String> debugStatements = new ArrayList<>(); + if (key != null) { + debugStatements.add("Key length: " + key.length); + debugStatements.add("Key: " + DatatypeConverter.printHexBinary(key)); + } else { + debugStatements.add("Key is null!"); } + + if (value != null) { + debugStatements.add("Value length: " + value.length); + debugStatements.add("Value: " + DatatypeConverter.printHexBinary(value)); + } else { + debugStatements.add("Value is null!"); + } + LOG.debug("Dropping malformed packet: " + Joiner.on(" / ").join(debugStatements)); } + + long tsWriteStart = System.nanoTime(); try { getWriter(new Partition( topic , context.get(EmitContext.Type.PARTITION)) - ).handle(KeyValue.key.get(), KeyValue.value.get()); + ).handle(result.key, result.value); } catch (IOException e) { LOG.error(e.getMessage(), e); //drop? not sure.. } + long tsWriteEnd = System.nanoTime(); + if(LOG.isDebugEnabled() && (Math.random() < 0.001 || !inited)) { + LOG.debug("Deserialize time (ns): " + (tsDeserializeEnd - tsDeserializeStart)); + LOG.debug("Write time (ns): " + (tsWriteEnd - tsWriteStart)); + } + inited = true; return tuple; } http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java index 66bb359..b6a2809 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java @@ -25,7 +25,9 @@ import org.apache.metron.spout.pcap.deserializer.KeyValueDeserializer; import java.io.Serializable; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * Configure the HDFS Writer for PCap @@ -34,9 +36,12 @@ public class HDFSWriterConfig implements Serializable { static final long serialVersionUID = 0xDEADBEEFL; private long numPackets; private long maxTimeNS; + private int syncEvery = 1; + private int replicationFactor = -1; private String outputPath; private String zookeeperQuorum; private KeyValueDeserializer deserializer; + private Map<String, Object> hdfsConfig = new HashMap<>(); /** * Set the deserializer, the bit of logic that defines how the timestamp and packet are read. @@ -70,6 +75,36 @@ public class HDFSWriterConfig implements Serializable { } /** + * The number of packets to write before a file is rolled. + * @param n + * @return + */ + public HDFSWriterConfig withSyncEvery(int n) { + syncEvery = n; + return this; + } + + /** + * The map config for HDFS + * @param config + * @return + */ + public HDFSWriterConfig withHDFSConfig(Map<String, Object> config) { + hdfsConfig = config; + return this; + } + + /** + * The HDFS replication factor to use. A value of -1 will not set replication factor. + * @param n + * @return + */ + public HDFSWriterConfig withReplicationFactor(int n) { + replicationFactor = n; + return this; + } + + /** * The total amount of time (in ms) to write before a file is rolled. * @param t * @return @@ -103,6 +138,10 @@ public class HDFSWriterConfig implements Serializable { return out; } + public Map<String, Object> getHDFSConfig() { + return hdfsConfig; + } + public Integer getZookeeperPort() { if(zookeeperQuorum != null) { String hostPort = Iterables.getFirst(Splitter.on(',').split(zookeeperQuorum), null); @@ -112,6 +151,14 @@ public class HDFSWriterConfig implements Serializable { return null; } + public int getSyncEvery() { + return syncEvery; + } + + public int getReplicationFactor() { + return replicationFactor; + } + public KeyValueDeserializer getDeserializer() { return deserializer; } http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java index f0ea1eb..86697db 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java @@ -27,18 +27,23 @@ import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; -import org.apache.log4j.Logger; import org.apache.metron.pcap.PcapHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.io.*; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.Serializable; import java.util.EnumSet; +import java.util.Map; /** * This class is intended to handle the writing of an individual file. */ public class PartitionHDFSWriter implements AutoCloseable, Serializable { static final long serialVersionUID = 0xDEADBEEFL; - private static final Logger LOG = Logger.getLogger(PartitionHDFSWriter.class); + private static final Logger LOG = LoggerFactory.getLogger(PartitionHDFSWriter.class); public static interface SyncHandler { @@ -102,14 +107,43 @@ public class PartitionHDFSWriter implements AutoCloseable, Serializable { private SyncHandler syncHandler; private long batchStartTime; private long numWritten; + private Configuration fsConfig = new Configuration(); public PartitionHDFSWriter(String topic, int partition, String uuid, HDFSWriterConfig config) { this.topic = topic; this.partition = partition; this.uuid = uuid; this.config = config; + try { - this.fs = FileSystem.get(new Configuration()); + int replicationFactor = config.getReplicationFactor(); + if (replicationFactor > 0) { + fsConfig.set("dfs.replication", String.valueOf(replicationFactor)); + } + if(config.getHDFSConfig() != null && !config.getHDFSConfig().isEmpty()) { + for(Map.Entry<String, Object> entry : config.getHDFSConfig().entrySet()) { + if(entry.getValue() instanceof Integer) { + fsConfig.setInt(entry.getKey(), (int)entry.getValue()); + } + else if(entry.getValue() instanceof Boolean) + { + fsConfig.setBoolean(entry.getKey(), (Boolean) entry.getValue()); + } + else if(entry.getValue() instanceof Long) + { + fsConfig.setLong(entry.getKey(), (Long) entry.getValue()); + } + else if(entry.getValue() instanceof Float) + { + fsConfig.setFloat(entry.getKey(), (Float) entry.getValue()); + } + else + { + fsConfig.set(entry.getKey(), String.valueOf(entry.getValue())); + } + } + } + this.fs = FileSystem.get(fsConfig); } catch (IOException e) { throw new RuntimeException("Unable to get FileSystem", e); } @@ -119,11 +153,14 @@ public class PartitionHDFSWriter implements AutoCloseable, Serializable { return Long.toUnsignedString(ts); } - public void handle(LongWritable ts, BytesWritable value) throws IOException { - turnoverIfNecessary(ts.get()); - writer.append(ts, value); - syncHandler.sync(outputStream); + public void handle(long ts, byte[] value) throws IOException { + turnoverIfNecessary(ts); + BytesWritable bw = new BytesWritable(value); + writer.append(new LongWritable(ts), bw); numWritten++; + if(numWritten % config.getSyncEvery() == 0) { + syncHandler.sync(outputStream); + } } public String getTopic() { @@ -144,8 +181,8 @@ public class PartitionHDFSWriter implements AutoCloseable, Serializable { outputStream.close(); } } - private Path getPath(long ts) { + private Path getPath(long ts) { String fileName = PcapHelper.toFilename(topic, ts, partition + "", uuid); return new Path(config.getOutputPath(), fileName); } @@ -157,7 +194,7 @@ public class PartitionHDFSWriter implements AutoCloseable, Serializable { private void turnoverIfNecessary(long ts, boolean force) throws IOException { long duration = ts - batchStartTime; boolean initial = outputStream == null; - boolean overDuration = duration >= config.getMaxTimeNS(); + boolean overDuration = config.getMaxTimeNS() <= 0 ? false : Long.compareUnsigned(duration, config.getMaxTimeNS()) >= 0; boolean tooManyPackets = numWritten >= config.getNumPackets(); if(force || initial || overDuration || tooManyPackets ) { //turnover @@ -183,14 +220,14 @@ public class PartitionHDFSWriter implements AutoCloseable, Serializable { } } - writer = SequenceFile.createWriter(new Configuration() + writer = SequenceFile.createWriter(this.fsConfig , SequenceFile.Writer.keyClass(LongWritable.class) , SequenceFile.Writer.valueClass(BytesWritable.class) , SequenceFile.Writer.stream(outputStream) , SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE) ); //reset state - LOG.info("Turning over and writing to " + path); + LOG.info("Turning over and writing to {}: [duration={} NS, force={}, initial={}, overDuration={}, tooManyPackets={}]", path, duration, force, initial, overDuration, tooManyPackets); batchStartTime = ts; numWritten = 0; } http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializer.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializer.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializer.java index de1e24b..749d74c 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializer.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializer.java @@ -18,19 +18,18 @@ package org.apache.metron.spout.pcap.deserializer; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.log4j.Logger; import org.apache.metron.common.utils.timestamp.TimestampConverter; import org.apache.metron.pcap.PcapHelper; import org.apache.metron.spout.pcap.Endianness; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Extract the timestamp from the key and raw data from the packet. */ public class FromKeyDeserializer extends KeyValueDeserializer { - private static final Logger LOG = Logger.getLogger(FromKeyDeserializer.class); + private static final Logger LOG = LoggerFactory.getLogger(FromKeyDeserializer.class); private static Endianness endianness = Endianness.getNativeEndianness(); @@ -39,13 +38,12 @@ public class FromKeyDeserializer extends KeyValueDeserializer { } @Override - public boolean deserializeKeyValue(byte[] key, byte[] value, LongWritable outKey, BytesWritable outValue) { - Long ts = converter.toNanoseconds(fromBytes(key)); - outKey.set(ts); - byte[] packetHeaderized = PcapHelper.addPacketHeader(ts, value, endianness); - byte[] globalHeaderized= PcapHelper.addGlobalHeader(packetHeaderized, endianness); - outValue.set(globalHeaderized, 0, globalHeaderized.length); - return true; + public Result deserializeKeyValue(byte[] key, byte[] value) { + if (key == null) { + throw new IllegalArgumentException("Expected a key but none provided"); + } + long ts = converter.toNanoseconds(fromBytes(key)); + return new Result(ts, PcapHelper.addHeaders(ts, value, endianness), true); } /** @@ -65,6 +63,6 @@ public class FromKeyDeserializer extends KeyValueDeserializer { value |= (long)(b & 255); } - return Long.valueOf(value); + return value; } } http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromPacketDeserializer.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromPacketDeserializer.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromPacketDeserializer.java index 6098904..0ba92f8 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromPacketDeserializer.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromPacketDeserializer.java @@ -18,26 +18,24 @@ package org.apache.metron.spout.pcap.deserializer; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.log4j.Logger; import org.apache.metron.pcap.PcapHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Extract the timestamp and raw data from the packet. */ public class FromPacketDeserializer extends KeyValueDeserializer { - private static final Logger LOG = Logger.getLogger(FromPacketDeserializer.class); + private static final Logger LOG = LoggerFactory.getLogger(FromPacketDeserializer.class); @Override - public boolean deserializeKeyValue(byte[] key, byte[] value, LongWritable outKey, BytesWritable outValue) { + public Result deserializeKeyValue(byte[] key, byte[] value) { Long ts = PcapHelper.getTimestamp(value); if(ts != null) { - outKey.set(ts); - outValue.set(value, 0, value.length); - return true; + return new Result(ts, value, true); } else { - return false; + return new Result(ts, value, false); } } } http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/KeyValueDeserializer.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/KeyValueDeserializer.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/KeyValueDeserializer.java index 48bea87..311be04 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/KeyValueDeserializer.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/KeyValueDeserializer.java @@ -18,8 +18,6 @@ package org.apache.metron.spout.pcap.deserializer; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.metron.common.utils.timestamp.TimestampConverter; import org.apache.metron.common.utils.timestamp.TimestampConverters; @@ -28,6 +26,17 @@ import java.io.Serializable; public abstract class KeyValueDeserializer implements Serializable { protected TimestampConverter converter; + public static class Result { + public byte[] value; + public Long key; + public boolean foundTimestamp; + public Result(Long key, byte[] value, boolean foundTimestamp) { + this.key = key; + this.value = value; + this.foundTimestamp = foundTimestamp; + } + } + public KeyValueDeserializer() { this(TimestampConverters.MICROSECONDS); } @@ -36,6 +45,6 @@ public abstract class KeyValueDeserializer implements Serializable { this.converter = converter; } - public abstract boolean deserializeKeyValue(byte[] key, byte[] value, LongWritable outKey, BytesWritable outValue); + public abstract Result deserializeKeyValue(byte[] key, byte[] value); } http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java index f460db3..c887606 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java @@ -35,7 +35,10 @@ import javax.annotation.Nullable; import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; -import java.util.*; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; public class PcapInspector { private static abstract class OptionHandler implements Function<String, Option> {} @@ -136,20 +139,24 @@ public class PcapInspector { LongWritable key = new LongWritable(); BytesWritable value = new BytesWritable(); - for(int i = 0;(n < 0 || i < n) && reader.next(key, value);++i) { + for (int i = 0; (n < 0 || i < n) && reader.next(key, value); ++i) { long millis = Long.divideUnsigned(key.get(), 1000000); String ts = DATE_FORMAT.format(new Date(millis)); - for(PacketInfo pi : PcapHelper.toPacketInfo(value.copyBytes())) { - Map<String, Object> result = PcapHelper.packetToFields(pi); - List<String> fieldResults = new ArrayList<String>() {{ - add("TS: " + ts); - }}; - for(Constants.Fields field : Constants.Fields.values()) { - if(result.containsKey(field.getName())) { - fieldResults.add(field.getName() + ": " + result.get(field.getName())); + try { + for (PacketInfo pi : PcapHelper.toPacketInfo(value.copyBytes())) { + Map<String, Object> result = PcapHelper.packetToFields(pi); + List<String> fieldResults = new ArrayList<String>() {{ + add("TS: " + ts); + }}; + for (Constants.Fields field : Constants.Fields.values()) { + if (result.containsKey(field.getName())) { + fieldResults.add(field.getName() + ": " + result.get(field.getName())); + } } + System.out.println(Joiner.on(",").join(fieldResults)); } - System.out.println(Joiner.on(",").join(fieldResults)); + } catch (Exception e) { + System.out.println(String.format("Error: malformed packet #=%s, ts=%s, error msg=%s", i + 1, ts, e.getMessage())); } } } http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java index a869723..d6d54dc 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java @@ -213,14 +213,19 @@ public class PcapTopologyIntegrationTest { final List<Map.Entry<byte[], byte[]>> pcapEntries = Lists.newArrayList(readPcaps(pcapFile, withHeaders)); Assert.assertTrue(Iterables.size(pcapEntries) > 0); final Properties topologyProperties = new Properties() {{ + setProperty("topology.workers", "1"); + setProperty("topology.worker.childopts", ""); setProperty("spout.kafka.topic.pcap", KAFKA_TOPIC); - setProperty("kafka.pcap.start", "BEGINNING"); + setProperty("kafka.pcap.start", "EARLIEST"); setProperty("kafka.pcap.out", outDir.getAbsolutePath()); setProperty("kafka.pcap.numPackets", "2"); setProperty("kafka.pcap.maxTimeMS", "200000000"); setProperty("kafka.pcap.ts_granularity", "NANOSECONDS"); - setProperty("storm.auto.credentials", "[]"); + setProperty("kafka.spout.parallelism", "1"); + setProperty("topology.auto-credentials", "[]"); setProperty("kafka.security.protocol", "PLAINTEXT"); + setProperty("hdfs.sync.every", "1"); + setProperty("hdfs.replication.factor", "-1"); }}; updatePropertiesCallback.apply(topologyProperties); http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializerTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializerTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializerTest.java new file mode 100644 index 0000000..1d49103 --- /dev/null +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializerTest.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.spout.pcap.deserializer; + +import org.apache.metron.common.utils.timestamp.TimestampConverters; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class FromKeyDeserializerTest { + + @Rule + public final ExpectedException exception = ExpectedException.none(); + + @Test + public void empty_or_null_key_throws_illegal_argument_exception() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Expected a key but none provided"); + + FromKeyDeserializer deserializer = new FromKeyDeserializer(TimestampConverters.NANOSECONDS); + deserializer.deserializeKeyValue(null, null); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java index ebd7ac7..e1ad3ca 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java @@ -39,9 +39,10 @@ import org.krakenapps.pcap.util.ByteOrderConverter; import java.io.EOFException; import java.io.IOException; -import java.util.*; - -import static org.apache.metron.pcap.Constants.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; public class PcapHelper { @@ -81,6 +82,14 @@ public class PcapHelper { } } + /** + * + * @param topic + * @param timestamp + * @param partition kafka partition + * @param uuid + * @return filename in this format: pcap_topic_timestamp_partition_uuid, e.g. pcap_pcap_1494886105667571000_0_pcap-8-1494965816 + */ public static String toFilename(String topic, long timestamp, String partition, String uuid) { return Joiner.on("_").join("pcap" @@ -163,6 +172,44 @@ public class PcapHelper { } return null; } + + public static byte[] addHeaders(long tsNano, byte[] packet, Endianness endianness) { + byte[] ret = new byte[GLOBAL_HEADER_SIZE + PACKET_HEADER_SIZE + packet.length]; + byte[] globalHeader = getPcapGlobalHeader(endianness); + int offset = 0; + System.arraycopy(globalHeader, 0, ret, offset, GLOBAL_HEADER_SIZE); + offset += globalHeader.length; + { + boolean swapBytes = swapBytes(endianness); + long micros = Long.divideUnsigned(tsNano, 1000); + int secs = (int)(micros / 1000000); + int usec = (int)(micros % 1000000); + int capLen = packet.length; + { + byte[] b = Bytes.toBytes(swapBytes?ByteOrderConverter.swap(secs):secs); + System.arraycopy(b, 0, ret, offset, Integer.BYTES); + offset += Integer.BYTES; + } + { + byte[] b = Bytes.toBytes(swapBytes?ByteOrderConverter.swap(usec):usec); + System.arraycopy(b, 0, ret, offset, Integer.BYTES); + offset += Integer.BYTES; + } + { + byte[] b = Bytes.toBytes(swapBytes?ByteOrderConverter.swap(capLen):capLen); + System.arraycopy(b, 0, ret, offset, Integer.BYTES); + offset += Integer.BYTES; + } + { + byte[] b = Bytes.toBytes(swapBytes?ByteOrderConverter.swap(capLen):capLen); + System.arraycopy(b, 0, ret, offset, Integer.BYTES); + offset += Integer.BYTES; + } + } + System.arraycopy(packet, 0, ret, offset, packet.length); + return ret; + } + public static byte[] addGlobalHeader(byte[] packet, Endianness endianness) { byte[] globalHeader = getPcapGlobalHeader(endianness); byte[] ret = new byte[packet.length + GLOBAL_HEADER_SIZE]; http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java index 3d4a4b3..8d40e5f 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java @@ -52,6 +52,11 @@ public class PcapJob { public static final String START_TS_CONF = "start_ts"; public static final String END_TS_CONF = "end_ts"; public static final String WIDTH_CONF = "width"; + + public static enum PCAP_COUNTER { + MALFORMED_PACKET_COUNT + } + public static class PcapPartitioner extends Partitioner<LongWritable, BytesWritable> implements Configurable { private Configuration configuration; Long start = null; @@ -110,15 +115,23 @@ public class PcapJob { // object will result in the whole set being passed through if any pass the filter. We cannot serialize PacketInfo // objects back to byte arrays, otherwise we could support more than one packet. // Note: short-circuit findAny() func on stream - boolean send = filteredPacketInfo(value).findAny().isPresent(); + List<PacketInfo> packetInfos; + try { + packetInfos = PcapHelper.toPacketInfo(value.copyBytes()); + } catch(Exception e) { + // toPacketInfo is throwing RuntimeExceptions. Attempt to catch and count errors with malformed packets + context.getCounter(PCAP_COUNTER.MALFORMED_PACKET_COUNT).increment(1); + return; + } + boolean send = filteredPacketInfo(packetInfos).findAny().isPresent(); if (send) { context.write(key, value); } } } - private Stream<PacketInfo> filteredPacketInfo(BytesWritable value) throws IOException { - return PcapHelper.toPacketInfo(value.copyBytes()).stream().filter(filter); + private Stream<PacketInfo> filteredPacketInfo(List<PacketInfo> packetInfos) throws IOException { + return packetInfos.stream().filter(filter); } }
