RPM Package Manager, CVS Repository
  http://rpm5.org/cvs/
  ____________________________________________________________________________

  Server: rpm5.org                         Name:   Jeff Johnson
  Root:   /v/rpm/cvs                       Email:  j...@rpm5.org
  Module: rpm                              Date:   24-May-2017 08:03:18
  Branch: rpm-5_4                          Handle: 2017052406031800

  Modified files:           (Branch: rpm-5_4)
    rpm                     CHANGES
    rpm/rpmio               msqio.c rpmmsq.h tmq.c

  Log:
    - msqio: add condvar to get rid of racy timeouts (loopback mode only).
    - msqio: make peace with O_NONBLOCK.

  Summary:
    Revision    Changes     Path
    1.3501.2.559+2  -0      rpm/CHANGES
    1.1.2.9     +161 -73    rpm/rpmio/msqio.c
    1.1.2.8     +4  -1      rpm/rpmio/rpmmsq.h
    1.1.2.8     +41 -4      rpm/rpmio/tmq.c
  ____________________________________________________________________________

  patch -p0 <<'@@ .'
  Index: rpm/CHANGES
  ============================================================================
  $ cvs diff -u -r1.3501.2.558 -r1.3501.2.559 CHANGES
  --- rpm/CHANGES       23 May 2017 19:36:46 -0000      1.3501.2.558
  +++ rpm/CHANGES       24 May 2017 06:03:18 -0000      1.3501.2.559
  @@ -1,4 +1,6 @@
   5.4.17 -> 5.4.18:
  +    - jbj: msqio: add condvar to get rid of racy timeouts (loopback mode 
only).
  +    - jbj: msqio: make peace with O_NONBLOCK.
       - jbj: msqio: permit POSIX/SysV message queue coexistence, default is 
POSIX.
       - jbj: msqio: set oflags/fdno on fd.
       - jbj: msqio: loopback mode refactoring.
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/msqio.c
  ============================================================================
  $ cvs diff -u -r1.1.2.8 -r1.1.2.9 msqio.c
  --- rpm/rpmio/msqio.c 23 May 2017 19:36:46 -0000      1.1.2.8
  +++ rpm/rpmio/msqio.c 24 May 2017 06:03:18 -0000      1.1.2.9
  @@ -32,10 +32,20 @@
   #define MSQF_ISSET(_FLAG) F_ISSET(msq->flags, _FLAG)
   
   #define SPEW(_list) \
  -    if (MSQF_ISSET(DEBUG) || _rpmmsq_debug || _rpmio_debug) fprintf _list
  +    if ((msq && MSQF_ISSET(DEBUG)) || _rpmmsq_debug || _rpmio_debug) fprintf 
