Ermal Luçi escreveu:
On Sat, Aug 2, 2008 at 3:00 PM, Mike Makonnen <[EMAIL PROTECTED]> wrote:
Mike Makonnen wrote:
Patrick Tracanelli wrote:
To let you know of my current (real world) tests:

- Wireless Internet Provider 1:
   - 4Mbit/s of Internet Traffic
   - Classifying default protocols + soulseek + ssh
   - Classifying 100Mbit/s of dump over ssh

Results in:
   No latency added, very low CPU usage, no packets dropping.

- Wireless ISP 2:
   - 21 Mbit/s of Internet Traffic
   - Classifying default protocols + soulseek + ssh

Results in:
   No tcp or udp traffic at all; everything that gets diverted never
comes out of the divert socket, and ipfw-classifyd logs

Aug  1 12:07:35 ourofino last message repeated 58 times
Aug  1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: bittorrent
(rule 50000)
Aug  1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: edonkey (rule
50000)
Aug  1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: fasttrack (rule
50000)
Aug  1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: gnutella (rule
1000)
Aug  1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: soulseek (rule
50000)
Aug  1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: ssh   (rule
50000)
Aug  1 12:18:28 ourofino ipfw-classifyd: unable to write to divert
socket: Operation not permitted
Aug  1 12:18:50 ourofino last message repeated 90 times
Hmmm... this part means that the call to sendto(2) to write the packet
back into network stack failed.  This explains why you are not seein g any
traffic comming back out of the divert socket, but I don't see why it would
suddenly fail with a permission error. Could this be a kernel bug?
Aug  1 12:18:51 ourofino ipfw-classifyd: packet dropped: input queue full
Aug  1 12:19:11 ourofino last message repeated 94 times

Raised queue len a lot (up to 40960), when the application starts it uses
up to 25% CPU and a second after that, CPU usage gets lower the 0.1%.
This looks like a deadlock. If it weren't able to process packets fast
enough the cpu usage should be high even as it's spewing "packet dropped"
messages. Can you send me some more information like memory usage and the
firewall script you are using? How much of the 21Mbits/s of traffic is P2P?
If you reduce the number of protocols you are trying to match against does
the behavior change? Using netstat -w1 -I<interface> can you tell me how
many packets per second we're talking about for 4Mbits/s and 21Mbit/s? Also,
the timestamps from the log file seem to show that the daemon is running for
approx. 34 sec. before the first "unable to write to write to divert socket"
message. Is it passing traffic during this time? Thanks.

I've uploaded a newer version. Can you try that also please. It includes:
 o SIGHUP forces it to re-read its configuration file
 o rc.d script
 o minor optimization (calls pthread_cond_signal with the mutex unlocked)
 o code cleanup

Also, for your convenience I have attached a patch against the earlier
version that removes a debugging printf that should remove spammage to your
log files (the current version has it removed already).

Ooops, a few minutes after I sent this email I found a couple of bugs (one
major, and one minor). They were in the original tarball as well as the
newer one I uploaded earlier today. I've uploaded a fixed version of the
code. Can you try that instead please.

Also, to help track down performance issues I've modified the Makefile to
build a profiled version of the application so you can use gprof(1) to
figure out where any problems lie.


Does this sound about right for implementing the GC and implementing syntax as
$protocol = dnpipe 20
$protocol2 = dnqueue 30
it has some extra goos for pf(4) and altq(4)
$protocol3 = queue $queue name
$protocol4 = tag TAGNAME
$protocol5 = action block

It adds 2 new options -e seconds for seconds before a flow is
considered expired and -n #packets proccessed before kicking the GC.

--- classifyd_old.c     2008-08-09 00:33:04.000000000 +0000
+++ classifyd.c 2008-08-09 00:33:34.000000000 +0000
@@ -28,13 +28,17 @@

 #include <sys/types.h>
 #include <sys/socket.h>
+#include <sys/ioctl.h>
+#include <sys/time.h>

+#include <net/if.h>
 #include <arpa/inet.h>
 #include <netinet/in.h>
 #include <netinet/in_systm.h>
 #include <netinet/ip.h>
 #include <netinet/tcp.h>
 #include <netinet/udp.h>
+#include <net/pfvar.h>

 #include <assert.h>
 #include <err.h>
@@ -53,6 +57,7 @@
 #include <unistd.h>

 #include "hashtable.h"
