RPM Package Manager, CVS Repository
  http://rpm5.org/cvs/
  ____________________________________________________________________________

  Server: rpm5.org                         Name:   Jeff Johnson
  Root:   /v/rpm/cvs                       Email:  j...@rpm5.org
  Module: rpm                              Date:   26-May-2017 15:22:18
  Branch: rpm-5_4                          Handle: 2017052613221701

  Modified files:           (Branch: rpm-5_4)
    rpm                     CHANGES
    rpm/rpmio               msqio.c rpmio.c rpmmsq.h tmq.c

  Log:
    - msqio: add a monitor to handle multiple readers/writers.

  Summary:
    Revision    Changes     Path
    1.3501.2.561+1  -0      rpm/CHANGES
    1.1.2.11    +273 -139   rpm/rpmio/msqio.c
    1.230.2.51  +6  -4      rpm/rpmio/rpmio.c
    1.1.2.10    +7  -2      rpm/rpmio/rpmmsq.h
    1.1.2.10    +21 -7      rpm/rpmio/tmq.c
  ____________________________________________________________________________

  patch -p0 <<'@@ .'
  Index: rpm/CHANGES
  ============================================================================
  $ cvs diff -u -r1.3501.2.560 -r1.3501.2.561 CHANGES
  --- rpm/CHANGES       25 May 2017 20:02:12 -0000      1.3501.2.560
  +++ rpm/CHANGES       26 May 2017 13:22:17 -0000      1.3501.2.561
  @@ -1,4 +1,5 @@
   5.4.17 -> 5.4.18:
  +    - jbj: msqio: add a monitor to handle multiple readers/writers.
       - jbj: msqio: add fopencookie wrapper.
       - jbj: msqio: add condvar to get rid of racy timeouts (loopback mode 
only).
       - jbj: msqio: make peace with O_NONBLOCK.
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/msqio.c
  ============================================================================
  $ cvs diff -u -r1.1.2.10 -r1.1.2.11 msqio.c
  --- rpm/rpmio/msqio.c 25 May 2017 20:02:13 -0000      1.1.2.10
  +++ rpm/rpmio/msqio.c 26 May 2017 13:22:17 -0000      1.1.2.11
  @@ -21,6 +21,7 @@
   #include <rpmlog.h>
   #include <rpmmacro.h>
   #include <rpmcb.h>           /* XXX rpmIsDebug() */
  +#include <yarn.h>
   
   #define      _RPMMSQ_INTERNAL
   #include "rpmmsq.h"
  @@ -37,15 +38,46 @@
   
   #define      MSQONLY(fd)     assert(fdGetIo(fd) == msqio)
   
  +static int _lockdebug = 0;
  +static int _conddebug = 0;
   #define Z(_rc)  assert((_rc) == 0)
  -#define LOCK(_m)     { Z(pthread_mutex_lock(&_m)); }
  -#define UNLOCK(_m)   { Z(pthread_mutex_unlock(&_m)); }
  -#define SIGNAL(_c)   { Z(pthread_cond_signal(&_c)); }
  -#define WAIT(_c,_m)  { Z(pthread_cond_wait(&_c, &_m)); }
  -#define DESTROY(_c,_m) \
  -     { Z(pthread_cond_destroy(&_c)); Z(pthread_mutex_destroy(&_m)); }
  -#define INIT(_c,_m) \
  -     { Z(pthread_cond_init(&_c, NULL)); Z(pthread_mutex_init(&_m, NULL)); }
  +#define LOCK(_m)  \
  +    {        if (_lockdebug) fprintf(stderr, "***  LOCKING(%p) %s\n", &_m, 
#_m);\
  +     Z(pthread_mutex_lock(&_m));\
  +    }
  +#define UNLOCK(_m) \
  +    {        Z(pthread_mutex_unlock(&_m));\
  +     if (_lockdebug) fprintf(stderr, "*** UNLOCKED(%p) %s\n", &_m, #_m);\
  +    }
  +#define SIGNAL(_c) \
  +    {        if (_conddebug) fprintf(stderr, "***   SIGNAL(%p) %s\n", &_c, 
#_c);\
  +     Z(pthread_cond_broadcast(&_c));\
  +    }
  +#define WAIT(_c,_m) \
  +    {        if (_conddebug) fprintf(stderr, "***     WAIT(%p) %s\n", &_c, 
#_c);\
  +     Z(pthread_cond_wait(&_c, &_m));\
  +    }
  +
  +#ifdef       NOTYET
  +static int _yarndebug = -1;
  +#define PEEK(_bolt)  yarnPeekLock(_bolt)
  +#define      POSSESS(_bolt) \
  +    {        yarnPossess(_bolt); \
  +     if (_yarndebug) fprintf(stderr, "***  POSSESS(%p)\t\t%ld\n", _bolt, 
PEEK(_bolt));\
  +    }
  +#define      RELEASE(_bolt) \
  +    {        if (_yarndebug) fprintf(stderr, "***  RELEASE(%p)\t\t%ld\n", 
_bolt, PEEK(_bolt));\
  +     yarnRelease(_bolt); \
  +    }
  +#define      TWIST(_bolt, _op, _val) \
  +    {        if (_yarndebug) fprintf(stderr, "***    TWIST(%p, %d, 
%d)\t%ld\n", _bolt, _op, _val, PEEK(_bolt));\
  +     yarnTwist(_bolt, _op, _val); \
  +    }
  +#define      WAITFOR(_bolt, _op, _val) \
  +    {        if (_yarndebug) fprintf(stderr, "***  WAITFOR(%p, %d, 
%d)\t%ld\n", _bolt, _op, _val, PEEK(_bolt));\
  +     yarnWaitFor(_bolt, _op, _val); \
  +    }
  +#endif
   
   #ifdef __cplusplus
   GENfree(rpmmsq)
  @@ -53,27 +85,45 @@
   
   /* =============================================================== */
   #if defined(WITH_MQ)
  -static inline
  -mqd_t __mq_open(const char *name, int oflag, mode_t mode,
  +static
  +mqd_t Mq_open(const char *name, int oflag, mode_t mode,
                struct mq_attr *attr)
   {
       rpmmsq msq = NULL;
       mqd_t mqdes = mq_open(name, oflag, mode, attr);
  -SPEW((stderr, "<--\t%s(%s,0x%x,0%o,%p) qid %d\n", "mq_open", name, oflag, 
mode, attr, mqdes));
  +SPEW((stderr, "<--\t%s(%s,0x%x,0%o,%p) qid %d\n", __FUNCTION__, name, oflag, 
mode, attr, mqdes));
       return mqdes;
   }
   
  -static inline
  -int __mq_notify(mqd_t mqdes, const struct sigevent *sevp)
  +static
  +int Mq_getattr(mqd_t mqdes, struct mq_attr *attr)
  +{
  +    rpmmsq msq = NULL;
  +    int rc = mq_getattr(mqdes, attr);
  +SPEW((stderr, "<--\t%s(0x%x,%p) rc %d\n", __FUNCTION__, mqdes, attr, rc));
  +    return rc;
  +}
  +
  +static
  +int Mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr)
  +{
  +    rpmmsq msq = NULL;
  +    int rc = mq_setattr(mqdes, newattr, oldattr);
  +SPEW((stderr, "<--\t%s(0x%x,%p,%p) rc %d\n", __FUNCTION__, mqdes, newattr, 
oldattr, rc));
  +    return rc;
  +}
  +
  +static
  +int Mq_notify(mqd_t mqdes, const struct sigevent *sevp)
   {
       rpmmsq msq = NULL;
       int rc = mq_notify(mqdes, sevp);
  -SPEW((stderr, "<--\t%s(0x%x,%p) rc %d\n", "mq_notify", mqdes, sevp, rc));
  +SPEW((stderr, "<--\t%s(0x%x,%p) rc %d\n", __FUNCTION__, mqdes, sevp, rc));
       return rc;
   }
   
  -static inline
  -int __mq_send(mqd_t mqdes, const char *msg_ptr,
  +static
  +int Mq_send(mqd_t mqdes, const char *msg_ptr,
                size_t msg_len, unsigned int msg_prio)
   {
       rpmmsq msq = NULL;
  @@ -93,12 +143,39 @@
            break;
        }
       }
  -SPEW((stderr, "<--\t%s(0x%x,%p[%lu],%u) rc %d\t\t\t\"%.*s\"\n", "mq_send", 
mqdes, buf, (unsigned long)count, prio, rc, nc, buf));
  +SPEW((stderr, "<--\t%s(0x%x,%p[%lu],%u) rc %d\t\t\t\"%.*s\"\n", 
__FUNCTION__, mqdes, buf, (unsigned long)count, prio, rc, nc, buf));
       return rc;
   }
   
  -static inline
  -ssize_t __mq_receive(mqd_t mqdes, char *msg_ptr,
  +static
  +int Mq_timedsend(mqd_t mqdes, const char *msg_ptr,
  +             size_t msg_len, unsigned int msg_prio,
  +             const struct timespec *abs_timeout)
  +{
  +    rpmmsq msq = NULL;
  +    int ncmax = 32;
  +    int nc = 0;
  +    const char * buf = msg_ptr;
  +    size_t count = msg_len;
  +    unsigned int prio = msg_prio;
  +    int rc;
  +
  +    rc = mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, abs_timeout);
  +    if (rc == 0) {
  +     /* Ensure printable. */
  +     for (nc = 0; nc < ncmax; nc++) {
  +         if (nc < (int)count && isprint(buf[nc]))
  +             continue;
  +         break;
  +     }
  +    }
  +SPEW((stderr, "<--\t%s(0x%x,%p[%lu],%u,%p) rc %d\t\t\t\"%.*s\"\n", 
__FUNCTION__, mqdes, buf, (unsigned long)count, prio, abs_timeout, rc, nc, 
buf));
  +    return rc;
  +}
  +
  +
  +static
  +ssize_t Mq_receive(mqd_t mqdes, char *msg_ptr,
                size_t msg_len, unsigned int *msg_prio)
   {
       rpmmsq msq = NULL;
  @@ -119,25 +196,52 @@
            break;
        }
       }
  -SPEW((stderr, "<--\t%s(0x%x,%p[%lu],%p) rc %ld prio %u\t\"%.*s\"\n", 
"mq_receive", mqdes, buf, (unsigned long)count, priop, (long)rc, prio, nc, 
buf));
  +SPEW((stderr, "<--\t%s(0x%x,%p[%lu],%p) rc %ld prio %u\t\"%.*s\"\n", 
__FUNCTION__, mqdes, buf, (unsigned long)count, priop, (long)rc, prio, nc, 
buf));
       return rc;
   }
   
  -static inline
  -int __mq_close(mqd_t mqdes)
  +static
  +ssize_t Mq_timedreceive(mqd_t mqdes, char *msg_ptr,
  +             size_t msg_len, unsigned int *msg_prio,
  +             const struct timespec *abs_timeout)
  +{
  +    rpmmsq msq = NULL;
  +    int ncmax = 32;
  +    int nc = 0;
  +    char * buf = msg_ptr;
  +    size_t count = msg_len;
  +    unsigned int *priop = msg_prio;
  +    unsigned int prio = *msg_prio;
  +    ssize_t rc;
  +
  +    rc = mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, abs_timeout);
  +    if (rc >= 0) {
  +     /* Ensure printable. */
  +     for (nc = 0; nc < ncmax; nc++) {
  +         if (nc < rc && isprint(buf[nc]))
  +             continue;
  +         break;
  +     }
  +    }
  +SPEW((stderr, "<--\t%s(0x%x,%p[%lu],%p,%p) rc %ld prio %u\t\"%.*s\"\n", 
__FUNCTION__, mqdes, buf, (unsigned long)count, priop, abs_timeout, (long)rc, 
prio, nc, buf));
  +    return rc;
  +}
  +
  +static
  +int Mq_close(mqd_t mqdes)
   {
       rpmmsq msq = NULL;
       int rc = mq_close(mqdes);
  -SPEW((stderr, "<--\t%s(0x%x) rc %d\n", "mq_close", mqdes, rc));
  +SPEW((stderr, "<--\t%s(0x%x) rc %d\n", __FUNCTION__, mqdes, rc));
       return rc;
   }
   
  -static inline
  -int __mq_unlink(const char *name)
  +static
  +int Mq_unlink(const char *name)
   {
       rpmmsq msq = NULL;
       int rc = mq_unlink(name);
  -SPEW((stderr, "<--\t%s(%s) rc %d\n", "mq_unlink", name, rc));
  +SPEW((stderr, "<--\t%s(%s) rc %d\n", __FUNCTION__, name, rc));
       return rc;
   }
   
  @@ -145,17 +249,17 @@
   
   /* =============================================================== */
   #if defined(WITH_MSQ)
  -static inline
  -int __msgget(key_t key, int msgflg)
  +static
  +int Msgget(key_t key, int msgflg)
   {
       rpmmsq msq = NULL;
       int rc = msgget(key, msgflg);
  -SPEW((stderr, "<--\t%s(0x%x,0%o) rc %d\n", "msgget", key, msgflg, rc));
  +SPEW((stderr, "<--\t%s(0x%x,0%o) rc %d\n", __FUNCTION__, key, msgflg, rc));
       return rc;
   }
   
  -static inline
  -ssize_t __msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int 
msgflg)
  +static
  +ssize_t Msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg)
   {
       rpmmsq msq = NULL;
       int ncmax = 32;
  @@ -173,12 +277,12 @@
            break;
        }
       }
  -SPEW((stderr, "<--\t%s(0x%x,%p,%lu,%ld,%d) rc %ld\t\"%.*s\"\n", "msqrcv", 
msqid, msgp, (unsigned long)msgsz, msgtyp, msgflg, (long)rc, nc, buf));
  +SPEW((stderr, "<--\t%s(0x%x,%p,%lu,%ld,%d) rc %ld\t\"%.*s\"\n", 
__FUNCTION__, msqid, msgp, (unsigned long)msgsz, msgtyp, msgflg, (long)rc, nc, 
buf));
       return rc;
   }
   
  -static inline
  -int __msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg)
  +static
  +int Msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg)
   {
       rpmmsq msq = NULL;
       int ncmax = 32;
  @@ -196,16 +300,16 @@
            break;
        }
       }
  -SPEW((stderr, "<--\t%s(0x%x,%p,%lu,%d) rc %d\t\t\"%.*s\"\n", "msqsnd", 
msqid, msgp, (unsigned long)msgsz, msgflg, rc, nc, buf));
  +SPEW((stderr, "<--\t%s(0x%x,%p,%lu,%d) rc %d\t\t\"%.*s\"\n", __FUNCTION__, 
msqid, msgp, (unsigned long)msgsz, msgflg, rc, nc, buf));
       return rc;
   }
   
  -static inline
  -int __msgctl(int msqid, int cmd, struct msqid_ds *buf)
  +static
  +int Msgctl(int msqid, int cmd, struct msqid_ds *buf)
   {
       rpmmsq msq = NULL;
       int rc = msgctl(msqid, cmd, buf);
  -SPEW((stderr, "<--\t%s(0x%x,%d,%p) rc %d\n", "msqctl", msqid, cmd, buf, rc));
  +SPEW((stderr, "<--\t%s(0x%x,%d,%p) rc %d\n", __FUNCTION__, msqid, cmd, buf, 
rc));
       return rc;
   }
   #endif       /* WITH_MSQ */
  @@ -245,6 +349,15 @@
   {
       rpmmsq msq = (rpmmsq) _msq;
       if (msq) {
  +
  +     msq->tid = 0;
  +SPEW((stderr, "%s: inflight %d:%d sent %d recv %d timeout %d again %d\n", 
__FUNCTION__, msq->i, msq->imax, msq->nsent, msq->nrecv, msq->ntimeout, 
msq->nagain));
  +     Z(pthread_mutex_destroy(&msq->m));
  +     Z(pthread_cond_destroy(&msq->e));
  +     Z(pthread_cond_destroy(&msq->f));
  +     msq->i = 0;
  +     msq->imax = 0;
  +
        msq->flags = 0;
        msq->qname = _free(msq->qname);
        msq->fmode = _free(msq->fmode);
  @@ -254,9 +367,7 @@
        msq->omode = 0;
        msq->key = 0;
        msq->mtype = 0;
  -     msq->tid = 0;
  -     DESTROY(msq->c, msq->m);
  -     INIT(msq->c, msq->m);
  +
        msq->ntimeout = 0;
        msq->nagain = 0;
        msq->nsent = 0;
  @@ -408,15 +519,15 @@
       case RPMMSQ_TYPE_POSIX:
       {
   #if defined(WITH_MQ)
  -     (void) __mq_unlink(msq->qname);
  +     (void) Mq_unlink(msq->qname);
   #endif       /* WITH_MQ */
       }        break;
       case RPMMSQ_TYPE_SYSV:
       {
   #if defined(WITH_MSQ)
  -     int qid = __msgget(msq->key, (omode & 0777));
  +     int qid = Msgget(msq->key, (omode & 0777));
        if (qid != -1) {
  -         (void) __msgctl(qid, IPC_RMID, NULL);
  +         (void) Msgctl(qid, IPC_RMID, NULL);
        }
   #endif       /* WITH_MSQ */
       }        break;
  @@ -440,11 +551,16 @@
            .mq_msgsize = 8192, /* /proc/sys/fs/mqueue/msgsize_default */
            .mq_curmsgs = 0,
        }, *attrs = &_attrs;
  -     msq->qid = __mq_open(msq->qname, msq->oflags, msq->omode, attrs);
  +     msq->qid = Mq_open(msq->qname, msq->oflags, msq->omode, attrs);
   
  -     /* (loopback mode) Configure the detached reader. */
  +     /* Initialize the monitor. */
        msq->tid = 0;
  -     INIT(msq->c, msq->m);
  +     Z(pthread_mutex_init(&msq->m, NULL));
  +     Z(pthread_cond_init(&msq->e, NULL));
  +     Z(pthread_cond_init(&msq->f, NULL));
  +     msq->i = 0;
  +     msq->imax = 10;
  +
        if (msq->qid != -1 && MSQF_ISSET(LOOP)) {
            pthread_attr_t attr;
            int xx;
  @@ -464,7 +580,7 @@
       case RPMMSQ_TYPE_SYSV:
       {
   #if defined(WITH_MSQ)
  -     msq->qid = __msgget(msq->key, omode);
  +     msq->qid = Msgget(msq->key, omode);
   #endif       /* WITH_MSQ */
       }        break;
       default:
  @@ -491,7 +607,15 @@
   
        unsigned int prio = 0;
   
  -     rc = __mq_receive(msq->qid, buf, count, &prio);
  +     /* Consumer in monitor. */
  +     LOCK(msq->m);
  +     while (msq->i == 0)
  +         WAIT(msq->e, msq->m);
  +     rc = Mq_receive(msq->qid, buf, count, &prio);
  +     if (msq->i-- == msq->imax)
  +         SIGNAL(msq->f);
  +     UNLOCK(msq->m);
  +
        if (rc >= 0) {
            msq->nrecv++;
            if (priop)
  @@ -508,9 +632,8 @@
        long msgtyp = msq->mtype;
        int msgflg = 0; /* IPC_NOWAIT, MSG_COPY, MSG_EXCEPT, MSG_NOERROR */
   
  -     rc = __msgrcv(msq->qid, msgp, msgsz, msgtyp, msgflg);
  +     rc = Msgrcv(msq->qid, msgp, msgsz, msgtyp, msgflg);
        if (rc >= 0) {
  -         msq->nrecv++;
            if (priop)
                *priop = msgp->mtype;
            if (rc > 0)
  @@ -537,19 +660,26 @@
       case RPMMSQ_TYPE_POSIX:
       {
   #if defined(WITH_MQ)
  -     rc = __mq_send(msq->qid, buf, count, prio);
  -     if (rc == 0) {
  -         msq->nsent++;
   
  -         /* (loopback mode) Wait for rpmmsqReader to start up. */
  -         if (MSQF_ISSET(LOOP)) {
  -             LOCK(msq->m);
  -             while (msq->nsent > msq->nrecv)
  -                 WAIT(msq->c, msq->m);
  -             UNLOCK(msq->m);
  -         }
  +     /* Producer in monitor. */
  +     LOCK(msq->m);
  +     while (msq->i == msq->imax)
  +         WAIT(msq->f, msq->m);
  +     rc = Mq_send(msq->qid, buf, count, prio);
  +     if (msq->i++ == 0)
  +         SIGNAL(msq->e);
  +     UNLOCK(msq->m);
  +
  +     /* (loopback mode) Wait for the reader thread. */
  +     if (msq->nsent == 0 && MSQF_ISSET(LOOP)) {
  +         /* Yield the CPU. */
  +         struct timespec ts = { 0, 100*1000 };
  +         Z(nanosleep(&ts, NULL));
        }
   
  +     if (rc == 0)
  +         msq->nsent++;
  +
        if (rc == 0)    /* XXX remap to write(2) return */
            rc = count;
   #endif       /* WITH_MQ */
  @@ -564,7 +694,7 @@
        msgp->mtype = msq->mtype;
        if (count)
            memcpy(msgp->mtext, buf, count);
  -     rc = __msgsnd(msq->qid, msgp, msgsz, msgflg);
  +     rc = Msgsnd(msq->qid, msgp, msgsz, msgflg);
        if (rc == 0) {
            msq->nsent++;
        }
  @@ -582,11 +712,18 @@
       return rc;
   }
   
  +static char *SEEK_[] = { "SET", "CUR", "END", "???" };
  +
   static int rpmmsqSeek(rpmmsq msq, _libio_pos_t pos,
                        int whence)
   {
  +#ifdef USE_COOKIE_SEEK_POINTER
  +    _IO_off64_t p = *pos;
  +#else
  +    off_t p = pos;
  +#endif
       int rc = -2;     /* assume failure */
  -SPEW((stderr, "<== %s(%p,%p,%d) qid %d rc %d\n", __FUNCTION__, msq, pos, 
whence, (msq ? msq->qid : -1), rc));
  +SPEW((stderr, "<== %s(%p, %ld, SEEK_%s) qid %d rc %d\n", __FUNCTION__, msq, 
p, SEEK_[whence&0x3], (msq ? msq->qid : -1), rc));
       return rc;
   }
   
  @@ -601,44 +738,40 @@
       case RPMMSQ_TYPE_POSIX:
       {
   #if defined(WITH_MQ)
  +     if (msq->qid != -1) {
  +         if (MSQF_ISSET(INFO) || _rpmmsq_debug)
  +             rpmmsqDump(NULL, msq, NULL);
   
  -     /* (loopback mode) Terminate rpmmsqReader. */
  -     if (msq->qid != -1 && MSQF_ISSET(LOOP)) {
  -         int prio = RPMMSQ_PRIO_EXIT;
  -         rc = __mq_send(msq->qid, __FUNCTION__, sizeof(__FUNCTION__)-1, 
prio);
  -         msq->nsent++;
  +         /* Producer in monitor. */
            LOCK(msq->m);
  -         while (msq->nsent > msq->nrecv)
  -             WAIT(msq->c, msq->m);
  -         UNLOCK(msq->m);
  +         while (msq->i == msq->imax)
  +             WAIT(msq->f, msq->m);
            /* Turn off the sigev detached thread. */
  -         rc = rpmmsqNotify(msq, NULL);
  -     }
  -
  -if (msq->qid != -1)
  -SPEW((stderr, "%s: sent %d recv %d timeout %d again %d\n", __FUNCTION__, 
msq->nsent, msq->nrecv, msq->ntimeout, msq->nagain));
  +         if (MSQF_ISSET(LOOP))
  +             rc = rpmmsqNotify(msq, NULL);
  +         rc = Mq_close(msq->qid);
  +         msq->qid = -1;
  +         if (msq->i++ == 0)
  +             SIGNAL(msq->e);
  +         UNLOCK(msq->m);
   
  -     if (msq->qid != -1) {
  -         if (MSQF_ISSET(INFO) || _rpmmsq_debug)
  -             rpmmsqDump(NULL, msq, NULL);
  -         rc = __mq_close(msq->qid);
        }
  +
        if (!rc && (delete || MSQF_ISSET(DELETE)))
  -         rc = __mq_unlink(msq->qname);               /* XXX rpmmsqReset? */
  -     rpmmsqFini(msq);
  -     rc = 0; /* XXX */
  +         rc = Mq_unlink(msq->qname);         /* XXX rpmmsqReset? */
   #endif       /* WITH_MQ */
       }        break;
   
       case RPMMSQ_TYPE_SYSV:
       {
   #if defined(WITH_MSQ)
  -     if (_rpmmsq_debug || MSQF_ISSET(INFO))
  -         rpmmsqDump(__FUNCTION__, msq, NULL);
  +     if (msq->qid != -1) {
  +         if (MSQF_ISSET(INFO) || _rpmmsq_debug)
  +             rpmmsqDump(__FUNCTION__, msq, NULL);
  +     }
  +     msq->qid = -1;
        if (delete || MSQF_ISSET(DELETE))
  -         rc = __msgctl(msq->qid, IPC_RMID, NULL);
  -     rpmmsqFini(msq);
  -     rc = 0; /* XXX */
  +         rc = Msgctl(msq->qid, IPC_RMID, NULL);
   #endif       /* WITH_MSQ */
       }        break;
       default:
  @@ -714,7 +847,7 @@
        case IPC_STAT:
        case IPC_SET:
        case IPC_RMID:
  -         rc = __msgctl(msq->qid, cmd, buf);
  +         rc = Msgctl(msq->qid, cmd, buf);
            break;
        }
   #endif       /* WITH_MSQ */
  @@ -737,7 +870,7 @@
       case RPMMSQ_TYPE_POSIX:
       {
   #if defined(WITH_MQ)
  -     rc = __mq_notify(msq->qid, _sevp);
  +     rc = Mq_notify(msq->qid, _sevp);
   #endif       /* WITH_MQ */
       }        break;
   
  @@ -769,63 +902,52 @@
       rpmmsq msq = (rpmmsq) sv.sival_ptr;
   assert(msq = rpmmsqLink(msq));
   
  +    msq->tid = pthread_self();
  +
   SPEW((stderr, "==> %s(%p) qid %d tid %ld\n", __FUNCTION__, msq, msq->qid, 
msq->tid));
       char b[BUFSIZ];
       size_t nb = sizeof(b);
       int rc;
   
  -    while (1) {
  -     int ncmax = 32;         /* max chars to print. */
  -     int nc = 0;             /* chars to print */
  +    while (msq->qid != -1) {
        unsigned int prio = 0;
        int rc;
   
  -     while (1) {
  -
  -         /* Yield the CPU. */
  -         if (msq->oflags & O_NONBLOCK) {
  -             struct timespec ts = { 0, 100 };
  -             Z(nanosleep(&ts, NULL));
  -         }
  -
  -         /* Loop on errors. */
  -         rc = __mq_receive(msq->qid, b, nb, &prio);
  -         if (rc < 0)
  -         switch (errno) {
  -         case ETIMEDOUT:     /* XXX mq_timedreceive() */
  -             msq->ntimeout++;
  -             continue;
  -         case EAGAIN:        /* XXX O_NONBLOCK */
  -             msq->nagain++;
  -             continue;
  +     /* Consumer in monitor. */
  +     LOCK(msq->m);
  +     while (msq->i == 0)
  +         WAIT(msq->e, msq->m);
  +     errno = 0;
  +     do {
  +         struct timespec ts = { 0, 1000 };
  +         rc = Mq_receive(msq->qid, b, nb, &prio);
  +         if (rc < 0) {
  +             switch (errno) {
  +             case ETIMEDOUT: /* XXX Mq_timedreceive() */
  +                 msq->ntimeout++;
  +                 Z(nanosleep(&ts, NULL));
  +                 continue;
  +             case EAGAIN:    /* XXX O_NONBLOCK */
  +                 msq->nagain++;
  +                 Z(nanosleep(&ts, NULL));
  +                 continue;
  +             }
            }
            break;
  -     }
  +     } while (rc < 0);
  +     if (msq->i-- == msq->imax)
  +         SIGNAL(msq->f);
  +     UNLOCK(msq->m);
   
        if (rc >= 0) {
            msq->nrecv++;
  -         /* Ensure printable. */
  -         for (nc = 0; nc < ncmax; nc++) {
  -             if (nc < rc && isprint(b[nc]))
  -                 continue;
  -             break;
  -         }
  +         /* Deliver the mssage through a callback. */
  +         int xx = rpmmsqDeliver(msq, b, rc, prio);
  +         (void)xx;
        }
  -
  -     LOCK(msq->m);
  -     SIGNAL(msq->c);
  -     UNLOCK(msq->m);
  -
  -     /* Exit immediately on highest priority message. */
  -     if (prio == RPMMSQ_PRIO_EXIT)
  -         break;
  -
  -     /* Deliver the mssage through a callback. */
  -     int xx = rpmmsqDeliver(msq, b, nb, prio);
  -     (void)xx;
       }
   
  -SPEW((stderr, "<== %s(%p) rc %d\n", __FUNCTION__, msq, rc));
  +SPEW((stderr, "<== %s(%p) qid %d tid %ld rc %d\n", __FUNCTION__, msq, 
msq->qid, msq->tid, rc));
   
       msq->tid = 0;
       msq = rpmmsqFree(msq);
  @@ -978,8 +1100,9 @@
   }
   
   /* =============================================================== */
  +static
   RPM_GNUC_PURE
  -static inline void * msqFileno(FD_t fd)
  +void * msqFileno(FD_t fd)
   {
       void * rc = NULL;
       int i;
  @@ -996,7 +1119,8 @@
       return rc;
   }
   
  -static FD_t msqOpen(const char * path, const char * fmode)
  +static
  +FD_t msqOpen(const char * path, const char * fmode)
   {
       FD_t fd;
       rpmmsq msq = rpmmsqOpen(path, fmode);
  @@ -1016,7 +1140,8 @@
       return fd;
   }
   
  -static FD_t msqFdopen(void * cookie, const char * fmode)
  +static
  +FD_t msqFdopen(void * cookie, const char * fmode)
   {
       FD_t fd = c2f(cookie);
       int fdno = fdFileno(fd);
  @@ -1056,15 +1181,17 @@
       return fdLink(fd, __FUNCTION__);
   }
   
  +static
   RPM_GNUC_PURE
  -static int msqFlush(void * cookie)
  +int msqFlush(void * cookie)
   {
       FD_t fd = c2f(cookie);
       rpmmsq msq = (rpmmsq) msqFileno(fd);
       return rpmmsqFlush(msq);
   }
   
  -static ssize_t msqRead(void * cookie, char * buf, size_t count)
  +static
  +ssize_t msqRead(void * cookie, char * buf, size_t count)
   {
       FD_t fd = c2f(cookie);
       rpmmsq msq = (rpmmsq) msqFileno(fd);
  @@ -1099,18 +1226,25 @@
       return rc;
   }
   
  +static
   RPM_GNUC_PURE
  -static int msqSeek(void * cookie, _libio_pos_t pos, int whence)
  +int msqSeek(void * cookie, _libio_pos_t pos, int whence)
   {
       FD_t fd = c2f(cookie);
       rpmmsq msq = (rpmmsq) msqFileno(fd);
  +#ifdef USE_COOKIE_SEEK_POINTER
  +    _IO_off64_t p = *pos;
  +#else
  +    off_t p = pos;
  +#endif
   
  +SPEW((stderr, "==> %s(%p, %ld, SEEK_%s)\n", __FUNCTION__, fd, p, 
SEEK_[whence&0x3]));
   assert(msq != NULL);
  -    MSQONLY(fd);
       return rpmmsqSeek(msq, pos, whence);
   }
   
  -static int msqClose(void * cookie)
  +static
  +int msqClose(void * cookie)
   {
       FD_t fd = c2f(cookie);
       rpmmsq msq = (rpmmsq) msqFileno(fd);
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmio.c
  ============================================================================
  $ cvs diff -u -r1.230.2.50 -r1.230.2.51 rpmio.c
  --- rpm/rpmio/rpmio.c 25 May 2017 20:02:13 -0000      1.230.2.50
  +++ rpm/rpmio/rpmio.c 26 May 2017 13:22:17 -0000      1.230.2.51
  @@ -269,6 +269,8 @@
       return fd;
   }
   
  +static char *SEEK_[] = { "SET", "CUR", "END", "???" };
  +
   static inline
   int fdSeekNot(void * cookie, _libio_pos_t pos, int whence)
   {
  @@ -498,7 +500,7 @@
       rc = lseek(fdFileno(fd), p, whence);
       fdstat_exit(fd, opx, rc);
   
  -DBGIO(fd, (stderr, "<--\tfdSeek(%p,%ld,%d) rc %lx %s\n", cookie, (long)p, 
whence, (unsigned long)rc, fdbg(fd)));
  +DBGIO(fd, (stderr, "<--\tfdSeek(%p, %ld, SEEK_%s) rc %lx %s\n", cookie, 
(long)p, SEEK_[whence&0x3], (unsigned long)rc, fdbg(fd)));
   
       return (int) rc;
   }
  @@ -2438,7 +2440,7 @@
       int rc;
   
       FDSANE(fd);
  -DBGIO(fd, (stderr, "==> Fread(%p,%u,%u,%p) %s\n", buf, (unsigned)size, 
(unsigned)nmemb, (fd ? fd : NULL), fdbg(fd)));
  +DBGIO(fd, (stderr, "==> Fread(%p,%u,%u,%p)\n", buf, (unsigned)size, 
(unsigned)nmemb, (fd ? fd : NULL)));
   
       if (fdGetIo(fd) == fpio) {
        rc = (int) fread(buf, size, nmemb, fdGetFILE(fd));
  @@ -2457,7 +2459,7 @@
       int rc;
   
       FDSANE(fd);
  -DBGIO(fd, (stderr, "==> Fwrite(%p,%u,%u,%p) %s\n", buf, (unsigned)size, 
(unsigned)nmemb, (fd ? fd : NULL), fdbg(fd)));
  +DBGIO(fd, (stderr, "==> Fwrite(%p,%u,%u,%p)\n", buf, (unsigned)size, 
(unsigned)nmemb, (fd ? fd : NULL)));
   
       if (fdGetIo(fd) == fpio) {
        rc = (int) fwrite(buf, size, nmemb, fdGetFILE(fd));
  @@ -2483,7 +2485,7 @@
       long int rc;
   
       FDSANE(fd);
  -DBGIO(fd, (stderr, "==> Fseek(%p,%ld,%d) %s\n", fd, (long)offset, whence, 
fdbg(fd)));
  +DBGIO(fd, (stderr, "==> Fseek(%p, %ld, SEEK_%s) %s\n", fd, (long)offset, 
SEEK_[whence&0x3], fdbg(fd)));
   
       if (fdGetIo(fd) == fpio)
        return fseek(fdGetFILE(fd), (long)offset, whence);
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmsq.h
  ============================================================================
  $ cvs diff -u -r1.1.2.9 -r1.1.2.10 rpmmsq.h
  --- rpm/rpmio/rpmmsq.h        25 May 2017 20:02:13 -0000      1.1.2.9
  +++ rpm/rpmio/rpmmsq.h        26 May 2017 13:22:17 -0000      1.1.2.10
  @@ -50,8 +50,13 @@
       key_t key;                       /*!< SysV: ftok(3) or IPC_PRIVATE. */
       long mtype;                      /*!< SysV: message type. */
       pthread_t tid;           /*!< LOOP: sigev thread id. */
  -    pthread_cond_t c;
  -    pthread_mutex_t m;
  +
  +    pthread_mutex_t m;               /*!< monitor mutex. */
  +    pthread_cond_t  e;               /*!< empty condition. */
  +    pthread_cond_t  f;               /*!< full condition. */
  +    int i;                   /*!< no. of inflight messages. */
  +    int imax;                        /*!< max. inflight messages. */
  +
       int ntimeout;            /*!< LOOP: no. of receive timeouts. */
       int nagain;                      /*!< LOOP: no. of waits (O_NONBLOCK). */
       int nsent;                       /*!< no. messages sent. */
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/tmq.c
  ============================================================================
  $ cvs diff -u -r1.1.2.9 -r1.1.2.10 tmq.c
  --- rpm/rpmio/tmq.c   25 May 2017 20:02:13 -0000      1.1.2.9
  +++ rpm/rpmio/tmq.c   26 May 2017 13:22:18 -0000      1.1.2.10
  @@ -35,7 +35,16 @@
   # include <mqueue.h>
   #endif
   
  +#define _RPMIOB_INTERNAL
   #include <rpmio_internal.h>
  +
  +#ifdef       NOTYET
  +#define _RPMZQ_INTERNAL
  +#define _RPMZ_INTERNAL
  +#define _RPMZ_INTERNAL_PIGZ
  +#include <rpmz.h>
  +#endif
  +
   #include <rpmlog.h>
   #include <rpmmacro.h>
   #include <poptIO.h>
  @@ -338,6 +347,7 @@
       (void)blen;
   
   #define F_ISSET(_f, _FLAG) ((_f) & RPMMSQ_FLAGS_##_FLAG)
  +fprintf(stderr, "*** %s: flags 0x%x\n", __FUNCTION__, flags);
       char fmode[64];
       char * te = fmode;
       te = stpcpy(te, "w+");
  @@ -400,17 +410,22 @@
       FD_t fd = Fopen(qname, fmode);
       if (fd) {
   
  +#ifndef      DISABLE
  +     xx = Ftell(fd);
        strcpy(b, "foo bar baz");
        blen = strlen(b);
        xx = Fwrite(b, 1, blen, fd);
  +     xx = Ferror(fd);
  +     xx = Fflush(fd);
  +     xx = Ftell(fd);
  +
        memset(b, 0, blen);
        xx = Fread(b, 1, blen, fd);
  +     xx = Ftell(fd);
   
        xx = Fileno(fd);
   if (_rpmio_debug)
   fprintf(stderr, "<== Fileno(%p) rc %d\n", fd, xx);
  -     xx = Fflush(fd);
  -     xx = Ferror(fd);
   
        strcpy(b, "blah blah blah");
        blen = strlen(b);
  @@ -426,25 +441,23 @@
            xx = Fread(b, 1, blen, fd);
        }
   
  -#ifdef       DISABLE
   #ifdef       NOTYET
        xx = Feof(fd);
        xx = Clearerr(fd);
   #endif
  -     xx = Fseek(fd, 0, SEEK_CUR);
  -     xx = Ftell(fd);
        Rewind(fd);
        fpos_t pos = {};
        xx = Fgetpos(fd, &pos);
        xx = Fsetpos(fd, &pos);
   
  +#else
        FD_t nfd = Fdopen(fd, fmode);
        if (nfd) {
            strcpy(b, "foo bar baz");
            blen = strlen(b);
            xx = Fwrite(b, 1, blen, nfd);
            memset(b, 0, blen);
  -         xx = Fread(b, 1, nb, nfd);
  +         xx = Fread(b, 1, blen, nfd);
            xx = Fclose(nfd);
        }
   #endif       /* DISABLE */
  @@ -481,7 +494,8 @@
        N_("Display queue info on close"), NULL },
    { "delete", 'D', POPT_BIT_SET|POPT_ARGFLAG_TOGGLE, &flags, _MSQBIT(DELETE),
        N_("Remove queue after closing"), NULL },
  - { "loopback", 'l', POPT_BIT_SET|POPT_ARGFLAG_TOGGLE,&flags, _MSQBIT(LOOP),
  +     /* XXX -L no workie */
  + { "loopback", 'L', POPT_BIT_SET|POPT_ARGFLAG_TOGGLE,&flags, _MSQBIT(LOOP),
        N_("Read messages after sending"), NULL },
   
    { "count", 'c', POPT_ARG_INT,                           &count, 0,
  @@ .
______________________________________________________________________
RPM Package Manager                                    http://rpm5.org
CVS Sources Repository                                rpm-cvs@rpm5.org

Reply via email to