METRON-986 Enhance Fastcapa to Support Intel X520 (nickwallen) closes 
apache/metron#608


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/50e521b0
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/50e521b0
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/50e521b0

Branch: refs/heads/master
Commit: 50e521b0077d098fdc89562d7aa9b95398ce7bb6
Parents: f7a94f2
Author: nickwallen <[email protected]>
Authored: Fri Jul 7 12:27:27 2017 -0400
Committer: nickallen <[email protected]>
Committed: Fri Jul 7 12:27:27 2017 -0400

----------------------------------------------------------------------
 .../roles/fastcapa/defaults/main.yml            |   3 +-
 .../roles/fastcapa/templates/fastcapa           |  88 ++--
 metron-sensors/fastcapa/README.md               |  48 +-
 metron-sensors/fastcapa/src/Makefile            |   6 +-
 metron-sensors/fastcapa/src/args.c              | 206 ++++-----
 metron-sensors/fastcapa/src/args.h              | 104 +----
 metron-sensors/fastcapa/src/kafka.c             | 106 +++--
 metron-sensors/fastcapa/src/kafka.h             |  31 +-
 metron-sensors/fastcapa/src/main.c              | 436 ++-----------------
 metron-sensors/fastcapa/src/main.h              | 106 -----
 metron-sensors/fastcapa/src/nic.c               | 222 ++++++++++
 metron-sensors/fastcapa/src/nic.h               |  50 +++
 metron-sensors/fastcapa/src/types.h             | 120 ++++-
 metron-sensors/fastcapa/src/worker.c            | 269 ++++++++++++
 metron-sensors/fastcapa/src/worker.h            |  51 +++
 15 files changed, 1017 insertions(+), 829 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-deployment/roles/fastcapa/defaults/main.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/fastcapa/defaults/main.yml 
b/metron-deployment/roles/fastcapa/defaults/main.yml
index 19168a8..a7edc08 100644
--- a/metron-deployment/roles/fastcapa/defaults/main.yml
+++ b/metron-deployment/roles/fastcapa/defaults/main.yml
@@ -35,7 +35,8 @@ fastcapa_bin: fastcapa
 fastcapa_portmask: 0x01
 fastcapa_kafka_config: /etc/fastcapa.conf
 fastcapa_topic: pcap
-fastcapa_burst_size: 32
+fastcapa_rx_burst_size: 32
+fastcapa_tx_burst_size: 256
 fastcapa_nb_rx_desc: 1024
 fastcapa_nb_rx_queue: 1
 fastcapa_tx_ring_size: 2048

http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-deployment/roles/fastcapa/templates/fastcapa
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/fastcapa/templates/fastcapa 
b/metron-deployment/roles/fastcapa/templates/fastcapa
index 5ee6abb..f34c603 100644
--- a/metron-deployment/roles/fastcapa/templates/fastcapa
+++ b/metron-deployment/roles/fastcapa/templates/fastcapa
@@ -22,49 +22,61 @@
 # processname: fastcapa
 #
 
-export RTE_SDK="{{ dpdk_sdk }}"
-export RTE_TARGET="{{ dpdk_target }}"
 export LD_LIBRARY_PATH="{{ fastcapa_ld_library_path }}"
 
 NAME="fastcapa"
 DESC="Metron network packet capture probe"
 PIDFILE=/var/run/$NAME.pid
 SCRIPTNAME=/etc/init.d/$NAME
-DAEMONLOG=/var/log/$NAME.log
+LOGROOT=/var/log/fastcapa
+DAEMONLOG=$LOGROOT/$NAME-stdout.log
+DAEMONERR=$LOGROOT/$NAME-stderr.log
 NOW=`date`
-DAEMON_PATH="{{ dpdk_sdk }}"
+DAEMON_PATH="/root"
 
 PORT_MASK="{{ fastcapa_portmask }}"
 KAFKA_TOPIC="{{ fastcapa_topic }}"
 KAFKA_CONFIG="{{ fastcapa_kafka_config }}"
-BURST_SIZE="{{ fastcapa_burst_size }}"
+RX_BURST_SIZE="{{ fastcapa_rx_burst_size }}"
+TX_BURST_SIZE="{{ fastcapa_tx_burst_size }}"
 NB_RX_DESC="{{ fastcapa_nb_rx_desc }}"
 NB_RX_QUEUE="{{ fastcapa_nb_rx_queue }}"
 TX_RING_SIZE="{{ fastcapa_tx_ring_size }}"
 
+DAEMON="{{ fastcapa_prefix }}/{{ fastcapa_bin }}"
+DAEMONOPTS+=" "
+DAEMONOPTS+=" -- "
+DAEMONOPTS+="-p $PORT_MASK "
+DAEMONOPTS+="-t $KAFKA_TOPIC "
+DAEMONOPTS+="-c $KAFKA_CONFIG "
+DAEMONOPTS+="-b $RX_BURST_SIZE "
+DAEMONOPTS+="-w $TX_BURST_SIZE "
+DAEMONOPTS+="-d $NB_RX_DESC "
+DAEMONOPTS+="-q $NB_RX_QUEUE "
+DAEMONOPTS+="-x $TX_RING_SIZE "
+
 case "$1" in
   start)
     printf "%-50s" "Starting $NAME..."
     echo "$NOW:  Starting $NAME..." >> $DAEMONLOG
 
-    cd $DAEMON_PATH
-    DAEMON="{{ fastcapa_prefix }}/{{ fastcapa_bin }}"
-    DAEMONOPTS+=" -- "
-    DAEMONOPTS+="-p $PORTMASK "
-    DAEMONOPTS+="-t $KAFKA_TOPIC "
-    DAEMONOPTS+="-c $KAFKA_CONFIG "
-    DAEMONOPTS+="-b $BURST_SIZE "
-    DAEMONOPTS+="-r $NB_RX_DESC "
-    DAEMONOPTS+="-q $NB_RX_QUEUE "
-    DAEMONOPTS+="-x $TX_RING_SIZE "
+    mkdir -p $LOGROOT
+    touch $DAEMONLOG
+    touch $DAEMONERR
 
+    cd $DAEMON_PATH
+    echo "$DAEMON $DAEMONOPTS >> $DAEMONLOG 2> $DAEMONERR" >> $DAEMONLOG
 
-    PID=`$DAEMON $DAEMONOPTS >> $DAEMONLOG 2>&1 & echo $!`
-    if [ -z $PID ]; then
-        printf "%s\n" "Fail"
+    if [ -f $PIDFILE ]; then
+        printf "%s\n" "Already running"
     else
-        echo $PID > $PIDFILE
-        printf "%s\n" "Ok"
+        PID=`$DAEMON $DAEMONOPTS >> $DAEMONLOG 2> $DAEMONERR & echo $!`
+        if [ -z $PID ]; then
+            printf "%s\n" "Fail"
+        else
+            echo $PID > $PIDFILE
+            printf "%s\n" "Ok"
+        fi
     fi
   ;;
 
@@ -87,10 +99,14 @@ case "$1" in
     PID=`cat $PIDFILE`
     cd $DAEMON_PATH
     if [ -f $PIDFILE ]; then
-        echo "$NOW:  Stopping $NAME with pid=$PID" >> $DAEMONLOG
-        kill -HUP $PID
-        printf "%s\n" "Ok"
-        rm -f $PIDFILE
+      while sleep 1
+        echo -n "."
+        kill -0 $PID >/dev/null 2>&1
+      do
+        kill -SIGINT $PID
+      done
+      printf "%s\n" "Ok"
+      rm -f $PIDFILE
     else
         printf "%s\n" "pidfile not found"
     fi
@@ -101,7 +117,29 @@ case "$1" in
     $0 start
   ;;
 