_list
   
   #define      MSQONLY(fd)     assert(fdGetIo(fd) == msqio)
   
  +#define Z(_rc)  assert((_rc) == 0)
  +#define LOCK(_m)     { Z(pthread_mutex_lock(&_m)); }
  +#define UNLOCK(_m)   { Z(pthread_mutex_unlock(&_m)); }
  +#define SIGNAL(_c)   { Z(pthread_cond_signal(&_c)); }
  +#define WAIT(_c,_m)  { Z(pthread_cond_wait(&_c, &_m)); }
  +#define DESTROY(_c,_m) \
  +     { Z(pthread_cond_destroy(&_c)); Z(pthread_mutex_destroy(&_m)); }
  +#define INIT(_c,_m) \
  +     { Z(pthread_cond_init(&_c, NULL)); Z(pthread_mutex_init(&_m, NULL)); }
  +
   #ifdef __cplusplus
   GENfree(rpmmsq)
   #endif       /* __cplusplus */
  @@ -46,13 +56,16 @@
       if (msq) {
        msq->flags = 0;
        msq->qname = _free(msq->qname);
  -     msq->qid = 0;
  +     msq->qid = -1;
        msq->oflags = 0;
        msq->omode = 0;
        msq->key = 0;
        msq->mtype = 0;
        msq->tid = 0;
  +     DESTROY(msq->c, msq->m);
  +     INIT(msq->c, msq->m);
        msq->ntimeout = 0;
  +     msq->nagain = 0;
        msq->nwait = 0;
        msq->nsent = 0;
        msq->nrecv = 0;
  @@ -122,6 +135,28 @@
            omode |= IPC_EXCL;
            continue;
            break;
  +     case 'D':       /* XXX collides with rpmio O_DIRECT */
  +         flags |= RPMMSQ_FLAGS_DELETE;
  +         continue;
  +         break;
  +     case 'R':       /* XXX */
  +         flags |= RPMMSQ_FLAGS_RESET;
  +         continue;
  +         break;
  +     case 'I':       /* XXX collides with rpmio fallocate(2) */
  +         flags |= RPMMSQ_FLAGS_INFO;
  +         continue;
  +         break;
  +     case 'P':       /* XXX collides with rpmio syncfs(2) */
  +         flags &= ~RPMMSQ_TYPE_MASK;
  +         flags |= RPMMSQ_TYPE_POSIX;
  +         continue;
  +         break;
  +     case 'S':       /* XXX collides with rpmio sync(2) */
  +         flags &= ~RPMMSQ_TYPE_MASK;
  +         flags |= RPMMSQ_TYPE_SYSV;
  +         continue;
  +         break;
        case '?':       /* XXX loopback mode */
            flags |= RPMMSQ_FLAGS_DEBUG;
            continue;
  @@ -174,7 +209,7 @@
       {
   #if defined(WITH_MQ)
        int xx = mq_unlink(msq->qname);
  -SPEW((stderr, "<-- %s(%s) rc %d (%s)\n", "mq_unlink", msq->qname, xx, 
__FUNCTION__));
  +SPEW((stderr, "<--\t%s(%s) rc %d (%s)\n", "mq_unlink", msq->qname, xx, 
__FUNCTION__));
   #endif       /* WITH_MQ */
       }        break;
       case RPMMSQ_TYPE_SYSV:
  @@ -184,7 +219,7 @@
        if (qid != -1) {
            int cmd = IPC_RMID;
            int xx = msgctl(qid, cmd, NULL);
  -SPEW((stderr, "<-- %s(0x%x,0%o,%p) rc %d (%s)\n", "msqctl", qid, cmd, NULL, 
xx, __FUNCTION__));
  +SPEW((stderr, "<--\t%s(0x%x,%d,%p) rc %d (%s)\n", "msqctl", qid, cmd, NULL, 
xx, __FUNCTION__));
        }
   #endif       /* WITH_MSQ */
       }        break;
  @@ -204,9 +239,11 @@
            .mq_curmsgs = 0,
        }, *attrs = &_attrs;
        msq->qid = mq_open(msq->qname, msq->oflags, msq->omode, attrs);
  -SPEW((stderr, "<-- %s(%s,0x%x,0%o,%p) qid %d\n", "mq_open", msq->qname, 
msq->oflags, msq->omode, attrs, msq->qid));
  +SPEW((stderr, "<--\t%s(%s,0x%x,0%o,%p) qid %d\n", "mq_open", msq->qname, 
msq->oflags, msq->omode, attrs, msq->qid));
   
        /* (loopback mode) Configure the detached reader. */
  +     msq->tid = 0;
  +     INIT(msq->c, msq->m);
        if (msq->qid != -1 && MSQF_ISSET(LOOP)) {
            pthread_attr_t attr;
            int xx;
  @@ -227,14 +264,14 @@
       {
   #if defined(WITH_MSQ)
        msq->qid = msgget(msq->key, omode);
  -SPEW((stderr, "<-- %s(0x%x,0%o) qid %d\n", "msqget", msq->key, omode, 
msq->qid));
  +SPEW((stderr, "<--\t%s(0x%x,0%o) qid %d\n", "msqget", msq->key, omode, 
msq->qid));
   #endif       /* WITH_MSQ */
       }        break;
       default:
        break;
       }
   
  -SPEW((stderr, "<-- %s(%s,%s,%d,0x%x) qid %d\n", __FUNCTION__, path, fmode, 
fdno, flags, (msq ? msq->qid : -1)));
  +SPEW((stderr, "<== %s(%s,%s,%d,0x%x) qid %d\n", __FUNCTION__, path, fmode, 
fdno, flags, (msq ? msq->qid : -1)));
   
       return rpmmsqLink(msq);
   }
  @@ -243,6 +280,7 @@
   {
       ssize_t rc = -1; /* assume failure */
   
  +    if (msq) /* XXX WTF? */
       switch (msq->flags & RPMMSQ_TYPE_MASK) {
       case RPMMSQ_TYPE_DEFAULT:
       case RPMMSQ_TYPE_POSIX:
  @@ -257,7 +295,7 @@
            nb = rc;
            msq->nrecv++;
        }
  -SPEW((stderr, "<-- %s(0x%x,%p[%lu],%p) rc %ld prio %u\t\"%.*s\"\n", 
"mq_receive", msq->qid, buf, (unsigned long)count, priop, (long)rc, prio, nb, 
buf));
  +SPEW((stderr, "<--\t%s(0x%x,%p[%lu],%p) rc %ld prio %u\t\"%.*s\"\n", 
"mq_receive", msq->qid, buf, (unsigned long)count, priop, (long)rc, prio, nb, 
buf));
        if (priop)
            *priop = prio;
   #endif       /* WITH_MQ */
  @@ -279,7 +317,7 @@
            nb = rc;
            msq->nrecv++;
        }
  -SPEW((stderr, "<-- %s(0x%x,%p,%lu,%ld,%d) rc %ld\t\"%.*s\"\n", "msqrcv", 
msq->qid, msgp, (unsigned long)msgsz, (long)msgtyp, msgflg, (long)rc, nb, buf));
  +SPEW((stderr, "<--\t%s(0x%x,%p,%lu,%ld,%d) rc %ld\t\"%.*s\"\n", "msqrcv", 
msq->qid, msgp, (unsigned long)msgsz, (long)msgtyp, msgflg, (long)rc, nb, buf));
        msgp = _free(msgp);
   #endif       /* WITH_MSQ */
       }        break;
  @@ -287,6 +325,7 @@
        break;
       }
   
  +SPEW((stderr, "<== %s(%p,%p[%lu],%p) qid %d rc %ld *priop %u\n", 
__FUNCTION__, msq, buf, count, priop, (msq ? msq->qid : -1), rc, (priop ? 
*priop : 0)));
       return rc;
   }
   
  @@ -294,28 +333,29 @@
   {
       ssize_t rc = -1; /* assume failure */
   
  +    if (msq) /* XXX WTF? */
       switch (msq->flags & RPMMSQ_TYPE_MASK) {
       case RPMMSQ_TYPE_DEFAULT:
       case RPMMSQ_TYPE_POSIX:
       {
   #if defined(WITH_MQ)
  +
        rc = mq_send(msq->qid, buf, count, prio);
  -     int nb = (rc == 0 ? count : 0);
  -SPEW((stderr, "<-- %s(0x%x,%p[%lu],%u) rc %ld\t\"%.*s\"\n", "mq_send", 
msq->qid, buf, (unsigned long)count, prio, (long)rc, nb, buf));
  -     if (rc == 0) {  /* XXX remap to write(2) return */
  -         rc = count;
  -         msq->nsent++;
  -     }
  +     msq->nsent++;
   
        /* (loopback mode) Wait for rpmmsqReader to start up. */
  -     sched_yield();  /* Give detached threads a chance to run. */
  -     if (MSQF_ISSET(LOOP) && prio == 0 && msq->tid == 0) {
  -         struct timespec req = { 0, 1000*1000 };
  -         while (msq->tid == 0) {
  -             nanosleep(&req, NULL);
  -             msq->nwait++;
  -         }
  +     if (MSQF_ISSET(LOOP)) {
  +         LOCK(msq->m);
  +         while (msq->nsent > msq->nrecv)
  +             WAIT(msq->c, msq->m);
  +         UNLOCK(msq->m);
        }
  +
  +     int nb = (rc == 0 ? count : 0);
  +SPEW((stderr, "<--\t%s(0x%x,%p[%lu],%u) rc %ld\t\t\t\"%.*s\"\n", "mq_send", 
msq->qid, buf, (unsigned long)count, prio, (long)rc, nb, buf));
  +
  +     if (rc == 0)    /* XXX remap to write(2) return */
  +         rc = count;
   #endif       /* WITH_MQ */
       }        break;
       case RPMMSQ_TYPE_SYSV:
  @@ -330,7 +370,7 @@
            memcpy(msgp->mtext, buf, count);
        rc = msgsnd(msq->qid, msgp, msgsz, msgflg);
        int nb = (rc == 0 ? count : 0);
  -SPEW((stderr, "<-- %s(0x%x,%p,%lu,%d) rc %ld\t\"%.*s\"\n", "msqsnd", 
msq->qid, msgp, (unsigned long)msgsz, msgflg, (long)rc, nb, buf));
  +SPEW((stderr, "<--\t%s(0x%x,%p,%lu,%d) rc %ld\t\t\"%.*s\"\n", "msqsnd", 
msq->qid, msgp, (unsigned long)msgsz, msgflg, (long)rc, nb, buf));
        msgp = _free(msgp);
        if (rc == 0) {  /* XXX remap to write(2) return */
            rc = count;
  @@ -342,13 +382,15 @@
        break;
       }
   
  +SPEW((stderr, "<== %s(%p,%p[%lu],%u) qid %d rc %ld\n", __FUNCTION__, msq, 
buf, count, prio, (msq ? msq->qid : -1), rc));
       return rc;
   }
   
   static int rpmmsqSeek(rpmmsq msq, _libio_pos_t pos,
                        int whence)
   {
  -    ssize_t rc = -2; /* assume failure */
  +    int rc = -2;     /* assume failure */
  +SPEW((stderr, "<== %s(%p,%p,%d) qid %d rc %d\n", __FUNCTION__, msq, pos, 
whence, (msq ? msq->qid : -1), rc));
       return rc;
   }
   
  @@ -356,35 +398,41 @@
   {
       int rc = -2;     /* assume failure */
   
  +    if (msq) /* XXX WTF? */
       switch (msq->flags & RPMMSQ_TYPE_MASK) {
       case RPMMSQ_TYPE_DEFAULT:
       case RPMMSQ_TYPE_POSIX:
       {
   #if defined(WITH_MQ)
  +     int prio = RPMMSQ_PRIO_EXIT;
  +
  +     rc = mq_send(msq->qid, __FUNCTION__, sizeof(__FUNCTION__)-1, prio);
  +     msq->nsent++;
  +
  +     /* (loopback mode) Terminate rpmmsqReader. */
        if (MSQF_ISSET(LOOP)) {
  -         /* (loopback mode) Terminate rpmmsqReader. */
  -         if (msq->tid) {
  -             rc = rpmmsqSend(msq, __FUNCTION__, sizeof(__FUNCTION__)-1, 
RPMMSQ_PRIO_EXIT);
  -             struct timespec req = { 0, 1000*1000 };
  -             do {
  -                 nanosleep(&req, NULL);
  -                 msq->nwait++;
  -             } while (msq->tid != 0);
  -         }
  -         /* (loopback mode) Turn off the sigev detached thread. */
  -         rc = rpmmsqNotify(msq, NULL);
  +         LOCK(msq->m);
  +         while (msq->nsent > msq->nrecv)
  +             WAIT(msq->c, msq->m);
  +         UNLOCK(msq->m);
        }
   
  -SPEW((stderr, "%s: sent %d recv %d wait %d timeout %d\n", __FUNCTION__, 
msq->nsent, msq->nrecv, msq->nwait, msq->ntimeout));
  +SPEW((stderr, "<--\t%s(0x%x,\"%s\",%u) rc %ld\n", "mq_send", msq->qid, 
__FUNCTION__, prio, (long)rc));
   
  -     if (_rpmmsq_debug)
  +     /* (loopback mode) Turn off the sigev detached thread. */
  +     rc = rpmmsqNotify(msq, NULL);
  +
  +SPEW((stderr, "%s: sent %d recv %d wait %d timeout %d again %d\n", 
__FUNCTION__, msq->nsent, msq->nrecv, msq->nwait, msq->ntimeout, msq->nagain));
  +
  +     if (_rpmmsq_debug || MSQF_ISSET(INFO))
            rpmmsqDump(__FUNCTION__, msq, NULL);
        rc = mq_close(msq->qid);
  -SPEW((stderr, "<-- %s(0x%x) rc %d\n", "mq_close", msq->qid, rc));
  +SPEW((stderr, "<--\t%s(0x%x) rc %d\n", "mq_close", msq->qid, rc));
        if (!rc && (delete || MSQF_ISSET(DELETE))) {
            rc = mq_unlink(msq->qname);         /* XXX rpmmsqReset? */
  -SPEW((stderr, "<-- %s(%s) rc %d (%s)\n", "mq_unlink", msq->qname, rc, 
__FUNCTION__));
  +SPEW((stderr, "<--\t%s(%s) rc %d (%s)\n", "mq_unlink", msq->qname, rc, 
__FUNCTION__));
        }
  +     rpmmsqFini(msq);
        rc = 0; /* XXX */
   #endif       /* WITH_MQ */
       }        break;
  @@ -392,37 +440,49 @@
       case RPMMSQ_TYPE_SYSV:
       {
   #if defined(WITH_MSQ)
  -     if (_rpmmsq_debug)
  +     if (_rpmmsq_debug || MSQF_ISSET(INFO))
            rpmmsqDump(__FUNCTION__, msq, NULL);
        if (delete || MSQF_ISSET(DELETE)) {
            int cmd = IPC_RMID;
            rc = msgctl(msq->qid, cmd, NULL);
  -SPEW((stderr, "<-- %s(0x%x,0%o,%p) rc %d (%s)\n", "msqctl", msq->qid, cmd, 
NULL, rc, __FUNCTION__));
  +SPEW((stderr, "<--\t%s(0x%x,%d,%p) rc %d (%s)\n", "msqctl", msq->qid, cmd, 
NULL, rc, __FUNCTION__));
        }
  +     rpmmsqFini(msq);
        rc = 0; /* XXX */
   #endif       /* WITH_MSQ */
       }        break;
       default:
        break;
       }
  +SPEW((stderr, "<== %s(%p,%d) qid %d rc %d\n", __FUNCTION__, msq, delete, 
(msq ? msq->qid : -1), rc));
       return rc;
   }
   
   static rpmmsq rpmmsqOpen(const char * path, const char * fmode)
   {
  -    return  rpmmsqNew(path, fmode, -1, 0);
  +    rpmmsq msq = NULL;
  +fprintf(stderr, "==> %s(%s,%s)\n", __FUNCTION__, path, fmode);
  +    msq = rpmmsqNew(path, fmode, -1, 0);
  +SPEW((stderr, "<== %s(%s,%s) qid %d\n", __FUNCTION__, path, fmode, (msq ? 
msq->qid : -1)));
  +    return msq;
   }
   
   static rpmmsq rpmmsqFdopen(void * _fdno, const char * fmode)
   {
  +    rpmmsq msq = NULL;
       int fdno = (int)((long)_fdno);   /* XXX hack */
  -    return  rpmmsqNew(NULL, fmode, fdno, 0);
  +fprintf(stderr, "==> %s(%d,%s)\n", __FUNCTION__, fdno, fmode);
  +    msq = rpmmsqNew(NULL, fmode, fdno, 0);
  +SPEW((stderr, "<== %s(%d,%s) qid %d\n", __FUNCTION__, fdno, fmode, (msq ? 
msq->qid : -1)));
  +    return msq;
   }
   
   static int rpmmsqFlush(void * _msq)
   {
  -    ssize_t rc = -2; /* assume failure */
  +    int rc = -2;     /* assume failure */
  +    rpmmsq msq = (rpmmsq) _msq;
       /* XXX TODO: drain the message queue? */
  +SPEW((stderr, "<== %s(%p) qid %d rc %d\n", __FUNCTION__, _msq, (msq ? 
msq->qid : -1), rc));
       return rc;
   }
   
  @@ -464,13 +524,14 @@
            rc = msgctl(msq->qid, cmd, buf);
            break;
        }
  -SPEW((stderr, "<-- %s(0x%x,%d,%p) rc %d\n", "msqctl", (unsigned)msq->qid, 
cmd, buf, rc));
  +SPEW((stderr, "<--\t%s(0x%x,%d,%p) rc %d\n", "msqctl", (unsigned)msq->qid, 
cmd, buf, rc));
   #endif       /* WITH_MSQ */
       }        break;
       default:
        break;
       }
   
  +SPEW((stderr, "<== %s(%p,%d,%p) qid %d rc %d\n", __FUNCTION__, msq, cmd, 
buf, (msq ? msq->qid : -1), rc));
       return rc;
   }
   
  @@ -485,7 +546,7 @@
       {
   #if defined(WITH_MQ)
        rc = mq_notify(msq->qid, _sevp);
  -SPEW((stderr, "<-- %s(0x%x,%p) rc %d\n", "mq_notify", msq->qid, _sevp, rc));
  +SPEW((stderr, "<--\t%s(0x%x,%p) rc %d\n", "mq_notify", msq->qid, _sevp, rc));
   #endif       /* WITH_MQ */
       }        break;
   
  @@ -496,6 +557,7 @@
        break;
       }
   
  +SPEW((stderr, "<== %s(%p,%p) qid %d rc %d\n", __FUNCTION__, msq, _sevp, (msq 
? msq->qid : -1), rc));
       return rc;
   }
   
  @@ -504,37 +566,62 @@
   #if defined(WITH_MQ)
       rpmmsq msq = (rpmmsq) sv.sival_ptr;
   
  -assert(msq);
  -    msq = rpmmsqLink(msq);
  -    msq->tid = pthread_self();
  +assert(msq = rpmmsqLink(msq));
   
   SPEW((stderr, "==> %s(%p) qid %d tid %ld\n", __FUNCTION__, msq, msq->qid, 
msq->tid));
       char b[BUFSIZ];
       size_t nb = sizeof(b);
  -    struct timespec ts = { 0, 0 };
       int rc;
   
       while (1) {
  -     int lrto_secs = 2;      /* XXX loopback retry timeout seconds */
        unsigned prio = 0;
  -     ts.tv_sec = time(NULL) + lrto_secs;
  -     int rc = mq_timedreceive(msq->qid, b, nb, &prio, &ts);
  -     int nr = (rc >= 0 ? rc : 0);
  -SPEW((stderr, "<-- %s(0x%x,%p[%lu]) rc %d prio %u\t\"%.*s\"\n", 
"mq_timedreceive", msq->qid, b, (unsigned long)nb, rc, prio, nr, b));
  -     if (rc < 0 && errno == ETIMEDOUT) {
  -         msq->ntimeout;
  -         continue;
  +     int rc;
  +
  +     while (1) {
  +
  +         /* Yield the CPU. */
  +         if (msq->oflags & O_NONBLOCK) {
  +             struct timespec ts = { 0, 100 };
  +             Z(nanosleep(&ts, NULL));
  +         }
  +
  +         rc = mq_receive(msq->qid, b, nb, &prio);
  +         if (rc < 0)
  +         switch (errno) {
  +         case ETIMEDOUT:
  +             msq->ntimeout++;
  +             continue;
  +         case EAGAIN:        /* XXX O_NONBLOCK */
  +             msq->nagain++;
  +             continue;
  +         }
  +         break;
        }
  +
  +     LOCK(msq->m);
        msq->nrecv++;
  -     /* Exit immediately on error or highest priority message. */
  -     if (rc < 0 || !strcmp(b, "XXX") || prio == RPMMSQ_PRIO_EXIT)
  +     SIGNAL(msq->c);
  +     UNLOCK(msq->m);
  +
  +     int nr = (rc >= 0 ? rc : 0);
  +SPEW((stderr, "<--\t%s(0x%x,%p[%lu]) rc %d prio %u\t\"%.*s\"\n", 
"mq_receive", msq->qid, b, (unsigned long)nb, rc, prio, nr, b));
  +
  +     /* Exit immediately on error. */
  +     if (rc < 0) {
  +         fprintf(stderr, "*** %s: error exit(%d): %m\n", __FUNCTION__, 
errno);
  +         break;
  +     }
  +
  +     /* Exit on highest priority message. */
  +     if (prio == RPMMSQ_PRIO_EXIT)
            break;
       }
  +
   SPEW((stderr, "<== %s(%p) rc %d\n", __FUNCTION__, msq, rc));
   
       msq->tid = 0;
  -
       msq = rpmmsqFree(msq);
  +
   #endif       /* WITH_MQ */
   }
   
  @@ -585,12 +672,12 @@
        fprintf(fp, "\t  rtime: %lu\n", ds->msg_rtime);
        fprintf(fp, "\t  ctime: %lu\n", ds->msg_ctime);
   #if defined(__linux__)
  -     fprintf(fp, "\t cbytes: %lu\n", ds->__msg_cbytes);
  +     fprintf(fp, "\t cbytes: %lu b in queue\n", ds->__msg_cbytes);
   #endif
  -     fprintf(fp, "\t   qnum: %lu\n", ds->msg_qnum);
  +     fprintf(fp, "\t   qnum: %lu msgs in queue\n", ds->msg_qnum);
        fprintf(fp, "\t qbytes: %lu\n", ds->msg_qbytes);
  -     fprintf(fp, "\t  lspid: %d\n", ds->msg_lspid);
  -     fprintf(fp, "\t  lrpid: %d\n", ds->msg_lrpid);
  +     fprintf(fp, "\t  lspid: %d pid of last send\n", ds->msg_lspid);
  +     fprintf(fp, "\t  lrpid: %d pid of last recv\n", ds->msg_lrpid);
       }
   #endif       /* WITH_MSQ */
   }
  @@ -603,14 +690,14 @@
   #if defined(WITH_MSQ) && defined(IPC_INFO)
       struct msginfo * mi = (struct msginfo *) _ptr;;
       if (mi) {
  -     fprintf(fp, "\tmsgpool: %7d Kb\n", mi->msgpool);
  -     fprintf(fp, "\t msgmap: %7d\n", mi->msgmap);
  -     fprintf(fp, "\t msgmax: %7d\n", mi->msgmax);
  -     fprintf(fp, "\t msgmnb: %7d b\n", mi->msgmnb);
  -     fprintf(fp, "\t msgmni: %7d\n", mi->msgmni);
  -     fprintf(fp, "\t msgssz: %7d\n", mi->msgssz);
  -     fprintf(fp, "\t msgtql: %7d\n", mi->msgtql);
  -     fprintf(fp, "\t msgseg: %7d\n", mi->msgseg);
  +     fprintf(fp, "\tmsgpool: %7d Kib buffer pool\n", mi->msgpool);
  +     fprintf(fp, "\t msgmap: %7d entries in msg map\n", mi->msgmap);
  +     fprintf(fp, "\t msgmax: %7d b max msg size\n", mi->msgmax);
  +     fprintf(fp, "\t msgmnb: %7d b max in queue\n", mi->msgmnb);
  +     fprintf(fp, "\t msgmni: %7d max no. of queues\n", mi->msgmni);
  +     fprintf(fp, "\t msgssz: %7d msg segment size\n", mi->msgssz);
  +     fprintf(fp, "\t msgtql: %7d max msgs in system\n", mi->msgtql);
  +     fprintf(fp, "\t msgseg: %7d max no. of segments\n", mi->msgseg);
       }
   #endif       /* WITH_MSQ */
   }
  @@ -674,6 +761,7 @@
        break;
       }
   
  +SPEW((stderr, "<== %s(%p,%p) qid %d rc %d\n", __FUNCTION__, msg, msq, (msq ? 
msq->qid : -1), rc));
       return rc;
   }
   
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmsq.h
  ============================================================================
  $ cvs diff -u -r1.1.2.7 -r1.1.2.8 rpmmsq.h
  --- rpm/rpmio/rpmmsq.h        23 May 2017 19:36:46 -0000      1.1.2.7
  +++ rpm/rpmio/rpmmsq.h        24 May 2017 06:03:18 -0000      1.1.2.8
  @@ -47,8 +47,11 @@
       int omode;                       /*!< message queue open mode. */
       key_t key;                       /*!< SysV: ftok(3) or IPC_PRIVATE. */
       long mtype;                      /*!< SysV: message type. */
  -    volatile pthread_t tid;  /*!< LOOP: sigev thread id. */
  +    pthread_t tid;           /*!< LOOP: sigev thread id. */
  +    pthread_cond_t c;
  +    pthread_mutex_t m;
       int ntimeout;            /*!< LOOP: no. of receive timeouts. */
  +    int nagain;                      /*!< LOOP: no. of waits (O_NONBLOCK). */
       int nwait;                       /*!< LOOP: no. of waits for thread 
change. */
       int nsent;                       /*!< no. messages sent. */
       int nrecv;                       /*!< no. messages received. */
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/tmq.c
  ============================================================================
  $ cvs diff -u -r1.1.2.7 -r1.1.2.8 tmq.c
  --- rpm/rpmio/tmq.c   23 May 2017 19:36:46 -0000      1.1.2.7
  +++ rpm/rpmio/tmq.c   24 May 2017 06:03:18 -0000      1.1.2.8
  @@ -366,7 +366,7 @@
       int xx;
   
       const char *qname = "/rpm";;
  -    rpmmsq msq = rpmmsqNew(qname, "w+l?", -1, flags);
  +    rpmmsq msq = rpmmsqNew(qname, "w+S?", -1, flags);
   
       memset(b, 0, nb);
       if (msq) {
  @@ -383,19 +383,56 @@
        xx = rpmmsqRecv(msq, b, nb, NULL);
        memset(b, 0, nb);
        xx = rpmmsqRecv(msq, b, nb, NULL);
  +
  +     for (int i = 0; i < 1000; i++) {
  +         snprintf(b, nb, "%d", i);
  +         blen = strlen(b);
  +         xx = rpmmsqSend(msq, b, blen, priority);
  +         memset(b, 0, nb);
  +         xx = rpmmsqRecv(msq, b, nb, NULL);
  +     }
       }
   
       xx = rpmmsqClose(msq, 0);
       msq = rpmmsqFree(msq);
   
  -    /* XXX mq_notify memory leak if loopback mode. */
  -    FD_t fd = Fopen(qname, "w+?.msqio");
  +    FD_t fd = Fopen(qname, "w+lP?.msqio");
       if (fd) {
        strcpy(b, "foo bar baz");
        blen = strlen(b);
        xx = Fwrite(b, 1, blen, fd);
        memset(b, 0, blen);
        xx = Fread(b, 1, nb, fd);
  +     strcpy(b, "blah blah blah");
  +     blen = strlen(b);
  +     xx = Fwrite(b, 1, blen, fd);
  +     memset(b, 0, blen);
  +     xx = Fread(b, 1, nb, fd);
  +
  +     for (int i = 0; i < 1000; i++) {
  +         snprintf(b, nb, "%d", i);
  +         blen = strlen(b);
  +         xx = Fwrite(b, 1, blen, fd);
  +     }
  +
  +#ifdef       DISABLE
  +     xx = Fflush(fd);
  +     xx = Fileno(fd);
  +if (_rpmio_debug)
  +fprintf(stderr, "<== Fileno(%p) rc %d\n", fd, xx);
  +     xx = Ferror(fd);
  +#ifdef       NOTYET
  +     xx = Feof(fd);
  +     xx = Clearerr(fd);
  +#endif
  +     xx = Fseek(fd, 0, SEEK_CUR);
  +     xx = Ftell(fd);
  +     Rewind(fd);
  +     fpos_t pos = {};
  +     xx = Fgetpos(fd, &pos);
  +     xx = Fsetpos(fd, &pos);
  +#endif
  +
        xx = Fclose(fd);
       } else
        perror("Fopen");
  @@ -417,7 +454,7 @@
   
    { "delete", 'D', POPT_BIT_XOR|POPT_ARGFLAG_TOGGLE, &flags, _MSQBIT(DELETE),
        N_("remove queue after closing"), NULL },
  - { "info", 'i', POPT_BIT_XOR|POPT_ARGFLAG_TOGGLE,   &flags, _MSQBIT(INFO),
  + { "info", 'I', POPT_BIT_XOR|POPT_ARGFLAG_TOGGLE,   &flags, _MSQBIT(INFO),
        N_("display queue info on close"), NULL },
    { "loop", 'l', POPT_BIT_XOR|POPT_ARGFLAG_TOGGLE,   &flags, _MSQBIT(LOOP),
        N_("toggle mq_notify running"), NULL },
  @@ .
______________________________________________________________________
RPM Package Manager                                    http://rpm5.org
CVS Sources Repository                                rpm-cvs@rpm5.org

Reply via email to