+#include "hashtable_private.h"
 #include "pathnames.h"
 #include "protocols.h"

@@ -94,6 +99,7 @@
        uint32_t if_datalen;    /* length in bytes of if_data */
        uint16_t if_pktcount;   /* number of packets concatenated */
        uint16_t if_fwrule;     /* ipfw(4) rule associated with flow */
+       time_t   expire;        /* flow expire time */
 };

 /*
@@ -126,7 +132,7 @@
 static struct ic_queue outQ;

 /* divert(4) socket */
-static int dvtS;
+static int dvtS = 0;

 /* config file path */
 static const char *conf = IC_CONFIG_PATH;
@@ -137,12 +143,25 @@
 /* List of protocols available to the system */
 struct ic_protocols *fp;

+/* Our hashtables */
+struct hashtable *sh = NULL,
+               *th = NULL,
+               *uh = NULL;
+
+/* signaled to kick garbage collector */
+static pthread_cond_t  gq_condvar;
+
+/* number of packets before kicking garbage collector */
+static unsigned int npackets = 250;
+
+static time_t time_expire = 40; /* 40 seconds */
 /*
  * Forward function declarations.
  */
 void           *classify_pthread(void *);
 void           *read_pthread(void *);
 void           *write_pthread(void *);
+void           *garbage_pthread(void *);
 static int     equalkeys(void *, void *);
 static unsigned int hashfromkey(void *);
 static void    test_re(void);
