METRON-986 Enhance Fastcapa to Support Intel X520 (nickwallen) closes apache/metron#608
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/50e521b0 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/50e521b0 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/50e521b0 Branch: refs/heads/master Commit: 50e521b0077d098fdc89562d7aa9b95398ce7bb6 Parents: f7a94f2 Author: nickwallen <[email protected]> Authored: Fri Jul 7 12:27:27 2017 -0400 Committer: nickallen <[email protected]> Committed: Fri Jul 7 12:27:27 2017 -0400 ---------------------------------------------------------------------- .../roles/fastcapa/defaults/main.yml | 3 +- .../roles/fastcapa/templates/fastcapa | 88 ++-- metron-sensors/fastcapa/README.md | 48 +- metron-sensors/fastcapa/src/Makefile | 6 +- metron-sensors/fastcapa/src/args.c | 206 ++++----- metron-sensors/fastcapa/src/args.h | 104 +---- metron-sensors/fastcapa/src/kafka.c | 106 +++-- metron-sensors/fastcapa/src/kafka.h | 31 +- metron-sensors/fastcapa/src/main.c | 436 ++----------------- metron-sensors/fastcapa/src/main.h | 106 ----- metron-sensors/fastcapa/src/nic.c | 222 ++++++++++ metron-sensors/fastcapa/src/nic.h | 50 +++ metron-sensors/fastcapa/src/types.h | 120 ++++- metron-sensors/fastcapa/src/worker.c | 269 ++++++++++++ metron-sensors/fastcapa/src/worker.h | 51 +++ 15 files changed, 1017 insertions(+), 829 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-deployment/roles/fastcapa/defaults/main.yml ---------------------------------------------------------------------- diff --git a/metron-deployment/roles/fastcapa/defaults/main.yml b/metron-deployment/roles/fastcapa/defaults/main.yml index 19168a8..a7edc08 100644 --- a/metron-deployment/roles/fastcapa/defaults/main.yml +++ b/metron-deployment/roles/fastcapa/defaults/main.yml @@ -35,7 +35,8 @@ fastcapa_bin: fastcapa fastcapa_portmask: 0x01 fastcapa_kafka_config: /etc/fastcapa.conf fastcapa_topic: pcap -fastcapa_burst_size: 32 +fastcapa_rx_burst_size: 32 +fastcapa_tx_burst_size: 256 fastcapa_nb_rx_desc: 1024 fastcapa_nb_rx_queue: 1 fastcapa_tx_ring_size: 2048 http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-deployment/roles/fastcapa/templates/fastcapa ---------------------------------------------------------------------- diff --git a/metron-deployment/roles/fastcapa/templates/fastcapa b/metron-deployment/roles/fastcapa/templates/fastcapa index 5ee6abb..f34c603 100644 --- a/metron-deployment/roles/fastcapa/templates/fastcapa +++ b/metron-deployment/roles/fastcapa/templates/fastcapa @@ -22,49 +22,61 @@ # processname: fastcapa # -export RTE_SDK="{{ dpdk_sdk }}" -export RTE_TARGET="{{ dpdk_target }}" export LD_LIBRARY_PATH="{{ fastcapa_ld_library_path }}" NAME="fastcapa" DESC="Metron network packet capture probe" PIDFILE=/var/run/$NAME.pid SCRIPTNAME=/etc/init.d/$NAME -DAEMONLOG=/var/log/$NAME.log +LOGROOT=/var/log/fastcapa +DAEMONLOG=$LOGROOT/$NAME-stdout.log +DAEMONERR=$LOGROOT/$NAME-stderr.log NOW=`date` -DAEMON_PATH="{{ dpdk_sdk }}" +DAEMON_PATH="/root" PORT_MASK="{{ fastcapa_portmask }}" KAFKA_TOPIC="{{ fastcapa_topic }}" KAFKA_CONFIG="{{ fastcapa_kafka_config }}" -BURST_SIZE="{{ fastcapa_burst_size }}" +RX_BURST_SIZE="{{ fastcapa_rx_burst_size }}" +TX_BURST_SIZE="{{ fastcapa_tx_burst_size }}" NB_RX_DESC="{{ fastcapa_nb_rx_desc }}" NB_RX_QUEUE="{{ fastcapa_nb_rx_queue }}" TX_RING_SIZE="{{ fastcapa_tx_ring_size }}" +DAEMON="{{ fastcapa_prefix }}/{{ fastcapa_bin }}" +DAEMONOPTS+=" " +DAEMONOPTS+=" -- " +DAEMONOPTS+="-p $PORT_MASK " +DAEMONOPTS+="-t $KAFKA_TOPIC " +DAEMONOPTS+="-c $KAFKA_CONFIG " +DAEMONOPTS+="-b $RX_BURST_SIZE " +DAEMONOPTS+="-w $TX_BURST_SIZE " +DAEMONOPTS+="-d $NB_RX_DESC " +DAEMONOPTS+="-q $NB_RX_QUEUE " +DAEMONOPTS+="-x $TX_RING_SIZE " + case "$1" in start) printf "%-50s" "Starting $NAME..." echo "$NOW: Starting $NAME..." >> $DAEMONLOG - cd $DAEMON_PATH - DAEMON="{{ fastcapa_prefix }}/{{ fastcapa_bin }}" - DAEMONOPTS+=" -- " - DAEMONOPTS+="-p $PORTMASK " - DAEMONOPTS+="-t $KAFKA_TOPIC " - DAEMONOPTS+="-c $KAFKA_CONFIG " - DAEMONOPTS+="-b $BURST_SIZE " - DAEMONOPTS+="-r $NB_RX_DESC " - DAEMONOPTS+="-q $NB_RX_QUEUE " - DAEMONOPTS+="-x $TX_RING_SIZE " + mkdir -p $LOGROOT + touch $DAEMONLOG + touch $DAEMONERR + cd $DAEMON_PATH + echo "$DAEMON $DAEMONOPTS >> $DAEMONLOG 2> $DAEMONERR" >> $DAEMONLOG - PID=`$DAEMON $DAEMONOPTS >> $DAEMONLOG 2>&1 & echo $!` - if [ -z $PID ]; then - printf "%s\n" "Fail" + if [ -f $PIDFILE ]; then + printf "%s\n" "Already running" else - echo $PID > $PIDFILE - printf "%s\n" "Ok" + PID=`$DAEMON $DAEMONOPTS >> $DAEMONLOG 2> $DAEMONERR & echo $!` + if [ -z $PID ]; then + printf "%s\n" "Fail" + else + echo $PID > $PIDFILE + printf "%s\n" "Ok" + fi fi ;; @@ -87,10 +99,14 @@ case "$1" in PID=`cat $PIDFILE` cd $DAEMON_PATH if [ -f $PIDFILE ]; then - echo "$NOW: Stopping $NAME with pid=$PID" >> $DAEMONLOG - kill -HUP $PID - printf "%s\n" "Ok" - rm -f $PIDFILE + while sleep 1 + echo -n "." + kill -0 $PID >/dev/null 2>&1 + do + kill -SIGINT $PID + done + printf "%s\n" "Ok" + rm -f $PIDFILE else printf "%s\n" "pidfile not found" fi @@ -101,7 +117,29 @@ case "$1" in $0 start ;; + tail) + tail -F $LOGROOT/* + ;; + + kill) + printf "%-50s" "Force killing $NAME" + PID=`cat $PIDFILE` + cd $DAEMON_PATH + if [ -f $PIDFILE ]; then + while sleep 1 + echo -n "." + kill -0 $PID >/dev/null 2>&1 + do + kill -SIGTERM $PID + done + printf "%s\n" "Ok" + rm -f $PIDFILE + else + printf "%s\n" "pidfile not found" + fi + ;; + *) - echo "Usage: $0 {status|start|stop|restart}" + echo "Usage: $0 {status|start|stop|restart|kill|tail}" exit 1 esac http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-sensors/fastcapa/README.md ---------------------------------------------------------------------- diff --git a/metron-sensors/fastcapa/README.md b/metron-sensors/fastcapa/README.md index 707b491..37c4189 100644 --- a/metron-sensors/fastcapa/README.md +++ b/metron-sensors/fastcapa/README.md @@ -32,7 +32,7 @@ Requirements The following system requirements must be met to run the Fastcapa probe. -* Linux kernel >= 2.6.34 +* Linux kernel >= 2.6.34 * A [DPDK supported ethernet device; NIC](http://dpdk.org/doc/nics). * Port(s) on the ethernet device that can be dedicated for exclusive use by Fastcapa @@ -128,7 +128,7 @@ The size of THPs that are supported will vary based on your CPU. These typicall 09:00.0 Ethernet controller: Cisco Systems Inc VIC Ethernet NIC (rev a2) 0a:00.0 Ethernet controller: Cisco Systems Inc VIC Ethernet NIC (rev a2) ``` - + 5. Bind the device. Replace the device name and PCI address with what is appropriate for your environment. ``` ifdown enp9s0f0 @@ -137,7 +137,7 @@ The size of THPs that are supported will vary based on your CPU. These typicall ``` 6. Ensure that the device was bound. It should be shown as a 'network device using DPDK-compatible driver.' - ``` + ``` $ dpdk-devbind --status Network devices using DPDK-compatible driver ============================================ @@ -151,7 +151,7 @@ The size of THPs that are supported will vary based on your CPU. These typicall The probe has been tested with [Librdkafka 0.9.4](https://github.com/edenhill/librdkafka/releases/tag/v0.9.4). -1. Choose an installation path. In this example, the libs will actually be installed at `/usr/local/lib`; note that `lib` is appended to the prefix. +1. Choose an installation path. In this example, the libs will actually be installed at `/usr/local/lib`; note that `lib` is appended to the prefix. ``` export RDK_PREFIX=/usr/local ``` @@ -161,7 +161,7 @@ The probe has been tested with [Librdkafka 0.9.4](https://github.com/edenhill/li wget https://github.com/edenhill/librdkafka/archive/v0.9.4.tar.gz -O - | tar -xz cd librdkafka-0.9.4/ ./configure --prefix=$RDK_PREFIX - make + make make install ``` @@ -184,7 +184,7 @@ The probe has been tested with [Librdkafka 0.9.4](https://github.com/edenhill/li cd metron/metron-sensors/fastcapa make ``` - + Usage ----- @@ -196,7 +196,7 @@ Follow these steps to run Fastcapa. [kafka-global] metadata.broker.list = kafka-broker1:9092 ``` - + 2. Bind the capture device. This is only needed if the device is not already bound. In this example, the device `enp9s0f0` with a PCI address of `09:00:0` is bound. Use values specific to your environment. ``` ifdown enp9s0f0 @@ -215,7 +215,7 @@ Follow these steps to run Fastcapa. Fastcapa accepts three sets of parameters. -1. Command-line parameters passed directly to DPDK's Environmental Abstraction Layer (EAL) +1. Command-line parameters passed directly to DPDK's Environmental Abstraction Layer (EAL) 2. Command-line parameters that define how Fastcapa will interact with DPDK. These parametera are separated on the command line by a double-dash (`--`). 3. A configuration file that define how Fastcapa interacts with Librdkafka. @@ -247,8 +247,9 @@ fastcapa -h | Name | Command | Description | Default | |--------------------------|-----------------|---------------------------------------------------------------------------------------------------------------------------|---------| | Port Mask | -p PORT_MASK | A bit mask identifying which ports to bind. | 0x01 | -| Burst Size | -b BURST_SIZE | Maximum number of packets to receive at one time. | 32 | -| Receive Descriptors | -r NB_RX_DESC | The number of descriptors for each receive queue (the size of the receive queue.) Limited by the ethernet device in use. | 1024 | +| Receive Burst Size | -b RX_BURST_SIZE | The max number of packets processed by a receive worker. | 32 | +| Transmit Burst Size | -w TX_BURST_SIZE | The max number of packets processed by a transmit worker. | 256 | +| Receive Descriptors | -d NB_RX_DESC | The number of descriptors for each receive queue (the size of the receive queue.) Limited by the ethernet device in use. | 1024 | | Transmission Ring Size | -x TX_RING_SIZE | The size of each transmission ring. This must be a power of 2. | 2048 | | Number Receive Queues | -q NB_RX_QUEUE | Number of receive queues to use for each port. Limited by the ethernet device in use. | 2 | | Kafka Topic | -t KAFKA_TOPIC | The name of the Kafka topic. | pcap | @@ -258,6 +259,18 @@ fastcapa -h To get more information about the Fastcapa specific parameters, run the following. Note that this puts the `-h` after the double-dash `--`. ``` fastcapa -- -h + +fastcapa [EAL options] -- [APP options] + -p PORT_MASK bitmask of ports to bind [0x01] + -b RX_BURST_SIZE burst size of receive worker [32] + -w TX_BURST_SIZE burst size of transmit worker [256] + -d NB_RX_DESC num of descriptors for receive ring [1024] + -x TX_RING_SIZE size of tx rings (must be a power of 2) [2048] + -q NB_RX_QUEUE num of receive queues for each device [1] + -t KAFKA_TOPIC name of the kafka topic [pcap] + -c KAFKA_CONF file containing configs for kafka client + -s KAFKA_STATS append kafka client stats to a file + -h print this help message ``` #### Fastcapa-Kafka Configuration File @@ -283,7 +296,7 @@ Global configuration values that should be located under the `[kafka-global]` he | queue.buffering.max.messages | Maximum number of messages allowed on the producer queue | 100000 | | queue.buffering.max.ms | Maximum time, in milliseconds, for buffering data on the producer queue | 1000 | | message.copy.max.bytes | Maximum size for message to be copied to buffer. Messages larger than this will be passed by reference (zero-copy) at the expense of larger iovecs. | 65535 | -| batch.num.messages | Maximum number of messages batched in one MessageSet | 10000 | +| batch.num.messages | Maximum number of messages batched in one MessageSet | 10000 | | statistics.interval.ms | How often statistics are emitted; 0 = never | 0 | | compression.codec | Compression codec to use for compressing message sets; {none, gzip, snappy, lz4 } | none | @@ -314,10 +327,10 @@ When running the probe some basic counters are output to stdout. Of course duri * `[rx]` + `out`: The receive workers have enqueued 8 packets onto the transmission rings. * `[rx]` + `drops`: If the transmission rings become full it will prevent the receive workers from enqueuing additional packets. The excess packets are dropped. This value will never decrease. * `[tx]` + `in`: The transmission workers have consumed 8 packets. -* `[tx]` + `out`: The transmission workers have packaged 8 packets into Kafka messages. +* `[tx]` + `out`: The transmission workers have packaged 8 packets into Kafka messages. * `[tx]` + `drops`: If the Kafka client library accepted fewer packets than expected. This value can increase or decrease over time as additional packets are acknowledged by the Kafka client library at a later point in time. * `[kaf]` + `in`: The Kafka client library has received 8 packets. -* `[kaf]` + `out`: A total of 7 packets has successfully reached Kafka. +* `[kaf]` + `out`: A total of 7 packets has successfully reached Kafka. * `[kaf]` + `queued`: There is 1 packet within the `rdkafka` queue waiting to be sent. ### Kerberos @@ -335,7 +348,7 @@ The probe can be used in a Kerberized environment. Follow these additional step wget https://github.com/edenhill/librdkafka/archive/v0.9.4.tar.gz -O - | tar -xz cd librdkafka-0.9.4/ ./configure --prefix=$RDK_PREFIX --enable-sasl - make + make make install ``` @@ -368,7 +381,7 @@ The probe can be used in a Kerberized environment. Follow these additional step sasl.kerberos.keytab = /etc/security/keytabs/metron.headless.keytab sasl.kerberos.principal = [email protected] ``` - + 1. Now run Fastcapa as you normally would. It should have no problem landing packets in your kerberized Kafka broker. How It Works @@ -461,13 +474,13 @@ PANIC in rte_eal_init(): Cannot get hugepage information ``` -Solution: This can occur if any process that has been allocated THPs crashes and fails to return the resources. +Solution: This can occur if any process that has been allocated THPs crashes and fails to return the resources. * Delete the THP files that are not in use. ``` rm -f /mnt/huge_1GB/rtemap_* ``` - + * If the first option does not work, re-mount the `hugetlbfs` file system. ``` umount -a -t hugetlbfs @@ -493,4 +506,3 @@ modprobe uio_pci_generic dpdk-devbind --bind=uio_pci_generic "09:00.0" dpdk-devbind --status ``` - http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-sensors/fastcapa/src/Makefile ---------------------------------------------------------------------- diff --git a/metron-sensors/fastcapa/src/Makefile b/metron-sensors/fastcapa/src/Makefile index e9bf877..7cf1cee 100644 --- a/metron-sensors/fastcapa/src/Makefile +++ b/metron-sensors/fastcapa/src/Makefile @@ -29,9 +29,9 @@ include $(RTE_SDK)/mk/rte.vars.mk APP = fastcapa # all source are stored in SRCS-y -SRCS-y := main.c args.c kafka.c +SRCS-y := main.c args.c kafka.c nic.c worker.c -KAFKALIB = -L/usr/local/lib -lrdkafka +KAFKALIB = -L/usr/local/lib -lrdkafka KAFKAINC = -I/usr/local/include/librdkafka/ GLIB = $(shell pkg-config --libs glib-2.0) -lgthread-2.0 @@ -47,6 +47,6 @@ CFLAGS_main.o += -Wno-return-type endif EXTRA_CFLAGS += -O3 -Wall -#EXTRA_CFLAGS += -g -Wall +#EXTRA_CFLAGS += -g -Wall -DDEBUG include $(RTE_SDK)/mk/rte.extapp.mk http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-sensors/fastcapa/src/args.c ---------------------------------------------------------------------- diff --git a/metron-sensors/fastcapa/src/args.c b/metron-sensors/fastcapa/src/args.c index 59d2748..9d88bea 100644 --- a/metron-sensors/fastcapa/src/args.c +++ b/metron-sensors/fastcapa/src/args.c @@ -18,28 +18,25 @@ #include "args.h" -typedef int bool; -#define true 1 -#define false 0 -#define valid(s) (s == NULL ? false : strlen(s) > 1) - /* * Print usage information to the user. */ static void print_usage(void) { printf("fastcapa [EAL options] -- [APP options]\n" - " -p PORT_MASK bitmask of ports to bind [%s]\n" - " -b BURST_SIZE max packets to retrieve at a time [%d]\n" - " -r NB_RX_DESC num of descriptors for receive ring [%d]\n" - " -x TX_RING_SIZE size of tx rings (must be a power of 2) [%d]\n" - " -q NB_RX_QUEUE num of receive queues for each device [%d]\n" - " -t KAFKA_TOPIC name of the kafka topic [%s]\n" - " -c KAFKA_CONF file containing configs for kafka client \n" - " -s KAFKA_STATS append kafka client stats to a file \n" - " -h print this help message \n", - STR(DEFAULT_PORT_MASK), - DEFAULT_BURST_SIZE, + " -p PORT_MASK bitmask of ports to bind [%s]\n" + " -b RX_BURST_SIZE burst size of receive worker [%d]\n" + " -w TX_BURST_SIZE burst size of transmit worker [%d]\n" + " -d NB_RX_DESC num of descriptors for receive ring [%d]\n" + " -x TX_RING_SIZE size of tx rings (must be a power of 2) [%d]\n" + " -q NB_RX_QUEUE num of receive queues for each device [%d]\n" + " -t KAFKA_TOPIC name of the kafka topic [%s]\n" + " -c KAFKA_CONF file containing configs for kafka client \n" + " -s KAFKA_STATS append kafka client stats to a file \n" + " -h print this help message \n", + STR(DEFAULT_PORT_MASK), + DEFAULT_RX_BURST_SIZE, + DEFAULT_TX_BURST_SIZE, DEFAULT_NB_RX_DESC, DEFAULT_TX_RING_SIZE, DEFAULT_NB_RX_QUEUE, @@ -77,25 +74,32 @@ static bool file_exists(const char* filepath) return (stat(filepath, &buf) == 0); } -/** - * Parse the command line arguments passed to the application. +/* + * Parse the command line arguments passed to the application; the arguments + * which no not go directly to DPDK's EAL. */ -int parse_args(int argc, char** argv) +static int parse_app_args(int argc, char** argv, app_params* p) { int opt; char** argvopt; int option_index; - int nb_workers; + unsigned int nb_workers; static struct option lgopts[] = { { NULL, 0, 0, 0 } }; - // initialize args - memset(&app, 0, sizeof(struct app_params)); + // set default args + p->enabled_port_mask = parse_portmask(STR(DEFAULT_PORT_MASK)); + p->kafka_topic = STR(DEFAULT_KAFKA_TOPIC); + p->rx_burst_size = DEFAULT_RX_BURST_SIZE; + p->tx_burst_size = DEFAULT_TX_BURST_SIZE; + p->nb_rx_desc = DEFAULT_NB_RX_DESC; + p->nb_rx_queue = DEFAULT_NB_RX_QUEUE; + p->tx_ring_size = DEFAULT_TX_RING_SIZE; // parse arguments to this application argvopt = argv; - while ((opt = getopt_long(argc, argvopt, "hp:b:t:c:r:q:s:x:", lgopts, &option_index)) != EOF) { + while ((opt = getopt_long(argc, argvopt, "b:c:d:hp:q:s:t:w:x:", lgopts, &option_index)) != EOF) { switch (opt) { // help @@ -103,24 +107,30 @@ int parse_args(int argc, char** argv) print_usage(); return -1; - // burst size + // rx burst size case 'b': - app.burst_size = atoi(optarg); - printf("[ -b BURST_SIZE ] defined as %d \n", app.burst_size); + p->rx_burst_size = atoi(optarg); + if(p->rx_burst_size < 1 || p->rx_burst_size > MAX_RX_BURST_SIZE) { + fprintf(stderr, "Invalid burst size; burst=%u must be in [1, %u]. \n", p->rx_burst_size, MAX_RX_BURST_SIZE); + print_usage(); + return -1; + } + break; - if(app.burst_size < 1 || app.burst_size > MAX_BURST_SIZE) { - fprintf(stderr, "Invalid burst size; burst=%u must be in [1, %u]. \n", app.burst_size, MAX_BURST_SIZE); + // tx burst size + case 'w': + p->tx_burst_size = atoi(optarg); + if(p->tx_burst_size < 1) { + fprintf(stderr, "Invalid burst size; burst=%u must be > 0. \n", p->tx_burst_size); print_usage(); return -1; } break; // number of receive descriptors - case 'r': - app.nb_rx_desc = atoi(optarg); - printf("[ -r NB_RX_DESC ] defined as %d \n", app.nb_rx_desc); - - if (app.nb_rx_desc < 1) { + case 'd': + p->nb_rx_desc = atoi(optarg); + if (p->nb_rx_desc < 1) { fprintf(stderr, "Invalid num of receive descriptors: '%s' \n", optarg); print_usage(); return -1; @@ -129,23 +139,14 @@ int parse_args(int argc, char** argv) // size of each transmit ring case 'x': - app.tx_ring_size = atoi(optarg); - printf("[ -x TX_RING_SIZE ] defined as %d \n", app.tx_ring_size); + p->tx_ring_size = atoi(optarg); - // must be a power of 2 and not 0 - if (app.tx_ring_size == 0 || (app.tx_ring_size & (app.tx_ring_size - 1)) != 0) { - fprintf(stderr, "Invalid tx ring size (must be power of 2): '%s' \n", optarg); - print_usage(); - return -1; - } break; // number of receive queues for each device case 'q': - app.nb_rx_queue = atoi(optarg); - printf("[ -q NB_RX_QUEUE ] defined as %d \n", app.nb_rx_queue); - - if (app.nb_rx_queue < 1) { + p->nb_rx_queue = atoi(optarg); + if (p->nb_rx_queue < 1) { fprintf(stderr, "Invalid num of receive queues: '%s' \n", optarg); print_usage(); return -1; @@ -154,10 +155,8 @@ int parse_args(int argc, char** argv) // port mask case 'p': - app.enabled_port_mask = parse_portmask(optarg); - printf("[ -p PORT_MASK ] defined as %d \n", app.enabled_port_mask); - - if (app.enabled_port_mask == 0) { + p->enabled_port_mask = parse_portmask(optarg); + if (p->enabled_port_mask == 0) { fprintf(stderr, "Invalid portmask: '%s'\n", optarg); print_usage(); return -1; @@ -166,10 +165,8 @@ int parse_args(int argc, char** argv) // kafka topic case 't': - app.kafka_topic = strdup(optarg); - printf("[ -t KAFKA_TOPIC ] defined as %s \n", app.kafka_topic); - - if (!valid(app.kafka_topic)) { + p->kafka_topic = strdup(optarg); + if (!valid(p->kafka_topic)) { printf("Invalid kafka topic: '%s'\n", optarg); print_usage(); return -1; @@ -178,10 +175,8 @@ int parse_args(int argc, char** argv) // kafka config path case 'c': - app.kafka_config_path = strdup(optarg); - printf("[ -c KAFKA_CONFIG ] defined as %s \n", app.kafka_config_path); - - if (!valid(app.kafka_config_path) || !file_exists(app.kafka_config_path)) { + p->kafka_config_path = strdup(optarg); + if (!valid(p->kafka_config_path) || !file_exists(p->kafka_config_path)) { fprintf(stderr, "Invalid kafka config: '%s'\n", optarg); print_usage(); return -1; @@ -190,8 +185,7 @@ int parse_args(int argc, char** argv) // kafka stats path case 's': - app.kafka_stats_path = strdup(optarg); - printf("[ -s KAFKA_STATS ] defined as %s \n", app.kafka_stats_path); + p->kafka_stats_path = strdup(optarg); break; default: @@ -200,42 +194,6 @@ int parse_args(int argc, char** argv) } } - // default PORT_MASK - if (app.enabled_port_mask == 0) { - printf("[ -p PORT_MASK ] undefined; defaulting to %s \n", STR(DEFAULT_PORT_MASK)); - app.enabled_port_mask = DEFAULT_PORT_MASK; - } - - // default KAFKA_TOPIC - if (!valid(app.kafka_topic)) { - printf("[ -t KAFKA_TOPIC ] undefined; defaulting to %s \n", STR(DEFAULT_KAFKA_TOPIC)); - app.kafka_topic = STR(DEFAULT_KAFKA_TOPIC); - } - - // default BURST_SIZE - if (app.burst_size == 0) { - printf("[ -b BURST_SIZE ] undefined; defaulting to %d \n", DEFAULT_BURST_SIZE); - app.burst_size = DEFAULT_BURST_SIZE; - } - - // default NB_RX_DESC - if (app.nb_rx_desc == 0) { - printf("[ -r NB_RX_DESC ] undefined; defaulting to %d \n", DEFAULT_NB_RX_DESC); - app.nb_rx_desc = DEFAULT_NB_RX_DESC; - } - - // default NB_RX_QUEUE - if (app.nb_rx_queue == 0) { - printf("[ -q NB_RX_QUEUE ] undefined; defaulting to %d \n", DEFAULT_NB_RX_QUEUE); - app.nb_rx_queue = DEFAULT_NB_RX_QUEUE; - } - - // default TX_RING_SIZE - if (app.tx_ring_size == 0) { - printf("[ -x TX_RING_SIZE ] undefined; defaulting to %u \n", DEFAULT_TX_RING_SIZE); - app.tx_ring_size = DEFAULT_TX_RING_SIZE; - } - // check number of ethernet devices if (rte_eth_dev_count() == 0) { rte_exit(EXIT_FAILURE, "No ethernet ports detected.\n"); @@ -243,19 +201,63 @@ int parse_args(int argc, char** argv) // check number of workers nb_workers = rte_lcore_count() - 1; - if (nb_workers < 1) { - rte_exit(EXIT_FAILURE, "Minimum 2 logical cores required. \n"); - } // need at least 1 worker for each receive queue - if(nb_workers < app.nb_rx_queue) { - rte_exit(EXIT_FAILURE, "Minimum 1 worker per receive queue; workers=%u rx_queues=%u. \n", - nb_workers, app.nb_rx_queue); + if(nb_workers < p->nb_rx_queue) { + rte_exit(EXIT_FAILURE, "Minimum 1 worker per receive queue; workers=%u rx_queues=%u. \n", + nb_workers, p->nb_rx_queue); } + p->nb_rx_workers = p->nb_rx_queue; + p->nb_tx_workers = nb_workers - p->nb_rx_workers; + + printf("[ -p PORT_MASK ] defined as %d \n", p->enabled_port_mask); + printf("[ -b RX_BURST_SIZE ] defined as %d \n", p->rx_burst_size); + printf("[ -w TX_BURST_SIZE ] defined as %d \n", p->tx_burst_size); + printf("[ -d NB_RX_DESC ] defined as %d \n", p->nb_rx_desc); + printf("[ -x TX_RING_SIZE ] defined as %d \n", p->tx_ring_size); + printf("[ -q NB_RX_QUEUE ] defined as %d \n", p->nb_rx_queue); + printf("[ -t KAFKA_TOPIC ] defined as %s \n", p->kafka_topic); + printf("[ -c KAFKA_CONFIG ] defined as %s \n", p->kafka_config_path); + printf("[ -s KAFKA_STATS ] defined as %s \n", p->kafka_stats_path); + printf("[ NUM_RX_WORKERS ] defined as %d \n", p->nb_rx_workers); + printf("[ NUM_TX_WORKERS ] defined as %d \n", p->nb_tx_workers); + // reset getopt lib optind = 0; - return 0; } +/* + * Parse the command line arguments passed to the application. + */ +int parse_args(int argc, char** argv, app_params* p) +{ + // initialize the environment + int ret = rte_eal_init(argc, argv); + if (ret < 0) { + rte_exit(EXIT_FAILURE, "Failed to initialize EAL: %i\n", ret); + } + + // advance past the environmental settings + argc -= ret; + argv += ret; + + // parse arguments to the application + ret = parse_app_args(argc, argv, p); + if (ret < 0) { + rte_exit(EXIT_FAILURE, "\n"); + } + + p->nb_ports = rte_eth_dev_count(); + p->nb_rx_workers = p->nb_rx_queue; + p->nb_tx_workers = (rte_lcore_count() - 1) - p->nb_rx_workers; + + // validate the number of workers + if(p->nb_tx_workers < p->nb_rx_workers) { + rte_exit(EXIT_FAILURE, "Additional lcore(s) required; found=%u, required=%u \n", + rte_lcore_count(), (p->nb_rx_queue*2) + 1); + } + + return 0; +} http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-sensors/fastcapa/src/args.h ---------------------------------------------------------------------- diff --git a/metron-sensors/fastcapa/src/args.h b/metron-sensors/fastcapa/src/args.h index 352b221..542b445 100644 --- a/metron-sensors/fastcapa/src/args.h +++ b/metron-sensors/fastcapa/src/args.h @@ -19,115 +19,27 @@ #ifndef METRON_ARGS_H #define METRON_ARGS_H -#include <stdio.h> -#include <stdlib.h> -#include <stdint.h> -#include <inttypes.h> +#include <unistd.h> #include <string.h> -#include <sys/types.h> -#include <sys/queue.h> -#include <sys/stat.h> -#include <stdarg.h> -#include <errno.h> #include <getopt.h> -#include <glib.h> -#include <rte_common.h> -#include <rte_byteorder.h> -#include <rte_log.h> +#include <sys/stat.h> #include <rte_memory.h> -#include <rte_memcpy.h> -#include <rte_memzone.h> -#include <rte_eal.h> -#include <rte_per_lcore.h> -#include <rte_launch.h> -#include <rte_atomic.h> -#include <rte_cycles.h> -#include <rte_prefetch.h> -#include <rte_lcore.h> -#include <rte_per_lcore.h> -#include <rte_branch_prediction.h> -#include <rte_interrupts.h> -#include <rte_pci.h> -#include <rte_random.h> -#include <rte_debug.h> -#include <rte_ether.h> #include <rte_ethdev.h> -#include <rte_ring.h> -#include <rte_mempool.h> -#include <rte_mbuf.h> -#include <rte_ip.h> -#include <rte_tcp.h> -#include <rte_lpm.h> -#include <rte_string_fns.h> +#include "types.h" -#define DEFAULT_BURST_SIZE 32 +#define DEFAULT_RX_BURST_SIZE 32 +#define DEFAULT_TX_BURST_SIZE 256 #define DEFAULT_PORT_MASK 0x01 #define DEFAULT_KAFKA_TOPIC pcap -#define DEFAULT_NB_RX_QUEUE 2 +#define DEFAULT_NB_RX_QUEUE 1 #define DEFAULT_NB_RX_DESC 1024 #define DEFAULT_TX_RING_SIZE 2048 #define DEFAULT_KAFKA_STATS_PATH 0 - -#define MAX_BURST_SIZE 1024 - -#define STR_EXPAND(tok) #tok -#define STR(tok) STR_EXPAND(tok) - -/* - * Logging definitions - */ -#define LOG_ERROR(log_type, fmt, args...) RTE_LOG(ERR, log_type, fmt, ##args); -#define LOG_WARN(log_type, fmt, args...) RTE_LOG(WARNING, log_type, fmt, ##args); -#define LOG_INFO(log_type, fmt, args...) RTE_LOG(INFO, log_type, fmt, ##args); - -#ifdef DEBUG -#define LOG_LEVEL RTE_LOG_DEBUG -#define LOG_DEBUG(log_type, fmt, args...) RTE_LOG(DEBUG, log_type, fmt, ##args); -#else -#define LOG_LEVEL RTE_LOG_INFO -#define LOG_DEBUG(log_type, fmt, args...) do {} while (0) -#endif - -/** - * Application configuration parameters. - */ -struct app_params { - - /* The number of receive descriptors to allocate for the receive ring. */ - uint16_t nb_rx_desc; - - /* The number of receive queues to set up for each ethernet device. */ - uint16_t nb_rx_queue; - - /* The size of the transmit ring (must be a power of 2). */ - uint16_t tx_ring_size; - - /* The maximum number of packets to retrieve at a time. */ - uint16_t burst_size; - - /* Defines which ports packets will be consumed from. */ - uint32_t enabled_port_mask; - - /* The name of the Kafka topic that packet data is sent to. */ - const char* kafka_topic; - - /* A file containing configuration values for the Kafka client. */ - char* kafka_config_path; - - /* A file to which the Kafka stats are appended to. */ - char* kafka_stats_path; - -} __rte_cache_aligned; - -/* - * Contains all application parameters. - */ -struct app_params app; +#define MAX_RX_BURST_SIZE 1024 /** * Parse the command line arguments passed to the application. */ -int parse_args(int argc, char** argv); +int parse_args(int argc, char** argv, app_params* app); #endif - http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-sensors/fastcapa/src/kafka.c ---------------------------------------------------------------------- diff --git a/metron-sensors/fastcapa/src/kafka.c b/metron-sensors/fastcapa/src/kafka.c index b844f3f..aceed39 100644 --- a/metron-sensors/fastcapa/src/kafka.c +++ b/metron-sensors/fastcapa/src/kafka.c @@ -18,13 +18,11 @@ #include "kafka.h" -#define POLL_TIMEOUT_MS 1000 - /* * Passed to all callback functions to help identify the connection. */ struct opaque { - int conn_id; + int conn_id; }; /* @@ -34,37 +32,51 @@ static rd_kafka_t **kaf_h; static rd_kafka_topic_t **kaf_top_h; static unsigned num_conns; static FILE *stats_fd; -static struct app_stats *kaf_conn_stats; +static app_stats *kaf_conn_stats; static struct opaque *kaf_opaque; static uint64_t *kaf_keys; /* * A callback executed when an error occurs within the kafka client */ -static void kaf_error_cb (rd_kafka_t *rk, int err, const char *reason, void* UNUSED(opaque)) +static void kaf_error_cb ( + rd_kafka_t *rk, + int err, + const char *reason, + void* UNUSED(opaque)) { - LOG_ERROR(USER1, "kafka client unexpected error; conn=%s, error=%s [%s] \n", + + LOG_ERROR(USER1, "kafka client error; conn=%s, error=%s [%s] \n", rd_kafka_name(rk), rd_kafka_err2str(err), reason); } /* * A callback executed when a broker throttles the producer */ -static void kaf_throttle_cb (rd_kafka_t *rk, const char *broker_name, int32_t broker_id, int throttle_time_ms, void* UNUSED(opaque)) +static void kaf_throttle_cb ( + rd_kafka_t *rk, + const char *broker_name, + int32_t broker_id, + int throttle_time_ms, + void* UNUSED(opaque)) { - LOG_ERROR(USER1, "kafka client throttle event; conn=%s, time=%dms broker=%s broker_id=%"PRId32" \n", + LOG_ERROR(USER1, "kafka client throttle event; conn=%s, time=%dms broker=%s broker_id=%"PRId32" \n", rd_kafka_name(rk), throttle_time_ms, broker_name, broker_id); } /* - * A callback executed on a fixed frequency (defined by `statistics.interval.ms`) + * A callback executed on a fixed frequency (defined by `statistics.interval.ms`) * that provides detailed performance statistics */ -static int kaf_stats_cb(rd_kafka_t *rk, char *json, size_t UNUSED(json_len), void *opaque) +static int kaf_stats_cb( + rd_kafka_t *rk, + char *json, + size_t UNUSED(json_len), + void *opaque) { int rc; struct opaque *data = (struct opaque*) opaque; - int conn_id = data->conn_id; + int conn_id = data->conn_id; // update queue depth of this kafka connection kaf_conn_stats[conn_id].depth = rd_kafka_outq_len(rk); @@ -78,7 +90,7 @@ static int kaf_stats_cb(rd_kafka_t *rk, char *json, size_t UNUSED(json_len), voi } fflush(stats_fd); } - + // 0 ensures the json pointer is immediately freed return 0; } @@ -87,18 +99,34 @@ static int kaf_stats_cb(rd_kafka_t *rk, char *json, size_t UNUSED(json_len), voi * A callback that is called once for each message when it has been successfully * produced. */ -static void kaf_message_delivered_cb (rd_kafka_t *UNUSED(rk), const rd_kafka_message_t *UNUSED(rkmessage), void *opaque) +static void kaf_message_delivered_cb ( + #ifdef DEBUG + rd_kafka_t *rk, + #else + rd_kafka_t *UNUSED(rk), + #endif + const rd_kafka_message_t *rkmessage, + void *opaque) { struct opaque *data = (struct opaque*) opaque; - int conn_id = data->conn_id; + int conn_id = data->conn_id; - kaf_conn_stats[conn_id].out += 1; + if(RD_KAFKA_RESP_ERR_NO_ERROR == rkmessage->err) { + kaf_conn_stats[conn_id].out += 1; + } else { + kaf_conn_stats[conn_id].drops += 1; + + #ifdef DEBUG + LOG_ERROR(USER1, "delivery failed: conn=%s, error=%s \n", + rd_kafka_name(rk), rd_kafka_err2str(rkmessage->err)); + #endif + } } /* * Opens the file used to persist the stats coming out of the kafka client */ -static int open_stats_file(char *filename) +static int open_stats_file(const char *filename) { int rc; @@ -113,7 +141,7 @@ static int open_stats_file(char *filename) if(rc < 0) { LOG_ERROR(USER1, "Unable to append to stats file \n"); return rc; - } + } fflush(stats_fd); return 0; @@ -122,11 +150,11 @@ static int open_stats_file(char *filename) /* * Closes the file used to persist the kafka client stats. */ -static void close_stats_file(void) +static void close_stats_file(void) { if(NULL != stats_fd) { fclose(stats_fd); - } + } } /** @@ -162,7 +190,7 @@ static void kaf_topic_option(const char* key, const char* val, void* arg) /** * Parses the configuration values from a configuration file. */ -static void parse_kafka_config(char* file_path, const char* group, +static void parse_kafka_config(const char* file_path, const char* group, void (*option_cb)(const char* key, const char* val, void* arg), void* arg) { @@ -187,11 +215,11 @@ static void parse_kafka_config(char* file_path, const char* group, for (i = 0; i < num_keys; i++) { value = g_key_file_get_value(gkf, group, keys[i], errs); if (value) { - LOG_DEBUG(USER1, "config[%s]: %s = %s\n", group, keys[i], value); + LOG_INFO(USER1, "config[%s]: %s = %s\n", group, keys[i], value); option_cb(keys[i], value, arg); } else { - LOG_INFO(USER1, "bad config: %s: %s = %s: %s\n", file_path, keys[i], value, errs[0]->message); + LOG_WARN(USER1, "bad config: %s: %s = %s: %s\n", file_path, keys[i], value, errs[0]->message); } } } @@ -206,16 +234,16 @@ static void parse_kafka_config(char* file_path, const char* group, /** * Initializes a pool of Kafka connections. */ -void kaf_init(int num_of_conns) +void kaf_init(int num_of_conns, const char* kafka_topic, const char* kafka_config_path, const char* kafka_stats_path) { int i; char errstr[512]; // open the file to which the kafka stats are appended - if(NULL != app.kafka_stats_path) { - LOG_INFO(USER1, "Appending Kafka client stats to '%s' \n", app.kafka_stats_path); - open_stats_file(app.kafka_stats_path); - } + if(NULL != kafka_stats_path) { + LOG_INFO(USER1, "Appending Kafka client stats to '%s' \n", kafka_stats_path); + open_stats_file(kafka_stats_path); + } // the number of connections to maintain num_conns = num_of_conns; @@ -223,10 +251,10 @@ void kaf_init(int num_of_conns) // create kafka resources for each consumer kaf_h = calloc(num_of_conns, sizeof(rd_kafka_t*)); kaf_top_h = calloc(num_of_conns, sizeof(rd_kafka_topic_t*)); - kaf_conn_stats = calloc(num_of_conns, sizeof(struct app_stats)); + kaf_conn_stats = calloc(num_of_conns, sizeof(app_stats)); kaf_opaque = calloc(num_of_conns, sizeof(struct opaque)); kaf_keys = calloc(num_of_conns, sizeof(uint64_t)); - + for (i = 0; i < num_of_conns; i++) { // passed to each callback function to identify the kafka connection @@ -240,8 +268,8 @@ void kaf_init(int num_of_conns) rd_kafka_conf_set_dr_msg_cb(kaf_conf, kaf_message_delivered_cb); // configure kafka connection; values parsed from kafka config file - if (NULL != app.kafka_config_path) { - parse_kafka_config(app.kafka_config_path, "kafka-global", kaf_global_option, (void*)kaf_conf); + if (NULL != kafka_config_path) { + parse_kafka_config(kafka_config_path, "kafka-global", kaf_global_option, (void*)kaf_conf); } // create a new kafka connection @@ -252,14 +280,14 @@ void kaf_init(int num_of_conns) // configure kafka topic; values parsed from kafka config file rd_kafka_topic_conf_t* topic_conf = rd_kafka_topic_conf_new(); - if (NULL != app.kafka_config_path) { - parse_kafka_config(app.kafka_config_path, "kafka-topic", kaf_topic_option, (void*)topic_conf); + if (NULL != kafka_config_path) { + parse_kafka_config(kafka_config_path, "kafka-topic", kaf_topic_option, (void*)topic_conf); } // connect to a kafka topic - kaf_top_h[i] = rd_kafka_topic_new(kaf_h[i], app.kafka_topic, topic_conf); + kaf_top_h[i] = rd_kafka_topic_new(kaf_h[i], kafka_topic, topic_conf); if (!kaf_top_h[i]) { - rte_exit(EXIT_FAILURE, "Cannot init kafka topic: %s", app.kafka_topic); + rte_exit(EXIT_FAILURE, "Cannot init kafka topic: %s", kafka_topic); } } } @@ -268,7 +296,7 @@ void kaf_init(int num_of_conns) * Executes polling across all of the kafka client connections. Ensures that any queued * callbacks are served. */ -void kaf_poll(void) +void kaf_poll(void) { unsigned i; for (i = 0; i < num_conns; i++) { @@ -279,7 +307,7 @@ void kaf_poll(void) /** * Retrieves a summary of statistics across all of the kafka client connections. */ -int kaf_stats(struct app_stats *stats) +int kaf_stats(app_stats *stats) { unsigned i; uint64_t in, out, depth, drops; @@ -356,10 +384,10 @@ int kaf_send(struct rte_mbuf* pkts[], int pkt_count, int conn_id) // find the topic connection based on the conn_id rd_kafka_topic_t* kaf_topic = kaf_top_h[conn_id]; - // current time in epoch microseconds from (big-endian aka network byte order) + // current time in epoch microseconds from (big-endian aka network byte order) // is added as a message key before being sent to kafka kaf_keys[conn_id] = htobe64(current_time()); - + // create the batch message for kafka for (i = 0; i < pkt_count; i++) { kaf_msgs[i].err = 0; http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-sensors/fastcapa/src/kafka.h ---------------------------------------------------------------------- diff --git a/metron-sensors/fastcapa/src/kafka.h b/metron-sensors/fastcapa/src/kafka.h index ab62ad7..961e78d 100644 --- a/metron-sensors/fastcapa/src/kafka.h +++ b/metron-sensors/fastcapa/src/kafka.h @@ -23,28 +23,31 @@ #include <string.h> #include <sys/time.h> #include <endian.h> +#include <glib.h> #include <librdkafka/rdkafka.h> #include <rte_common.h> #include <rte_mbuf.h> - #include "args.h" #include "types.h" -#ifdef __GNUC__ -# define UNUSED(x) UNUSED_ ## x __attribute__((__unused__)) -#else -# define UNUSED(x) UNUSED_ ## x -#endif +#define POLL_TIMEOUT_MS 1000 -/** +/* * Initializes a pool of Kafka connections. -*/ -void kaf_init(int num_of_conns); + */ +void kaf_init( + int num_of_conns, + const char* kafka_topic, + const char* kafka_config_path, + const char* kafka_stats_path); -/** +/* * Publish a set of packets to a kafka topic. */ -int kaf_send(struct rte_mbuf* data[], int num_to_send, int conn_id); +int kaf_send( + struct rte_mbuf* data[], + int num_to_send, + int conn_id); /* * Executes polling across all of the kafka client connections. Ensures that any queued @@ -52,12 +55,12 @@ int kaf_send(struct rte_mbuf* data[], int num_to_send, int conn_id); */ void kaf_poll(void); -/** +/* * Retrieves a summary of statistics across all of the kafka client connections. */ -int kaf_stats(struct app_stats *stats); +int kaf_stats(app_stats *stats); -/** +/* * Closes the pool of Kafka connections. */ void kaf_close(void); http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-sensors/fastcapa/src/main.c ---------------------------------------------------------------------- diff --git a/metron-sensors/fastcapa/src/main.c b/metron-sensors/fastcapa/src/main.c index c4c5123..f47a51d 100644 --- a/metron-sensors/fastcapa/src/main.c +++ b/metron-sensors/fastcapa/src/main.c @@ -16,428 +16,46 @@ * limitations under the License. */ -#include "main.h" - -/* - * Initialize a port using global settings and with the rx buffers - * coming from the mbuf_pool passed as parameter - */ -static inline int init_port(const uint8_t port, struct rte_mempool* mbuf_pool) -{ - struct rte_eth_conf port_conf = port_conf_default; - int retval; - uint16_t q; - int retry = 5; - const uint16_t tx_queues = 1; - int socket; - struct rte_eth_dev_info dev_info; - - if (port >= rte_eth_dev_count()) { - rte_exit(EXIT_FAILURE, "Port does not exist; port=%u \n", port); - return -1; - } - - // check that the number of RX queues does not exceed what is supported by the device - rte_eth_dev_info_get(port, &dev_info); - if (app.nb_rx_queue > dev_info.max_rx_queues) { - rte_exit(EXIT_FAILURE, "Too many RX queues for device; port=%u, rx_queues=%u, max_queues=%u \n", - port, app.nb_rx_queue, dev_info.max_rx_queues); - return -EINVAL; - } - - // check that the number of TX queues does not exceed what is supported by the device - if (tx_queues > dev_info.max_tx_queues) { - rte_exit(EXIT_FAILURE, "Too many TX queues for device; port=%u, tx_queues=%u, max_queues=%u \n", - port, tx_queues, dev_info.max_tx_queues); - return -EINVAL; - } - - retval = rte_eth_dev_configure(port, app.nb_rx_queue, tx_queues, &port_conf); - if (retval != 0) { - rte_exit(EXIT_FAILURE, "Cannot configure device; port=%u, err=%s \n", port, strerror(-retval)); - return retval; - } - - // create the receive queues - socket = rte_eth_dev_socket_id(port); - for (q = 0; q < app.nb_rx_queue; q++) { - retval = rte_eth_rx_queue_setup(port, q, app.nb_rx_desc, socket, NULL, mbuf_pool); - if (retval != 0) { - rte_exit(EXIT_FAILURE, "Cannot setup RX queue; port=%u, err=%s \n", port, strerror(-retval)); - return retval; - } - } - - // create the transmit queues - at least one TX queue must be setup even though we don't use it - for (q = 0; q < tx_queues; q++) { - retval = rte_eth_tx_queue_setup(port, q, TX_QUEUE_SIZE, socket, NULL); - if (retval != 0) { - rte_exit(EXIT_FAILURE, "Cannot setup TX queue; port=%u, err=%s \n", port, strerror(-retval)); - return retval; - } - } - - // start the receive and transmit units on the device - retval = rte_eth_dev_start(port); - if (retval < 0) { - rte_exit(EXIT_FAILURE, "Cannot start device; port=%u, err=%s \n", port, strerror(-retval)); - return retval; - } - - // retrieve information about the device - struct rte_eth_link link; - do { - rte_eth_link_get_nowait(port, &link); - - } while (retry-- > 0 && !link.link_status && !sleep(1)); - - // if still no link information, must be down - if (!link.link_status) { - rte_exit(EXIT_FAILURE, "Link down; port=%u \n", port); - return 0; - } - - // enable promisc mode - rte_eth_promiscuous_enable(port); - - // print diagnostics - struct ether_addr addr; - rte_eth_macaddr_get(port, &addr); - LOG_INFO(USER1, "Device setup successfully; port=%u, mac=%02" PRIx8 " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n", - (unsigned)port, - addr.addr_bytes[0], addr.addr_bytes[1], - addr.addr_bytes[2], addr.addr_bytes[3], - addr.addr_bytes[4], addr.addr_bytes[5]); - - return 0; -} - -static void print_stats(struct rx_worker_params *rx_params, unsigned nb_rx_workers, struct tx_worker_params *tx_params, unsigned nb_tx_workers) + #include "args.h" + #include "kafka.h" + #include "nic.h" + #include "types.h" + #include "worker.h" + + +static void wait_for_workers(void) { - struct rte_eth_stats eth_stats; - unsigned i; - uint64_t in, out, depth, drops; - struct app_stats stats; - - // header - printf("\n\n %15s %15s %15s %15s \n", " ----- in -----", " --- queued ---", "----- out -----", "---- drops ----"); - - // summarize stats from each port - in = 0; - for (i = 0; i < rte_eth_dev_count(); i++) { - rte_eth_stats_get(i, ð_stats); - in += eth_stats.ipackets; - } - printf("[nic] %15" PRIu64 " %15s %15s %15s \n", in, "-", "-", "-"); - - // summarize receive; from network to receive queues - in = out = depth = drops = 0; - for (i = 0; i < nb_rx_workers; i++) { - in += rx_params[i].stats.in; - out += rx_params[i].stats.out; - depth += rx_params[i].stats.depth; - drops += rx_params[i].stats.drops; - } - printf("[rx] %15" PRIu64 " %15s %15" PRIu64 " %15" PRIu64 "\n", in, "-", out, drops); - - // summarize transmit; from receive queues to transmit rings - in = out = depth = 0; - for (i = 0; i < nb_tx_workers; i++) { - in += tx_params[i].stats.in; - out += tx_params[i].stats.out; - depth += tx_params[i].stats.depth; - } - printf("[tx] %15" PRIu64 " %15s %15" PRIu64 " %15" PRIu64 "\n", in, "-", out, in - out); - - // summarize push to kafka; from transmit rings to librdkafka - kaf_stats(&stats); - printf("[kaf] %15" PRIu64 " %15" PRIu64 " %15" PRIu64 " %15" PRIu64 "\n", stats.in, stats.depth, stats.out, stats.drops); - - // summarize any errors on the ports - for (i = 0; i < rte_eth_dev_count(); i++) { - rte_eth_stats_get(i, ð_stats); - - if(eth_stats.ierrors > 0 || eth_stats.oerrors > 0 || eth_stats.rx_nombuf > 0) { - printf("\nErrors: Port %u \n", i); - printf(" - In Errs: %" PRIu64 "\n", eth_stats.ierrors); - printf(" - Out Errs: %" PRIu64 "\n", eth_stats.oerrors); - printf(" - Mbuf Errs: %" PRIu64 "\n", eth_stats.rx_nombuf); - } - } -} - -/* - * Handles interrupt signals. - */ -static void sig_handler(int sig_num) -{ - LOG_INFO(USER1, "Exiting on signal '%d'\n", sig_num); - - // set quit flag for rx thread to exit - quit_signal = 1; -} - -static int monitor_workers(struct rx_worker_params *rx_params, unsigned nb_rx_workers, struct tx_worker_params *tx_params, unsigned nb_tx_workers) -{ - LOG_INFO(USER1, "Starting to monitor workers; core=%u, socket=%u \n", rte_lcore_id(), rte_socket_id()); - while (!quit_signal) { - kaf_poll(); - print_stats(rx_params, nb_rx_workers, tx_params, nb_tx_workers); - sleep(5); - } - - LOG_INFO(USER1, "Finished monitoring workers; core=%u, socket=%u \n", rte_lcore_id(), rte_socket_id()); - return 0; -} - -/** - * Process packets from a single queue. - */ -static int receive_worker(struct rx_worker_params* params) -{ - const uint8_t nb_ports = rte_eth_dev_count(); - const unsigned socket_id = rte_socket_id(); - const uint16_t burst_size = app.burst_size; - const uint16_t queue_id = params->queue_id; - struct rte_ring *ring = params->output_ring; - int i, dev_socket_id; - uint8_t port; - struct rte_mbuf* pkts[MAX_BURST_SIZE]; - const int attempts = MAX_BURST_SIZE / burst_size; - - LOG_INFO(USER1, "Receive worker started; core=%u, socket=%u, queue=%u attempts=%d \n", rte_lcore_id(), socket_id, queue_id, attempts); - - // validate each port - for (port = 0; port < nb_ports; port++) { - - // skip ports that are not enabled - if ((app.enabled_port_mask & (1 << port)) == 0) { - continue; - } - - // check for cross-socket communication - dev_socket_id = rte_eth_dev_socket_id(port); - if (dev_socket_id >= 0 && ((unsigned) dev_socket_id) != socket_id) { - LOG_WARN(USER1, "Warning: Port %u on different socket from worker; performance will suffer\n", port); - } - } - - port = 0; - while (!quit_signal) { - - // skip to the next enabled port - if ((app.enabled_port_mask & (1 << port)) == 0) { - if (++port == nb_ports) { - port = 0; - } - continue; - } - - // receive a 'burst' of packets. if get back the max number requested, then there - // are likely more packets waiting. immediately go back and grab some. - i = 0; - uint16_t nb_in = 0, nb_in_last = 0; - do { - nb_in_last = rte_eth_rx_burst(port, queue_id, &pkts[nb_in], burst_size); - nb_in += nb_in_last; - - } while (++i < attempts && nb_in_last == burst_size); - params->stats.in += nb_in; + unsigned lcore_id; - // add each packet to the ring buffer - if(likely(nb_in) > 0) { - const uint16_t nb_out = rte_ring_enqueue_burst(ring, (void *) pkts, nb_in); - params->stats.out += nb_out; - params->stats.drops += (nb_in - nb_out); - } - - // clean-up the packet buffer - for (i = 0; i < nb_in; i++) { - rte_pktmbuf_free(pkts[i]); - } - - // wrap-around to the first port - if (++port == nb_ports) { - port = 0; + // wait for each worker to complete + RTE_LCORE_FOREACH_SLAVE(lcore_id) { + if (rte_eal_wait_lcore(lcore_id) < 0) { + LOG_WARN(USER1, "Failed to wait for worker; lcore=%u \n", lcore_id); } } - - LOG_INFO(USER1, "Receive worker finished; core=%u, socket=%u, queue=%u \n", rte_lcore_id(), socket_id, queue_id); - return 0; } -/** - * - */ -static int transmit_worker(struct tx_worker_params *params) -{ - unsigned i, nb_in, nb_out; - const uint16_t burst_size = params->burst_size; - struct rte_ring *ring = params->input_ring; - const int kafka_id = params->kafka_id; - - LOG_INFO(USER1, "Transmit worker started; core=%u, socket=%u \n", rte_lcore_id(), rte_socket_id()); - while (!quit_signal) { - - // dequeue packets from the ring - struct rte_mbuf* pkts[MAX_BURST_SIZE]; - nb_in = rte_ring_dequeue_burst(ring, (void*) pkts, burst_size); - - if(likely(nb_in > 0)) { - params->stats.in += nb_in; - // prepare the packets to be sent to kafka - nb_out = kaf_send(pkts, nb_in, kafka_id); - params->stats.out += nb_out; - } - - // clean-up the packet buffer - for (i = 0; i < nb_in; i++) { - rte_pktmbuf_free(pkts[i]); - } - } - - LOG_INFO(USER1, "Transmit worker finished; core=%u, socket=%u \n", rte_lcore_id(), rte_socket_id()); - return 0; -} - -/** - * Get it going. - */ int main(int argc, char* argv[]) { - unsigned lcore_id; - unsigned nb_workers; - uint8_t port_id; - unsigned nb_ports; - unsigned nb_ports_available; - struct rte_mempool* mbuf_pool; - unsigned n, i; - unsigned nb_rx_workers, nb_tx_workers; - unsigned rx_worker_id = 0, tx_worker_id = 0; - char buf[32]; - - // catch interrupt - signal(SIGINT, sig_handler); - - // initialize the environment - int ret = rte_eal_init(argc, argv); - if (ret < 0) { - rte_exit(EXIT_FAILURE, "Failed to initialize EAL: %i\n", ret); - } - - // advance past the environmental settings - argc -= ret; - argv += ret; + app_params p = {0}; + parse_args(argc, argv, &p); - // parse arguments to the application - ret = parse_args(argc, argv); - if (ret < 0) { - rte_exit(EXIT_FAILURE, "Invalid parameters\n"); - } + struct rte_ring *tx_rings[p.nb_rx_queue]; + rx_worker_params rx_params[p.nb_rx_workers]; + tx_worker_params tx_params[p.nb_tx_workers]; - nb_workers = rte_lcore_count() - 1; - nb_ports_available = nb_ports = rte_eth_dev_count(); - nb_rx_workers = app.nb_rx_queue; - nb_tx_workers = nb_workers - nb_rx_workers; - n = NUM_MBUFS * nb_ports; + // initialize + kaf_init(p.nb_tx_workers, p.kafka_topic, p.kafka_config_path, p.kafka_stats_path); + init_receive(p.enabled_port_mask, p.nb_rx_queue, p.nb_rx_desc); + init_transmit(tx_rings, p.nb_rx_queue, p.tx_ring_size); - // validate the number of workers - if(nb_tx_workers < nb_rx_workers) { - rte_exit(EXIT_FAILURE, "Additional lcore(s) required; found=%u, required=%u \n", - rte_lcore_count(), (app.nb_rx_queue*2) + 1); - } - - // create memory pool - mbuf_pool = rte_pktmbuf_pool_create("mbuf-pool", n, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); - if (mbuf_pool == NULL) { - rte_exit(EXIT_FAILURE, "Unable to create memory pool; n=%u, cache_size=%u, data_room_size=%u, socket=%u \n", - n, MBUF_CACHE_SIZE, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); - } - - // initialize each specified ethernet ports - for (port_id = 0; port_id < nb_ports; port_id++) { - - // skip over ports that are not enabled - if ((app.enabled_port_mask & (1 << port_id)) == 0) { - LOG_INFO(USER1, "Skipping over disabled port '%d'\n", port_id); - nb_ports_available--; - continue; - } - - // initialize the port - creates one receive queue for each worker - LOG_INFO(USER1, "Initializing port %u\n", (unsigned)port_id); - if (init_port(port_id, mbuf_pool) != 0) { - rte_exit(EXIT_FAILURE, "Cannot initialize port %" PRIu8 "\n", port_id); - } - } - - // ensure that we were able to initialize enough ports - if (nb_ports_available < 1) { - rte_exit(EXIT_FAILURE, "Error: No available enabled ports. Portmask set?\n"); - } - - // each transmit worker has their own kafka client connection - kaf_init(nb_tx_workers); - - struct rx_worker_params rx_params[nb_rx_workers]; - struct tx_worker_params tx_params[nb_tx_workers]; - - // create the transmit rings - 1 for each receive queue - struct rte_ring *tx_rings[app.nb_rx_queue]; - for(i = 0; i < app.nb_rx_queue; i++) { - sprintf(buf, "tx-ring-%d", i); - tx_rings[i] = rte_ring_create(buf, app.tx_ring_size, rte_socket_id(), 0); - if(NULL == tx_rings[i]) { - rte_exit(EXIT_FAILURE, "Unable to create transmit ring: %s \n", rte_strerror(rte_errno)); - } - } - - // launch the workers - RTE_LCORE_FOREACH_SLAVE(lcore_id) { - - if(rx_worker_id < nb_rx_workers) { - - LOG_INFO(USER1, "Launching receive worker; worker=%u, core=%u, queue=%u\n", rx_worker_id, lcore_id, rx_worker_id); - rx_params[rx_worker_id] = (struct rx_worker_params) { - .worker_id = rx_worker_id, - .queue_id = rx_worker_id, - .burst_size = app.burst_size, - .output_ring = tx_rings[rx_worker_id], - .stats = {0} - }; - rte_eal_remote_launch((lcore_function_t*) receive_worker, &rx_params[rx_worker_id], lcore_id); - rx_worker_id++; - - } else { - - unsigned ring_id = tx_worker_id % app.nb_rx_queue; - LOG_INFO(USER1, "Launching transmit worker; worker=%u, core=%u ring=%u \n", tx_worker_id, lcore_id, ring_id); - tx_params[tx_worker_id] = (struct tx_worker_params) { - .worker_id = tx_worker_id, - .burst_size = app.burst_size, - .input_ring = tx_rings[ring_id], - .kafka_id = tx_worker_id, - .stats = {0} - }; - rte_eal_remote_launch((lcore_function_t*) transmit_worker, &tx_params[tx_worker_id], lcore_id); - tx_worker_id++; - } - - } - - // allow the master to monitor each of the workers - monitor_workers(rx_params, nb_rx_workers, tx_params, nb_tx_workers); - - // wait for each worker to complete - RTE_LCORE_FOREACH_SLAVE(lcore_id) { - if (rte_eal_wait_lcore(lcore_id) < 0) { - LOG_WARN(USER1, "Failed to wait for worker; lcore=%u \n", lcore_id); - return -1; - } - } + // start receive and transmit workers + start_workers(rx_params, tx_params, tx_rings, &p); + monitor_workers(rx_params, p.nb_rx_workers, tx_params, p.nb_tx_workers); + wait_for_workers(); + // clean up kaf_close(); return 0; } - http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-sensors/fastcapa/src/main.h ---------------------------------------------------------------------- diff --git a/metron-sensors/fastcapa/src/main.h b/metron-sensors/fastcapa/src/main.h deleted file mode 100644 index 0a230a5..0000000 --- a/metron-sensors/fastcapa/src/main.h +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef METRON_MAIN_H -#define METRON_MAIN_H - -#include <stdio.h> -#include <stdlib.h> -#include <stdint.h> -#include <inttypes.h> -#include <sys/types.h> -#include <string.h> -#include <sys/queue.h> -#include <stdarg.h> -#include <errno.h> -#include <getopt.h> -#include <unistd.h> -#include <signal.h> -#include <rte_common.h> -#include <rte_byteorder.h> -#include <rte_log.h> -#include <rte_memory.h> -#include <rte_memcpy.h> -#include <rte_memzone.h> -#include <rte_eal.h> -#include <rte_per_lcore.h> -#include <rte_launch.h> -#include <rte_atomic.h> -#include <rte_cycles.h> -#include <rte_prefetch.h> -#include <rte_lcore.h> -#include <rte_per_lcore.h> -#include <rte_branch_prediction.h> -#include <rte_interrupts.h> -#include <rte_pci.h> -#include <rte_random.h> -#include <rte_debug.h> -#include <rte_ether.h> -#include <rte_ethdev.h> -#include <rte_ring.h> -#include <rte_mempool.h> -#include <rte_mbuf.h> -#include <rte_ip.h> -#include <rte_tcp.h> -#include <rte_lpm.h> -#include <rte_string_fns.h> -#include <rte_distributor.h> -#include <rte_malloc.h> -#include <rte_errno.h> - -#include "args.h" -#include "kafka.h" -#include "types.h" - -/* - * the number of receive queue descriptors is a multiple of the packet burst size - */ -#define RX_QUEUE_MULT 4 -#define TX_QUEUE_SIZE 32 -#define NUM_MBUFS ((64 * 1024) - 1) -#define MBUF_CACHE_SIZE 250 - -// uncomment below line to enable debug logs -//#define DEBUG - -volatile uint8_t quit_signal; - -/** - * Default port configuration settings. - */ -const struct rte_eth_conf port_conf_default = { - .rxmode = { - .mq_mode = ETH_MQ_RX_RSS, - .max_rx_pkt_len = ETHER_MAX_LEN, - .enable_scatter = 1, - .enable_lro = 1 - }, - .txmode = { - .mq_mode = ETH_MQ_TX_NONE, - }, - .rx_adv_conf = { - .rss_conf = { - .rss_hf = ETH_RSS_IP | ETH_RSS_UDP | ETH_RSS_TCP | ETH_RSS_SCTP, - } - }, -}; - -int main(int argc, char* argv[]); - -#endif - http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-sensors/fastcapa/src/nic.c ---------------------------------------------------------------------- diff --git a/metron-sensors/fastcapa/src/nic.c b/metron-sensors/fastcapa/src/nic.c new file mode 100644 index 0000000..5c68430 --- /dev/null +++ b/metron-sensors/fastcapa/src/nic.c @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "nic.h" + +/** + * Default receive queue settings. + */ +static const struct rte_eth_conf rx_conf_default = { + .rxmode = { + .mq_mode = ETH_MQ_RX_RSS, + .max_rx_pkt_len = ETHER_MAX_LEN, + .enable_scatter = 0, + .enable_lro = 0 + }, + .txmode = { + .mq_mode = ETH_MQ_TX_NONE, + }, + .rx_adv_conf = { + .rss_conf = { + .rss_hf = ETH_RSS_IP | ETH_RSS_UDP | ETH_RSS_TCP | ETH_RSS_SCTP, + } + }, +}; + +/** + * Default transmit queue settings. + */ +static const struct rte_eth_txconf tx_conf_default = { + .tx_thresh = { + .pthresh = 0, + .hthresh = 0, + .wthresh = 0 + }, + .tx_rs_thresh = 0, + .tx_free_thresh = 0, + .txq_flags = 0, + .tx_deferred_start = 0 +}; + +/* + * Initialize a NIC port. + */ +static int init_port( + const uint8_t port_id, + struct rte_mempool* mem_pool, + const uint16_t nb_rx_queue, + const uint16_t nb_rx_desc) +{ + struct rte_eth_conf rx_conf = rx_conf_default; + struct rte_eth_txconf tx_conf = tx_conf_default; + int retval; + uint16_t q; + int retry = 5; + const uint16_t tx_queues = 1; + int socket; + struct rte_eth_dev_info dev_info; + + if (port_id >= rte_eth_dev_count()) { + rte_exit(EXIT_FAILURE, "Port does not exist; port=%u \n", port_id); + return -1; + } + + // check that the number of RX queues does not exceed what is supported by the device + rte_eth_dev_info_get(port_id, &dev_info); + if (nb_rx_queue > dev_info.max_rx_queues) { + rte_exit(EXIT_FAILURE, "Too many RX queues for device; port=%u, rx_queues=%u, max_queues=%u \n", + port_id, nb_rx_queue, dev_info.max_rx_queues); + return -EINVAL; + } + + // check that the number of TX queues does not exceed what is supported by the device + if (tx_queues > dev_info.max_tx_queues) { + rte_exit(EXIT_FAILURE, "Too many TX queues for device; port=%u, tx_queues=%u, max_queues=%u \n", + port_id, tx_queues, dev_info.max_tx_queues); + return -EINVAL; + } + + retval = rte_eth_dev_configure(port_id, nb_rx_queue, tx_queues, &rx_conf); + if (retval != 0) { + rte_exit(EXIT_FAILURE, "Cannot configure device; port=%u, err=%s \n", port_id, strerror(-retval)); + return retval; + } + + // create the receive queues + socket = rte_eth_dev_socket_id(port_id); + for (q = 0; q < nb_rx_queue; q++) { + retval = rte_eth_rx_queue_setup(port_id, q, nb_rx_desc, socket, NULL, mem_pool); + if (retval != 0) { + rte_exit(EXIT_FAILURE, "Cannot setup RX queue; port=%u, err=%s \n", port_id, strerror(-retval)); + return retval; + } + } + + // create the transmit queues - at least one TX queue must be setup even though we don't use it + for (q = 0; q < tx_queues; q++) { + retval = rte_eth_tx_queue_setup(port_id, q, TX_QUEUE_SIZE, socket, &tx_conf); + if (retval != 0) { + rte_exit(EXIT_FAILURE, "Cannot setup TX queue; port=%u, err=%s \n", port_id, strerror(-retval)); + return retval; + } + } + + // start the receive and transmit units on the device + retval = rte_eth_dev_start(port_id); + if (retval < 0) { + rte_exit(EXIT_FAILURE, "Cannot start device; port=%u, err=%s \n", port_id, strerror(-retval)); + return retval; + } + + // retrieve information about the device + struct rte_eth_link link; + do { + rte_eth_link_get_nowait(port_id, &link); + + } while (retry-- > 0 && !link.link_status && !sleep(1)); + + // if still no link information, must be down + if (!link.link_status) { + rte_exit(EXIT_FAILURE, "Link down; port=%u \n", port_id); + return 0; + } + + // enable promisc mode + rte_eth_promiscuous_enable(port_id); + + // print diagnostics + struct ether_addr addr; + rte_eth_macaddr_get(port_id, &addr); + LOG_INFO(USER1, "Device setup successfully; port=%u, mac=%02" PRIx8 " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n", + (unsigned int) port_id, + addr.addr_bytes[0], addr.addr_bytes[1], + addr.addr_bytes[2], addr.addr_bytes[3], + addr.addr_bytes[4], addr.addr_bytes[5]); + + return 0; +} + +/* + * Preparation for receiving and processing packets. + */ +int init_receive( + const uint8_t enabled_port_mask, + const uint16_t nb_rx_queue, + const uint16_t nb_rx_desc) +{ + unsigned int nb_ports_available; + unsigned int nb_ports; + unsigned int port_id; + unsigned int size; + + nb_ports_available = nb_ports = rte_eth_dev_count(); + + // create memory pool + size = NUM_MBUFS * nb_ports; + struct rte_mempool* mem_pool = rte_pktmbuf_pool_create("mbuf-pool", size, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); + if (mem_pool == NULL) { + rte_exit(EXIT_FAILURE, "Unable to create memory pool; n=%u, cache_size=%u, data_room_size=%u, socket=%u \n", + size, MBUF_CACHE_SIZE, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); + } + + // initialize each specified ethernet ports + for (port_id = 0; port_id < nb_ports; port_id++) { + + // skip over ports that are not enabled + if ((enabled_port_mask & (1 << port_id)) == 0) { + LOG_INFO(USER1, "Skipping over disabled port '%d'\n", port_id); + nb_ports_available--; + continue; + } + + // initialize the port - creates one receive queue for each worker + LOG_INFO(USER1, "Initializing port %u\n", (unsigned)port_id); + if (init_port(port_id, mem_pool, nb_rx_queue, nb_rx_desc) != 0) { + rte_exit(EXIT_FAILURE, "Cannot initialize port %" PRIu8 "\n", port_id); + } + } + + // ensure that we were able to initialize enough ports + if (nb_ports_available < 1) { + rte_exit(EXIT_FAILURE, "Error: No available enabled ports. Portmask set?\n"); + } + + return 0; +} + +/* + * Preparation for transmitting packets. + */ +int init_transmit( + struct rte_ring **tx_rings, + const unsigned int count, + const unsigned int size) +{ + unsigned int i; + char buf[32]; + + for(i = 0; i < count; i++) { + sprintf(buf, "tx-ring-%d", i); + tx_rings[i] = rte_ring_create(buf, size, rte_socket_id(), RING_F_SP_ENQ); + if(NULL == tx_rings[i]) { + rte_exit(EXIT_FAILURE, "Unable to create transmit ring: %s \n", rte_strerror(rte_errno)); + } + } + + return 0; +} http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-sensors/fastcapa/src/nic.h ---------------------------------------------------------------------- diff --git a/metron-sensors/fastcapa/src/nic.h b/metron-sensors/fastcapa/src/nic.h new file mode 100644 index 0000000..42e52b7 --- /dev/null +++ b/metron-sensors/fastcapa/src/nic.h @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef METRON_NIC_H +#define METRON_NIC_H + +#include <signal.h> +#include <unistd.h> +#include <rte_ethdev.h> +#include <rte_mempool.h> +#include <rte_mbuf.h> +#include <rte_errno.h> +#include "types.h" + +#define MBUF_CACHE_SIZE 250 +#define TX_QUEUE_SIZE 64 +#define NUM_MBUFS ((64 * 1024) - 1) + +/* + * Preparation for receiving and processing packets. + */ +int init_receive( + const uint8_t enabled_port_mask, + const uint16_t nb_rx_queue, + const uint16_t nb_rx_desc); + +/* + * Preparation for transmitting packets. + */ +int init_transmit( + struct rte_ring **tx_rings, + const unsigned int count, + const unsigned int size); + +#endif http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-sensors/fastcapa/src/types.h ---------------------------------------------------------------------- diff --git a/metron-sensors/fastcapa/src/types.h b/metron-sensors/fastcapa/src/types.h index 0210eda..e687651 100644 --- a/metron-sensors/fastcapa/src/types.h +++ b/metron-sensors/fastcapa/src/types.h @@ -19,20 +19,103 @@ #ifndef METRON_TYPES_H #define METRON_TYPES_H -/** - * Tracks packet processing stats. +#include <rte_log.h> +#include <rte_memory.h> + +/* + * Allow strings to be used for preprocessor #define's + */ +#define STR_EXPAND(tok) #tok +#define STR(tok) STR_EXPAND(tok) + +typedef int bool; +#define true 1 +#define false 0 + +#define valid(s) (s == NULL ? false : strlen(s) > 1) + +/* + * Allows unused function parameters to be marked as unused to + * avoid unnecessary compile-time warnings. + */ +#ifdef __GNUC__ +# define UNUSED(x) UNUSED_ ## x __attribute__((__unused__)) +#else +# define UNUSED(x) UNUSED_ ## x +#endif + +/* + * Logging definitions */ -struct app_stats { +#define LOG_ERROR(log_type, fmt, args...) RTE_LOG(ERR, log_type, fmt, ##args); +#define LOG_WARN(log_type, fmt, args...) RTE_LOG(WARNING, log_type, fmt, ##args); +#define LOG_INFO(log_type, fmt, args...) RTE_LOG(INFO, log_type, fmt, ##args); + +#ifdef DEBUG +#define LOG_LEVEL RTE_LOG_DEBUG +#define LOG_DEBUG(log_type, fmt, args...) RTE_LOG(DEBUG, log_type, fmt, ##args); +#else +#define LOG_LEVEL RTE_LOG_INFO +#define LOG_DEBUG(log_type, fmt, args...) do {} while (0) +#endif + +/* + * Application configuration parameters. + */ +typedef struct { + + /* The number of receive descriptors to allocate for the receive ring. */ + uint16_t nb_rx_desc; + + /* The number of receive queues to set up for each ethernet device. */ + uint16_t nb_rx_queue; + + /* The size of the transmit ring (must be a power of 2). */ + unsigned int tx_ring_size; + + /* The maximum number of packets retrieved by the receive worker. */ + uint16_t rx_burst_size; + + /* The maximum number of packets retrieved by the transmit worker. */ + unsigned int tx_burst_size; + + /* Defines which ports packets will be consumed from. */ + unsigned int enabled_port_mask; + + /* The name of the Kafka topic that packet data is sent to. */ + const char* kafka_topic; + + /* A file containing configuration values for the Kafka client. */ + char* kafka_config_path; + + /* A file to which the Kafka stats are appended to. */ + char* kafka_stats_path; + + /* The number of receive workers. */ + unsigned int nb_rx_workers; + + /* The number of transmit workers. */ + unsigned int nb_tx_workers; + + /* The number of NIC ports from which packets will be consumed. */ + unsigned int nb_ports; + +} app_params __rte_cache_aligned; + +/* + * Tracks packet processing metrics. + */ +typedef struct { uint64_t in; uint64_t out; uint64_t depth; uint64_t drops; -} __rte_cache_aligned; +} app_stats __rte_cache_aligned; -/** +/* * The parameters required by a receive worker. */ -struct rx_worker_params { +typedef struct { /* worker identifier */ uint16_t worker_id; @@ -41,38 +124,43 @@ struct rx_worker_params { uint16_t queue_id; /* how many packets are pulled off the queue at a time */ - uint16_t burst_size; + uint16_t rx_burst_size; + + /* Defines which ports packets will be consumed from. */ + unsigned int enabled_port_mask; /* the ring onto which the packets are enqueued */ struct rte_ring *output_ring; /* metrics */ - struct app_stats stats; + app_stats stats; -} __rte_cache_aligned; +} rx_worker_params __rte_cache_aligned; -/** +/* * The parameters required by a transmit worker. */ -struct tx_worker_params { +typedef struct { /* worker identifier */ uint16_t worker_id; /* how many packets are pulled off the ring at a time */ - uint16_t burst_size; + unsigned int tx_burst_size; + + /* The size of the transmit ring (must be a power of 2). */ + unsigned int tx_ring_size; /* the ring from which packets are dequeued */ struct rte_ring *input_ring; /* identifies the kafka client connection used by the worker */ int kafka_id; - + /* worker metrics */ - struct app_stats stats; + app_stats stats; -} __rte_cache_aligned; +} tx_worker_params __rte_cache_aligned; #endif - http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-sensors/fastcapa/src/worker.c ---------------------------------------------------------------------- diff --git a/metron-sensors/fastcapa/src/worker.c b/metron-sensors/fastcapa/src/worker.c new file mode 100644 index 0000000..aa6a7c3 --- /dev/null +++ b/metron-sensors/fastcapa/src/worker.c @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "worker.h" + +/* + * Handles interrupt signals. + */ +static void sig_handler(int sig_num) +{ + LOG_INFO(USER1, "Exiting on signal '%d'\n", sig_num); + + // set quit flag for rx thread to exit + quit_signal = 1; +} + +/* + * Prints packet processing metrics to stdout. + */ +static void print_stats( + const rx_worker_params *rx_params, + const unsigned nb_rx_workers, + const tx_worker_params *tx_params, + const unsigned nb_tx_workers) +{ + struct rte_eth_stats eth_stats; + unsigned i; + uint64_t in, out, depth, drops; + app_stats stats; + + // header + printf("\n\n %15s %15s %15s %15s \n", " ----- in -----", " --- queued ---", "----- out -----", "---- drops ----"); + + // summarize stats from each port + in = out = depth = drops = 0; + for (i = 0; i < rte_eth_dev_count(); i++) { + rte_eth_stats_get(i, ð_stats); + in += eth_stats.ipackets; + drops += eth_stats.ierrors + eth_stats.oerrors + eth_stats.rx_nombuf; + } + printf("[nic] %15" PRIu64 " %15" PRIu64 " %15s %15" PRIu64 "\n", in, depth, "-", drops); + + // summarize receive; from network to receive queues + in = out = depth = drops = 0; + for (i = 0; i < nb_rx_workers; i++) { + in += rx_params[i].stats.in; + out += rx_params[i].stats.out; + depth += rx_params[i].stats.depth; + drops += rx_params[i].stats.drops; + } + printf("[rx] %15" PRIu64 " %15s %15" PRIu64 " %15" PRIu64 "\n", in, "-", out, drops); + + // summarize transmit; from receive queues to transmit rings + in = out = depth = drops = 0; + for (i = 0; i < nb_tx_workers; i++) { + in += tx_params[i].stats.in; + out += tx_params[i].stats.out; + depth += tx_params[i].stats.depth; + drops += tx_params[i].stats.drops; + } + printf("[tx] %15" PRIu64 " %15s %15" PRIu64 " %15" PRIu64 "\n", in, "-", out, drops); + + // summarize push to kafka; from transmit rings to librdkafka + kaf_stats(&stats); + printf("[kaf] %15" PRIu64 " %15" PRIu64 " %15" PRIu64 " %15" PRIu64 "\n", stats.in, stats.depth, stats.out, stats.drops); + fflush(stdout); +} + +/* + * Process packets from a single queue. + */ +static int receive_worker(rx_worker_params* params) +{ + const uint8_t nb_ports = rte_eth_dev_count(); + const unsigned socket_id = rte_socket_id(); + const uint16_t rx_burst_size = params->rx_burst_size; + const uint16_t queue_id = params->queue_id; + struct rte_ring *ring = params->output_ring; + int i, dev_socket_id; + uint8_t port; + struct rte_mbuf* pkts[MAX_RX_BURST_SIZE]; + //const int attempts = MAX_RX_BURST_SIZE / rx_burst_size; + const int attempts = 0; + + LOG_INFO(USER1, "Receive worker started; core=%u, socket=%u, queue=%u attempts=%d \n", rte_lcore_id(), socket_id, queue_id, attempts); + + // validate each port + for (port = 0; port < nb_ports; port++) { + + // skip ports that are not enabled + if ((params->enabled_port_mask & (1 << port)) == 0) { + continue; + } + + // check for cross-socket communication + dev_socket_id = rte_eth_dev_socket_id(port); + if (dev_socket_id >= 0 && ((unsigned) dev_socket_id) != socket_id) { + LOG_WARN(USER1, "Warning: Port %u on different socket from worker; performance will suffer\n", port); + } + } + + port = 0; + while (!quit_signal) { + + // skip to the next enabled port + if ((params->enabled_port_mask & (1 << port)) == 0) { + if (++port == nb_ports) { + port = 0; + } + continue; + } + + // receive a 'burst' of packets. if get back the max number requested, then there + // are likely more packets waiting. immediately go back and grab some. + i = 0; + uint16_t nb_in = 0, nb_in_last = 0; + do { + nb_in_last = rte_eth_rx_burst(port, queue_id, &pkts[nb_in], rx_burst_size); + nb_in += nb_in_last; + + } while (++i < attempts && nb_in_last == rx_burst_size); + params->stats.in += nb_in; + + // add each packet to the ring buffer + if(likely(nb_in) > 0) { + const uint16_t nb_out = rte_ring_enqueue_burst(ring, (void *) pkts, nb_in); + params->stats.out += nb_out; + params->stats.drops += (nb_in - nb_out); + } + + // clean-up the packet buffer + for (i = 0; i < nb_in; i++) { + rte_pktmbuf_free(pkts[i]); + } + + // wrap-around to the first port + if (++port == nb_ports) { + port = 0; + } + } + + LOG_INFO(USER1, "Receive worker finished; core=%u, socket=%u, queue=%u \n", rte_lcore_id(), socket_id, queue_id); + return 0; +} + +/* + * The transmit worker is responsible for consuming packets from the transmit + * rings and queueing the packets for bulk delivery to kafka. + */ +static int transmit_worker(tx_worker_params *params) +{ + unsigned i, nb_in, nb_out; + const unsigned int tx_burst_size = params->tx_burst_size; + struct rte_ring *ring = params->input_ring; + const int kafka_id = params->kafka_id; + + LOG_INFO(USER1, "Transmit worker started; core=%u, socket=%u \n", rte_lcore_id(), rte_socket_id()); + while (!quit_signal) { + + // dequeue packets from the ring + struct rte_mbuf* pkts[params->tx_ring_size]; + nb_in = rte_ring_dequeue_burst(ring, (void*) pkts, tx_burst_size); + + if(likely(nb_in > 0)) { + params->stats.in += nb_in; + + // prepare the packets to be sent to kafka + nb_out = kaf_send(pkts, nb_in, kafka_id); + params->stats.out += nb_out; + + // clean-up the packet buffer + for (i = 0; i < nb_in; i++) { + rte_pktmbuf_free(pkts[i]); + } + } + } + + LOG_INFO(USER1, "Transmit worker finished; core=%u, socket=%u \n", rte_lcore_id(), rte_socket_id()); + return 0; +} + +/* + * Start the receive and transmit works. + */ +int start_workers( + rx_worker_params* rx_params, + tx_worker_params* tx_params, + struct rte_ring **tx_rings, + app_params *p) +{ + unsigned lcore_id; + unsigned rx_worker_id = 0; + unsigned tx_worker_id = 0; + + signal(SIGINT, sig_handler); + + // launch the workers + RTE_LCORE_FOREACH_SLAVE(lcore_id) { + + if(rx_worker_id < p->nb_rx_workers) { + + LOG_INFO(USER1, "Launching receive worker; worker=%u, core=%u, queue=%u\n", rx_worker_id, lcore_id, rx_worker_id); + rx_params[rx_worker_id] = (rx_worker_params) { + .worker_id = rx_worker_id, + .queue_id = rx_worker_id, + .rx_burst_size = p->rx_burst_size, + .enabled_port_mask = p->enabled_port_mask, + .output_ring = tx_rings[rx_worker_id], + .stats = {0} + }; + rte_eal_remote_launch((lcore_function_t*) receive_worker, &rx_params[rx_worker_id], lcore_id); + rx_worker_id++; + + } else { + + unsigned ring_id = tx_worker_id % p->nb_rx_queue; + LOG_INFO(USER1, "Launching transmit worker; worker=%u, core=%u ring=%u \n", tx_worker_id, lcore_id, ring_id); + tx_params[tx_worker_id] = (tx_worker_params) { + .worker_id = tx_worker_id, + .tx_burst_size = p->tx_burst_size, + .tx_ring_size = p->tx_ring_size, + .input_ring = tx_rings[ring_id], + .kafka_id = tx_worker_id, + .stats = {0} + }; + rte_eal_remote_launch((lcore_function_t*) transmit_worker, &tx_params[tx_worker_id], lcore_id); + tx_worker_id++; + } + + } + + return 0; +} + +/* + * Monitors the receive and transmit workers. Executed by the main thread, while + * other threads are created to perform the actual packet processing. + */ +int monitor_workers( + const rx_worker_params *rx_params, + const unsigned nb_rx_workers, + const tx_worker_params *tx_params, + const unsigned nb_tx_workers) +{ + LOG_INFO(USER1, "Starting to monitor workers; core=%u, socket=%u \n", rte_lcore_id(), rte_socket_id()); + while (!quit_signal) { + kaf_poll(); + print_stats(rx_params, nb_rx_workers, tx_params, nb_tx_workers); + sleep(5); + } + + LOG_INFO(USER1, "Finished monitoring workers; core=%u, socket=%u \n", rte_lcore_id(), rte_socket_id()); + return 0; +} http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-sensors/fastcapa/src/worker.h ---------------------------------------------------------------------- diff --git a/metron-sensors/fastcapa/src/worker.h b/metron-sensors/fastcapa/src/worker.h new file mode 100644 index 0000000..243e842 --- /dev/null +++ b/metron-sensors/fastcapa/src/worker.h @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef METRON_WORKER_H +#define METRON_WORKER_H + +#include <stdint.h> +#include <unistd.h> + +#include <rte_ethdev.h> + +#include "types.h" +#include "kafka.h" + +volatile uint8_t quit_signal; + +/* + * Start the receive and transmit works. + */ +int start_workers( + rx_worker_params* rx_params, + tx_worker_params* tx_params, + struct rte_ring **tx_rings, + app_params *p); + +/* + * Monitors the receive and transmit workers. Executed by the main thread, while + * other threads are created to perform the actual packet processing. + */ +int monitor_workers( + const rx_worker_params *rx_params, + const unsigned nb_rx_workers, + const tx_worker_params *tx_params, + const unsigned nb_tx_workers); + +#endif
