RPM Package Manager, CVS Repository http://rpm5.org/cvs/ ____________________________________________________________________________
Server: rpm5.org Name: Jeff Johnson Root: /v/rpm/cvs Email: j...@rpm5.org Module: rpm Date: 29-May-2017 18:21:01 Branch: rpm-5_4 Handle: 2017052916210001 Modified files: (Branch: rpm-5_4) rpm/rpmio librpmio.vers msqio.c rpmaio.c rpmaio.h rpmmsq.h tmq.c Log: - msqio: sanity. Summary: Revision Changes Path 2.199.2.82 +0 -1 rpm/rpmio/librpmio.vers 1.1.2.20 +71 -31 rpm/rpmio/msqio.c 1.1.2.3 +24 -25 rpm/rpmio/rpmaio.c 1.1.2.2 +4 -2 rpm/rpmio/rpmaio.h 1.1.2.17 +21 -31 rpm/rpmio/rpmmsq.h 1.1.2.12 +10 -0 rpm/rpmio/tmq.c ____________________________________________________________________________ patch -p0 <<'@@ .' Index: rpm/rpmio/librpmio.vers ============================================================================ $ cvs diff -u -r2.199.2.81 -r2.199.2.82 librpmio.vers --- rpm/rpmio/librpmio.vers 26 May 2017 19:52:21 -0000 2.199.2.81 +++ rpm/rpmio/librpmio.vers 29 May 2017 16:21:00 -0000 2.199.2.82 @@ -736,7 +736,6 @@ msqio; rpmmsqClose; rpmmsqCtl; - rpmmsqDeliver; rpmmsqDump; rpmmsqFdopen; rpmmsqFlush; @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/msqio.c ============================================================================ $ cvs diff -u -r1.1.2.19 -r1.1.2.20 msqio.c --- rpm/rpmio/msqio.c 29 May 2017 06:57:54 -0000 1.1.2.19 +++ rpm/rpmio/msqio.c 29 May 2017 16:21:00 -0000 1.1.2.20 @@ -331,7 +331,7 @@ /* =============================================================== */ static int _lockdebug = 0; -static int _conddebug = 0; +static int _conddebug = -1; #define PEEK(_bolt) yarnPeekLock(_bolt) #define POSSESS(_bolt) \ { yarnPossess(_bolt); \ @@ -375,7 +375,7 @@ RPM_GNUC_TM_ATOMIC { if ((item = aioq->head) != NULL) { aioq->head = item->next; - item->next = NULL; /* XXX rpmmalloc.c ? */ + item->next = NULL; if (aioq->head == NULL) aioq->tail = &aioq->head; } @@ -398,17 +398,32 @@ static inline ssize_t aioqPutWait(rpmmsq msq, unsigned long *priop, - int op, int prio, void *b, size_t nb) + int op, int prio, void *b, size_t nb, + struct sigevent *sigev, off_t offset) { AIOQ_t aioq = &msq->aioq; rpmzLog zlog = msq->zlog; ssize_t rc; - rpmaio aio = rpmaioNew(msq->qid, op, prio, b, nb); + rpmaio aio = rpmaioNew(msq->qid, op, prio, b, nb, sigev, offset); aio->ix = ++msq->nqueued; + rpmioItem item = (rpmioItem) aio;; - rpmioItem item = rpmioLinkPoolItem(&aio->_item, + switch (op) { + case LIO_WRITE: + /* XXX Asynchronous writes? */ + if ((msq->oflags & O_NONBLOCK) && nb <= sizeof(aio->buf)) { + memcpy(aio->buf, b, nb); + b = aio->buf; + break; + } + /*@fallthrough@*/ + default: + item = rpmioLinkPoolItem(item, __FUNCTION__, __FILE__, __LINE__); + break; + } + /* Producer monitor. */ POSSESS(aioq->have); @@ -416,20 +431,26 @@ aioqPutTail(aioq, item); TWIST(aioq->have, BY, 1); - /* Wait for AIO completion. */ - POSSESS(item->use); - WAITFOR(item->use, TO_BE, 1); - RELEASE(item->use); - - rc = aio_return(&aio->cb); - if (rc >= 0 && priop) - *priop = aio->cb.aio_reqprio; /* XXX prio */ - if (rc < 0) - errno = aio_error(&aio->cb); /* XXX W2DO? */ + if (b != aio->buf) { + /* Wait for AIO completion. */ + POSSESS(item->use); + WAITFOR(item->use, TO_BE, 1); + RELEASE(item->use); + + rc = aio_return(&aio->aiocb); + if (rc >= 0 && priop) + *priop = aio->aiocb.aio_reqprio; /* XXX prio */ + if (rc < 0) + errno = aio_error(&aio->aiocb); /* XXX W2DO? */ - aio = rpmaioFree(aio); + aio = rpmaioFree(aio); + } else { + /* XXX Asynchronous writes. */ + rc = 0; + errno = EINPROGRESS; /* XXX */ + } -SPEW("<--\t%s(%p, %p) rc %ld\n", __FUNCTION__, aioq, item, rc); +SPEW("<--\t%s(%p, %p) rc %ld %m\n", __FUNCTION__, aioq, item, rc); return rc; } @@ -548,6 +569,10 @@ aioq->have = yarnFreeLock(aioq->have); msq->nqueued = 0; + msq->st = _free(msq->st); + msq->ds = _free(msq->ds); + msq->mi = _free(msq->mi); + if (msq->zlog) msq->zlog = rpmzLogDump(msq->zlog, NULL); } @@ -665,6 +690,9 @@ msq->nrecv = 0; msq->ntimeout = 0; msq->nagain = 0; + msq->st = NULL; + msq->ds = NULL; + msq->mi = NULL; msq->zlog = rpmzLogNew(NULL); /* XXX TODO: fix rpmmsqFdopen() entry. */ @@ -776,7 +804,7 @@ #if defined(WITH_MQ) /* Queue the read. */ rc = aioqPutWait(msq, priop, - LIO_READ, 0, buf, count); + LIO_READ, 0, buf, count, NULL, 0); #endif /* WITH_MQ */ } break; @@ -829,7 +857,7 @@ } else { /* Queue the write. */ rc = aioqPutWait(msq, NULL, - LIO_WRITE, 0, (char *)buf, count); + LIO_WRITE, 0, (char *)buf, count, NULL, 0); } if (rc == 0) /* XXX remap to write(2) return */ rc = count; @@ -903,7 +931,7 @@ } else { /* Queue the close. */ rc = aioqPutWait(msq, NULL, - LIO_CLOSE, 0, NULL, 0); + LIO_CLOSE, 0, NULL, 0, NULL, 0); } /* Yield the CPU. */ @@ -1103,21 +1131,21 @@ /* Process the request. */ rpmaio aio = (rpmaio) item; - char *b = (char *)aio->cb.aio_buf; - size_t nb = aio->cb.aio_nbytes; + char *b = (char *)aio->aiocb.aio_buf; + size_t nb = aio->aiocb.aio_nbytes; rc = -1; - switch (aio->cb.aio_lio_opcode) { + switch (aio->aiocb.aio_lio_opcode) { case LIO_READ: { unsigned int prio = 0; rc = rpmmsqReaderRead(msq, b, nb, &prio); if (rc >= 0) { msq->nrecv++; - aio->cb.aio_reqprio = prio; /* XXX prio */ + aio->aiocb.aio_reqprio = prio; /* XXX prio */ } } break; case LIO_WRITE: - { unsigned int prio = aio->cb.aio_reqprio; /* XXX prio */ + { unsigned int prio = aio->aiocb.aio_reqprio; /* XXX prio */ rc = Mq_send(msq->qid, (const char *)b, nb, prio); if (rc == 0) msq->nsent++; @@ -1139,8 +1167,8 @@ break; } - aio->cb.__error_code = errno; - aio->cb.__return_value = rc; + aio->aiocb.__error_code = errno; + aio->aiocb.__return_value = rc; aio = rpmaioFree(aio); } while (msq->qid != -1); @@ -1261,9 +1289,13 @@ const char * lpath = rpmGetPath("/dev/mqueue/", msq->qname, NULL); - rc = Stat(lpath, &msq->sb); + if (msq->st) + free(msq->st); + msq->st = xmalloc(sizeof(struct stat)); + + rc = Stat(lpath, msq->st); if (!rc) - rpmmsqDumpST(lpath, &msq->sb, NULL); + rpmmsqDumpST(lpath, msq->st, NULL); /* XXX .fdio avoids select/poll issues on /dev/mqueue with .ufdio. */ { FD_t fd = Fopen(lpath, "r.fdio"); @@ -1285,11 +1317,19 @@ case RPMMSQ_TYPE_SYSV: { #if defined(WITH_MSQ) - rc = rpmmsqCtl(msq, IPC_STAT, &msq->ds); + if (msq->ds) + free(msq->ds); + msq->ds = xmalloc(sizeof(struct msqid_ds)); + + rc = rpmmsqCtl(msq, IPC_STAT, msq->ds); if (!rc) - rpmmsqDumpDS(NULL, &msq->ds, NULL); + rpmmsqDumpDS(NULL, msq->ds, NULL); #if defined(IPC_INFO) /* XXX linux-only? */ + if (msq->mi) + free(msq->mi); + msq->mi = xmalloc(sizeof(struct msginfo)); + rc = rpmmsqCtl(msq, IPC_INFO, (struct msqid_ds *)&msq->mi); if (!rc) rpmmsqDumpMI(NULL, &msq->mi, NULL); @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmaio.c ============================================================================ $ cvs diff -u -r1.1.2.2 -r1.1.2.3 rpmaio.c --- rpm/rpmio/rpmaio.c 28 May 2017 20:18:16 -0000 1.1.2.2 +++ rpm/rpmio/rpmaio.c 29 May 2017 16:21:00 -0000 1.1.2.3 @@ -52,17 +52,17 @@ be += sprintf(be, "========================== aio(%p) use %ld ix %d\n", aio, use, aio->ix); #define PRINT_AIO(_fmt, _foo) \ - { be += sprintf(be, "%25s: %"#_fmt"\n", #_foo, aio->cb._foo); } + { be += sprintf(be, "%25s: %"#_fmt"\n", #_foo, aio->aiocb._foo); } #define PRINT_AIO_SIGEV(_fmt, _foo) \ - { be += sprintf(be, "%25s: %"#_fmt"\n", #_foo, aio->cb.aio_sigevent._foo); } + { be += sprintf(be, "%25s: %"#_fmt"\n", #_foo, aio->aiocb.aio_sigevent._foo); } PRINT_AIO(d, aio_fildes); be += sprintf(be, "%25s: %s\n", "aio_lio_opcode", - LIO_[aio->cb.aio_lio_opcode % (sizeof(LIO_)/sizeof(LIO_[0]))]); - if (aio->cb.aio_reqprio) + LIO_[aio->aiocb.aio_lio_opcode % (sizeof(LIO_)/sizeof(LIO_[0]))]); + if (aio->aiocb.aio_reqprio) PRINT_AIO(d, aio_reqprio); PRINT_AIO(p, aio_buf); PRINT_AIO(zd, aio_nbytes); - switch (aio->cb.aio_sigevent.sigev_notify) { + switch (aio->aiocb.aio_sigevent.sigev_notify) { default: break; case SIGEV_THREAD: @@ -87,11 +87,7 @@ rpmaio aio = (rpmaio) _aio; if (aio) { - aio->cb.aio_fildes = 0; - aio->cb.aio_lio_opcode = 0; - aio->cb.aio_reqprio = 0; - aio->cb.aio_buf = NULL; - aio->cb.aio_nbytes = 0; + memset(&aio->aiocb, 0, sizeof(aio->aiocb)); aio->ix = -1; } } @@ -101,30 +97,33 @@ rpmaio aio = (rpmaio) _aio; if (aio) { - aio->cb.aio_fildes = 0; - aio->cb.aio_lio_opcode = 0; - aio->cb.aio_reqprio = 0; - aio->cb.aio_buf = NULL; - aio->cb.aio_nbytes = 0; + memset(&aio->aiocb, 0, sizeof(aio->aiocb)); aio->ix = -1; } } RPMIOPOOL_MODULE(aio) -rpmaio rpmaioNew(int fdno, int op, int prio, void *b, size_t nb) +rpmaio rpmaioNew(int fdno, int op, int prio, void *b, size_t nb, + void * _sigev, off_t offset) { rpmaio aio = rpmaioGetPool(_rpmaioPool); + struct aiocb * aiocb = &aio->aiocb; + aiocb->aio_fildes = fdno; + aiocb->aio_lio_opcode = op; + aiocb->aio_reqprio = prio; + aiocb->aio_buf = b; + aiocb->aio_nbytes = nb; + if (_sigev) + aiocb->aio_sigevent = *(struct sigevent *) _sigev; + + aiocb->__next_prio = NULL; + aiocb->__abs_prio = 0; + aiocb->__policy = 0; + aiocb->__error_code = EINPROGRESS; /* XXX */ + aiocb->__return_value = 0; - aio->_item.next = NULL; /* XXX rpmmalloc.c ? */ - aio->cb.aio_fildes = fdno; - aio->cb.aio_lio_opcode = op; - aio->cb.aio_reqprio = prio; - aio->cb.aio_buf = b; - aio->cb.aio_nbytes = nb; - - aio->cb.__error_code = EINPROGRESS; /* XXX */ - aio->cb.__return_value = 0; /* XXX */ + aiocb->aio_offset = offset; aio->ix = -1; @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmaio.h ============================================================================ $ cvs diff -u -r1.1.2.1 -r1.1.2.2 rpmaio.h --- rpm/rpmio/rpmaio.h 28 May 2017 18:39:22 -0000 1.1.2.1 +++ rpm/rpmio/rpmaio.h 29 May 2017 16:21:01 -0000 1.1.2.2 @@ -18,8 +18,9 @@ struct rpmaio_s { struct rpmioItem_s _item; /*!< usage mutex and pool identifier. */ - struct aiocb cb; + struct aiocb aiocb; int ix; + char buf[BUFSIZ]; }; #endif @@ -58,7 +59,8 @@ * Create an aio wrapper. * @return aio wrapper */ -rpmaio rpmaioNew(int fdno, int op, int prio, void *b, size_t nb); +rpmaio rpmaioNew(int fdno, int op, int prio, void *b, size_t nb, + void * _sigev, off_t offset); #ifdef __cplusplus } @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmsq.h ============================================================================ $ cvs diff -u -r1.1.2.16 -r1.1.2.17 rpmmsq.h --- rpm/rpmio/rpmmsq.h 29 May 2017 06:57:54 -0000 1.1.2.16 +++ rpm/rpmio/rpmmsq.h 29 May 2017 16:21:01 -0000 1.1.2.17 @@ -34,6 +34,10 @@ #undef _MFB #undef _KFB +#ifdef __cplusplus +extern "C" { +#endif + /** */ #if defined(_RPMMSQ_INTERNAL) @@ -60,23 +64,26 @@ int ntimeout; /*!< no. of receive timeouts. */ int nagain; /*!< no. of waits (O_NONBLOCK). */ - struct mq_attr oattrs; /*!< mq_open attributes. */ - pthread_attr_t attr; /*!< mq_notify thread attr */ - struct sigevent sigev; /*!< mq_notify sigevent_t */ - - struct stat sb; -#if defined(WITH_MSQ) - struct msqid_ds ds; - struct msginfo mi; -#endif /* WITH_MSQ */ + struct mq_attr oattrs; /*!< POSIX: mq_open attributes. */ + pthread_attr_t attr; /*!< POSIX: mq_notify thread attr */ + struct sigevent sigev; /*!< POSIX: mq_notify sigevent_t */ + + struct stat *st; /*!< POSIX: /dev/mqueue/qname */ + void *ds; /*!< SysV: struct msqid_ds */ + void *mi; /*!< SysV: struct msginfo */ - rpmzLog zlog; + rpmzLog zlog; /*!< high precision timestamp'd log */ }; -#endif /* _RPMMSQ_INTERNAL */ -#ifdef __cplusplus -extern "C" { -#endif +rpmmsq rpmmsqOpen(const char * path, const char * fmode); +rpmmsq rpmmsqFdopen(void * _fdno, const char * fmode); +int rpmmsqFlush(void * _msq); +int rpmmsqCtl(void * _msq, int cmd, void *buf); +int rpmmsqNotify(rpmmsq msq, const void *_sevp); +void rpmmsqReader(union sigval sv); +int rpmmsqDump(const char *msg, void * _msq, FILE *fp); + +#endif /* _RPMMSQ_INTERNAL */ /** * Unreference a msq wrapper instance. @@ -117,23 +124,6 @@ int rpmmsqClose(rpmmsq msq, int delete); -rpmmsq rpmmsqOpen(const char * path, const char * fmode); -rpmmsq rpmmsqFdopen(void * _fdno, const char * fmode); -int rpmmsqFlush(void * _msq); - -int rpmmsqCtl(void * _msq, int cmd, void *buf); - -int rpmmsqNotify(rpmmsq msq, const void *_sevp); - -int rpmmsqDeliver(rpmmsq msq, char *b, size_t nb, unsigned long prio); - -void rpmmsqReader(union sigval sv); -#define RPMMSQ_PRIO_EXIT 0x7fff - -/** - */ -int rpmmsqDump(const char *msg, void * _msq, FILE *fp); - #ifdef __cplusplus } #endif @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/tmq.c ============================================================================ $ cvs diff -u -r1.1.2.11 -r1.1.2.12 tmq.c --- rpm/rpmio/tmq.c 28 May 2017 18:01:53 -0000 1.1.2.11 +++ rpm/rpmio/tmq.c 29 May 2017 16:21:01 -0000 1.1.2.12 @@ -458,6 +458,16 @@ assert(nr == nw && !strncmp(rb, wb, blen)); } + for (int i = 0; i < 8; i++) { + blen = snprintf(wb, nwb, "%d", i); + nw = Fwrite(wb, 1, blen, fd); + } + + for (int i = 0; i < 8; i++) { + memset(rb, 0, blen); + nr = Fread(rb, 1, blen, fd); + } + #ifdef DISABLE #ifdef NOTYET xx = Feof(fd); @@ . ______________________________________________________________________ RPM Package Manager http://rpm5.org CVS Sources Repository rpm-cvs@rpm5.org