RPM Package Manager, CVS Repository
  http://rpm5.org/cvs/
  ____________________________________________________________________________

  Server: rpm5.org                         Name:   Jeff Johnson
  Root:   /v/rpm/cvs                       Email:  j...@rpm5.org
  Module: rpm                              Date:   29-May-2017 18:21:01
  Branch: rpm-5_4                          Handle: 2017052916210001

  Modified files:           (Branch: rpm-5_4)
    rpm/rpmio               librpmio.vers msqio.c rpmaio.c rpmaio.h rpmmsq.h
                            tmq.c

  Log:
    - msqio: sanity.

  Summary:
    Revision    Changes     Path
    2.199.2.82  +0  -1      rpm/rpmio/librpmio.vers
    1.1.2.20    +71 -31     rpm/rpmio/msqio.c
    1.1.2.3     +24 -25     rpm/rpmio/rpmaio.c
    1.1.2.2     +4  -2      rpm/rpmio/rpmaio.h
    1.1.2.17    +21 -31     rpm/rpmio/rpmmsq.h
    1.1.2.12    +10 -0      rpm/rpmio/tmq.c
  ____________________________________________________________________________

  patch -p0 <<'@@ .'
  Index: rpm/rpmio/librpmio.vers
  ============================================================================
  $ cvs diff -u -r2.199.2.81 -r2.199.2.82 librpmio.vers
  --- rpm/rpmio/librpmio.vers   26 May 2017 19:52:21 -0000      2.199.2.81
  +++ rpm/rpmio/librpmio.vers   29 May 2017 16:21:00 -0000      2.199.2.82
  @@ -736,7 +736,6 @@
       msqio;
       rpmmsqClose;
       rpmmsqCtl;
  -    rpmmsqDeliver;
       rpmmsqDump;
       rpmmsqFdopen;
       rpmmsqFlush;
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/msqio.c
  ============================================================================
  $ cvs diff -u -r1.1.2.19 -r1.1.2.20 msqio.c
  --- rpm/rpmio/msqio.c 29 May 2017 06:57:54 -0000      1.1.2.19
  +++ rpm/rpmio/msqio.c 29 May 2017 16:21:00 -0000      1.1.2.20
  @@ -331,7 +331,7 @@
   
   /* =============================================================== */
   static int _lockdebug = 0;
  -static int _conddebug = 0;
  +static int _conddebug = -1;
   #define PEEK(_bolt)  yarnPeekLock(_bolt)
   #define      POSSESS(_bolt) \
       {        yarnPossess(_bolt); \
  @@ -375,7 +375,7 @@
       RPM_GNUC_TM_ATOMIC {
        if ((item = aioq->head) != NULL) {
            aioq->head = item->next;
  -         item->next = NULL;          /* XXX rpmmalloc.c ? */
  +         item->next = NULL;
            if (aioq->head == NULL)
                aioq->tail = &aioq->head;
        }
  @@ -398,17 +398,32 @@
   
   static inline
   ssize_t aioqPutWait(rpmmsq msq, unsigned long *priop,
  -             int op, int prio, void *b, size_t nb)
  +             int op, int prio, void *b, size_t nb,
  +             struct sigevent *sigev, off_t offset)
   {
       AIOQ_t aioq = &msq->aioq;
       rpmzLog zlog = msq->zlog;
       ssize_t rc;
   
  -    rpmaio aio = rpmaioNew(msq->qid, op, prio, b, nb);
  +    rpmaio aio = rpmaioNew(msq->qid, op, prio, b, nb, sigev, offset);
       aio->ix = ++msq->nqueued;
  +    rpmioItem item = (rpmioItem) aio;;
   
  -    rpmioItem item = rpmioLinkPoolItem(&aio->_item,
  +    switch (op) {
  +    case LIO_WRITE:
  +     /* XXX Asynchronous writes? */
  +     if ((msq->oflags & O_NONBLOCK) && nb <= sizeof(aio->buf)) {
  +         memcpy(aio->buf, b, nb);
  +         b = aio->buf;
  +         break;
  +     }
  +     /*@fallthrough@*/
  +    default:
  +     item = rpmioLinkPoolItem(item,
                        __FUNCTION__, __FILE__, __LINE__);
  +     break;
  +    }
  +
   
       /* Producer monitor. */
       POSSESS(aioq->have);
  @@ -416,20 +431,26 @@
       aioqPutTail(aioq, item);
       TWIST(aioq->have, BY, 1);
   
  -    /* Wait for AIO completion. */
  -    POSSESS(item->use);
  -    WAITFOR(item->use, TO_BE, 1);
  -    RELEASE(item->use);
  -
  -    rc = aio_return(&aio->cb);
  -    if (rc >= 0 && priop)
  -     *priop = aio->cb.aio_reqprio;           /* XXX prio */
  -    if (rc < 0)
  -     errno = aio_error(&aio->cb);            /* XXX W2DO? */
  +    if (b != aio->buf) {
  +     /* Wait for AIO completion. */
  +     POSSESS(item->use);
  +     WAITFOR(item->use, TO_BE, 1);
  +     RELEASE(item->use);
  +
  +     rc = aio_return(&aio->aiocb);
  +     if (rc >= 0 && priop)
  +         *priop = aio->aiocb.aio_reqprio;    /* XXX prio */
  +     if (rc < 0)
  +         errno = aio_error(&aio->aiocb);     /* XXX W2DO? */
   
  -    aio = rpmaioFree(aio);
  +     aio = rpmaioFree(aio);
  +    } else {
  +     /* XXX Asynchronous writes. */
  +     rc = 0;
  +     errno = EINPROGRESS;                    /* XXX */
  +    }
   
  -SPEW("<--\t%s(%p, %p) rc %ld\n", __FUNCTION__, aioq, item, rc);
  +SPEW("<--\t%s(%p, %p) rc %ld %m\n", __FUNCTION__, aioq, item, rc);
       return rc;
   }
   
  @@ -548,6 +569,10 @@
        aioq->have = yarnFreeLock(aioq->have);
        msq->nqueued = 0;
   
  +     msq->st = _free(msq->st);
  +     msq->ds = _free(msq->ds);
  +     msq->mi = _free(msq->mi);
  +
        if (msq->zlog)
            msq->zlog = rpmzLogDump(msq->zlog, NULL);
       }
  @@ -665,6 +690,9 @@
       msq->nrecv = 0;
       msq->ntimeout = 0;
       msq->nagain = 0;
  +    msq->st = NULL;
  +    msq->ds = NULL;
  +    msq->mi = NULL;
       msq->zlog = rpmzLogNew(NULL);
   
   /* XXX TODO: fix rpmmsqFdopen() entry. */
  @@ -776,7 +804,7 @@
   #if defined(WITH_MQ)
        /* Queue the read. */
        rc = aioqPutWait(msq, priop,
  -                     LIO_READ, 0, buf, count);
  +                     LIO_READ, 0, buf, count, NULL, 0);
   #endif       /* WITH_MQ */
       }        break;
   
  @@ -829,7 +857,7 @@
        } else {
            /* Queue the write. */
            rc = aioqPutWait(msq, NULL,
  -                     LIO_WRITE, 0, (char *)buf, count);
  +                     LIO_WRITE, 0, (char *)buf, count, NULL, 0);
        }
        if (rc == 0)    /* XXX remap to write(2) return */
            rc = count;
  @@ -903,7 +931,7 @@
            } else {
                /* Queue the close. */
                rc = aioqPutWait(msq, NULL,
  -                     LIO_CLOSE, 0, NULL, 0);
  +                     LIO_CLOSE, 0, NULL, 0, NULL, 0);
            }
   
            /* Yield the CPU. */
  @@ -1103,21 +1131,21 @@
   
        /* Process the request. */
        rpmaio aio = (rpmaio) item;
  -     char *b = (char *)aio->cb.aio_buf;
  -     size_t nb = aio->cb.aio_nbytes;
  +     char *b = (char *)aio->aiocb.aio_buf;
  +     size_t nb = aio->aiocb.aio_nbytes;
   
        rc = -1;
  -     switch (aio->cb.aio_lio_opcode) {
  +     switch (aio->aiocb.aio_lio_opcode) {
        case LIO_READ:
        {   unsigned int prio = 0;
            rc = rpmmsqReaderRead(msq, b, nb, &prio);
            if (rc >= 0) {
                msq->nrecv++;
  -             aio->cb.aio_reqprio = prio;             /* XXX prio */
  +             aio->aiocb.aio_reqprio = prio;          /* XXX prio */
            }
        }   break;
        case LIO_WRITE:
  -     {   unsigned int prio = aio->cb.aio_reqprio;    /* XXX prio */
  +     {   unsigned int prio = aio->aiocb.aio_reqprio; /* XXX prio */
            rc = Mq_send(msq->qid, (const char *)b, nb, prio);
            if (rc == 0)
                msq->nsent++;
  @@ -1139,8 +1167,8 @@
            break;
        }
   
  -     aio->cb.__error_code = errno;
  -     aio->cb.__return_value = rc;
  +     aio->aiocb.__error_code = errno;
  +     aio->aiocb.__return_value = rc;
        aio = rpmaioFree(aio);
       } while (msq->qid != -1);
   
  @@ -1261,9 +1289,13 @@
   
        const char * lpath = rpmGetPath("/dev/mqueue/", msq->qname, NULL);
   
  -     rc = Stat(lpath, &msq->sb);
  +     if (msq->st)
  +         free(msq->st);
  +     msq->st = xmalloc(sizeof(struct stat));
  +
  +     rc = Stat(lpath, msq->st);
        if (!rc)
  -         rpmmsqDumpST(lpath, &msq->sb, NULL);
  +         rpmmsqDumpST(lpath, msq->st, NULL);
   
        /* XXX .fdio avoids select/poll issues on /dev/mqueue with .ufdio. */
        {   FD_t fd = Fopen(lpath, "r.fdio");
  @@ -1285,11 +1317,19 @@
       case RPMMSQ_TYPE_SYSV:
       {
   #if defined(WITH_MSQ)
  -     rc = rpmmsqCtl(msq, IPC_STAT, &msq->ds);
  +     if (msq->ds)
  +         free(msq->ds);
  +     msq->ds = xmalloc(sizeof(struct msqid_ds));
  +
  +     rc = rpmmsqCtl(msq, IPC_STAT, msq->ds);
        if (!rc)
  -         rpmmsqDumpDS(NULL, &msq->ds, NULL);
  +         rpmmsqDumpDS(NULL, msq->ds, NULL);
   
   #if defined(IPC_INFO)        /* XXX linux-only? */
  +     if (msq->mi)
  +         free(msq->mi);
  +     msq->mi = xmalloc(sizeof(struct msginfo));
  +
        rc = rpmmsqCtl(msq, IPC_INFO, (struct msqid_ds *)&msq->mi);
        if (!rc)
            rpmmsqDumpMI(NULL, &msq->mi, NULL);
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmaio.c
  ============================================================================
  $ cvs diff -u -r1.1.2.2 -r1.1.2.3 rpmaio.c
  --- rpm/rpmio/rpmaio.c        28 May 2017 20:18:16 -0000      1.1.2.2
  +++ rpm/rpmio/rpmaio.c        29 May 2017 16:21:00 -0000      1.1.2.3
  @@ -52,17 +52,17 @@
        be += sprintf(be, "========================== aio(%p) use %ld ix %d\n",
                aio, use, aio->ix);
   #define PRINT_AIO(_fmt, _foo) \
  -    {        be += sprintf(be, "%25s: %"#_fmt"\n", #_foo, aio->cb._foo); }
  +    {        be += sprintf(be, "%25s: %"#_fmt"\n", #_foo, aio->aiocb._foo); }
   #define PRINT_AIO_SIGEV(_fmt, _foo) \
  -    {        be += sprintf(be, "%25s: %"#_fmt"\n", #_foo, 
aio->cb.aio_sigevent._foo); }
  +    {        be += sprintf(be, "%25s: %"#_fmt"\n", #_foo, 
aio->aiocb.aio_sigevent._foo); }
        PRINT_AIO(d, aio_fildes);
        be += sprintf(be, "%25s: %s\n", "aio_lio_opcode",
  -             LIO_[aio->cb.aio_lio_opcode % (sizeof(LIO_)/sizeof(LIO_[0]))]);
  -     if (aio->cb.aio_reqprio)
  +             LIO_[aio->aiocb.aio_lio_opcode % 
(sizeof(LIO_)/sizeof(LIO_[0]))]);
  +     if (aio->aiocb.aio_reqprio)
            PRINT_AIO(d, aio_reqprio);
        PRINT_AIO(p, aio_buf);
        PRINT_AIO(zd, aio_nbytes);
  -     switch (aio->cb.aio_sigevent.sigev_notify) {
  +     switch (aio->aiocb.aio_sigevent.sigev_notify) {
        default:
            break;
        case SIGEV_THREAD:
  @@ -87,11 +87,7 @@
       rpmaio aio = (rpmaio) _aio;
   
       if (aio) {
  -     aio->cb.aio_fildes = 0;
  -     aio->cb.aio_lio_opcode = 0;
  -     aio->cb.aio_reqprio = 0;
  -     aio->cb.aio_buf = NULL;
  -     aio->cb.aio_nbytes = 0;
  +     memset(&aio->aiocb, 0, sizeof(aio->aiocb));
        aio->ix = -1;
       }
   }
  @@ -101,30 +97,33 @@
       rpmaio aio = (rpmaio) _aio;
   
       if (aio) {
  -     aio->cb.aio_fildes = 0;
  -     aio->cb.aio_lio_opcode = 0;
  -     aio->cb.aio_reqprio = 0;
  -     aio->cb.aio_buf = NULL;
  -     aio->cb.aio_nbytes = 0;
  +     memset(&aio->aiocb, 0, sizeof(aio->aiocb));
        aio->ix = -1;
       }
   }
   
   RPMIOPOOL_MODULE(aio)
   
  -rpmaio rpmaioNew(int fdno, int op, int prio, void *b, size_t nb)
  +rpmaio rpmaioNew(int fdno, int op, int prio, void *b, size_t nb,
  +             void * _sigev, off_t offset)
   {
       rpmaio aio = rpmaioGetPool(_rpmaioPool);
  +    struct aiocb * aiocb = &aio->aiocb;
  +    aiocb->aio_fildes = fdno;
  +    aiocb->aio_lio_opcode = op;
  +    aiocb->aio_reqprio = prio;
  +    aiocb->aio_buf = b;
  +    aiocb->aio_nbytes = nb;
  +    if (_sigev)
  +     aiocb->aio_sigevent = *(struct sigevent *) _sigev;
  +
  +    aiocb->__next_prio = NULL;
  +    aiocb->__abs_prio = 0;
  +    aiocb->__policy = 0;
  +    aiocb->__error_code = EINPROGRESS;       /* XXX */
  +    aiocb->__return_value = 0;
   
  -    aio->_item.next = NULL;          /* XXX rpmmalloc.c ? */
  -    aio->cb.aio_fildes = fdno;
  -    aio->cb.aio_lio_opcode = op;
  -    aio->cb.aio_reqprio = prio;
  -    aio->cb.aio_buf = b;
  -    aio->cb.aio_nbytes = nb;
  -
  -    aio->cb.__error_code = EINPROGRESS;      /* XXX */
  -    aio->cb.__return_value = 0;              /* XXX */
  +    aiocb->aio_offset = offset;
   
       aio->ix = -1;
   
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmaio.h
  ============================================================================
  $ cvs diff -u -r1.1.2.1 -r1.1.2.2 rpmaio.h
  --- rpm/rpmio/rpmaio.h        28 May 2017 18:39:22 -0000      1.1.2.1
  +++ rpm/rpmio/rpmaio.h        29 May 2017 16:21:01 -0000      1.1.2.2
  @@ -18,8 +18,9 @@
   
   struct rpmaio_s {
       struct rpmioItem_s _item;        /*!< usage mutex and pool identifier. */
  -    struct aiocb cb;
  +    struct aiocb aiocb;
       int ix;
  +    char buf[BUFSIZ];
   };
   #endif
   
  @@ -58,7 +59,8 @@
    * Create an aio wrapper.
    * @return           aio wrapper
    */
  -rpmaio rpmaioNew(int fdno, int op, int prio, void *b, size_t nb);
  +rpmaio rpmaioNew(int fdno, int op, int prio, void *b, size_t nb,
  +             void * _sigev, off_t offset);
   
   #ifdef __cplusplus
   }
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmsq.h
  ============================================================================
  $ cvs diff -u -r1.1.2.16 -r1.1.2.17 rpmmsq.h
  --- rpm/rpmio/rpmmsq.h        29 May 2017 06:57:54 -0000      1.1.2.16
  +++ rpm/rpmio/rpmmsq.h        29 May 2017 16:21:01 -0000      1.1.2.17
  @@ -34,6 +34,10 @@
   #undef  _MFB
   #undef  _KFB
   
  +#ifdef __cplusplus
  +extern "C" {
  +#endif
  +
   /**
    */
   #if defined(_RPMMSQ_INTERNAL)
  @@ -60,23 +64,26 @@
       int ntimeout;            /*!< no. of receive timeouts. */
       int nagain;                      /*!< no. of waits (O_NONBLOCK). */
   
  -    struct mq_attr oattrs;   /*!< mq_open attributes. */
  -    pthread_attr_t attr;     /*!< mq_notify thread attr */
  -    struct sigevent sigev;   /*!< mq_notify sigevent_t */
  -
  -    struct stat sb;
  -#if defined(WITH_MSQ)
  -    struct msqid_ds ds;
  -    struct msginfo mi;
  -#endif       /* WITH_MSQ */
  +    struct mq_attr oattrs;   /*!< POSIX: mq_open attributes. */
  +    pthread_attr_t attr;     /*!< POSIX: mq_notify thread attr */
  +    struct sigevent sigev;   /*!< POSIX: mq_notify sigevent_t */
  +
  +    struct stat *st;         /*!< POSIX: /dev/mqueue/qname */
  +    void *ds;                        /*!< SysV: struct msqid_ds */
  +    void *mi;                        /*!< SysV: struct msginfo */
   
  -    rpmzLog zlog;
  +    rpmzLog zlog;            /*!< high precision timestamp'd log */
   };
  -#endif       /* _RPMMSQ_INTERNAL */
   
  -#ifdef __cplusplus
  -extern "C" {
  -#endif
  +rpmmsq rpmmsqOpen(const char * path, const char * fmode);
  +rpmmsq rpmmsqFdopen(void * _fdno, const char * fmode);
  +int rpmmsqFlush(void * _msq);
  +int rpmmsqCtl(void * _msq, int cmd, void *buf);
  +int rpmmsqNotify(rpmmsq msq, const void *_sevp);
  +void rpmmsqReader(union sigval sv);
  +int rpmmsqDump(const char *msg, void * _msq, FILE *fp);
  +
  +#endif       /* _RPMMSQ_INTERNAL */
   
   /**
    * Unreference a msq wrapper instance.
  @@ -117,23 +124,6 @@
   
   int rpmmsqClose(rpmmsq msq, int delete);
   
  -rpmmsq rpmmsqOpen(const char * path, const char * fmode);
  -rpmmsq rpmmsqFdopen(void * _fdno, const char * fmode);
  -int rpmmsqFlush(void * _msq);
  -
  -int rpmmsqCtl(void * _msq, int cmd, void *buf);
  -
  -int rpmmsqNotify(rpmmsq msq, const void *_sevp);
  -
  -int rpmmsqDeliver(rpmmsq msq, char *b, size_t nb, unsigned long prio);
  -
  -void rpmmsqReader(union sigval sv);
  -#define      RPMMSQ_PRIO_EXIT        0x7fff
  -
  -/**
  - */
  -int rpmmsqDump(const char *msg, void * _msq, FILE *fp);
  -
   #ifdef __cplusplus
   }
   #endif
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/tmq.c
  ============================================================================
  $ cvs diff -u -r1.1.2.11 -r1.1.2.12 tmq.c
  --- rpm/rpmio/tmq.c   28 May 2017 18:01:53 -0000      1.1.2.11
  +++ rpm/rpmio/tmq.c   29 May 2017 16:21:01 -0000      1.1.2.12
  @@ -458,6 +458,16 @@
            assert(nr == nw && !strncmp(rb, wb, blen));
        }
   
  +     for (int i = 0; i < 8; i++) {
  +         blen = snprintf(wb, nwb, "%d", i);
  +         nw = Fwrite(wb, 1, blen, fd);
  +     }
  +
  +     for (int i = 0; i < 8; i++) {
  +         memset(rb, 0, blen);
  +         nr = Fread(rb, 1, blen, fd);
  +     }
  +
   #ifdef       DISABLE
   #ifdef       NOTYET
        xx = Feof(fd);
  @@ .
______________________________________________________________________
RPM Package Manager                                    http://rpm5.org
CVS Sources Repository                                rpm-cvs@rpm5.org

Reply via email to