@@ -155,7 +174,7 @@
 {
        struct sockaddr_in addr;
        struct sigaction sa;
-       pthread_t  classifytd, readtd, writetd;
+       pthread_t  classifytd, readtd, writetd, garbagectd;
        const char *errstr;
        long long  num;
        uint16_t   port, qmaxsz;
@@ -164,13 +183,27 @@
        tflag = 0;
        port = IC_DPORT;
        qmaxsz = IC_QMAXSZ;
-       while ((ch = getopt(argc, argv, "htc:P:p:q:")) != -1) {
+       while ((ch = getopt(argc, argv, "n:e:htc:P:p:q:")) != -1) {
                switch(ch) {
                case 'c':
                        conf = strdup(optarg);
                        if (conf == NULL)
                                err(EX_TEMPFAIL, "config file path");
                        break;
+               case 'e':
+                       num = strtonum((const char *)optarg, 1, 400, &errstr);
+                       if (num == 0 && errstr != NULL) {
+                               errx(EX_USAGE, "invalud expire seconds: %s", 
errstr); 
+                       }
+                       time_expire = (time_t)num;
+                       break;
+               case 'n':
+                        num = strtonum((const char *)optarg, 1,
65535, &errstr);
+                        if (num == 0 && errstr != NULL) {
+                                errx(EX_USAGE, "invalud expire
seconds: %s", errstr);
+                        }
+                        npackets = (unsigned int)num;
+                       break;
                case 'P':
                        protoDir = strdup(optarg);
                        if (protoDir == NULL)
@@ -230,6 +263,9 @@
        error = pthread_cond_init(&outQ.fq_condvar, NULL);
        if (error != 0)
                err(EX_OSERR, "unable to initialize output queue condvar");
+        error = pthread_cond_init(&gq_condvar, NULL);
+        if (error != 0)
+                err(EX_OSERR, "unable to initialize garbage collector
condvar");

        /*
         * Create and bind the divert(4) socket.
@@ -276,32 +312,80 @@
        if (error == -1)
                err(EX_OSERR, "unable to set signal handler");

+        /*
+         * There are 3 tables: udp, tcp, and tcp syn.
+         * The tcp syn table tracks connections for which a
+         * SYN packet has been sent but no reply has been returned
+         * yet. Once the SYN ACK reply is detected it is moved to
+         * the regular tcp connection tracking table.
+         */
+        sh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
+        if (sh == NULL) {
+                syslog(LOG_ERR, "unable to create TCP (SYN) tracking table");
+               error = EX_SOFTWARE;
+               goto cleanup;
+        }
+        th = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
+        if (th == NULL) {
+                syslog(LOG_ERR, "unable to create TCP tracking table");
+               error = EX_SOFTWARE;
+                goto cleanup;
+        }
+        uh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
+        if (uh == NULL) {
+                syslog(LOG_ERR, "unable to create UDP tracking table");
+               error = EX_SOFTWARE;
+                goto cleanup;
+        }
+
        /*
         * Create the various threads.
         */
        error = pthread_create(&readtd, NULL, read_pthread, NULL);
-       if (error != 0)
-               err(EX_OSERR, "unable to create reader thread");
+       if (error != 0) {
+               syslog(LOG_ERR, "unable to create reader thread");
+               error = EX_OSERR;
+               goto cleanup;
+       }
        error = pthread_create(&classifytd, NULL, classify_pthread, NULL);
-       if (error != 0)
-               err(EX_OSERR, "unable to create classifier thread");
+       if (error != 0) {
+               syslog(LOG_ERR, "unable to create classifier thread");
+               error = EX_OSERR;
+               goto cleanup;
+       }
        error = pthread_create(&writetd, NULL, write_pthread, NULL);
-       if (error != 0)
-               err(EX_OSERR, "unable to create writer thread");
-
+       if (error != 0) {
+               syslog(LOG_ERR, "unable to create writer thread");
+               error = EX_OSERR;
+               goto cleanup;
+       }
+        error = pthread_create(&garbagectd, NULL, garbage_pthread, NULL);
+        if (error != 0) {
+                syslog(LOG_ERR, "unable to create garbage collect thread");
+               error = EX_OSERR;
+               goto cleanup;
+       }
        /*
         * Wait for our threads to exit.
         */
        pthread_join(readtd, NULL);
        pthread_join(classifytd, NULL);
        pthread_join(writetd, NULL);
-
+       pthread_join(garbagectd, NULL);
        /*
         * Cleanup
         */
-       close(dvtS);
+cleanup:
+       if (dvtS > 0)
+               close(dvtS);
+       if (sh != NULL)
+               hashtable_destroy(sh, 1);
+       if (th != NULL)
+               hashtable_destroy(th, 1);
+       if (uh != NULL)
+               hashtable_destroy(uh, 1);
        
-       return (0);
+       return (error);
 }

 void *
@@ -310,6 +394,7 @@
        struct ic_pkt      *pkt;
        struct ip *ipp;
        int       len;
+       unsigned int pcktcnt = 0;

        while (1) {
                pkt = (struct ic_pkt *)malloc(sizeof(struct ic_pkt));
@@ -353,6 +438,10 @@
                STAILQ_INSERT_HEAD(&inQ.fq_pkthead, pkt, fp_link);
                inQ.fq_size++;
                pthread_mutex_unlock(&inQ.fq_mtx);
+               if (++pcktcnt > npackets) {
+                       pcktcnt = 0;
+                       pthread_cond_signal(&gq_condvar);
+               }
                pthread_cond_signal(&inQ.fq_condvar);
        }

@@ -420,39 +509,19 @@
        struct tcphdr    *tcp;
        struct udphdr    *udp;
        struct ic_pkt    *pkt;
-       struct hashtable *sh, *th, *uh;
        struct protocol  *proto;
+       struct timeval   tv;
        regmatch_t       pmatch;
        u_char           *data, *payload;
        uint16_t         trycount;
        int              datalen, error;

-       /*
-        * There are 3 tables: udp, tcp, and tcp syn.
-        * The tcp syn table tracks connections for which a
-        * SYN packet has been sent but no reply has been returned
-        * yet. Once the SYN ACK reply is detected it is moved to
-        * the regular tcp connection tracking table.
-        */
-       sh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
-       if (sh == NULL) {
-               syslog(LOG_ERR, "unable to create TCP (SYN) tracking table");
-               exit(EX_SOFTWARE);
-       }
-       th = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
-       if (th == NULL) {
-               syslog(LOG_ERR, "unable to create TCP tracking table");
-               exit(EX_SOFTWARE);
-       }
-       uh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
-       if (uh == NULL) {
-               syslog(LOG_ERR, "unable to create UDP tracking table");
-               exit(EX_SOFTWARE);
-       }
-
        flow = NULL;
        key = NULL;
        while(1) {
+               while(gettimeofday(&tv, NULL) != 0)
+                       ;
+
                pthread_mutex_lock(&inQ.fq_mtx);
                pkt = STAILQ_LAST(&inQ.fq_pkthead, ic_pkt, fp_link);
                while (pkt == NULL) {
@@ -528,6 +597,8 @@
                                        free(pkt);
                                        continue;
                                }
+                               
+                               flow->expire = tv.tv_sec;
                                goto enqueue;
                        /*
                         * Handle session tear-down.
@@ -583,8 +654,11 @@
                                 * collecting IC_PKTMAXMATCH packets, just pass 
it through.
                                 */
                                } else if (flow->if_pktcount >= IC_PKTMAXMATCH 
&&
-                                   flow->if_fwrule == 0)
+                                   flow->if_fwrule == 0) {
+                                       flow->expire = tv.tv_sec;
                                        goto enqueue;
+                               }
+                               flow->expire = tv.tv_sec;
                                goto classify;
                        }

@@ -630,6 +704,7 @@
                                        free(pkt);
                                        continue;
                                }
+                               flow->expire = tv.tv_sec;
                                goto classify;
                        }

@@ -688,6 +763,7 @@
                                flow->if_datalen = datalen;
                                flow->if_pktcount = 1;
                                flow->if_fwrule = 0;
+                               flow->expire = tv.tv_sec;
                                if (hashtable_insert(uh, (void *)key, (void 
*)flow) == 0) {
                                        syslog(LOG_WARNING,
                                            "packet dropped: unable to insert into 
table");
@@ -715,19 +791,26 @@
                                flow->if_data = data;
                                flow->if_datalen += datalen;
                                flow->if_pktcount++;
+                               flow->expire = tv.tv_sec;
                        /*
                         * If we haven't been able to classify this flow after
                         * collecting IC_PKTMAXMATCH packets, just pass it 
through.
                         */
                        } else if (flow->if_pktcount >= IC_PKTMAXMATCH &&
-                           flow->if_fwrule == 0)
+                           flow->if_fwrule == 0) {
+                               flow->expire = tv.tv_sec;
                                goto enqueue;
+                       }
                } else
                        /* Not an TCP or UDP packet. */
                        goto enqueue;

 classify:
-               assert(flow != NULL);
+               if (flow == NULL) {
+                       syslog(LOG_ERR, "flow is null argghhhhhhh");
+                       goto enqueue;
+               }
+               //assert(flow != NULL);

                /*
                 * Inform divert(4) what rule to send it to by
@@ -823,6 +906,80 @@
        return (NULL);
 }

+void *
+garbage_pthread(void *arg __unused)
+{
+       char errbuf[LINE_MAX];
+       struct entry *e, *f;
+       unsigned int i, flows_expired, error;
+       struct timeval tv;
+
+       while (1) {
+               flows_expired = 0;
+               while (gettimeofday(&tv, NULL) != 0)
+                       ;
+               tv.tv_sec -= time_expire;
+
+               pthread_mutex_lock(&inQ.fq_mtx);
+                error = pthread_cond_wait(&gq_condvar, &inQ.fq_mtx);
+                if (error != 0) {
+                        strerror_r(error, errbuf, sizeof(errbuf));
+                        syslog(EX_OSERR, "unable to wait on garbage
collection: %s",
+                                errbuf);
+                        exit(EX_OSERR);
+                }
+
+               for (i = 0; i < sh->tablelength; i++) {
+                       e = sh->table[i];
+                       while (e != NULL) {
+                               f = e; e = e->next;
+                               if (((struct ip_flow *)f->v)->expire < 
tv.tv_sec) {
+                                       freekey(f->k);
+                                       sh->entrycount--;
+                                       if (f->v != NULL)
+                                               free(f->v);
+                                       free(f);
+                                       flows_expired++;
+                               }
+                       }
+               }
+                for (i = 0; i < th->tablelength; i++) {
+                        e = th->table[i];
+                        while (e != NULL) {
+                                f = e; e = e->next;
+                                if (((struct ip_flow *)f->v)->expire
< tv.tv_sec) {
+                                        freekey(f->k);
+                                        th->entrycount--;
+                                        if (f->v != NULL)
+                                                free(f->v);
+                                        free(f);
+                                       flows_expired++;
+                                }
+                        }
+                }
+                for (i = 0; i < uh->tablelength; i++) {
+                        e = uh->table[i];
+                        while (e != NULL) {
+                                f = e; e = e->next;
+                                if (((struct ip_flow *)f->v)->expire
< tv.tv_sec) {
+                                        freekey(f->k);
+                                        uh->entrycount--;
+                                        if (f->v != NULL)
+                                                free(f->v);
+                                        free(f);
+                                       flows_expired++;
+                                }
+                        }
+                }
+
+               pthread_mutex_unlock(&inQ.fq_mtx);
+               
+               syslog(LOG_WARNING, "expired %u flows", flows_expired);
+       }
+
+       return (NULL);
+}
+
 /*
  * NOTE: The protocol list (plist) passed as an argument is a global
  *      variable. It is accessed from 3 functions: classify_pthread,
@@ -840,12 +997,20 @@
 static int
 read_config(const char *file, struct ic_protocols *plist)
 {
+       enum { bufsize = 2048 };
        struct protocol *proto;
        properties      props;
-       const char      *errmsg, *name, *value;
-       int             fd;
+       const char      *errmsg, *name;
+       char            *value;
+       int             fd, fdpf;
        uint16_t        rule;
+       char **ap, *argv[bufsize];

+       fdpf = open("/dev/pf", O_RDONLY);
+       if (fdpf == -1) {
+               syslog(LOG_ERR, "unable to open /dev/pf");
+               return (EX_OSERR);
+       }
        fd = open(file, O_RDONLY);
        if (fd == -1) {
                syslog(LOG_ERR, "unable to open configuration file");
@@ -863,10 +1028,48 @@
                /* Do not match traffic against this pattern */
                if (value == NULL)
                        continue;
