METRON-937 Pycapa - Consume Messages from Begin, End, or Stored Offsets (nickwallen) closes apache/metron#570
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/8779eb3f Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/8779eb3f Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/8779eb3f Branch: refs/heads/Metron_0.4.0 Commit: 8779eb3fe6d2767d6b79d665adec735380cf2d61 Parents: 74bc236 Author: nickwallen <[email protected]> Authored: Thu Jun 1 11:15:42 2017 -0400 Committer: nickallen <[email protected]> Committed: Thu Jun 1 11:15:42 2017 -0400 ---------------------------------------------------------------------- metron-sensors/pycapa/README.md | 169 +++++++++++++----------- metron-sensors/pycapa/pycapa/consumer.py | 51 ++++++- metron-sensors/pycapa/pycapa/producer.py | 24 +++- metron-sensors/pycapa/pycapa/pycapa_cli.py | 34 +++-- 4 files changed, 179 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/8779eb3f/metron-sensors/pycapa/README.md ---------------------------------------------------------------------- diff --git a/metron-sensors/pycapa/README.md b/metron-sensors/pycapa/README.md index 6f41d63..fed1399 100644 --- a/metron-sensors/pycapa/README.md +++ b/metron-sensors/pycapa/README.md @@ -72,19 +72,22 @@ Pycapa has two primary runtime modes. ### Parameters ``` -$ pycapa -h +$ pycapa --help usage: pycapa [-h] [-p] [-c] [-k KAFKA_BROKERS] [-t KAFKA_TOPIC] - [-i NETWORK_IFACE] [-m MAX_PACKETS] [-pp PRETTY_PRINT] - [-ll LOG_LEVEL] [-X KAFKA_CONFIGS] [-s SNAPLEN] + [-o {begin,end,stored}] [-i NETWORK_IFACE] [-m MAX_PACKETS] + [-pp PRETTY_PRINT] [-ll LOG_LEVEL] [-X KAFKA_CONFIGS] + [-s SNAPLEN] optional arguments: -h, --help show this help message and exit -p, --producer sniff packets and send to kafka -c, --consumer read packets from kafka -k KAFKA_BROKERS, --kafka-broker KAFKA_BROKERS - kafka broker(s) + kafka broker(s) as host:port -t KAFKA_TOPIC, --kafka-topic KAFKA_TOPIC kafka topic + -o {begin,end,stored}, --kafka-offset {begin,end,stored} + kafka offset to consume from; default=end -i NETWORK_IFACE, --interface NETWORK_IFACE network interface to listen on -m MAX_PACKETS, --max-packets MAX_PACKETS @@ -92,91 +95,81 @@ optional arguments: -pp PRETTY_PRINT, --pretty-print PRETTY_PRINT pretty print every X packets -ll LOG_LEVEL, --log-level LOG_LEVEL - set the log level + set the log level; DEBUG, INFO, WARN -X KAFKA_CONFIGS define a kafka client parameter; key=value -s SNAPLEN, --snaplen SNAPLEN - snapshot length + capture only the first X bytes of each packet; + default=65535 ``` ### Examples -**Example**: Capture 10 packets from the `eth0` network interface and forward those to a Kafka topic called `pcap` running on `localhost:9092`. - ``` - $ pycapa --producer \ - --interface eth0 \ - --kafka-broker localhost:9092 \ - --kafka-topic pcap \ - --max-packets 10 - INFO:root:Connecting to Kafka; {'bootstrap.servers': 'localhost:9092', 'group.id': 'AWBHMIAESAHJ'} - INFO:root:Starting packet capture - INFO:root:Waiting for '10' message(s) to flush - INFO:root:'10' packet(s) in, '10' packet(s) out - ``` +#### Example 1 -**Example**: Capture packets until SIGINT is received. A SIGINT is the interrupt signal sent when entering CTRL-D in the console. - ``` - $ pycapa --producer \ - --interface eth0 \ - --kafka-broker localhost:9092 \ - --kafka-topic pcap - INFO:root:Connecting to Kafka; {'bootstrap.servers': 'localhost:9092', 'group.id': 'EULLGDOMZDCT'} - INFO:root:Starting packet capture - ^C - INFO:root:Clean shutdown process started - INFO:root:Waiting for '0' message(s) to flush - INFO:root:'7' packet(s) in, '7' packet(s) out - ``` +Capture 10 packets from the `eth0` network interface and forward those to a Kafka topic called `pcap` running on `localhost:9092`. The process will not terminate until all messages have been delivered to Kafka. -**Example**: While capturing packets, output diagnostic information every 10 packets. - ``` - $ pycapa --producer \ - --interface en0 \ - --kafka-broker localhost:9092 \ - --kafka-topic pcap \ - --pretty-print 10 - INFO:root:Connecting to Kafka; {'bootstrap.servers': 'localhost:9092', 'group.id': 'YMDSEEDIHVWD'} +``` +$ pycapa --producer \ + --interface eth0 \ + --kafka-broker localhost:9092 \ + --kafka-topic pcap \ + --max-packets 10 +INFO:root:Connecting to Kafka; {'bootstrap.servers': 'localhost:9092', 'group.id': 'AWBHMIAESAHJ'} +INFO:root:Starting packet capture +INFO:root:Waiting for '6' message(s) to flush +INFO:root:'10' packet(s) in, '10' packet(s) out +``` + +#### Example 2 + +Capture packets until SIGINT is received (the interrupt signal sent when entering CTRL-C in the console.) In this example, nothing will be reported as packets are captured and delivered to Kafka. Simply wait a few seconds, then type CTRL-C and the number of packets will be reported. + +``` +$ pycapa --producer \ + --interface en0 \ + --kafka-broker localhost:9092 \ + --kafka-topic pcap +INFO:root:Connecting to Kafka; {'bootstrap.servers': 'localhost:9092', 'group.id': 'EULLGDOMZDCT'} +INFO:root:Starting packet capture +^C +INFO:root:Clean shutdown process started +INFO:root:Waiting for '2' message(s) to flush +INFO:root:'21' packet(s) in, '21' packet(s) out +``` + +#### Example 3 + +While capturing packets, output diagnostic information every 5 packets. Diagnostics will report when packets have been received from the network interface and when they have been successfully delivered to Kafka. + +``` +$ pycapa --producer \ + --interface eth0 \ + --kafka-broker localhost:9092 \ + --kafka-topic pcap \ + --pretty-print 5 + INFO:root:Connecting to Kafka; {'bootstrap.servers': 'localhost:9092', 'group.id': 'UAWINMBDNQEH'} INFO:root:Starting packet capture - 10 packet(s) received - ac bc 32 bf 0d 43 b8 3e 59 8b 8a 8a 08 00 45 00 - 00 3c 00 00 40 00 40 06 b9 66 c0 a8 00 02 c0 a8 - 00 03 1f 7c d7 14 5f 8b 82 b4 a8 c5 f6 63 a0 12 - 38 90 59 cc 00 00 02 04 05 b4 04 02 08 0a 00 51 - 44 17 39 43 3e 9b 01 03 03 04 - 20 packet(s) received - 01 00 5e 00 00 fb ac bc 32 bf 0d 43 08 00 45 00 - 00 44 d2 09 00 00 ff 11 47 f8 c0 a8 00 03 e0 00 - 00 fb 14 e9 14 e9 00 30 69 fc 00 00 00 00 00 01 - 00 00 00 00 00 00 0b 5f 67 6f 6f 67 6c 65 63 61 - 73 74 04 5f 74 63 70 05 6c 6f 63 61 6c 00 00 0c - 80 01 + Packet received[5] + Packet delivered[5]: date=2017-05-08 14:48:54.474031 topic=pcap partition=0 offset=29086 len=42 + Packet received[10] + Packet received[15] + Packet delivered[10]: date=2017-05-08 14:48:58.879710 topic=pcap partition=0 offset=0 len=187 + Packet delivered[15]: date=2017-05-08 14:48:59.633127 topic=pcap partition=0 offset=0 len=43 + Packet received[20] + Packet delivered[20]: date=2017-05-08 14:49:01.949628 topic=pcap partition=0 offset=29101 len=134 + Packet received[25] ^C INFO:root:Clean shutdown process started - INFO:root:Waiting for '2' message(s) to flush - INFO:root:'20' packet(s) in, '20' packet(s) out - ``` + Packet delivered[25]: date=2017-05-08 14:49:03.589940 topic=pcap partition=0 offset=0 len=142 + INFO:root:Waiting for '1' message(s) to flush + INFO:root:'27' packet(s) in, '27' packet(s) out -**Example**: Consume 10 packets from the Kafka topic `pcap` running on `localhost:9092`, then pipe those into Wireshark for DPI. - ``` - $ pycapa --consumer \ - --kafka-broker localhost:9092 \ - --kafka-topic pcap \ - --max-packets 10 \ - | tshark -i - - Capturing on 'Standard input' - 1 0.000000 ArrisGro_0e:65:df â Apple_bf:0d:43 ARP 56 Who has 192.168.0.3? Tell 192.168.0.1 - 2 0.000044 Apple_bf:0d:43 â ArrisGro_0e:65:df ARP 42 192.168.0.3 is at ac:bc:32:bf:0d:43 - 3 0.203495 fe80::1286:8cff:fe0e:65df â ff02::1 ICMPv6 134 Router Advertisement from 10:86:8c:0e:65:df - 4 2.031988 192.168.0.3 â 96.27.183.249 TCP 54 55110 â 443 [ACK] Seq=1 Ack=1 Win=4108 Len=0 - 5 2.035816 192.30.253.125 â 192.168.0.3 TLSv1.2 97 Application Data - 6 2.035892 192.168.0.3 â 192.30.253.125 TCP 66 54671 â 443 [ACK] Seq=1 Ack=32 Win=4095 Len=0 TSval=961120495 TSecr=2658503052 - 7 2.035994 192.168.0.3 â 192.30.253.125 TLSv1.2 101 Application Data - 8 2.053866 96.27.183.249 â 192.168.0.3 TCP 66 [TCP ACKed unseen segment] 443 â 55110 [ACK] Seq=1 Ack=2 Win=243 Len=0 TSval=728145145 TSecr=961030381 - 9 2.083872 192.30.253.125 â 192.168.0.3 TCP 66 443 â 54671 [ACK] Seq=32 Ack=36 Win=31 Len=0 TSval=2658503087 TSecr=961120495 - 10 3.173189 fe80::1286:8cff:fe0e:65df â ff02::1 ICMPv6 134 Router Advertisement from 10:86:8c:0e:65:df - 10 packets captured - ``` +``` + +#### Example 4 + +Consume 10 packets and create a libpcap-compliant pcap file. -**Example**: Consume 10 packets and create a libpcap-compliant pcap file. ``` $ pycapa --consumer \ --kafka-broker localhost:9092 \ @@ -196,6 +189,30 @@ optional arguments: 10 2.494769 192.168.0.3 â 224.0.0.251 MDNS 82 Standard query 0x0000 PTR _googlecast._tcp.local, "QM" question ``` +#### Example 5 + +Consume 10 packets from the Kafka topic `pcap` running on `localhost:9092`, then pipe those into Wireshark for DPI. + +``` +$ pycapa --consumer \ + --kafka-broker localhost:9092 \ + --kafka-topic pcap \ + --max-packets 10 \ + | tshark -i - +Capturing on 'Standard input' + 1 0.000000 ArrisGro_0e:65:df â Apple_bf:0d:43 ARP 56 Who has 192.168.0.3? Tell 192.168.0.1 + 2 0.000044 Apple_bf:0d:43 â ArrisGro_0e:65:df ARP 42 192.168.0.3 is at ac:bc:32:bf:0d:43 + 3 0.203495 fe80::1286:8cff:fe0e:65df â ff02::1 ICMPv6 134 Router Advertisement from 10:86:8c:0e:65:df + 4 2.031988 192.168.0.3 â 96.27.183.249 TCP 54 55110 â 443 [ACK] Seq=1 Ack=1 Win=4108 Len=0 + 5 2.035816 192.30.253.125 â 192.168.0.3 TLSv1.2 97 Application Data + 6 2.035892 192.168.0.3 â 192.30.253.125 TCP 66 54671 â 443 [ACK] Seq=1 Ack=32 Win=4095 Len=0 TSval=961120495 TSecr=2658503052 + 7 2.035994 192.168.0.3 â 192.30.253.125 TLSv1.2 101 Application Data + 8 2.053866 96.27.183.249 â 192.168.0.3 TCP 66 [TCP ACKed unseen segment] 443 â 55110 [ACK] Seq=1 Ack=2 Win=243 Len=0 TSval=728145145 TSecr=961030381 + 9 2.083872 192.30.253.125 â 192.168.0.3 TCP 66 443 â 54671 [ACK] Seq=32 Ack=36 Win=31 Len=0 TSval=2658503087 TSecr=961120495 + 10 3.173189 fe80::1286:8cff:fe0e:65df â ff02::1 ICMPv6 134 Router Advertisement from 10:86:8c:0e:65:df +10 packets captured +``` + ### Kerberos The probe can be used in a Kerberized environment. Follow these additional steps to use Pycapa with Kerberos. The following assumptions have been made. These may need altered to fit your environment. http://git-wip-us.apache.org/repos/asf/metron/blob/8779eb3f/metron-sensors/pycapa/pycapa/consumer.py ---------------------------------------------------------------------- diff --git a/metron-sensors/pycapa/pycapa/consumer.py b/metron-sensors/pycapa/pycapa/consumer.py index 7029f25..484ae3c 100644 --- a/metron-sensors/pycapa/pycapa/consumer.py +++ b/metron-sensors/pycapa/pycapa/consumer.py @@ -23,7 +23,7 @@ import random import logging import time import struct -from confluent_kafka import Consumer, KafkaException, KafkaError +from confluent_kafka import Consumer, KafkaException, KafkaError, OFFSET_BEGINNING, OFFSET_END, OFFSET_STORED from common import to_date, to_hex, unpack_ts @@ -56,16 +56,51 @@ def packet_header(msg): return hdr +def seek_to_end(consumer, partitions): + """ Advance all partitions to the last offset. """ + + # advance to the end, ignoring any committed offsets + for p in partitions: + p.offset = OFFSET_END + consumer.assign(partitions) + + +def seek_to_begin(consumer, partitions): + """ Advance all partitions to the first offset. """ + + # advance to the end, ignoring any committed offsets + for p in partitions: + p.offset = OFFSET_BEGINNING + consumer.assign(partitions) + + +def seek_to_stored(consumer, partitions): + """ Advance all partitions to the stored offset. """ + + # advance to the end, ignoring any committed offsets + for p in partitions: + p.offset = OFFSET_STORED + consumer.assign(partitions) + + def consumer(args, poll_timeout=3.0): """ Consumes packets from a Kafka topic. """ # setup the signal handler signal.signal(signal.SIGINT, signal_handler) + # where to start consuming messages from + kafka_offset_options = { + "begin": seek_to_begin, + "end": seek_to_end, + "stored": seek_to_stored + } + on_assign_cb = kafka_offset_options[args.kafka_offset] + # connect to kafka logging.debug("Connecting to Kafka; %s", args.kafka_configs) kafka_consumer = Consumer(args.kafka_configs) - kafka_consumer.subscribe([args.kafka_topic]) + kafka_consumer.subscribe([args.kafka_topic], on_assign=on_assign_cb) # if 'pretty-print' not set, write libpcap global header if args.pretty_print == 0: @@ -85,8 +120,10 @@ def consumer(args, poll_timeout=3.0): elif msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: - logging.debug("reached end of topar: topic=%s, partition=%d, offset=%s", msg.topic(), msg.partition(), msg.offset()) - elif msg.error(): + if args.pretty_print > 0: + print "Reached end of topar: topic=%s, partition=%d, offset=%s" % ( + msg.topic(), msg.partition(), msg.offset()) + else: raise KafkaException(msg.error()) else: @@ -103,9 +140,9 @@ def consumer(args, poll_timeout=3.0): elif pkts_in % args.pretty_print == 0: # pretty print - print 'Packet: count=%s date=%s topic=%s' % ( - pkts_in, to_date(unpack_ts(msg.key())), args.kafka_topic) - print to_hex(msg.value()) + print 'Packet[%s]: date=%s topic=%s partition=%s offset=%s len=%s' % ( + pkts_in, to_date(unpack_ts(msg.key())), args.kafka_topic, + msg.partition(), msg.offset(), len(msg.value())) finally: sys.stdout.close() http://git-wip-us.apache.org/repos/asf/metron/blob/8779eb3f/metron-sensors/pycapa/pycapa/producer.py ---------------------------------------------------------------------- diff --git a/metron-sensors/pycapa/pycapa/producer.py b/metron-sensors/pycapa/pycapa/producer.py index 7374522..ec21fdc 100644 --- a/metron-sensors/pycapa/pycapa/producer.py +++ b/metron-sensors/pycapa/pycapa/producer.py @@ -21,11 +21,11 @@ import pcapy import argparse import random import logging -from common import to_date, to_hex, pack_ts +from common import to_date, to_hex, pack_ts, unpack_ts from confluent_kafka import Producer finished = threading.Event() - +producer_args = None def signal_handler(signum, frame): """ Initiates a clean shutdown for a SIGINT """ @@ -57,10 +57,18 @@ def delivery_callback(err, msg): if err: logging.error("message delivery failed: error=%s", err) - else: - logging.debug("message delivery succeeded: pkts_out=%d", delivery_callback.pkts_out) + + elif msg is not None: delivery_callback.pkts_out += 1 + pretty_print = 0 + pretty_print = producer_args.pretty_print + + if pretty_print > 0 and delivery_callback.pkts_out % pretty_print == 0: + print 'Packet delivered[%s]: date=%s topic=%s partition=%s offset=%s len=%s' % ( + delivery_callback.pkts_out, to_date(unpack_ts(msg.key())), msg.topic(), + msg.partition(), msg.offset(), len(msg.value())) + def producer(args, sniff_timeout_ms=500, sniff_promisc=True): """ Captures packets from a network interface and sends them to a Kafka topic. """ @@ -68,6 +76,9 @@ def producer(args, sniff_timeout_ms=500, sniff_promisc=True): # setup the signal handler signal.signal(signal.SIGINT, signal_handler) + global producer_args + producer_args = args + # connect to kafka logging.info("Connecting to Kafka; %s", args.kafka_configs) kafka_producer = Producer(args.kafka_configs) @@ -88,10 +99,9 @@ def producer(args, sniff_timeout_ms=500, sniff_promisc=True): pkt_ts = timestamp(pkt_hdr) kafka_producer.produce(args.kafka_topic, key=pack_ts(pkt_ts), value=pkt_raw, callback=delivery_callback) - # debug messages, if needed + # pretty print, if needed if args.pretty_print > 0 and pkts_in % args.pretty_print == 0: - print '{} packet(s) received'.format(pkts_in) - print to_hex(pkt_raw) + print 'Packet received[%s]' % (pkts_in) # serve the callback queue kafka_producer.poll(0) http://git-wip-us.apache.org/repos/asf/metron/blob/8779eb3f/metron-sensors/pycapa/pycapa/pycapa_cli.py ---------------------------------------------------------------------- diff --git a/metron-sensors/pycapa/pycapa/pycapa_cli.py b/metron-sensors/pycapa/pycapa/pycapa_cli.py index f650280..609205a 100644 --- a/metron-sensors/pycapa/pycapa/pycapa_cli.py +++ b/metron-sensors/pycapa/pycapa/pycapa_cli.py @@ -39,13 +39,19 @@ def make_parser(): default=False) parser.add_argument('-k', '--kafka-broker', - help='kafka broker(s)', + help='kafka broker(s) as host:port', dest='kafka_brokers') parser.add_argument('-t', '--kafka-topic', help='kafka topic', dest='kafka_topic') + parser.add_argument('-o', '--kafka-offset', + help='kafka offset to consume from; default=end', + dest='kafka_offset', + choices=['begin','end','stored'], + default='end') + parser.add_argument('-i', '--interface', help='network interface to listen on', dest='interface', @@ -64,7 +70,7 @@ def make_parser(): default=0) parser.add_argument('-ll', '--log-level', - help='set the log level', + help='set the log level; DEBUG, INFO, WARN', dest='log_level', default='INFO') @@ -75,7 +81,7 @@ def make_parser(): action='append') parser.add_argument('-s','--snaplen', - help="snapshot length", + help="capture only the first X bytes of each packet; default=65535", dest='snaplen', type=int, default=65535) @@ -96,13 +102,21 @@ def keyval(input, delim="="): def valid_args(args): """ Validates the command-line arguments. """ - if args.producer and args.kafka_brokers and args.kafka_topic and args.interface: - return True - elif args.consumer and args.kafka_brokers and args.kafka_topic: - return True - else: + if not args.producer and not args.consumer: + print "error: expected either --consumer or --producer \n" return False + elif args.producer and not (args.kafka_brokers and args.kafka_topic and args.interface): + print "error: missing required args: expected [--kafka-broker, --kafka-topic, --interface] \n" + return False + + elif args.consumer and not (args.kafka_brokers and args.kafka_topic): + print "error: missing required args: expected [--kafka-broker, --kafka-topic] \n" + return False + + else: + return True + def clean_kafka_configs(args): """ Cleans and transforms the Kafka client configs. """ @@ -116,7 +130,7 @@ def clean_kafka_configs(args): # boostrap servers can be set as a "-X bootstrap.servers=KAFKA:9092" or "-k KAFKA:9092" bootstrap_key = "bootstrap.servers" if(bootstrap_key not in configs): - configs[bootstrap_key] = args.kafka_brokers; + configs[bootstrap_key] = args.kafka_brokers # if no 'group.id', generate a random one group_key = "group.id" @@ -125,7 +139,9 @@ def clean_kafka_configs(args): args.kafka_configs = configs + def main(): + parser = make_parser() args = parser.parse_args()