+  tail)
+    tail -F $LOGROOT/*
+  ;;
+
+  kill)
+    printf "%-50s" "Force killing $NAME"
+    PID=`cat $PIDFILE`
+    cd $DAEMON_PATH
+    if [ -f $PIDFILE ]; then
+      while sleep 1
+        echo -n "."
+        kill -0 $PID >/dev/null 2>&1
+      do
+        kill -SIGTERM $PID
+      done
+      printf "%s\n" "Ok"
+      rm -f $PIDFILE
+    else
+        printf "%s\n" "pidfile not found"
+    fi
+  ;;
+
   *)
-    echo "Usage: $0 {status|start|stop|restart}"
+    echo "Usage: $0 {status|start|stop|restart|kill|tail}"
     exit 1
 esac

http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-sensors/fastcapa/README.md
----------------------------------------------------------------------
diff --git a/metron-sensors/fastcapa/README.md 
b/metron-sensors/fastcapa/README.md
index 707b491..37c4189 100644
--- a/metron-sensors/fastcapa/README.md
+++ b/metron-sensors/fastcapa/README.md
@@ -32,7 +32,7 @@ Requirements
 
 The following system requirements must be met to run the Fastcapa probe.
 
-* Linux kernel >= 2.6.34 
+* Linux kernel >= 2.6.34
 * A [DPDK supported ethernet device; NIC](http://dpdk.org/doc/nics).
 * Port(s) on the ethernet device that can be dedicated for exclusive use by 
Fastcapa
 
@@ -128,7 +128,7 @@ The size of THPs that are supported will vary based on your 
CPU.  These typicall
     09:00.0 Ethernet controller: Cisco Systems Inc VIC Ethernet NIC (rev a2)
     0a:00.0 Ethernet controller: Cisco Systems Inc VIC Ethernet NIC (rev a2)
     ```
-    
+
 5. Bind the device.  Replace the device name and PCI address with what is 
appropriate for your environment.
     ```
     ifdown enp9s0f0
@@ -137,7 +137,7 @@ The size of THPs that are supported will vary based on your 
CPU.  These typicall
     ```
 
 6. Ensure that the device was bound. It should be shown as a 'network device 
using DPDK-compatible driver.'
-    ``` 
+    ```
     $ dpdk-devbind --status
     Network devices using DPDK-compatible driver
     ============================================
@@ -151,7 +151,7 @@ The size of THPs that are supported will vary based on your 
CPU.  These typicall
 
 The probe has been tested with [Librdkafka 
0.9.4](https://github.com/edenhill/librdkafka/releases/tag/v0.9.4).
 
-1. Choose an installation path.  In this example, the libs will actually be 
installed at `/usr/local/lib`; note that `lib` is appended to the prefix. 
+1. Choose an installation path.  In this example, the libs will actually be 
installed at `/usr/local/lib`; note that `lib` is appended to the prefix.
     ```
     export RDK_PREFIX=/usr/local
     ```
@@ -161,7 +161,7 @@ The probe has been tested with [Librdkafka 
0.9.4](https://github.com/edenhill/li
     wget https://github.com/edenhill/librdkafka/archive/v0.9.4.tar.gz  -O - | 
tar -xz
     cd librdkafka-0.9.4/
     ./configure --prefix=$RDK_PREFIX
-    make 
+    make
     make install
     ```
 
@@ -184,7 +184,7 @@ The probe has been tested with [Librdkafka 
0.9.4](https://github.com/edenhill/li
     cd metron/metron-sensors/fastcapa
     make
     ```
-    
+
 
 Usage
 -----
@@ -196,7 +196,7 @@ Follow these steps to run Fastcapa.
     [kafka-global]
     metadata.broker.list = kafka-broker1:9092
     ```
-    
+
 2. Bind the capture device.  This is only needed if the device is not already 
bound.  In this example, the device `enp9s0f0` with a PCI address of `09:00:0` 
is bound.  Use values specific to your environment.
     ```
     ifdown enp9s0f0
@@ -215,7 +215,7 @@ Follow these steps to run Fastcapa.
 
 Fastcapa accepts three sets of parameters.  
 
-1. Command-line parameters passed directly to DPDK's Environmental Abstraction 
Layer (EAL) 
+1. Command-line parameters passed directly to DPDK's Environmental Abstraction 
Layer (EAL)
 2. Command-line parameters that define how Fastcapa will interact with DPDK.  
These parametera are separated on the command line by a  double-dash (`--`).
 3. A configuration file that define how Fastcapa interacts with Librdkafka.
 
@@ -247,8 +247,9 @@ fastcapa -h
 | Name | Command | Description | Default |
 
|--------------------------|-----------------|---------------------------------------------------------------------------------------------------------------------------|---------|
 | Port Mask | -p PORT_MASK | A bit mask identifying which ports to bind. | 
0x01 |
-| Burst Size | -b BURST_SIZE | Maximum number of packets to receive at one 
time. | 32 |
-| Receive Descriptors | -r NB_RX_DESC | The number of descriptors for each 
receive queue (the size of the receive queue.)  Limited by the ethernet device 
in use. | 1024 |
+| Receive Burst Size | -b RX_BURST_SIZE | The max number of packets processed 
by a receive worker. | 32 |
+| Transmit Burst Size | -w TX_BURST_SIZE | The max number of packets processed 
by a transmit worker.  | 256 |
+| Receive Descriptors | -d NB_RX_DESC | The number of descriptors for each 
receive queue (the size of the receive queue.)  Limited by the ethernet device 
in use. | 1024 |
 | Transmission Ring Size | -x TX_RING_SIZE | The size of each transmission 
ring.  This must be a power of 2. | 2048 |
 | Number Receive Queues | -q NB_RX_QUEUE | Number of receive queues to use for 
each port.  Limited by the ethernet device in use. | 2 |
 | Kafka Topic | -t KAFKA_TOPIC | The name of the Kafka topic. | pcap |
@@ -258,6 +259,18 @@ fastcapa -h
 To get more information about the Fastcapa specific parameters, run the 
following.  Note that this puts the `-h` after the double-dash `--`.
 ```
 fastcapa -- -h
+
+fastcapa [EAL options] -- [APP options]
+  -p PORT_MASK        bitmask of ports to bind                     [0x01]
+  -b RX_BURST_SIZE    burst size of receive worker                 [32]
+  -w TX_BURST_SIZE    burst size of transmit worker                [256]
+  -d NB_RX_DESC       num of descriptors for receive ring          [1024]
+  -x TX_RING_SIZE     size of tx rings (must be a power of 2)      [2048]
+  -q NB_RX_QUEUE      num of receive queues for each device        [1]
+  -t KAFKA_TOPIC      name of the kafka topic                      [pcap]
+  -c KAFKA_CONF       file containing configs for kafka client
+  -s KAFKA_STATS      append kafka client stats to a file
+  -h                  print this help message
 ```
 
 #### Fastcapa-Kafka Configuration File
@@ -283,7 +296,7 @@ Global configuration values that should be located under 
the `[kafka-global]` he
 | queue.buffering.max.messages | Maximum number of messages allowed on the 
producer queue | 100000 |
 | queue.buffering.max.ms | Maximum time, in milliseconds, for buffering data 
on the producer queue | 1000 |
 | message.copy.max.bytes | Maximum size for message to be copied to buffer. 
Messages larger than this will be passed by reference (zero-copy) at the 
expense of larger iovecs. | 65535 |
-| batch.num.messages | Maximum number of messages batched in one MessageSet | 
10000 | 
+| batch.num.messages | Maximum number of messages batched in one MessageSet | 
10000 |
 | statistics.interval.ms | How often statistics are emitted; 0 = never | 0 |
 | compression.codec | Compression codec to use for compressing message sets; 
{none, gzip, snappy, lz4 } | none |
 
@@ -314,10 +327,10 @@ When running the probe some basic counters are output to 
stdout.  Of course duri
 * `[rx]` + `out`: The receive workers have enqueued 8 packets onto the 
transmission rings.
 * `[rx]` + `drops`: If the transmission rings become full it will prevent the 
receive workers from enqueuing additional packets.  The excess packets are 
dropped.  This value will never decrease.
 * `[tx]` + `in`: The transmission workers have consumed 8 packets.
-* `[tx]` + `out`: The transmission workers have packaged 8 packets into Kafka 
messages. 
+* `[tx]` + `out`: The transmission workers have packaged 8 packets into Kafka 
messages.
 * `[tx]` + `drops`: If the Kafka client library accepted fewer packets than 
expected.  This value can increase or decrease over time as additional packets 
are acknowledged by the Kafka client library at a later point in time.
 * `[kaf]` + `in`: The Kafka client library has received 8 packets.
-* `[kaf]` + `out`: A total of 7 packets has successfully reached Kafka. 
+* `[kaf]` + `out`: A total of 7 packets has successfully reached Kafka.
 * `[kaf]` + `queued`: There is 1 packet within the `rdkafka` queue waiting to 
be sent.
 
 ### Kerberos
@@ -335,7 +348,7 @@ The probe can be used in a Kerberized environment.  Follow 
these additional step
     wget https://github.com/edenhill/librdkafka/archive/v0.9.4.tar.gz  -O - | 
tar -xz
     cd librdkafka-0.9.4/
     ./configure --prefix=$RDK_PREFIX --enable-sasl
-    make 
+    make
     make install
     ```
 
@@ -368,7 +381,7 @@ The probe can be used in a Kerberized environment.  Follow 
these additional step
     sasl.kerberos.keytab = /etc/security/keytabs/metron.headless.keytab
     sasl.kerberos.principal = [email protected]
     ```
-    
+
 1. Now run Fastcapa as you normally would.  It should have no problem landing 
packets in your kerberized Kafka broker.
 
 How It Works
@@ -461,13 +474,13 @@ PANIC in rte_eal_init():
 Cannot get hugepage information
 ```
 
-Solution: This can occur if any process that has been allocated THPs crashes 
and fails to return the resources. 
+Solution: This can occur if any process that has been allocated THPs crashes 
and fails to return the resources.
 
 * Delete the THP files that are not in use.
     ```
     rm -f /mnt/huge_1GB/rtemap_*
     ```
-      
+
 * If the first option does not work, re-mount the `hugetlbfs` file system.
     ```
     umount -a -t hugetlbfs
@@ -493,4 +506,3 @@ modprobe uio_pci_generic
  dpdk-devbind --bind=uio_pci_generic "09:00.0"
  dpdk-devbind --status
 ```
-

http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-sensors/fastcapa/src/Makefile
----------------------------------------------------------------------
diff --git a/metron-sensors/fastcapa/src/Makefile 
b/metron-sensors/fastcapa/src/Makefile
index e9bf877..7cf1cee 100644
--- a/metron-sensors/fastcapa/src/Makefile
+++ b/metron-sensors/fastcapa/src/Makefile
@@ -29,9 +29,9 @@ include $(RTE_SDK)/mk/rte.vars.mk
 APP = fastcapa
 
 # all source are stored in SRCS-y
-SRCS-y := main.c args.c kafka.c
+SRCS-y := main.c args.c kafka.c nic.c worker.c
 
-KAFKALIB = -L/usr/local/lib -lrdkafka 
+KAFKALIB = -L/usr/local/lib -lrdkafka
 KAFKAINC = -I/usr/local/include/librdkafka/
 
 GLIB     = $(shell pkg-config --libs glib-2.0) -lgthread-2.0
@@ -47,6 +47,6 @@ CFLAGS_main.o += -Wno-return-type
 endif
 
 EXTRA_CFLAGS += -O3 -Wall
-#EXTRA_CFLAGS += -g -Wall
+#EXTRA_CFLAGS += -g -Wall -DDEBUG
 
 include $(RTE_SDK)/mk/rte.extapp.mk

http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-sensors/fastcapa/src/args.c
----------------------------------------------------------------------
diff --git a/metron-sensors/fastcapa/src/args.c 
b/metron-sensors/fastcapa/src/args.c
index 59d2748..9d88bea 100644
--- a/metron-sensors/fastcapa/src/args.c
+++ b/metron-sensors/fastcapa/src/args.c
@@ -18,28 +18,25 @@
 
 #include "args.h"
 
-typedef int bool;
-#define true 1
-#define false 0
-#define valid(s) (s == NULL ? false : strlen(s) > 1)
-
 /*
  * Print usage information to the user.
  */
 static void print_usage(void)
 {
     printf("fastcapa [EAL options] -- [APP options]\n"
-           "  -p PORT_MASK     bitmask of ports to bind                     
[%s]\n"
-           "  -b BURST_SIZE    max packets to retrieve at a time            
[%d]\n"
-           "  -r NB_RX_DESC    num of descriptors for receive ring          
[%d]\n"
-           "  -x TX_RING_SIZE  size of tx rings (must be a power of 2)      
[%d]\n"
-           "  -q NB_RX_QUEUE   num of receive queues for each device        
[%d]\n"
-           "  -t KAFKA_TOPIC   name of the kafka topic                      
[%s]\n"
-           "  -c KAFKA_CONF    file containing configs for kafka client        
 \n"
-           "  -s KAFKA_STATS   append kafka client stats to a file             
 \n"
-           "  -h               print this help message                         
 \n",
-        STR(DEFAULT_PORT_MASK), 
-        DEFAULT_BURST_SIZE, 
+           "  -p PORT_MASK        bitmask of ports to bind                     
[%s]\n"
+           "  -b RX_BURST_SIZE    burst size of receive worker                 
[%d]\n"
+           "  -w TX_BURST_SIZE    burst size of transmit worker                
[%d]\n"
+           "  -d NB_RX_DESC       num of descriptors for receive ring          
[%d]\n"
+           "  -x TX_RING_SIZE     size of tx rings (must be a power of 2)      
[%d]\n"
+           "  -q NB_RX_QUEUE      num of receive queues for each device        
[%d]\n"
+           "  -t KAFKA_TOPIC      name of the kafka topic                      
[%s]\n"
+           "  -c KAFKA_CONF       file containing configs for kafka client     
    \n"
+           "  -s KAFKA_STATS      append kafka client stats to a file          
    \n"
+           "  -h                  print this help message                      
    \n",
+        STR(DEFAULT_PORT_MASK),
+        DEFAULT_RX_BURST_SIZE,
+        DEFAULT_TX_BURST_SIZE,
         DEFAULT_NB_RX_DESC,
         DEFAULT_TX_RING_SIZE,
         DEFAULT_NB_RX_QUEUE,
@@ -77,25 +74,32 @@ static bool file_exists(const char* filepath)
     return (stat(filepath, &buf) == 0);
 }
 
-/**
- * Parse the command line arguments passed to the application.
+/*
+ * Parse the command line arguments passed to the application; the arguments
+ * which no not go directly to DPDK's EAL.
  */
-int parse_args(int argc, char** argv)
+static int parse_app_args(int argc, char** argv, app_params* p)
 {
     int opt;
     char** argvopt;
     int option_index;
-    int nb_workers;
+    unsigned int nb_workers;
     static struct option lgopts[] = {
         { NULL, 0, 0, 0 }
     };
 
-    // initialize args
-    memset(&app, 0, sizeof(struct app_params));
+    // set default args
+    p->enabled_port_mask = parse_portmask(STR(DEFAULT_PORT_MASK));
+    p->kafka_topic = STR(DEFAULT_KAFKA_TOPIC);
+    p->rx_burst_size = DEFAULT_RX_BURST_SIZE;
+    p->tx_burst_size = DEFAULT_TX_BURST_SIZE;
+    p->nb_rx_desc = DEFAULT_NB_RX_DESC;
+    p->nb_rx_queue = DEFAULT_NB_RX_QUEUE;
+    p->tx_ring_size = DEFAULT_TX_RING_SIZE;
 
     // parse arguments to this application
     argvopt = argv;
-    while ((opt = getopt_long(argc, argvopt, "hp:b:t:c:r:q:s:x:", lgopts, 
&option_index)) != EOF) {
+    while ((opt = getopt_long(argc, argvopt, "b:c:d:hp:q:s:t:w:x:", lgopts, 
&option_index)) != EOF) {
         switch (opt) {
 
             // help
@@ -103,24 +107,30 @@ int parse_args(int argc, char** argv)
                 print_usage();
                 return -1;
 
-            // burst size
+            // rx burst size
             case 'b':
-                app.burst_size = atoi(optarg);
-                printf("[ -b BURST_SIZE ] defined as %d \n", app.burst_size);
+                p->rx_burst_size = atoi(optarg);
+                if(p->rx_burst_size < 1 || p->rx_burst_size > 
MAX_RX_BURST_SIZE) {
+                    fprintf(stderr, "Invalid burst size; burst=%u must be in 
[1, %u]. \n", p->rx_burst_size, MAX_RX_BURST_SIZE);
+                    print_usage();
+                    return -1;
+                }
+                break;
 
-                if(app.burst_size < 1 || app.burst_size > MAX_BURST_SIZE) {
-                    fprintf(stderr, "Invalid burst size; burst=%u must be in 
[1, %u]. \n", app.burst_size, MAX_BURST_SIZE);
+            // tx burst size
+            case 'w':
+                p->tx_burst_size = atoi(optarg);
+                if(p->tx_burst_size < 1) {
+                    fprintf(stderr, "Invalid burst size; burst=%u must be > 0. 
\n", p->tx_burst_size);
                     print_usage();
                     return -1;
                 }
                 break;
 
             // number of receive descriptors
-            case 'r':
-                app.nb_rx_desc = atoi(optarg);
-                printf("[ -r NB_RX_DESC ] defined as %d \n", app.nb_rx_desc);
-
-                if (app.nb_rx_desc < 1) {
+            case 'd':
+                p->nb_rx_desc = atoi(optarg);
+                if (p->nb_rx_desc < 1) {
                     fprintf(stderr, "Invalid num of receive descriptors: '%s' 
\n", optarg);
                     print_usage();
                     return -1;
@@ -129,23 +139,14 @@ int parse_args(int argc, char** argv)
 
             // size of each transmit ring
             case 'x':
-                app.tx_ring_size = atoi(optarg);
-                printf("[ -x TX_RING_SIZE ] defined as %d \n", 
app.tx_ring_size);
+                p->tx_ring_size = atoi(optarg);
 
-                // must be a power of 2 and not 0
-                if (app.tx_ring_size == 0 || (app.tx_ring_size & 
(app.tx_ring_size - 1)) != 0) {
-                    fprintf(stderr, "Invalid tx ring size (must be power of 
2): '%s' \n", optarg);
-                    print_usage();
-                    return -1;
-                }
                 break;
 
             // number of receive queues for each device
             case 'q':
-                app.nb_rx_queue = atoi(optarg);
-                printf("[ -q NB_RX_QUEUE ] defined as %d \n", app.nb_rx_queue);
-
-                if (app.nb_rx_queue < 1) {
+                p->nb_rx_queue = atoi(optarg);
+                if (p->nb_rx_queue < 1) {
                     fprintf(stderr, "Invalid num of receive queues: '%s' \n", 
optarg);
                     print_usage();
                     return -1;
@@ -154,10 +155,8 @@ int parse_args(int argc, char** argv)
 
           // port mask
           case 'p':
-              app.enabled_port_mask = parse_portmask(optarg);
-              printf("[ -p PORT_MASK ] defined as %d \n", 
app.enabled_port_mask);
-
-              if (app.enabled_port_mask == 0) {
+              p->enabled_port_mask = parse_portmask(optarg);
+              if (p->enabled_port_mask == 0) {
                   fprintf(stderr, "Invalid portmask: '%s'\n", optarg);
                   print_usage();
                   return -1;
@@ -166,10 +165,8 @@ int parse_args(int argc, char** argv)
 
           // kafka topic
           case 't':
-              app.kafka_topic = strdup(optarg);
-              printf("[ -t KAFKA_TOPIC ] defined as %s \n", app.kafka_topic);
-
-              if (!valid(app.kafka_topic)) {
+              p->kafka_topic = strdup(optarg);
+              if (!valid(p->kafka_topic)) {
                   printf("Invalid kafka topic: '%s'\n", optarg);
                   print_usage();
                   return -1;
@@ -178,10 +175,8 @@ int parse_args(int argc, char** argv)
 
           // kafka config path
           case 'c':
-              app.kafka_config_path = strdup(optarg);
-              printf("[ -c KAFKA_CONFIG ] defined as %s \n", 
app.kafka_config_path);
-
-              if (!valid(app.kafka_config_path) || 
!file_exists(app.kafka_config_path)) {
+              p->kafka_config_path = strdup(optarg);
+              if (!valid(p->kafka_config_path) || 
!file_exists(p->kafka_config_path)) {
                   fprintf(stderr, "Invalid kafka config: '%s'\n", optarg);
                   print_usage();
                   return -1;
@@ -190,8 +185,7 @@ int parse_args(int argc, char** argv)
 
           // kafka stats path
           case 's':
-              app.kafka_stats_path = strdup(optarg);
-              printf("[ -s KAFKA_STATS ] defined as %s \n", 
app.kafka_stats_path);
+              p->kafka_stats_path = strdup(optarg);
               break;
 
           default:
@@ -200,42 +194,6 @@ int parse_args(int argc, char** argv)
           }
     }
 
-    // default PORT_MASK
-    if (app.enabled_port_mask == 0) {
-        printf("[ -p PORT_MASK ] undefined; defaulting to %s \n", 
STR(DEFAULT_PORT_MASK));
-        app.enabled_port_mask = DEFAULT_PORT_MASK;
-    }
-
-    // default KAFKA_TOPIC
-    if (!valid(app.kafka_topic)) {
-        printf("[ -t KAFKA_TOPIC ] undefined; defaulting to %s \n", 
STR(DEFAULT_KAFKA_TOPIC));
-        app.kafka_topic = STR(DEFAULT_KAFKA_TOPIC);
-    }
-
-    // default BURST_SIZE 
-    if (app.burst_size == 0) {
-        printf("[ -b BURST_SIZE ] undefined; defaulting to %d \n", 
DEFAULT_BURST_SIZE);
-        app.burst_size = DEFAULT_BURST_SIZE;
-    }
-
-    // default NB_RX_DESC
-    if (app.nb_rx_desc == 0) {
-        printf("[ -r NB_RX_DESC ] undefined; defaulting to %d \n", 
DEFAULT_NB_RX_DESC);
-        app.nb_rx_desc = DEFAULT_NB_RX_DESC;
-    }
-
-    // default NB_RX_QUEUE
-    if (app.nb_rx_queue == 0) {
-        printf("[ -q NB_RX_QUEUE ] undefined; defaulting to %d \n", 
DEFAULT_NB_RX_QUEUE);
-        app.nb_rx_queue = DEFAULT_NB_RX_QUEUE;
-    }
-
-    // default TX_RING_SIZE
-    if (app.tx_ring_size == 0) {
-        printf("[ -x TX_RING_SIZE ] undefined; defaulting to %u \n", 
DEFAULT_TX_RING_SIZE);
-        app.tx_ring_size = DEFAULT_TX_RING_SIZE;
-    }
-
     // check number of ethernet devices
     if (rte_eth_dev_count() == 0) {
          rte_exit(EXIT_FAILURE, "No ethernet ports detected.\n");
@@ -243,19 +201,63 @@ int parse_args(int argc, char** argv)
 
     // check number of workers
     nb_workers = rte_lcore_count() - 1;
-    if (nb_workers < 1) {
-        rte_exit(EXIT_FAILURE, "Minimum 2 logical cores required. \n");
-    }
 
     // need at least 1 worker for each receive queue
-    if(nb_workers < app.nb_rx_queue) {
-        rte_exit(EXIT_FAILURE, "Minimum 1 worker per receive queue; workers=%u 
rx_queues=%u. \n", 
-            nb_workers, app.nb_rx_queue);
+    if(nb_workers < p->nb_rx_queue) {
+        rte_exit(EXIT_FAILURE, "Minimum 1 worker per receive queue; workers=%u 
rx_queues=%u. \n",
+            nb_workers, p->nb_rx_queue);
     }
 
+    p->nb_rx_workers = p->nb_rx_queue;
+    p->nb_tx_workers = nb_workers - p->nb_rx_workers;
+
+    printf("[ -p PORT_MASK ] defined as %d \n", p->enabled_port_mask);
+    printf("[ -b RX_BURST_SIZE ] defined as %d \n", p->rx_burst_size);
+    printf("[ -w TX_BURST_SIZE ] defined as %d \n", p->tx_burst_size);
+    printf("[ -d NB_RX_DESC ] defined as %d \n", p->nb_rx_desc);
+    printf("[ -x TX_RING_SIZE ] defined as %d \n", p->tx_ring_size);
+    printf("[ -q NB_RX_QUEUE ] defined as %d \n", p->nb_rx_queue);
+    printf("[ -t KAFKA_TOPIC ] defined as %s \n", p->kafka_topic);
+    printf("[ -c KAFKA_CONFIG ] defined as %s \n", p->kafka_config_path);
+    printf("[ -s KAFKA_STATS ] defined as %s \n", p->kafka_stats_path);
+    printf("[ NUM_RX_WORKERS ] defined as %d \n", p->nb_rx_workers);
+    printf("[ NUM_TX_WORKERS ] defined as %d \n", p->nb_tx_workers);
+
     // reset getopt lib
     optind = 0;
-
     return 0;
 }
 
+/*
+ * Parse the command line arguments passed to the application.
+ */
+int parse_args(int argc, char** argv, app_params* p)
+{
+  // initialize the environment
+  int ret = rte_eal_init(argc, argv);
+  if (ret < 0) {
+      rte_exit(EXIT_FAILURE, "Failed to initialize EAL: %i\n", ret);
+  }
+
+  // advance past the environmental settings
+  argc -= ret;
+  argv += ret;
+
+  // parse arguments to the application
+  ret = parse_app_args(argc, argv, p);
+  if (ret < 0) {
+      rte_exit(EXIT_FAILURE, "\n");
+  }
+
+  p->nb_ports = rte_eth_dev_count();
+  p->nb_rx_workers = p->nb_rx_queue;
+  p->nb_tx_workers = (rte_lcore_count() - 1) - p->nb_rx_workers;
+
+  // validate the number of workers
+  if(p->nb_tx_workers < p->nb_rx_workers) {
+      rte_exit(EXIT_FAILURE, "Additional lcore(s) required; found=%u, 
required=%u \n",
+          rte_lcore_count(), (p->nb_rx_queue*2) + 1);
+  }
+
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-sensors/fastcapa/src/args.h
----------------------------------------------------------------------
diff --git a/metron-sensors/fastcapa/src/args.h 
b/metron-sensors/fastcapa/src/args.h
index 352b221..542b445 100644
--- a/metron-sensors/fastcapa/src/args.h
+++ b/metron-sensors/fastcapa/src/args.h
@@ -19,115 +19,27 @@
 #ifndef METRON_ARGS_H
 #define METRON_ARGS_H
 
-#include <stdio.h>
-#include <stdlib.h>
-#include <stdint.h>
-#include <inttypes.h>
+#include <unistd.h>
 #include <string.h>
-#include <sys/types.h>
-#include <sys/queue.h>
-#include <sys/stat.h>
-#include <stdarg.h>
-#include <errno.h>
 #include <getopt.h>
-#include <glib.h>
-#include <rte_common.h>
-#include <rte_byteorder.h>
-#include <rte_log.h>
+#include <sys/stat.h>
 #include <rte_memory.h>
-#include <rte_memcpy.h>
-#include <rte_memzone.h>
-#include <rte_eal.h>
-#include <rte_per_lcore.h>
-#include <rte_launch.h>
-#include <rte_atomic.h>
-#include <rte_cycles.h>
-#include <rte_prefetch.h>
-#include <rte_lcore.h>
-#include <rte_per_lcore.h>
-#include <rte_branch_prediction.h>
-#include <rte_interrupts.h>
-#include <rte_pci.h>
-#include <rte_random.h>
-#include <rte_debug.h>
-#include <rte_ether.h>
 #include <rte_ethdev.h>
-#include <rte_ring.h>
-#include <rte_mempool.h>
-#include <rte_mbuf.h>
-#include <rte_ip.h>
-#include <rte_tcp.h>
-#include <rte_lpm.h>
-#include <rte_string_fns.h>
+#include "types.h"
 
-#define DEFAULT_BURST_SIZE 32
+#define DEFAULT_RX_BURST_SIZE 32
+#define DEFAULT_TX_BURST_SIZE 256
 #define DEFAULT_PORT_MASK 0x01
 #define DEFAULT_KAFKA_TOPIC pcap
-#define DEFAULT_NB_RX_QUEUE 2
+#define DEFAULT_NB_RX_QUEUE 1
 #define DEFAULT_NB_RX_DESC 1024
 #define DEFAULT_TX_RING_SIZE 2048
 #define DEFAULT_KAFKA_STATS_PATH 0
-
-#define MAX_BURST_SIZE 1024
-
-#define STR_EXPAND(tok) #tok
-#define STR(tok) STR_EXPAND(tok)
-
-/*
- * Logging definitions
- */
-#define LOG_ERROR(log_type, fmt, args...) RTE_LOG(ERR, log_type, fmt, ##args);
-#define LOG_WARN(log_type, fmt, args...) RTE_LOG(WARNING, log_type, fmt, 
##args);
-#define LOG_INFO(log_type, fmt, args...) RTE_LOG(INFO, log_type, fmt, ##args);
-
-#ifdef DEBUG
-#define LOG_LEVEL RTE_LOG_DEBUG
-#define LOG_DEBUG(log_type, fmt, args...) RTE_LOG(DEBUG, log_type, fmt, 
##args);
-#else
-#define LOG_LEVEL RTE_LOG_INFO
-#define LOG_DEBUG(log_type, fmt, args...) do {} while (0)
-#endif
-
-/**
- * Application configuration parameters.
- */
-struct app_params {
-
-    /* The number of receive descriptors to allocate for the receive ring. */
-    uint16_t nb_rx_desc;
-
-    /* The number of receive queues to set up for each ethernet device. */
-    uint16_t nb_rx_queue;
-
-    /* The size of the transmit ring (must be a power of 2). */
-    uint16_t tx_ring_size;
-
-    /* The maximum number of packets to retrieve at a time. */
-    uint16_t burst_size;
-
-    /* Defines which ports packets will be consumed from. */
-    uint32_t enabled_port_mask;
-
-    /* The name of the Kafka topic that packet data is sent to. */
-    const char* kafka_topic;
-
-    /* A file containing configuration values for the Kafka client. */
-    char* kafka_config_path;
-
-    /* A file to which the Kafka stats are appended to. */
-    char* kafka_stats_path;
-
-} __rte_cache_aligned;
-
-/*
- * Contains all application parameters.
- */
-struct app_params app;
+#define MAX_RX_BURST_SIZE 1024
 
 /**
  * Parse the command line arguments passed to the application.
  */
-int parse_args(int argc, char** argv);
+int parse_args(int argc, char** argv, app_params* app);
 
 #endif
-

http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-sensors/fastcapa/src/kafka.c
----------------------------------------------------------------------
diff --git a/metron-sensors/fastcapa/src/kafka.c 
b/metron-sensors/fastcapa/src/kafka.c
index b844f3f..aceed39 100644
--- a/metron-sensors/fastcapa/src/kafka.c
+++ b/metron-sensors/fastcapa/src/kafka.c
@@ -18,13 +18,11 @@
 
 #include "kafka.h"
 
-#define POLL_TIMEOUT_MS 1000
-
 /*
  * Passed to all callback functions to help identify the connection.
  */
 struct opaque {
-    int conn_id;  
+    int conn_id;
 };
 
 /*
@@ -34,37 +32,51 @@ static rd_kafka_t **kaf_h;
 static rd_kafka_topic_t **kaf_top_h;
 static unsigned num_conns;
 static FILE *stats_fd;
-static struct app_stats *kaf_conn_stats;
+static app_stats *kaf_conn_stats;
 static struct opaque *kaf_opaque;
 static uint64_t *kaf_keys;
 
 /*
  * A callback executed when an error occurs within the kafka client
  */
-static void kaf_error_cb (rd_kafka_t *rk, int err, const char *reason, void* 
UNUSED(opaque))
+static void kaf_error_cb (
+    rd_kafka_t *rk,
+    int err,
+    const char *reason,
+    void* UNUSED(opaque))
 {
-    LOG_ERROR(USER1, "kafka client unexpected error; conn=%s, error=%s [%s] 
\n", 
+
+    LOG_ERROR(USER1, "kafka client error; conn=%s, error=%s [%s] \n",
         rd_kafka_name(rk), rd_kafka_err2str(err), reason);
 }
 
 /*
  * A callback executed when a broker throttles the producer
  */
-static void kaf_throttle_cb (rd_kafka_t *rk, const char *broker_name, int32_t 
broker_id, int throttle_time_ms, void* UNUSED(opaque))
+static void kaf_throttle_cb (
+    rd_kafka_t *rk,
+    const char *broker_name,
+    int32_t broker_id,
+    int throttle_time_ms,
+    void* UNUSED(opaque))
 {
-    LOG_ERROR(USER1, "kafka client throttle event; conn=%s, time=%dms 
broker=%s broker_id=%"PRId32" \n", 
+    LOG_ERROR(USER1, "kafka client throttle event; conn=%s, time=%dms 
broker=%s broker_id=%"PRId32" \n",
         rd_kafka_name(rk), throttle_time_ms, broker_name, broker_id);
 }
 
 /*
- * A callback executed on a fixed frequency (defined by 
`statistics.interval.ms`) 
+ * A callback executed on a fixed frequency (defined by 
`statistics.interval.ms`)
  * that provides detailed performance statistics
  */
-static int kaf_stats_cb(rd_kafka_t *rk, char *json, size_t UNUSED(json_len), 
void *opaque) 
+static int kaf_stats_cb(
+    rd_kafka_t *rk,
+    char *json,
+    size_t UNUSED(json_len),
+    void *opaque)
 {
     int rc;
     struct opaque *data = (struct opaque*) opaque;
-    int conn_id = data->conn_id;   
+    int conn_id = data->conn_id;
 
     // update queue depth of this kafka connection
     kaf_conn_stats[conn_id].depth = rd_kafka_outq_len(rk);
@@ -78,7 +90,7 @@ static int kaf_stats_cb(rd_kafka_t *rk, char *json, size_t 
UNUSED(json_len), voi
         }
         fflush(stats_fd);
     }
-    
+
     // 0 ensures the json pointer is immediately freed
     return 0;
 }
@@ -87,18 +99,34 @@ static int kaf_stats_cb(rd_kafka_t *rk, char *json, size_t 
UNUSED(json_len), voi
  * A callback that is called once for each message when it has been 
successfully
  * produced.
  */
-static void kaf_message_delivered_cb (rd_kafka_t *UNUSED(rk), const 
rd_kafka_message_t *UNUSED(rkmessage), void *opaque) 
+static void kaf_message_delivered_cb (
+    #ifdef DEBUG
+    rd_kafka_t *rk,
+    #else
+    rd_kafka_t *UNUSED(rk),
+    #endif
+    const rd_kafka_message_t *rkmessage,
+    void *opaque)
 {
     struct opaque *data = (struct opaque*) opaque;
-    int conn_id = data->conn_id;   
+    int conn_id = data->conn_id;
 
-    kaf_conn_stats[conn_id].out += 1;
+    if(RD_KAFKA_RESP_ERR_NO_ERROR == rkmessage->err) {
+        kaf_conn_stats[conn_id].out += 1;
+    } else {
+        kaf_conn_stats[conn_id].drops += 1;
+
+        #ifdef DEBUG
+        LOG_ERROR(USER1, "delivery failed: conn=%s, error=%s \n",
+          rd_kafka_name(rk), rd_kafka_err2str(rkmessage->err));
+        #endif
+    }
 }
 
 /*
  * Opens the file used to persist the stats coming out of the kafka client
  */
-static int open_stats_file(char *filename)
+static int open_stats_file(const char *filename)
 {
     int rc;
 
@@ -113,7 +141,7 @@ static int open_stats_file(char *filename)
     if(rc < 0) {
        LOG_ERROR(USER1, "Unable to append to stats file \n");
        return rc;
-    } 
+    }
 
     fflush(stats_fd);
     return 0;
@@ -122,11 +150,11 @@ static int open_stats_file(char *filename)
 /*
  * Closes the file used to persist the kafka client stats.
  */
-static void close_stats_file(void) 
+static void close_stats_file(void)
 {
     if(NULL != stats_fd) {
         fclose(stats_fd);
-    }  
+    }
 }
 
 /**
@@ -162,7 +190,7 @@ static void kaf_topic_option(const char* key, const char* 
val, void* arg)
 /**
  * Parses the configuration values from a configuration file.
  */
-static void parse_kafka_config(char* file_path, const char* group,
+static void parse_kafka_config(const char* file_path, const char* group,
     void (*option_cb)(const char* key, const char* val, void* arg), void* arg)
 {
 
@@ -187,11 +215,11 @@ static void parse_kafka_config(char* file_path, const 
char* group,
         for (i = 0; i < num_keys; i++) {
             value = g_key_file_get_value(gkf, group, keys[i], errs);
             if (value) {
-                LOG_DEBUG(USER1, "config[%s]: %s = %s\n", group, keys[i], 
value);
+                LOG_INFO(USER1, "config[%s]: %s = %s\n", group, keys[i], 
value);
                 option_cb(keys[i], value, arg);
             }
             else {
-                LOG_INFO(USER1, "bad config: %s: %s = %s: %s\n", file_path, 
keys[i], value, errs[0]->message);
+                LOG_WARN(USER1, "bad config: %s: %s = %s: %s\n", file_path, 
keys[i], value, errs[0]->message);
             }
         }
     }
@@ -206,16 +234,16 @@ static void parse_kafka_config(char* file_path, const 
char* group,
 /**
  * Initializes a pool of Kafka connections.
  */
-void kaf_init(int num_of_conns)
+void kaf_init(int num_of_conns, const char* kafka_topic, const char* 
kafka_config_path, const char* kafka_stats_path)
 {
     int i;
     char errstr[512];
 
     // open the file to which the kafka stats are appended
-    if(NULL != app.kafka_stats_path) {
-        LOG_INFO(USER1, "Appending Kafka client stats to '%s' \n", 
app.kafka_stats_path);
-        open_stats_file(app.kafka_stats_path);
-    }  
+    if(NULL != kafka_stats_path) {
+        LOG_INFO(USER1, "Appending Kafka client stats to '%s' \n", 
kafka_stats_path);
+        open_stats_file(kafka_stats_path);
+    }
 
     // the number of connections to maintain
     num_conns = num_of_conns;
@@ -223,10 +251,10 @@ void kaf_init(int num_of_conns)
     // create kafka resources for each consumer
     kaf_h = calloc(num_of_conns, sizeof(rd_kafka_t*));
     kaf_top_h = calloc(num_of_conns, sizeof(rd_kafka_topic_t*));
-    kaf_conn_stats = calloc(num_of_conns, sizeof(struct app_stats));
+    kaf_conn_stats = calloc(num_of_conns, sizeof(app_stats));
     kaf_opaque = calloc(num_of_conns, sizeof(struct opaque));
     kaf_keys = calloc(num_of_conns, sizeof(uint64_t));
-    
+
     for (i = 0; i < num_of_conns; i++) {
 
         // passed to each callback function to identify the kafka connection
@@ -240,8 +268,8 @@ void kaf_init(int num_of_conns)
         rd_kafka_conf_set_dr_msg_cb(kaf_conf, kaf_message_delivered_cb);
 
         // configure kafka connection; values parsed from kafka config file
-        if (NULL != app.kafka_config_path) {
-            parse_kafka_config(app.kafka_config_path, "kafka-global", 
kaf_global_option, (void*)kaf_conf);
+        if (NULL != kafka_config_path) {
+            parse_kafka_config(kafka_config_path, "kafka-global", 
kaf_global_option, (void*)kaf_conf);
         }
 
         // create a new kafka connection
@@ -252,14 +280,14 @@ void kaf_init(int num_of_conns)
 
         // configure kafka topic; values parsed from kafka config file
         rd_kafka_topic_conf_t* topic_conf = rd_kafka_topic_conf_new();
-        if (NULL != app.kafka_config_path) {
-            parse_kafka_config(app.kafka_config_path, "kafka-topic", 
kaf_topic_option, (void*)topic_conf);
+        if (NULL != kafka_config_path) {
+            parse_kafka_config(kafka_config_path, "kafka-topic", 
kaf_topic_option, (void*)topic_conf);
         }
 
         // connect to a kafka topic
-        kaf_top_h[i] = rd_kafka_topic_new(kaf_h[i], app.kafka_topic, 
topic_conf);
+        kaf_top_h[i] = rd_kafka_topic_new(kaf_h[i], kafka_topic, topic_conf);
         if (!kaf_top_h[i]) {
-            rte_exit(EXIT_FAILURE, "Cannot init kafka topic: %s", 
app.kafka_topic);
+            rte_exit(EXIT_FAILURE, "Cannot init kafka topic: %s", kafka_topic);
         }
     }
 }
@@ -268,7 +296,7 @@ void kaf_init(int num_of_conns)
  * Executes polling across all of the kafka client connections.  Ensures that 
any queued
  * callbacks are served.
  */
-void kaf_poll(void) 
+void kaf_poll(void)
 {
     unsigned i;
     for (i = 0; i < num_conns; i++) {
@@ -279,7 +307,7 @@ void kaf_poll(void)
 /**
  * Retrieves a summary of statistics across all of the kafka client 
connections.
  */
-int kaf_stats(struct app_stats *stats)
+int kaf_stats(app_stats *stats)
 {
     unsigned i;
     uint64_t in, out, depth, drops;
@@ -356,10 +384,10 @@ int kaf_send(struct rte_mbuf* pkts[], int pkt_count, int 
conn_id)
     // find the topic connection based on the conn_id
     rd_kafka_topic_t* kaf_topic = kaf_top_h[conn_id];
 
-    // current time in epoch microseconds from (big-endian aka network byte 
order) 
+    // current time in epoch microseconds from (big-endian aka network byte 
order)
     // is added as a message key before being sent to kafka
     kaf_keys[conn_id] = htobe64(current_time());
-    
+
     // create the batch message for kafka
     for (i = 0; i < pkt_count; i++) {
         kaf_msgs[i].err = 0;

http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-sensors/fastcapa/src/kafka.h
----------------------------------------------------------------------
diff --git a/metron-sensors/fastcapa/src/kafka.h 
b/metron-sensors/fastcapa/src/kafka.h
index ab62ad7..961e78d 100644
--- a/metron-sensors/fastcapa/src/kafka.h
+++ b/metron-sensors/fastcapa/src/kafka.h
@@ -23,28 +23,31 @@
 #include <string.h>
 #include <sys/time.h>
 #include <endian.h>
+#include <glib.h>
 #include <librdkafka/rdkafka.h>
 #include <rte_common.h>
 #include <rte_mbuf.h>
-
 #include "args.h"
 #include "types.h"
 
-#ifdef __GNUC__
-#  define UNUSED(x) UNUSED_ ## x __attribute__((__unused__))
-#else
-#  define UNUSED(x) UNUSED_ ## x
-#endif
+#define POLL_TIMEOUT_MS 1000
 
-/**
+/*
  * Initializes a pool of Kafka connections.
-*/
-void kaf_init(int num_of_conns);
+ */
+void kaf_init(
+    int num_of_conns,
+    const char* kafka_topic,
+    const char* kafka_config_path,
+    const char* kafka_stats_path);
 
-/**
+/*
  * Publish a set of packets to a kafka topic.
  */
-int kaf_send(struct rte_mbuf* data[], int num_to_send, int conn_id);
+int kaf_send(
+    struct rte_mbuf* data[],
+    int num_to_send,
+    int conn_id);
 
 /*
  * Executes polling across all of the kafka client connections.  Ensures that 
any queued
@@ -52,12 +55,12 @@ int kaf_send(struct rte_mbuf* data[], int num_to_send, int 
conn_id);
  */
 void kaf_poll(void);
 
-/**
+/*
  * Retrieves a summary of statistics across all of the kafka client 
connections.
  */
-int kaf_stats(struct app_stats *stats);
+int kaf_stats(app_stats *stats);
 
-/**
+/*
  * Closes the pool of Kafka connections.
  */
 void kaf_close(void);

http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-sensors/fastcapa/src/main.c
----------------------------------------------------------------------
diff --git a/metron-sensors/fastcapa/src/main.c 
b/metron-sensors/fastcapa/src/main.c
index c4c5123..f47a51d 100644
--- a/metron-sensors/fastcapa/src/main.c
+++ b/metron-sensors/fastcapa/src/main.c
@@ -16,428 +16,46 @@
  * limitations under the License.
  */
 
-#include "main.h"
-
-/*
- * Initialize a port using global settings and with the rx buffers
- * coming from the mbuf_pool passed as parameter
- */
-static inline int init_port(const uint8_t port, struct rte_mempool* mbuf_pool)
-{
-    struct rte_eth_conf port_conf = port_conf_default;
-    int retval;
-    uint16_t q;
-    int retry = 5;
-    const uint16_t tx_queues = 1;
-    int socket;
-    struct rte_eth_dev_info dev_info;
-
-    if (port >= rte_eth_dev_count()) {
-        rte_exit(EXIT_FAILURE, "Port does not exist; port=%u \n", port);
-        return -1;
-    }
-
-    // check that the number of RX queues does not exceed what is supported by 
the device
-    rte_eth_dev_info_get(port, &dev_info);
-    if (app.nb_rx_queue > dev_info.max_rx_queues) {
-        rte_exit(EXIT_FAILURE, "Too many RX queues for device; port=%u, 
rx_queues=%u, max_queues=%u \n", 
-            port, app.nb_rx_queue, dev_info.max_rx_queues);
-        return -EINVAL;
-    }
-
-    // check that the number of TX queues does not exceed what is supported by 
the device
-    if (tx_queues > dev_info.max_tx_queues) {
-        rte_exit(EXIT_FAILURE, "Too many TX queues for device; port=%u, 
tx_queues=%u, max_queues=%u \n", 
-            port, tx_queues, dev_info.max_tx_queues);
-        return -EINVAL;
-    }
-
-    retval = rte_eth_dev_configure(port, app.nb_rx_queue, tx_queues, 
&port_conf);
-    if (retval != 0) {
-        rte_exit(EXIT_FAILURE, "Cannot configure device; port=%u, err=%s \n", 
port, strerror(-retval));
-        return retval;
-    }
-
-    // create the receive queues
-    socket = rte_eth_dev_socket_id(port);
-    for (q = 0; q < app.nb_rx_queue; q++) {
-        retval = rte_eth_rx_queue_setup(port, q, app.nb_rx_desc, socket, NULL, 
mbuf_pool);
-        if (retval != 0) {
-            rte_exit(EXIT_FAILURE, "Cannot setup RX queue; port=%u, err=%s 
\n", port, strerror(-retval));
-            return retval;
-        }
-    }
-
-    // create the transmit queues - at least one TX queue must be setup even 
though we don't use it
-    for (q = 0; q < tx_queues; q++) {
-        retval = rte_eth_tx_queue_setup(port, q, TX_QUEUE_SIZE, socket, NULL);
-        if (retval != 0) {
-            rte_exit(EXIT_FAILURE, "Cannot setup TX queue; port=%u, err=%s 
\n", port, strerror(-retval));
-            return retval;
-        }
-    }
-
-    // start the receive and transmit units on the device
-    retval = rte_eth_dev_start(port);
-    if (retval < 0) {
-        rte_exit(EXIT_FAILURE, "Cannot start device; port=%u, err=%s \n", 
port, strerror(-retval));
-        return retval;
-    }
-
-    // retrieve information about the device
-    struct rte_eth_link link;
-    do {
-        rte_eth_link_get_nowait(port, &link);
-
-    } while (retry-- > 0 && !link.link_status && !sleep(1));
-
-    // if still no link information, must be down
-    if (!link.link_status) {
-        rte_exit(EXIT_FAILURE, "Link down; port=%u \n", port);
-        return 0;
-    }
-
-    // enable promisc mode
-    rte_eth_promiscuous_enable(port);
-
-    // print diagnostics
-    struct ether_addr addr;
-    rte_eth_macaddr_get(port, &addr);
-    LOG_INFO(USER1, "Device setup successfully; port=%u, mac=%02" PRIx8 " %02" 
PRIx8 " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
-        (unsigned)port,
-        addr.addr_bytes[0], addr.addr_bytes[1],
-        addr.addr_bytes[2], addr.addr_bytes[3],
-        addr.addr_bytes[4], addr.addr_bytes[5]);
-
-    return 0;
-}
-
-static void print_stats(struct rx_worker_params *rx_params, unsigned 
nb_rx_workers, struct tx_worker_params *tx_params, unsigned nb_tx_workers)
+ #include "args.h"
+ #include "kafka.h"
+ #include "nic.h"
+ #include "types.h"
+ #include "worker.h"
+ 
+
+static void wait_for_workers(void)
 {
-    struct rte_eth_stats eth_stats;
-    unsigned i;
-    uint64_t in, out, depth, drops;
-    struct app_stats stats;
-
-    // header
-    printf("\n\n     %15s %15s %15s %15s \n", " ----- in -----", " --- queued 
---", "----- out -----", "---- drops ----");
-
-    // summarize stats from each port
-    in = 0;
-    for (i = 0; i < rte_eth_dev_count(); i++) {
-        rte_eth_stats_get(i, &eth_stats);
-        in += eth_stats.ipackets;
-    }
-    printf("[nic] %15" PRIu64 " %15s %15s %15s \n", in, "-", "-", "-");
-
-    // summarize receive; from network to receive queues
-    in = out = depth = drops = 0;
-    for (i = 0; i < nb_rx_workers; i++) {
-        in += rx_params[i].stats.in;
-        out += rx_params[i].stats.out;
-        depth += rx_params[i].stats.depth;
-        drops += rx_params[i].stats.drops;
-    }
-    printf("[rx]  %15" PRIu64 " %15s %15" PRIu64 " %15" PRIu64 "\n", in, "-", 
out, drops);
-
-    // summarize transmit; from receive queues to transmit rings
-    in = out = depth = 0;
-    for (i = 0; i < nb_tx_workers; i++) {
-        in += tx_params[i].stats.in;
-        out += tx_params[i].stats.out;
-        depth += tx_params[i].stats.depth;
-    }
-    printf("[tx]  %15" PRIu64 " %15s %15" PRIu64 " %15" PRIu64 "\n", in, "-", 
out, in - out);
-
-    // summarize push to kafka; from transmit rings to librdkafka
-    kaf_stats(&stats);
-    printf("[kaf] %15" PRIu64 " %15" PRIu64 " %15" PRIu64 " %15" PRIu64 "\n", 
stats.in, stats.depth, stats.out, stats.drops);
-
-    // summarize any errors on the ports
-    for (i = 0; i < rte_eth_dev_count(); i++) {
-        rte_eth_stats_get(i, &eth_stats);
-
-        if(eth_stats.ierrors > 0 || eth_stats.oerrors > 0 || 
eth_stats.rx_nombuf > 0) {
-            printf("\nErrors: Port %u \n", i);
-            printf(" - In Errs:   %" PRIu64 "\n", eth_stats.ierrors);
-            printf(" - Out Errs:  %" PRIu64 "\n", eth_stats.oerrors);
-            printf(" - Mbuf Errs: %" PRIu64 "\n", eth_stats.rx_nombuf);
-        }
-    }
-}
-
-/*
- * Handles interrupt signals.
- */
-static void sig_handler(int sig_num)
-{
-    LOG_INFO(USER1, "Exiting on signal '%d'\n", sig_num);
-
-    // set quit flag for rx thread to exit
-    quit_signal = 1;
-}
-
-static int monitor_workers(struct rx_worker_params *rx_params, unsigned 
nb_rx_workers, struct tx_worker_params *tx_params, unsigned nb_tx_workers)
-{
-    LOG_INFO(USER1, "Starting to monitor workers; core=%u, socket=%u \n", 
rte_lcore_id(), rte_socket_id());
-    while (!quit_signal) {
-       kaf_poll();
-        print_stats(rx_params, nb_rx_workers, tx_params, nb_tx_workers);
-        sleep(5);
-    }
-
-    LOG_INFO(USER1, "Finished monitoring workers; core=%u, socket=%u \n", 
rte_lcore_id(), rte_socket_id());
-    return 0;
-}
-
-/**
- * Process packets from a single queue.
- */
-static int receive_worker(struct rx_worker_params* params)
-{
-    const uint8_t nb_ports = rte_eth_dev_count();
-    const unsigned socket_id = rte_socket_id();
-    const uint16_t burst_size = app.burst_size;
-    const uint16_t queue_id = params->queue_id;
-    struct rte_ring *ring = params->output_ring;
-    int i, dev_socket_id;
-    uint8_t port;
-    struct rte_mbuf* pkts[MAX_BURST_SIZE];
-    const int attempts = MAX_BURST_SIZE / burst_size;
-
-    LOG_INFO(USER1, "Receive worker started; core=%u, socket=%u, queue=%u 
attempts=%d \n", rte_lcore_id(), socket_id, queue_id, attempts);
-
-    // validate each port
-    for (port = 0; port < nb_ports; port++) {
-
-        // skip ports that are not enabled
-        if ((app.enabled_port_mask & (1 << port)) == 0) {
-            continue;
-        }
-
-        // check for cross-socket communication
-        dev_socket_id = rte_eth_dev_socket_id(port);
-        if (dev_socket_id >= 0 && ((unsigned) dev_socket_id) != socket_id) {
-            LOG_WARN(USER1, "Warning: Port %u on different socket from worker; 
performance will suffer\n", port);
-        }
-    }
-
-    port = 0;
-    while (!quit_signal) {
-
-        // skip to the next enabled port
-        if ((app.enabled_port_mask & (1 << port)) == 0) {
-            if (++port == nb_ports) {
-                port = 0;
-            }
-            continue;
-        }
-
-        // receive a 'burst' of packets. if get back the max number requested, 
then there 
-        // are likely more packets waiting. immediately go back and grab some.
-        i = 0;
-        uint16_t nb_in = 0, nb_in_last = 0;
-        do {
-            nb_in_last = rte_eth_rx_burst(port, queue_id, &pkts[nb_in], 
burst_size);
-            nb_in += nb_in_last;
-
-        } while (++i < attempts && nb_in_last == burst_size);
-        params->stats.in += nb_in;
+    unsigned lcore_id;
 
-        // add each packet to the ring buffer
-        if(likely(nb_in) > 0) {
-          const uint16_t nb_out = rte_ring_enqueue_burst(ring, (void *) pkts, 
nb_in);
-          params->stats.out += nb_out;
-          params->stats.drops += (nb_in - nb_out);
-        }
-       
-        // clean-up the packet buffer 
-        for (i = 0; i < nb_in; i++) {
-          rte_pktmbuf_free(pkts[i]);
-        }
-        
-        // wrap-around to the first port
-        if (++port == nb_ports) {
-            port = 0;
+    // wait for each worker to complete
+    RTE_LCORE_FOREACH_SLAVE(lcore_id) {
+        if (rte_eal_wait_lcore(lcore_id) < 0) {
+            LOG_WARN(USER1, "Failed to wait for worker; lcore=%u \n", 
lcore_id);
         }
     }
-
-    LOG_INFO(USER1, "Receive worker finished; core=%u, socket=%u, queue=%u 
\n", rte_lcore_id(), socket_id, queue_id);
-    return 0;
 }
 
-/**
- *
- */
-static int transmit_worker(struct tx_worker_params *params)
-{
-    unsigned i, nb_in, nb_out;
-    const uint16_t burst_size = params->burst_size;
-    struct rte_ring *ring = params->input_ring;
-    const int kafka_id = params->kafka_id;
-
-    LOG_INFO(USER1, "Transmit worker started; core=%u, socket=%u \n", 
rte_lcore_id(), rte_socket_id());
-    while (!quit_signal) {
-
-        // dequeue packets from the ring
-        struct rte_mbuf* pkts[MAX_BURST_SIZE];
-        nb_in = rte_ring_dequeue_burst(ring, (void*) pkts, burst_size);
-        
-        if(likely(nb_in > 0)) {
-            params->stats.in += nb_in;
 
-            // prepare the packets to be sent to kafka
-            nb_out = kaf_send(pkts, nb_in, kafka_id);
-            params->stats.out += nb_out;
-        }
-
-        // clean-up the packet buffer    
-        for (i = 0; i < nb_in; i++) {
-            rte_pktmbuf_free(pkts[i]);
-        }
-    }
-
-    LOG_INFO(USER1, "Transmit worker finished; core=%u, socket=%u \n", 
rte_lcore_id(), rte_socket_id());
-    return 0;
-}
-
-/**
- * Get it going.
- */
 int main(int argc, char* argv[])
 {
-    unsigned lcore_id;
-    unsigned nb_workers;
-    uint8_t port_id;
-    unsigned nb_ports;
-    unsigned nb_ports_available;
-    struct rte_mempool* mbuf_pool;
-    unsigned n, i;
-    unsigned nb_rx_workers, nb_tx_workers;
-    unsigned rx_worker_id = 0, tx_worker_id = 0;
-    char buf[32];
-
-    // catch interrupt
-    signal(SIGINT, sig_handler);
-
-    // initialize the environment
-    int ret = rte_eal_init(argc, argv);
-    if (ret < 0) {
-        rte_exit(EXIT_FAILURE, "Failed to initialize EAL: %i\n", ret);
-    }
-
-    // advance past the environmental settings
-    argc -= ret;
-    argv += ret;
+    app_params p = {0};
+    parse_args(argc, argv, &p);
 
-    // parse arguments to the application
-    ret = parse_args(argc, argv);
-    if (ret < 0) {
-        rte_exit(EXIT_FAILURE, "Invalid parameters\n");
-    }
+    struct rte_ring *tx_rings[p.nb_rx_queue];
+    rx_worker_params rx_params[p.nb_rx_workers];
+    tx_worker_params tx_params[p.nb_tx_workers];
 
-    nb_workers = rte_lcore_count() - 1;
-    nb_ports_available = nb_ports = rte_eth_dev_count();
-    nb_rx_workers = app.nb_rx_queue;
-    nb_tx_workers = nb_workers - nb_rx_workers;
-    n = NUM_MBUFS * nb_ports;
+    // initialize
+    kaf_init(p.nb_tx_workers, p.kafka_topic, p.kafka_config_path, 
p.kafka_stats_path);
+    init_receive(p.enabled_port_mask, p.nb_rx_queue, p.nb_rx_desc);
+    init_transmit(tx_rings, p.nb_rx_queue, p.tx_ring_size);
 
-    // validate the number of workers
-    if(nb_tx_workers < nb_rx_workers) {
-        rte_exit(EXIT_FAILURE, "Additional lcore(s) required; found=%u, 
required=%u \n", 
-            rte_lcore_count(), (app.nb_rx_queue*2) + 1);
-    }
-
-    // create memory pool
-    mbuf_pool = rte_pktmbuf_pool_create("mbuf-pool", n, MBUF_CACHE_SIZE, 0, 
RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
-    if (mbuf_pool == NULL) {
-        rte_exit(EXIT_FAILURE, "Unable to create memory pool; n=%u, 
cache_size=%u, data_room_size=%u, socket=%u \n", 
-            n, MBUF_CACHE_SIZE, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
-    }
-
-    // initialize each specified ethernet ports
-    for (port_id = 0; port_id < nb_ports; port_id++) {
-
-        // skip over ports that are not enabled
-        if ((app.enabled_port_mask & (1 << port_id)) == 0) {
-            LOG_INFO(USER1, "Skipping over disabled port '%d'\n", port_id);
-            nb_ports_available--;
-            continue;
-        }
-
-        // initialize the port - creates one receive queue for each worker
-        LOG_INFO(USER1, "Initializing port %u\n", (unsigned)port_id);
-        if (init_port(port_id, mbuf_pool) != 0) {
-            rte_exit(EXIT_FAILURE, "Cannot initialize port %" PRIu8 "\n", 
port_id);
-        }
-    }
-
-    // ensure that we were able to initialize enough ports
-    if (nb_ports_available < 1) {
-        rte_exit(EXIT_FAILURE, "Error: No available enabled ports. Portmask 
set?\n");
-    }
-
-    // each transmit worker has their own kafka client connection
-    kaf_init(nb_tx_workers);
-
-    struct rx_worker_params rx_params[nb_rx_workers];
-    struct tx_worker_params tx_params[nb_tx_workers];
-
-    // create the transmit rings - 1 for each receive queue
-    struct rte_ring *tx_rings[app.nb_rx_queue]; 
-    for(i = 0; i < app.nb_rx_queue; i++) {
-        sprintf(buf, "tx-ring-%d", i);
-        tx_rings[i] = rte_ring_create(buf, app.tx_ring_size, rte_socket_id(), 
0);
-        if(NULL == tx_rings[i]) {
-            rte_exit(EXIT_FAILURE, "Unable to create transmit ring: %s \n", 
rte_strerror(rte_errno));
-        }
-    }
-
-    // launch the workers
-    RTE_LCORE_FOREACH_SLAVE(lcore_id) {
-
-        if(rx_worker_id < nb_rx_workers) {
-
-            LOG_INFO(USER1, "Launching receive worker; worker=%u, core=%u, 
queue=%u\n", rx_worker_id, lcore_id, rx_worker_id);
-            rx_params[rx_worker_id] = (struct rx_worker_params) { 
-                .worker_id = rx_worker_id, 
-                .queue_id = rx_worker_id, 
-                .burst_size = app.burst_size, 
-                .output_ring = tx_rings[rx_worker_id], 
-                .stats = {0} 
-            };
-            rte_eal_remote_launch((lcore_function_t*) receive_worker, 
&rx_params[rx_worker_id], lcore_id);
-            rx_worker_id++;
-
-        } else {
-
-            unsigned ring_id = tx_worker_id % app.nb_rx_queue;
-            LOG_INFO(USER1, "Launching transmit worker; worker=%u, core=%u 
ring=%u \n", tx_worker_id, lcore_id, ring_id);
-            tx_params[tx_worker_id] = (struct tx_worker_params) { 
-                .worker_id = tx_worker_id, 
-                .burst_size = app.burst_size, 
-                .input_ring = tx_rings[ring_id], 
-                .kafka_id = tx_worker_id,
-                .stats = {0} 
-            };
-            rte_eal_remote_launch((lcore_function_t*) transmit_worker, 
&tx_params[tx_worker_id], lcore_id);
-            tx_worker_id++;
-        }
-
-    }
-
-    // allow the master to monitor each of the workers
-    monitor_workers(rx_params, nb_rx_workers, tx_params, nb_tx_workers);
-
-    // wait for each worker to complete
-    RTE_LCORE_FOREACH_SLAVE(lcore_id) {
-        if (rte_eal_wait_lcore(lcore_id) < 0) {
-            LOG_WARN(USER1, "Failed to wait for worker; lcore=%u \n", 
lcore_id);
-            return -1;
-        }
-    }
+    // start receive and transmit workers
+    start_workers(rx_params, tx_params, tx_rings, &p);
+    monitor_workers(rx_params, p.nb_rx_workers, tx_params, p.nb_tx_workers);
+    wait_for_workers();
 
+    // clean up
     kaf_close();
     return 0;
 }
-

http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-sensors/fastcapa/src/main.h
----------------------------------------------------------------------
diff --git a/metron-sensors/fastcapa/src/main.h 
b/metron-sensors/fastcapa/src/main.h
deleted file mode 100644
index 0a230a5..0000000
--- a/metron-sensors/fastcapa/src/main.h
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef METRON_MAIN_H
-#define METRON_MAIN_H
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <stdint.h>
-#include <inttypes.h>
-#include <sys/types.h>
-#include <string.h>
-#include <sys/queue.h>
-#include <stdarg.h>
-#include <errno.h>
-#include <getopt.h>
-#include <unistd.h>
-#include <signal.h>
-#include <rte_common.h>
-#include <rte_byteorder.h>
-#include <rte_log.h>
-#include <rte_memory.h>
-#include <rte_memcpy.h>
-#include <rte_memzone.h>
-#include <rte_eal.h>
-#include <rte_per_lcore.h>
-#include <rte_launch.h>
-#include <rte_atomic.h>
-#include <rte_cycles.h>
-#include <rte_prefetch.h>
-#include <rte_lcore.h>
-#include <rte_per_lcore.h>
-#include <rte_branch_prediction.h>
-#include <rte_interrupts.h>
-#include <rte_pci.h>
-#include <rte_random.h>
-#include <rte_debug.h>
-#include <rte_ether.h>
-#include <rte_ethdev.h>
-#include <rte_ring.h>
-#include <rte_mempool.h>
-#include <rte_mbuf.h>
-#include <rte_ip.h>
-#include <rte_tcp.h>
-#include <rte_lpm.h>
-#include <rte_string_fns.h>
-#include <rte_distributor.h>
-#include <rte_malloc.h>
-#include <rte_errno.h>
-
-#include "args.h"
-#include "kafka.h"
-#include "types.h"
-
-/*
- * the number of receive queue descriptors is a multiple of the packet burst 
size
- */
-#define RX_QUEUE_MULT 4 
-#define TX_QUEUE_SIZE 32
-#define NUM_MBUFS ((64 * 1024) - 1)
-#define MBUF_CACHE_SIZE 250
-
-// uncomment below line to enable debug logs
-//#define DEBUG 
-
-volatile uint8_t quit_signal;
-
-/**
- * Default port configuration settings.
- */
-const struct rte_eth_conf port_conf_default = {
-    .rxmode = {
-        .mq_mode = ETH_MQ_RX_RSS,
-        .max_rx_pkt_len = ETHER_MAX_LEN,
-        .enable_scatter = 1,
-        .enable_lro = 1
-    },
-    .txmode = {
-        .mq_mode = ETH_MQ_TX_NONE,
-    },
-    .rx_adv_conf = {
-      .rss_conf = {
-        .rss_hf = ETH_RSS_IP | ETH_RSS_UDP | ETH_RSS_TCP | ETH_RSS_SCTP,
-      }
-    },
-};
-
-int main(int argc, char* argv[]);
-
-#endif
-

http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-sensors/fastcapa/src/nic.c
----------------------------------------------------------------------
diff --git a/metron-sensors/fastcapa/src/nic.c 
b/metron-sensors/fastcapa/src/nic.c
new file mode 100644
index 0000000..5c68430
--- /dev/null
+++ b/metron-sensors/fastcapa/src/nic.c
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "nic.h"
+
+/**
+ * Default receive queue settings.
+ */
+static const struct rte_eth_conf rx_conf_default = {
+    .rxmode = {
+        .mq_mode = ETH_MQ_RX_RSS,
+        .max_rx_pkt_len = ETHER_MAX_LEN,
+        .enable_scatter = 0,
+        .enable_lro = 0
+    },
+    .txmode = {
+        .mq_mode = ETH_MQ_TX_NONE,
+    },
+    .rx_adv_conf = {
+      .rss_conf = {
+        .rss_hf = ETH_RSS_IP | ETH_RSS_UDP | ETH_RSS_TCP | ETH_RSS_SCTP,
+      }
+    },
+};
+
+/**
+ * Default transmit queue settings.
+ */
+static const struct rte_eth_txconf tx_conf_default = {
+    .tx_thresh = {
+        .pthresh = 0,
+        .hthresh = 0,
+        .wthresh = 0
+    },
+    .tx_rs_thresh = 0,
+    .tx_free_thresh = 0,
+    .txq_flags = 0,
+    .tx_deferred_start = 0
+};
+
+/*
+ * Initialize a NIC port.
+ */
+static int init_port(
+    const uint8_t port_id,
+    struct rte_mempool* mem_pool,
+    const uint16_t nb_rx_queue,
+    const uint16_t nb_rx_desc)
+{
+    struct rte_eth_conf rx_conf = rx_conf_default;
+    struct rte_eth_txconf tx_conf = tx_conf_default;
+    int retval;
+    uint16_t q;
+    int retry = 5;
+    const uint16_t tx_queues = 1;
+    int socket;
+    struct rte_eth_dev_info dev_info;
+
+    if (port_id >= rte_eth_dev_count()) {
+        rte_exit(EXIT_FAILURE, "Port does not exist; port=%u \n", port_id);
+        return -1;
+    }
+
+    // check that the number of RX queues does not exceed what is supported by 
the device
+    rte_eth_dev_info_get(port_id, &dev_info);
+    if (nb_rx_queue > dev_info.max_rx_queues) {
+        rte_exit(EXIT_FAILURE, "Too many RX queues for device; port=%u, 
rx_queues=%u, max_queues=%u \n",
+            port_id, nb_rx_queue, dev_info.max_rx_queues);
+        return -EINVAL;
+    }
+
+    // check that the number of TX queues does not exceed what is supported by 
the device
+    if (tx_queues > dev_info.max_tx_queues) {
+        rte_exit(EXIT_FAILURE, "Too many TX queues for device; port=%u, 
tx_queues=%u, max_queues=%u \n",
+            port_id, tx_queues, dev_info.max_tx_queues);
+        return -EINVAL;
+    }
+
+    retval = rte_eth_dev_configure(port_id, nb_rx_queue, tx_queues, &rx_conf);
+    if (retval != 0) {
+        rte_exit(EXIT_FAILURE, "Cannot configure device; port=%u, err=%s \n", 
port_id, strerror(-retval));
+        return retval;
+    }
+
+    // create the receive queues
+    socket = rte_eth_dev_socket_id(port_id);
+    for (q = 0; q < nb_rx_queue; q++) {
+        retval = rte_eth_rx_queue_setup(port_id, q, nb_rx_desc, socket, NULL, 
mem_pool);
+        if (retval != 0) {
+            rte_exit(EXIT_FAILURE, "Cannot setup RX queue; port=%u, err=%s 
\n", port_id, strerror(-retval));
+            return retval;
+        }
+    }
+
+    // create the transmit queues - at least one TX queue must be setup even 
though we don't use it
+    for (q = 0; q < tx_queues; q++) {
+        retval = rte_eth_tx_queue_setup(port_id, q, TX_QUEUE_SIZE, socket, 
&tx_conf);
+        if (retval != 0) {
+            rte_exit(EXIT_FAILURE, "Cannot setup TX queue; port=%u, err=%s 
\n", port_id, strerror(-retval));
+            return retval;
+        }
+    }
+
+    // start the receive and transmit units on the device
+    retval = rte_eth_dev_start(port_id);
+    if (retval < 0) {
+        rte_exit(EXIT_FAILURE, "Cannot start device; port=%u, err=%s \n", 
port_id, strerror(-retval));
+        return retval;
+    }
+
+    // retrieve information about the device
+    struct rte_eth_link link;
+    do {
+        rte_eth_link_get_nowait(port_id, &link);
+
+    } while (retry-- > 0 && !link.link_status && !sleep(1));
+
+    // if still no link information, must be down
+    if (!link.link_status) {
+        rte_exit(EXIT_FAILURE, "Link down; port=%u \n", port_id);
+        return 0;
+    }
+
+    // enable promisc mode
+    rte_eth_promiscuous_enable(port_id);
+
+    // print diagnostics
+    struct ether_addr addr;
+    rte_eth_macaddr_get(port_id, &addr);
+    LOG_INFO(USER1, "Device setup successfully; port=%u, mac=%02" PRIx8 " %02" 
PRIx8 " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
+        (unsigned int) port_id,
+        addr.addr_bytes[0], addr.addr_bytes[1],
+        addr.addr_bytes[2], addr.addr_bytes[3],
+        addr.addr_bytes[4], addr.addr_bytes[5]);
+
+    return 0;
+}
+
+/*
+ * Preparation for receiving and processing packets.
+ */
+int init_receive(
+    const uint8_t enabled_port_mask,
+    const uint16_t nb_rx_queue,
+    const uint16_t nb_rx_desc)
+{
+    unsigned int nb_ports_available;
+    unsigned int nb_ports;
+    unsigned int port_id;
+    unsigned int size;
+
+    nb_ports_available = nb_ports = rte_eth_dev_count();
+
+    // create memory pool
+    size = NUM_MBUFS * nb_ports;
+    struct rte_mempool* mem_pool = rte_pktmbuf_pool_create("mbuf-pool", size, 
MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
+    if (mem_pool == NULL) {
+        rte_exit(EXIT_FAILURE, "Unable to create memory pool; n=%u, 
cache_size=%u, data_room_size=%u, socket=%u \n",
+            size, MBUF_CACHE_SIZE, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
+    }
+
+    // initialize each specified ethernet ports
+    for (port_id = 0; port_id < nb_ports; port_id++) {
+
+        // skip over ports that are not enabled
+        if ((enabled_port_mask & (1 << port_id)) == 0) {
+            LOG_INFO(USER1, "Skipping over disabled port '%d'\n", port_id);
+            nb_ports_available--;
+            continue;
+        }
+
+        // initialize the port - creates one receive queue for each worker
+        LOG_INFO(USER1, "Initializing port %u\n", (unsigned)port_id);
+        if (init_port(port_id, mem_pool, nb_rx_queue, nb_rx_desc) != 0) {
+            rte_exit(EXIT_FAILURE, "Cannot initialize port %" PRIu8 "\n", 
port_id);
+        }
+    }
+
+    // ensure that we were able to initialize enough ports
+    if (nb_ports_available < 1) {
+        rte_exit(EXIT_FAILURE, "Error: No available enabled ports. Portmask 
set?\n");
+    }
+
+    return 0;
+}
+
+/*
+ * Preparation for transmitting packets.
+ */
+int init_transmit(
+    struct rte_ring **tx_rings,
+    const unsigned int count,
+    const unsigned int size)
+{
+    unsigned int i;
+    char buf[32];
+
+    for(i = 0; i < count; i++) {
+        sprintf(buf, "tx-ring-%d", i);
+        tx_rings[i] = rte_ring_create(buf, size, rte_socket_id(), 
RING_F_SP_ENQ);
+        if(NULL == tx_rings[i]) {
+            rte_exit(EXIT_FAILURE, "Unable to create transmit ring: %s \n", 
rte_strerror(rte_errno));
+        }
+    }
+
+    return 0;
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-sensors/fastcapa/src/nic.h
----------------------------------------------------------------------
diff --git a/metron-sensors/fastcapa/src/nic.h 
b/metron-sensors/fastcapa/src/nic.h
new file mode 100644
index 0000000..42e52b7
--- /dev/null
+++ b/metron-sensors/fastcapa/src/nic.h
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef METRON_NIC_H
+#define METRON_NIC_H
+
+#include <signal.h>
+#include <unistd.h>
+#include <rte_ethdev.h>
+#include <rte_mempool.h>
+#include <rte_mbuf.h>
+#include <rte_errno.h>
+#include "types.h"
+
+#define MBUF_CACHE_SIZE 250
+#define TX_QUEUE_SIZE 64
+#define NUM_MBUFS ((64 * 1024) - 1)
+
+/*
+ * Preparation for receiving and processing packets.
+ */
+int init_receive(
+    const uint8_t enabled_port_mask,
+    const uint16_t nb_rx_queue,
+    const uint16_t nb_rx_desc);
+
+/*
+ * Preparation for transmitting packets.
+ */
+int init_transmit(
+    struct rte_ring **tx_rings,
+    const unsigned int count,
+    const unsigned int size);
+
+#endif

http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-sensors/fastcapa/src/types.h
----------------------------------------------------------------------
diff --git a/metron-sensors/fastcapa/src/types.h 
b/metron-sensors/fastcapa/src/types.h
index 0210eda..e687651 100644
--- a/metron-sensors/fastcapa/src/types.h
+++ b/metron-sensors/fastcapa/src/types.h
@@ -19,20 +19,103 @@
 #ifndef METRON_TYPES_H
 #define METRON_TYPES_H
 
-/**
- * Tracks packet processing stats.
+#include <rte_log.h>
+#include <rte_memory.h>
+
+/*
+ * Allow strings to be used for preprocessor #define's
+ */
+#define STR_EXPAND(tok) #tok
+#define STR(tok) STR_EXPAND(tok)
+
+typedef int bool;
+#define true 1
+#define false 0
+
+#define valid(s) (s == NULL ? false : strlen(s) > 1)
+
+/*
+ * Allows unused function parameters to be marked as unused to
+ * avoid unnecessary compile-time warnings.
+ */
+#ifdef __GNUC__
+#  define UNUSED(x) UNUSED_ ## x __attribute__((__unused__))
+#else
+#  define UNUSED(x) UNUSED_ ## x
+#endif
+
+/*
+ * Logging definitions
  */
-struct app_stats {
+#define LOG_ERROR(log_type, fmt, args...) RTE_LOG(ERR, log_type, fmt, ##args);
+#define LOG_WARN(log_type, fmt, args...) RTE_LOG(WARNING, log_type, fmt, 
##args);
+#define LOG_INFO(log_type, fmt, args...) RTE_LOG(INFO, log_type, fmt, ##args);
+
+#ifdef DEBUG
+#define LOG_LEVEL RTE_LOG_DEBUG
+#define LOG_DEBUG(log_type, fmt, args...) RTE_LOG(DEBUG, log_type, fmt, 
##args);
+#else
+#define LOG_LEVEL RTE_LOG_INFO
+#define LOG_DEBUG(log_type, fmt, args...) do {} while (0)
+#endif
+
+/*
+ * Application configuration parameters.
+ */
+typedef struct {
+
+    /* The number of receive descriptors to allocate for the receive ring. */
+    uint16_t nb_rx_desc;
+
+    /* The number of receive queues to set up for each ethernet device. */
+    uint16_t nb_rx_queue;
+
+    /* The size of the transmit ring (must be a power of 2). */
+    unsigned int tx_ring_size;
+
+    /* The maximum number of packets retrieved by the receive worker. */
+    uint16_t rx_burst_size;
+
+    /* The maximum number of packets retrieved by the transmit worker. */
+    unsigned int tx_burst_size;
+
+    /* Defines which ports packets will be consumed from. */
+    unsigned int enabled_port_mask;
+
+    /* The name of the Kafka topic that packet data is sent to. */
+    const char* kafka_topic;
+
+    /* A file containing configuration values for the Kafka client. */
+    char* kafka_config_path;
+
+    /* A file to which the Kafka stats are appended to. */
+    char* kafka_stats_path;
+
+    /* The number of receive workers. */
+    unsigned int nb_rx_workers;
+
+    /* The number of transmit workers. */
+    unsigned int nb_tx_workers;
+
+    /* The number of NIC ports from which packets will be consumed. */
+    unsigned int nb_ports;
+
+} app_params __rte_cache_aligned;
+
+/*
+ * Tracks packet processing metrics.
+ */
+typedef struct {
     uint64_t in;
     uint64_t out;
     uint64_t depth;
     uint64_t drops;
-} __rte_cache_aligned;
+} app_stats __rte_cache_aligned;
 
-/**
+/*
  * The parameters required by a receive worker.
  */
-struct rx_worker_params {
+typedef struct {
 
     /* worker identifier */
     uint16_t worker_id;
@@ -41,38 +124,43 @@ struct rx_worker_params {
     uint16_t queue_id;
 
     /* how many packets are pulled off the queue at a time */
-    uint16_t burst_size;
+    uint16_t rx_burst_size;
+
+    /* Defines which ports packets will be consumed from. */
+    unsigned int enabled_port_mask;
 
     /* the ring onto which the packets are enqueued */
     struct rte_ring *output_ring;
 
     /* metrics */
-    struct app_stats stats;
+    app_stats stats;
 
-} __rte_cache_aligned;
+} rx_worker_params __rte_cache_aligned;
 
-/**
+/*
  * The parameters required by a transmit worker.
  */
-struct tx_worker_params {
+typedef struct {
 
     /* worker identifier */
     uint16_t worker_id;
 
     /* how many packets are pulled off the ring at a time */
-    uint16_t burst_size;
+    unsigned int tx_burst_size;
+
+    /* The size of the transmit ring (must be a power of 2). */
+    unsigned int tx_ring_size;
 
     /* the ring from which packets are dequeued */
     struct rte_ring *input_ring;
 
     /* identifies the kafka client connection used by the worker */
     int kafka_id;
- 
+
     /* worker metrics */
-    struct app_stats stats;
+    app_stats stats;
 
-} __rte_cache_aligned;
+} tx_worker_params __rte_cache_aligned;
 
 
 #endif
-

http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-sensors/fastcapa/src/worker.c
----------------------------------------------------------------------
diff --git a/metron-sensors/fastcapa/src/worker.c 
b/metron-sensors/fastcapa/src/worker.c
new file mode 100644
index 0000000..aa6a7c3
--- /dev/null
+++ b/metron-sensors/fastcapa/src/worker.c
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "worker.h"
+
+/*
+ * Handles interrupt signals.
+ */
+static void sig_handler(int sig_num)
+{
+    LOG_INFO(USER1, "Exiting on signal '%d'\n", sig_num);
+
+    // set quit flag for rx thread to exit
+    quit_signal = 1;
+}
+
+/*
+ * Prints packet processing metrics to stdout.
+ */
+static void print_stats(
+    const rx_worker_params *rx_params,
+    const unsigned nb_rx_workers,
+    const tx_worker_params *tx_params,
+    const unsigned nb_tx_workers)
+{
+    struct rte_eth_stats eth_stats;
+    unsigned i;
+    uint64_t in, out, depth, drops;
+    app_stats stats;
+
+    // header
+    printf("\n\n     %15s %15s %15s %15s \n", " ----- in -----", " --- queued 
---", "----- out -----", "---- drops  ----");
+
+    // summarize stats from each port
+    in = out = depth = drops = 0;
+    for (i = 0; i < rte_eth_dev_count(); i++) {
+        rte_eth_stats_get(i, &eth_stats);
+        in += eth_stats.ipackets;
+        drops += eth_stats.ierrors + eth_stats.oerrors + eth_stats.rx_nombuf;
+    }
+    printf("[nic] %15" PRIu64 " %15" PRIu64 " %15s %15" PRIu64 "\n", in, 
depth, "-", drops);
+
+    // summarize receive; from network to receive queues
+    in = out = depth = drops = 0;
+    for (i = 0; i < nb_rx_workers; i++) {
+        in += rx_params[i].stats.in;
+        out += rx_params[i].stats.out;
+        depth += rx_params[i].stats.depth;
+        drops += rx_params[i].stats.drops;
+    }
+    printf("[rx]  %15" PRIu64 " %15s %15" PRIu64 " %15" PRIu64 "\n", in, "-", 
out, drops);
+
+    // summarize transmit; from receive queues to transmit rings
+    in = out = depth = drops = 0;
+    for (i = 0; i < nb_tx_workers; i++) {
+        in += tx_params[i].stats.in;
+        out += tx_params[i].stats.out;
+        depth += tx_params[i].stats.depth;
+        drops += tx_params[i].stats.drops;
+    }
+    printf("[tx]  %15" PRIu64 " %15s %15" PRIu64 " %15" PRIu64 "\n", in, "-", 
out, drops);
+
+    // summarize push to kafka; from transmit rings to librdkafka
+    kaf_stats(&stats);
+    printf("[kaf] %15" PRIu64 " %15" PRIu64 " %15" PRIu64 " %15" PRIu64 "\n", 
stats.in, stats.depth, stats.out, stats.drops);
+    fflush(stdout);
+}
+
+/*
+ * Process packets from a single queue.
+ */
+static int receive_worker(rx_worker_params* params)
+{
+    const uint8_t nb_ports = rte_eth_dev_count();
+    const unsigned socket_id = rte_socket_id();
+    const uint16_t rx_burst_size = params->rx_burst_size;
+    const uint16_t queue_id = params->queue_id;
+    struct rte_ring *ring = params->output_ring;
+    int i, dev_socket_id;
+    uint8_t port;
+    struct rte_mbuf* pkts[MAX_RX_BURST_SIZE];
+    //const int attempts = MAX_RX_BURST_SIZE / rx_burst_size;
+    const int attempts = 0;
+
+    LOG_INFO(USER1, "Receive worker started; core=%u, socket=%u, queue=%u 
attempts=%d \n", rte_lcore_id(), socket_id, queue_id, attempts);
+
+    // validate each port
+    for (port = 0; port < nb_ports; port++) {
+
+        // skip ports that are not enabled
+        if ((params->enabled_port_mask & (1 << port)) == 0) {
+            continue;
+        }
+
+        // check for cross-socket communication
+        dev_socket_id = rte_eth_dev_socket_id(port);
+        if (dev_socket_id >= 0 && ((unsigned) dev_socket_id) != socket_id) {
+            LOG_WARN(USER1, "Warning: Port %u on different socket from worker; 
performance will suffer\n", port);
+        }
+    }
+
+    port = 0;
+    while (!quit_signal) {
+
+        // skip to the next enabled port
+        if ((params->enabled_port_mask & (1 << port)) == 0) {
+            if (++port == nb_ports) {
+                port = 0;
+            }
+            continue;
+        }
+
+        // receive a 'burst' of packets. if get back the max number requested, 
then there
+        // are likely more packets waiting. immediately go back and grab some.
+        i = 0;
+        uint16_t nb_in = 0, nb_in_last = 0;
+        do {
+            nb_in_last = rte_eth_rx_burst(port, queue_id, &pkts[nb_in], 
rx_burst_size);
+            nb_in += nb_in_last;
+
+        } while (++i < attempts && nb_in_last == rx_burst_size);
+        params->stats.in += nb_in;
+
+        // add each packet to the ring buffer
+        if(likely(nb_in) > 0) {
+          const uint16_t nb_out = rte_ring_enqueue_burst(ring, (void *) pkts, 
nb_in);
+          params->stats.out += nb_out;
+          params->stats.drops += (nb_in - nb_out);
+        }
+
+        // clean-up the packet buffer
+        for (i = 0; i < nb_in; i++) {
+          rte_pktmbuf_free(pkts[i]);
+        }
+
+        // wrap-around to the first port
+        if (++port == nb_ports) {
+            port = 0;
+        }
+    }
+
+    LOG_INFO(USER1, "Receive worker finished; core=%u, socket=%u, queue=%u 
\n", rte_lcore_id(), socket_id, queue_id);
+    return 0;
+}
+
+/*
+ * The transmit worker is responsible for consuming packets from the transmit
+ * rings and queueing the packets for bulk delivery to kafka.
+ */
+static int transmit_worker(tx_worker_params *params)
+{
+    unsigned i, nb_in, nb_out;
+    const unsigned int tx_burst_size = params->tx_burst_size;
+    struct rte_ring *ring = params->input_ring;
+    const int kafka_id = params->kafka_id;
+
+    LOG_INFO(USER1, "Transmit worker started; core=%u, socket=%u \n", 
rte_lcore_id(), rte_socket_id());
+    while (!quit_signal) {
+
+        // dequeue packets from the ring
+        struct rte_mbuf* pkts[params->tx_ring_size];
+        nb_in = rte_ring_dequeue_burst(ring, (void*) pkts, tx_burst_size);
+
+        if(likely(nb_in > 0)) {
+            params->stats.in += nb_in;
+
+            // prepare the packets to be sent to kafka
+            nb_out = kaf_send(pkts, nb_in, kafka_id);
+            params->stats.out += nb_out;
+
+            // clean-up the packet buffer
+            for (i = 0; i < nb_in; i++) {
+                rte_pktmbuf_free(pkts[i]);
+            }
+        }
+    }
+
+    LOG_INFO(USER1, "Transmit worker finished; core=%u, socket=%u \n", 
rte_lcore_id(), rte_socket_id());
+    return 0;
+}
+
+/*
+ * Start the receive and transmit works.
+ */
+int start_workers(
+    rx_worker_params* rx_params,
+    tx_worker_params* tx_params,
+    struct rte_ring **tx_rings,
+    app_params *p)
+{
+    unsigned lcore_id;
+    unsigned rx_worker_id = 0;
+    unsigned tx_worker_id = 0;
+
+    signal(SIGINT, sig_handler);
+
+    // launch the workers
+    RTE_LCORE_FOREACH_SLAVE(lcore_id) {
+
+        if(rx_worker_id < p->nb_rx_workers) {
+
+            LOG_INFO(USER1, "Launching receive worker; worker=%u, core=%u, 
queue=%u\n", rx_worker_id, lcore_id, rx_worker_id);
+            rx_params[rx_worker_id] = (rx_worker_params) {
+                .worker_id = rx_worker_id,
+                .queue_id = rx_worker_id,
+                .rx_burst_size = p->rx_burst_size,
+                .enabled_port_mask = p->enabled_port_mask,
+                .output_ring = tx_rings[rx_worker_id],
+                .stats = {0}
+            };
+            rte_eal_remote_launch((lcore_function_t*) receive_worker, 
&rx_params[rx_worker_id], lcore_id);
+            rx_worker_id++;
+
+        } else {
+
+            unsigned ring_id = tx_worker_id % p->nb_rx_queue;
+            LOG_INFO(USER1, "Launching transmit worker; worker=%u, core=%u 
ring=%u \n", tx_worker_id, lcore_id, ring_id);
+            tx_params[tx_worker_id] = (tx_worker_params) {
+                .worker_id = tx_worker_id,
+                .tx_burst_size = p->tx_burst_size,
+                .tx_ring_size = p->tx_ring_size,
+                .input_ring = tx_rings[ring_id],
+                .kafka_id = tx_worker_id,
+                .stats = {0}
+            };
+            rte_eal_remote_launch((lcore_function_t*) transmit_worker, 
&tx_params[tx_worker_id], lcore_id);
+            tx_worker_id++;
+        }
+
+    }
+
+    return 0;
+}
+
+/*
+ * Monitors the receive and transmit workers.  Executed by the main thread, 
while
+ * other threads are created to perform the actual packet processing.
+ */
+int monitor_workers(
+    const rx_worker_params *rx_params,
+    const unsigned nb_rx_workers,
+    const tx_worker_params *tx_params,
+    const unsigned nb_tx_workers)
+{
+    LOG_INFO(USER1, "Starting to monitor workers; core=%u, socket=%u \n", 
rte_lcore_id(), rte_socket_id());
+    while (!quit_signal) {
+        kaf_poll();
+        print_stats(rx_params, nb_rx_workers, tx_params, nb_tx_workers);
+        sleep(5);
+    }
+
+    LOG_INFO(USER1, "Finished monitoring workers; core=%u, socket=%u \n", 
rte_lcore_id(), rte_socket_id());
+    return 0;
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/50e521b0/metron-sensors/fastcapa/src/worker.h
----------------------------------------------------------------------
diff --git a/metron-sensors/fastcapa/src/worker.h 
b/metron-sensors/fastcapa/src/worker.h
new file mode 100644
index 0000000..243e842
--- /dev/null
+++ b/metron-sensors/fastcapa/src/worker.h
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef METRON_WORKER_H
+#define METRON_WORKER_H
+
+#include <stdint.h>
+#include <unistd.h>
+
+#include <rte_ethdev.h>
+
+#include "types.h"
+#include "kafka.h"
+
+volatile uint8_t quit_signal;
+
+/*
+ * Start the receive and transmit works.
+ */
+int start_workers(
+    rx_worker_params* rx_params,
+    tx_worker_params* tx_params,
+    struct rte_ring **tx_rings,
+    app_params *p);
+
+/*
+ * Monitors the receive and transmit workers.  Executed by the main thread, 
while
+ * other threads are created to perform the actual packet processing.
+ */
+int monitor_workers(
+    const rx_worker_params *rx_params,
+    const unsigned nb_rx_workers,
+    const tx_worker_params *tx_params,
+    const unsigned nb_tx_workers);
+
+#endif

Reply via email to