-               rule = strtonum(value, 1, 65535, &errmsg);
-               if (rule == 0) {
+               for (ap = argv; (*ap = strsep(&value, " \t")) != NULL;)
+                       if (**ap != '\0')
+                               if (++ap >= &argv[bufsize])
+                                       break;
+               if (!strncmp(argv[0], "queue", strlen("queue"))) {
+                       if (ioctl(fdpf, DIOCGETNAMEDALTQ, &rule)) {
+                               syslog(LOG_WARNING,
+                                       "could not get ALTQ translation for"
+                                       " queue %s", argv[1]);
+                               continue;
+                       }
+                       if (rule == 0) {
+                               syslog(LOG_WARNING,
+                                       "queue %s does not exists!", argv[1]);
+                               continue;
+                       }
+               } else if (!strncmp(argv[0], "dnqueue", strlen("dnqueue")))
+                       rule = strtonum(argv[1], 1, 65535, &errmsg);
+               else if (!strncmp(argv[0], "dnpipe", strlen("dnpipe")))
+                       rule = strtonum(argv[1], 1, 65535, &errmsg);
+               else if (!strncmp(argv[0], "tag", strlen("tag"))) {
+                        if (ioctl(fdpf, DIOCGETNAMEDTAG, &rule)) {
+                                syslog(LOG_WARNING,
+                                        "could not get tag translation for"
+                                        " queue %s", argv[1]);
+                                continue;
+                        }
+                        if (rule == 0) {
+                                syslog(LOG_WARNING,
+                                        "tag %s does not exists!", argv[1]);
+                                continue;
+                        }
+               } else if (!strncmp(argv[0], "action", strlen("action"))) {
+                       if (strncmp(argv[1], "block", strlen("block")))
+                               rule = PF_DROP;
+                       else if (strncmp(argv[1], "allow", strlen("allow")))
+                               rule = PF_PASS;
+                       else
+                               continue;
+               } else {
                        syslog(LOG_WARNING,
-                           "invalid rule number for %s protocol: %s",
+                           "invalid action specified for %s protocol: %s",
                            proto->p_name, errmsg);
                        continue;
                }
@@ -953,10 +1156,14 @@
 static void
 usage(const char *arg0)
 {
-       printf("usage: %s [-h] [-c file] [-p port] [-P dir] [-q length]\n",
basename(arg0));
+       printf("usage: %s [-h] [-c file] [-e seconds] [-n packets] "
+               "[-p port] [-P dir] [-q length]\n", basename(arg0));
        printf("usage: %s -t -P dir\n", basename(arg0));
        printf( "    -c file   : path to configuration file\n"
+               "    -e secs   : number of seconds before a flow is expired\n"
                "    -h        : this help screen\n"
+               "    -n packets: number of packets before the garbage collector"
+                       " tries to expire flows\n"
                "    -P dir    : directory containing protocol patterns\n"
                "    -p port   : port number of divert socket\n"
                "    -q length : max length (in packets) of in/out queues\n"


Some progress in solution of presented problems ?

--
Daniel



_______________________________________________
[email protected] mailing list
http://lists.freebsd.org/mailman/listinfo/freebsd-net
To unsubscribe, send any mail to "[EMAIL PROTECTED]"

Reply via email to