>Number:         179085
>Category:       bin
>Synopsis:       pthread patch for netblast
>Confidential:   no
>Severity:       non-critical
>Priority:       low
>Responsible:    freebsd-bugs
>State:          open
>Quarter:        
>Keywords:       
>Date-Required:
>Class:          sw-bug
>Submitter-Id:   current-users
>Arrival-Date:   Wed May 29 20:20:00 UTC 2013
>Closed-Date:
>Last-Modified:
>Originator:     Olivier Cochard-Labbe
>Release:        current
>Organization:
BSD Router Project
>Environment:
FreeBSD laptop.bsdrp.net 10.0-CURRENT FreeBSD 10.0-CURRENT #2 r250925: Thu May 
23 21:49:14 CEST 2013     
[email protected]:/usr/obj/usr/local/BSDRP/BSDRPcur/FreeBSD/src/sys/LAPTOP  
amd64
>Description:
Here is a patch that add multithread support to tools/tools/netrate/netblast 
(I've used the netreceive code).
WARNING: It need full review because it's my first try with pthread.
>How-To-Repeat:

>Fix:


Patch attached with submission follows:

Index: tools/tools/netrate/netblast/Makefile
===================================================================
--- tools/tools/netrate/netblast/Makefile       (revision 250926)
+++ tools/tools/netrate/netblast/Makefile       (working copy)
@@ -4,5 +4,8 @@
 
 PROG=  netblast
 NO_MAN=
+LDFLAGS += -lpthread
 
+WARNS?= 3
+
 .include <bsd.prog.mk>
Index: tools/tools/netrate/netblast/netblast.c
===================================================================
--- tools/tools/netrate/netblast/netblast.c     (revision 250926)
+++ tools/tools/netrate/netblast/netblast.c     (working copy)
@@ -40,15 +40,24 @@
 #include <string.h>
 #include <unistd.h>                    /* close */
 
+#include <pthread.h>
+#include <fcntl.h>
+#include <time.h>   /* clock_getres() */
+
+static int round_to(int n, int l)
+{
+    return ((n + l - 1)/l)*l;
+}
+
 static void
 usage(void)
 {
 
-       fprintf(stderr, "netblast [ip] [port] [payloadsize] [duration]\n");
+       fprintf(stderr, "netblast [ip] [port] [payloadsize] [duration] 
[nthreads]\n");
        exit(-1);
 }
 
-static int     global_stop_flag;
+static int     global_stop_flag=0;
 
 static void
 signal_handler(int signum __unused)
@@ -57,48 +66,28 @@
        global_stop_flag = 1;
 }
 
+
 /*
- * Loop that blasts packets: begin by recording time information, resetting
- * stats.  Set the interval timer for when we want to wake up.  Then go.
- * SIGALRM will set a flag indicating it's time to stop.  Note that there's
- * some overhead to the signal and timer setup, so the smaller the duration,
- * the higher the relative overhead.
+ * Each socket uses multiple threads so the receiver is
+ * more efficient. A collector thread runs the stats.
  */
-static int
-blast_loop(int s, long duration, u_char *packet, u_int packet_len)
+struct td_desc {
+    pthread_t td_id;
+    uint64_t counter; /* tx counter */
+       uint64_t send_errors; /* tx send errors */
+    uint64_t send_calls;    /* tx send calls */
+       int s;
+       u_char *packet;
+       u_int packet_len;
+};
+
+static void *
+blast(void *data)
 {
-       struct timespec starttime, tmptime;
-       struct itimerval it;
-       u_int32_t counter;
-       int send_errors, send_calls;
-
-       if (signal(SIGALRM, signal_handler) == SIG_ERR) {
-               perror("signal");
-               return (-1);
-       }
-
-       if (clock_getres(CLOCK_REALTIME, &tmptime) == -1) {
-               perror("clock_getres");
-               return (-1);
-       }
-
-       if (clock_gettime(CLOCK_REALTIME, &starttime) == -1) {
-               perror("clock_gettime");
-               return (-1);
-       }
-
-       it.it_interval.tv_sec = 0;
-       it.it_interval.tv_usec = 0;
-       it.it_value.tv_sec = duration;
-       it.it_value.tv_usec = 0;
-
-       if (setitimer(ITIMER_REAL, &it, NULL) < 0) {
-               perror("setitimer");
-               return (-1);
-       }
-
-       send_errors = send_calls = 0;
-       counter = 0;
+    struct td_desc *t = data;
+       t->counter=0;
+       t->send_errors=0;
+       t->send_calls=0;
        while (global_stop_flag == 0) {
                /*
                 * We maintain and, if there's room, send a counter.  Note
@@ -110,32 +99,74 @@
                 * operation, causing the current sequence number also to be
                 * skipped.
                 */
-               if (packet_len >= 4) {
-                       be32enc(packet, counter);
-                       counter++;
+               if (t->packet_len >= 4) {
+                       be32enc(t->packet, t->counter);
+                       t->counter++;
                }
-               if (send(s, packet, packet_len, 0) < 0)
-                       send_errors++;
-               send_calls++;
+               if (send(t->s, t->packet, t->packet_len, 0) < 0)
+                       t->send_errors++;
+               t->send_calls++;
        }
+    return NULL;
+}
 
+static struct td_desc **
+make_threads(int s, u_char *packet, u_int packet_len, int nthreads)
+{
+    int i;
+    int lb = round_to(nthreads * sizeof (struct td_desc *), 64);
+    int td_len = round_to(sizeof(struct td_desc), 64); // cache align
+    char *m = calloc(1, lb + td_len * nthreads);
+    struct td_desc **tp;
+
+    /* pointers plus the structs */
+    if (m == NULL) {
+        perror("no room for pointers!");
+        exit(1);
+    }
+    tp = (struct td_desc **)m;
+    m += lb;    /* skip the pointers */
+    for (i = 0; i < nthreads; i++, m += td_len) {
+        tp[i] = (struct td_desc *)m;
+        tp[i]->s = s;
+        tp[i]->packet = packet;
+        tp[i]->packet_len = packet_len;
+        if (pthread_create(&tp[i]->td_id, NULL, blast, tp[i])) {
+            perror("unable to create thread");
+            exit(1);
+        }
+    }
+    return tp;
+}
+
+
+static void
+main_thread(struct td_desc **tp, long duration, struct timespec starttime, 
struct timespec tmptime, int nthreads)
+{
+       uint64_t send_errors=0, send_calls=0;
+       int i;
        if (clock_gettime(CLOCK_REALTIME, &tmptime) == -1) {
                perror("clock_gettime");
-               return (-1);
        }
 
+       for (i = 0; i < nthreads; i++) {
+               /* Wait for thread end */
+               pthread_join( tp[i]->td_id, NULL);
+               send_calls+=tp[i]->send_calls;
+               send_errors+=tp[i]->send_errors;
+    }
+
        printf("\n");
        printf("start:             %zd.%09lu\n", starttime.tv_sec,
            starttime.tv_nsec);
        printf("finish:            %zd.%09lu\n", tmptime.tv_sec,
            tmptime.tv_nsec);
-       printf("send calls:        %d\n", send_calls);
-       printf("send errors:       %d\n", send_errors);
-       printf("approx send rate:  %ld\n", (send_calls - send_errors) /
+       printf("send calls:        %lu\n", send_calls);
+       printf("send errors:       %lu\n", send_errors);
+       printf("approx send rate:  %lu\n", (send_calls - send_errors) /
            duration);
-       printf("approx error rate: %d\n", (send_errors / send_calls));
+       printf("approx error rate: %lu\n", (send_errors / send_calls));
 
-       return (0);
 }
 
 int
@@ -143,11 +174,15 @@
 {
        long payloadsize, duration;
        struct addrinfo hints, *res, *res0;
-       char *dummy, *packet;
-       int port, s, error;
+       char *dummy;
+       u_char *packet;
+       int port, s, error, nthreads = 1;
+       struct td_desc **tp;    
        const char *cause = NULL;
+       struct timespec starttime, tmptime;
+       struct itimerval it;
 
-       if (argc != 5)
+       if (argc < 5)
                usage();
 
        memset(&hints, 0, sizeof(hints));
@@ -177,6 +212,11 @@
                /*NOTREACHED*/
        }
 
+       if (argc > 5)
+        nthreads = strtoul(argv[5], &dummy, 10);
+    if (nthreads < 1 || nthreads > 64)
+        usage();
+
        packet = malloc(payloadsize);
        if (packet == NULL) {
                perror("malloc");
@@ -216,6 +256,43 @@
 
        freeaddrinfo(res0);
 
-       return (blast_loop(s, duration, packet, payloadsize));
+       printf("netblast %d threads sending on UDP port %d\n",
+    nthreads, (u_short)port);
 
+       /*
+        * Begin by recording time information stats.
+        * Set the interval timer for when we want to wake up.
+        * SIGALRM will set a flag indicating it's time to stop.  Note that 
there's
+        * some overhead to the signal and timer setup, so the smaller the 
duration,
+        * the higher the relative overhead.
+        */
+
+       if (signal(SIGALRM, signal_handler) == SIG_ERR) {
+               perror("signal");
+               return (-1);
+       }
+
+       if (clock_getres(CLOCK_REALTIME, &tmptime) == -1) {
+               perror("clock_getres");
+               return (-1);
+       }
+
+       if (clock_gettime(CLOCK_REALTIME, &starttime) == -1) {
+               perror("clock_gettime");
+               return (-1);
+       }
+
+       it.it_interval.tv_sec = 0;
+       it.it_interval.tv_usec = 0;
+       it.it_value.tv_sec = duration;
+       it.it_value.tv_usec = 0;
+
+       if (setitimer(ITIMER_REAL, &it, NULL) < 0) {
+               perror("setitimer");
+               return (-1);
+       }
+
+    tp = make_threads(s, packet, payloadsize, nthreads);
+    main_thread(tp, duration, starttime,  tmptime, nthreads);
+
 }


>Release-Note:
>Audit-Trail:
>Unformatted:
_______________________________________________
[email protected] mailing list
http://lists.freebsd.org/mailman/listinfo/freebsd-bugs
To unsubscribe, send any mail to "[email protected]"

Reply via email to