RPM Package Manager, CVS Repository
  http://rpm5.org/cvs/
  ____________________________________________________________________________

  Server: rpm5.org                         Name:   Jeff Johnson
  Root:   /v/rpm/cvs                       Email:  j...@rpm5.org
  Module: rpm                              Date:   29-May-2017 08:57:54
  Branch: rpm-5_4                          Handle: 2017052906575400

  Modified files:           (Branch: rpm-5_4)
    rpm/rpmio               msqio.c rpmmsq.h

  Log:
    - msqio: haul out the trash.

  Summary:
    Revision    Changes     Path
    1.1.2.19    +106 -126   rpm/rpmio/msqio.c
    1.1.2.16    +0  -2      rpm/rpmio/rpmmsq.h
  ____________________________________________________________________________

  patch -p0 <<'@@ .'
  Index: rpm/rpmio/msqio.c
  ============================================================================
  $ cvs diff -u -r1.1.2.18 -r1.1.2.19 msqio.c
  --- rpm/rpmio/msqio.c 28 May 2017 22:14:55 -0000      1.1.2.18
  +++ rpm/rpmio/msqio.c 29 May 2017 06:57:54 -0000      1.1.2.19
  @@ -32,6 +32,8 @@
   
   typedef      struct AIOQ_s * AIOQ_t;
   struct AIOQ_s {
  +    yarnLock have;
  +    void * pool;
       rpmioItem head;             /*!< AIO cb queue head. */
       rpmioItem * tail;           /*!< AIO cb queue tail. */
   };
  @@ -274,7 +276,7 @@
       if (rc == 0) {
        /* Ensure printable. */
        for (nc = 0; nc < ncmax; nc++) {
  -         if (nc < rc && isprint(buf[nc]))
  +         if (nc < (int)msgsz && isprint(buf[nc]))
                continue;
            break;
        }
  @@ -315,7 +317,6 @@
        PRINT(d, msgsize);
   #endif       /* WITH_MSQ */
        if (stats) {
  -         PRINT(d, inflight);
            PRINT(d, nsent);
            PRINT(d, nrecv);
            PRINT(d, ntimeout);
  @@ -324,14 +325,13 @@
        PRINT(p, aioq.head);
        PRINT(p, aioq.tail);
        PRINT(d, nqueued);
  -     PRINT(d, ndelay);
   #undef       PRINT
       }
   }
   
   /* =============================================================== */
   static int _lockdebug = 0;
  -static int _conddebug = -1;
  +static int _conddebug = 0;
   #define PEEK(_bolt)  yarnPeekLock(_bolt)
   #define      POSSESS(_bolt) \
       {        yarnPossess(_bolt); \
  @@ -345,7 +345,7 @@
       }
   #define      TWIST(_bolt, _op, _val) \
       {        if (zlog && _conddebug) \
  -         rpmzlogAdd(zlog, "***\t    TWIST(%p, %d, %d)\t%ld", _bolt, _op, 
_val, PEEK(_bolt)); \
  +         rpmzLogAdd(zlog, "***\t    TWIST(%p, %d, %d)\t%ld", _bolt, _op, 
_val, PEEK(_bolt)); \
        yarnTwist(_bolt, _op, _val); \
       }
   #define      WAITFOR(_bolt, _op, _val) \
  @@ -359,6 +359,8 @@
   {
       rpmmsq msq = NULL;
       RPM_GNUC_TM_ATOMIC {
  +     aioq->have = yarnNewLock(0);
  +     aioq->pool = NULL;
        aioq->head = NULL;
        aioq->tail = &aioq->head;
       }
  @@ -379,7 +381,7 @@
        }
       }
   if (item)
  -SPEW("<--\t%s(%p) item %p\n", __FUNCTION__, aioq, item);
  +SPEW("<--\t%s(%p) %p\n", __FUNCTION__, aioq, item);
       return item;
   }
   
  @@ -391,29 +393,43 @@
        *aioq->tail = item;
        aioq->tail = (rpmioItem *) &item->next;
       }
  -SPEW("<--\t%s(%p,%p)\n", __FUNCTION__, aioq, item);
  +SPEW("<--\t%s(%p, %p)\n", __FUNCTION__, aioq, item);
   }
   
   static inline
  -ssize_t aioqPutWait(AIOQ_t aioq, rpmioItem item, rpmzLog zlog,
  -             unsigned long *priop)
  +ssize_t aioqPutWait(rpmmsq msq, unsigned long *priop,
  +             int op, int prio, void *b, size_t nb)
   {
  -    rpmmsq msq = NULL;
  +    AIOQ_t aioq = &msq->aioq;
  +    rpmzLog zlog = msq->zlog;
       ssize_t rc;
   
  -    POSSESS(item->use);
  +    rpmaio aio = rpmaioNew(msq->qid, op, prio, b, nb);
  +    aio->ix = ++msq->nqueued;
  +
  +    rpmioItem item = rpmioLinkPoolItem(&aio->_item,
  +                     __FUNCTION__, __FILE__, __LINE__);
  +
  +    /* Producer monitor. */
  +    POSSESS(aioq->have);
  +    WAITFOR(aioq->have, NOT_TO_BE, msq->msgmax);
       aioqPutTail(aioq, item);
  +    TWIST(aioq->have, BY, 1);
  +
  +    /* Wait for AIO completion. */
  +    POSSESS(item->use);
       WAITFOR(item->use, TO_BE, 1);
       RELEASE(item->use);
   
  -    rpmaio aio = (rpmaio) item;
       rc = aio_return(&aio->cb);
       if (rc >= 0 && priop)
        *priop = aio->cb.aio_reqprio;           /* XXX prio */
       if (rc < 0)
        errno = aio_error(&aio->cb);            /* XXX W2DO? */
   
  -SPEW("<--\t%s(%p,%p) rc %ld\n", __FUNCTION__, aioq, item, rc);
  +    aio = rpmaioFree(aio);
  +
  +SPEW("<--\t%s(%p, %p) rc %ld\n", __FUNCTION__, aioq, item, rc);
       return rc;
   }
   
  @@ -451,7 +467,6 @@
        PRINT(ld, mtype);
        PRINT(d, msgmax);
        PRINT(d, msgsize);
  -     PRINT(d, inflight);
        PRINT(d, nsent);
        PRINT(d, nrecv);
        PRINT(d, ntimeout);
  @@ -474,7 +489,6 @@
        PRINT(p, aioq.head);
        PRINT(p, aioq.tail);
        PRINT(d, nqueued);
  -     PRINT(d, ndelay);
   
   #undef       PRINT_SIGEV
   #undef       PRINT_ATTRS
  @@ -495,7 +509,7 @@
       rpmmsq msq = (rpmmsq) _msq;
   
       if (msq) {
  -SPEW("%s: inflight %d queued %d sent %d recv %d timeout %d again %d delay 
%d\n", __FUNCTION__, msq->inflight, msq->nqueued, msq->nsent, msq->nrecv, 
msq->ntimeout, msq->nagain, msq->ndelay);
  +SPEW("%s: queued %d sent %d recv %d timeout %d again %d\n", __FUNCTION__, 
msq->nqueued, msq->nsent, msq->nrecv, msq->ntimeout, msq->nagain);
        msq->msgmax = MSQ_MSGMAX;
        msq->msgsize = MSQ_MSGSIZE;
   
  @@ -514,18 +528,25 @@
        msq->key = 0;
        msq->mtype = 0;
   
  +     AIOQ_t aioq = &msq->aioq;
  +     rpmzLog zlog = msq->zlog;
  +     rpmioItem item;
  +
        /* Drain the queue. */
  +     POSSESS(aioq->have);                    /* XXX enter */
        while (1) {
  -         rpmioItem item;
  -         if ((item = aioqGetHead(&msq->aioq)) == NULL)
  +         if (PEEK(aioq->have) == 0)
                break;
  -         while (item)
  +         item = aioqGetHead(&msq->aioq);
  +         TWIST(aioq->have, BY, -1);          /* XXX exit */
  +         while (item)                        /* XXX deadlock w msq ref */
                item = rpmioFreePoolItem(item,
                        __FUNCTION__, __FILE__, __LINE__);
  +         POSSESS(aioq->have);                /* XXX re-enter */
        }
  +     RELEASE(aioq->have);                    /* XXX exit */
  +     aioq->have = yarnFreeLock(aioq->have);
        msq->nqueued = 0;
  -     msq->inflight = 0;
  -     msq->ndelay = 0;
   
        if (msq->zlog)
            msq->zlog = rpmzLogDump(msq->zlog, NULL);
  @@ -640,8 +661,6 @@
       /* XXX TODO: use POSIX shared mutexes? */
       aioqInit(&msq->aioq);
       msq->nqueued = 0;
  -    msq->inflight = 0;
  -    msq->ndelay = 0;
       msq->nsent = 0;
       msq->nrecv = 0;
       msq->ntimeout = 0;
  @@ -755,25 +774,9 @@
       case RPMMSQ_TYPE_POSIX:
       {
   #if defined(WITH_MQ)
  -     unsigned int prio = 0;
  -
  -     rpmaio aio = rpmaioNew(msq->qid, LIO_READ, 0, buf, count);
  -     aio->ix = ++msq->nqueued;
  -
  -     rpmioItem item = rpmioLinkPoolItem(&aio->_item,
  -                     __FUNCTION__, __FILE__, __LINE__);
  -
        /* Queue the read. */
  -     rc = aioqPutWait(&msq->aioq, item, msq->zlog, priop);
  -#ifdef       DYING
  -     if (rc >= 0)
  -         prio = aio->cb.aio_reqprio;         /* XXX prio */
  -#endif
  -
  -     aio = rpmaioFree(aio);
  -
  -     if (rc >= 0 && priop)
  -         *priop = prio;
  +     rc = aioqPutWait(msq, priop,
  +                     LIO_READ, 0, buf, count);
   #endif       /* WITH_MQ */
       }        break;
   
  @@ -813,32 +816,21 @@
       case RPMMSQ_TYPE_POSIX:
       {
   #if defined(WITH_MQ)
  -
  +     /* Send the first message to trigger the reader thread startup. */
        if (msq->nsent == 0) {
            rc = Mq_send(msq->qid, buf, count, prio);
            if (rc == 0)
                msq->nsent++;
  -         msq->inflight++;
   
            /* Yield the CPU. */
            sched_yield();
            struct timespec ts = { 0, 100*1000 };
            Z(nanosleep(&ts, NULL));
  -
        } else {
  -         rpmaio aio = rpmaioNew(msq->qid, LIO_WRITE, prio, (char *)buf, 
count);
  -         aio->ix = ++msq->nqueued;
  -         aio->cb.aio_reqprio = prio; /* XXX prio */
  -
  -         rpmioItem item = rpmioLinkPoolItem(&aio->_item,
  -                     __FUNCTION__, __FILE__, __LINE__);
  -
            /* Queue the write. */
  -         rc = aioqPutWait(&msq->aioq, item, msq->zlog, NULL);
  -
  -         aio = rpmaioFree(aio);
  +         rc = aioqPutWait(msq, NULL,
  +                     LIO_WRITE, 0, (char *)buf, count);
        }
  -
        if (rc == 0)    /* XXX remap to write(2) return */
            rc = count;
   #endif       /* WITH_MQ */
  @@ -854,9 +846,8 @@
        if (count)
            memcpy(msgp->mtext, buf, count);
        rc = Msgsnd(msq->qid, msgp, msgsz, msgflg);
  -     if (rc == 0) {
  +     if (rc == 0)
            msq->nsent++;
  -     }
   
        msgp = _free(msgp);
        if (rc == 0)    /* XXX remap to write(2) return */
  @@ -902,32 +893,23 @@
            if (MSQF_ISSET(INFO) || _rpmmsq_debug)
                rpmmsqDump(NULL, msq, NULL);
   
  +         /* Reader thread never started: disable notify and close. */
            if (msq->nsent == 0)  {
                rc = rpmmsqNotify(msq, NULL);
                rc = Mq_close(msq->qid);
                if (rc == 0)
                    msq->qid = -1;
                break;
  +         } else {
  +             /* Queue the close. */
  +             rc = aioqPutWait(msq, NULL,
  +                     LIO_CLOSE, 0, NULL, 0);
            }
   
  -         unsigned prio = 0;
  -         rpmaio aio = rpmaioNew(msq->qid, LIO_CLOSE, prio, NULL, 0);
  -         aio->ix = ++msq->nqueued;
  -         aio->cb.aio_reqprio = prio; /* XXX prio */
  -
  -         rpmioItem item = rpmioLinkPoolItem(&aio->_item,
  -                     __FUNCTION__, __FILE__, __LINE__);
  -
  -         /* Queue the write. */
  -         rc = aioqPutWait(&msq->aioq, item, msq->zlog, NULL);
  -
            /* Yield the CPU. */
            sched_yield();
  -         struct timespec ts = { 0, 100*1000 };
  +         struct timespec ts = { 0, 10*1000*1000 };
            Z(nanosleep(&ts, NULL));
  -
  -         aio = rpmaioFree(aio);
  -
        }
   
        if (!rc && (delete || MSQF_ISSET(DELETE)))
  @@ -943,6 +925,7 @@
                rpmmsqDump(__FUNCTION__, msq, NULL);
        }
        msq->qid = -1;
  +     rc = 0;                 /* XXX */
        if (delete || MSQF_ISSET(DELETE))
            rc = Msgctl(msq->qid, IPC_RMID, NULL);
   #endif       /* WITH_MSQ */
  @@ -1105,64 +1088,61 @@
   SPEW("==> %s(%p) qid %d\n", __FUNCTION__, msq, msq->qid);
       ssize_t rc;
   
  -    while (msq->qid != -1) {
  -
  -     while (1) {
  -         rpmioItem item;
  -
  -         /* Get next request from queue. */
  -         if ((item = aioqGetHead(&msq->aioq)) == NULL) {
  -             msq->ndelay++;
  -             /* Yield the CPU. */
  -             sched_yield();
  -             struct timespec ts = { 0, 100*1000 };
  -             Z(nanosleep(&ts, NULL));
  -             break;
  +    if (msq->qid != -1)
  +    do {
  +     AIOQ_t aioq = &msq->aioq;
  +     rpmzLog zlog = msq->zlog;
  +     rpmioItem item;
  +
  +     /* Consumer monitor. */
  +     POSSESS(aioq->have);
  +     WAITFOR(aioq->have, NOT_TO_BE, 0);
  +     item = aioqGetHead(&msq->aioq);
  +     TWIST(aioq->have, BY, -1);
  +assert(item);
  +
  +     /* Process the request. */
  +     rpmaio aio = (rpmaio) item;
  +     char *b = (char *)aio->cb.aio_buf;
  +     size_t nb = aio->cb.aio_nbytes;
  +
  +     rc = -1;
  +     switch (aio->cb.aio_lio_opcode) {
  +     case LIO_READ:
  +     {   unsigned int prio = 0;
  +         rc = rpmmsqReaderRead(msq, b, nb, &prio);
  +         if (rc >= 0) {
  +             msq->nrecv++;
  +             aio->cb.aio_reqprio = prio;             /* XXX prio */
            }
  -
  -         rpmaio aio = (rpmaio) item;
  -         char *b = (char *)aio->cb.aio_buf;
  -         size_t nb = aio->cb.aio_nbytes;
  -
  -         rc = -1;
  -         switch (aio->cb.aio_lio_opcode) {
  -         case LIO_READ:
  -         {   unsigned int prio = 0;
  -             rc = rpmmsqReaderRead(msq, b, nb, &prio);
  -             if (rc >= 0) {
  -                 msq->nrecv++;
  -                 aio->cb.aio_reqprio = prio;         /* XXX prio */
  -             }
  -             if (msq->inflight > 0)
  -                 msq->inflight--;
  -         }   break;
  -         case LIO_WRITE:
  -         {   unsigned int prio = aio->cb.aio_reqprio;        /* XXX prio */
  -             rc = Mq_send(msq->qid, (const char *)b, nb, prio);
  +     }   break;
  +     case LIO_WRITE:
  +     {   unsigned int prio = aio->cb.aio_reqprio;    /* XXX prio */
  +         rc = Mq_send(msq->qid, (const char *)b, nb, prio);
  +         if (rc == 0)
  +             msq->nsent++;
  +     }   break;
  +     case LIO_CLOSE:
  +     {
  +         if (b == NULL && nb == 0) {
  +             rc = Mq_close(msq->qid);
                if (rc == 0)
  -                 msq->nsent++;
  -         }   break;
  -         case LIO_CLOSE:
  -         {
  -             if (b == NULL && nb == 0) {
  -                 rc = Mq_close(msq->qid);
  -                 if (rc == 0)
  -                     msq->qid = -1;
  -             }
  -         }   break;
  -         case LIO_DSYNC:
  -         case LIO_SYNC:
  -         case LIO_READ64:
  -         case LIO_WRITE64:
  -         default:
  -             break;
  +                 msq->qid = -1;
            }
  -
  -         aio->cb.__error_code = errno;               /* XXX */
  -         aio->cb.__return_value = rc;                /* XXX */
  -         aio = rpmaioFree(aio);
  +     }       break;
  +     case LIO_NOP:
  +     case LIO_DSYNC:
  +     case LIO_SYNC:
  +     case LIO_READ64:
  +     case LIO_WRITE64:
  +     default:
  +         break;
        }
  -    }
  +
  +     aio->cb.__error_code = errno;
  +     aio->cb.__return_value = rc;
  +     aio = rpmaioFree(aio);
  +    } while (msq->qid != -1);
   
   SPEW("<== %s(%p) qid %d rc %ld\n", __FUNCTION__, msq, msq->qid, rc);
   
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmsq.h
  ============================================================================
  $ cvs diff -u -r1.1.2.15 -r1.1.2.16 rpmmsq.h
  --- rpm/rpmio/rpmmsq.h        28 May 2017 22:14:55 -0000      1.1.2.15
  +++ rpm/rpmio/rpmmsq.h        29 May 2017 06:57:54 -0000      1.1.2.16
  @@ -54,8 +54,6 @@
       int msgmax;                      /*!< max. inflight messages. */
       int msgsize;             /*!< max. message size. */
   
  -    int inflight;
  -    int ndelay;
       int nqueued;             /*!< no. messages queued. */
       int nsent;                       /*!< no. messages sent. */
       int nrecv;                       /*!< no. messages received. */
  @@ .
______________________________________________________________________
RPM Package Manager                                    http://rpm5.org
CVS Sources Repository                                rpm-cvs@rpm5.org

Reply via email to