RPM Package Manager, CVS Repository http://rpm5.org/cvs/ ____________________________________________________________________________
Server: rpm5.org Name: Jeff Johnson Root: /v/rpm/cvs Email: j...@rpm5.org Module: rpm Date: 26-May-2017 15:22:18 Branch: rpm-5_4 Handle: 2017052613221701 Modified files: (Branch: rpm-5_4) rpm CHANGES rpm/rpmio msqio.c rpmio.c rpmmsq.h tmq.c Log: - msqio: add a monitor to handle multiple readers/writers. Summary: Revision Changes Path 1.3501.2.561+1 -0 rpm/CHANGES 1.1.2.11 +273 -139 rpm/rpmio/msqio.c 1.230.2.51 +6 -4 rpm/rpmio/rpmio.c 1.1.2.10 +7 -2 rpm/rpmio/rpmmsq.h 1.1.2.10 +21 -7 rpm/rpmio/tmq.c ____________________________________________________________________________ patch -p0 <<'@@ .' Index: rpm/CHANGES ============================================================================ $ cvs diff -u -r1.3501.2.560 -r1.3501.2.561 CHANGES --- rpm/CHANGES 25 May 2017 20:02:12 -0000 1.3501.2.560 +++ rpm/CHANGES 26 May 2017 13:22:17 -0000 1.3501.2.561 @@ -1,4 +1,5 @@ 5.4.17 -> 5.4.18: + - jbj: msqio: add a monitor to handle multiple readers/writers. - jbj: msqio: add fopencookie wrapper. - jbj: msqio: add condvar to get rid of racy timeouts (loopback mode only). - jbj: msqio: make peace with O_NONBLOCK. @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/msqio.c ============================================================================ $ cvs diff -u -r1.1.2.10 -r1.1.2.11 msqio.c --- rpm/rpmio/msqio.c 25 May 2017 20:02:13 -0000 1.1.2.10 +++ rpm/rpmio/msqio.c 26 May 2017 13:22:17 -0000 1.1.2.11 @@ -21,6 +21,7 @@ #include <rpmlog.h> #include <rpmmacro.h> #include <rpmcb.h> /* XXX rpmIsDebug() */ +#include <yarn.h> #define _RPMMSQ_INTERNAL #include "rpmmsq.h" @@ -37,15 +38,46 @@ #define MSQONLY(fd) assert(fdGetIo(fd) == msqio) +static int _lockdebug = 0; +static int _conddebug = 0; #define Z(_rc) assert((_rc) == 0) -#define LOCK(_m) { Z(pthread_mutex_lock(&_m)); } -#define UNLOCK(_m) { Z(pthread_mutex_unlock(&_m)); } -#define SIGNAL(_c) { Z(pthread_cond_signal(&_c)); } -#define WAIT(_c,_m) { Z(pthread_cond_wait(&_c, &_m)); } -#define DESTROY(_c,_m) \ - { Z(pthread_cond_destroy(&_c)); Z(pthread_mutex_destroy(&_m)); } -#define INIT(_c,_m) \ - { Z(pthread_cond_init(&_c, NULL)); Z(pthread_mutex_init(&_m, NULL)); } +#define LOCK(_m) \ + { if (_lockdebug) fprintf(stderr, "*** LOCKING(%p) %s\n", &_m, #_m);\ + Z(pthread_mutex_lock(&_m));\ + } +#define UNLOCK(_m) \ + { Z(pthread_mutex_unlock(&_m));\ + if (_lockdebug) fprintf(stderr, "*** UNLOCKED(%p) %s\n", &_m, #_m);\ + } +#define SIGNAL(_c) \ + { if (_conddebug) fprintf(stderr, "*** SIGNAL(%p) %s\n", &_c, #_c);\ + Z(pthread_cond_broadcast(&_c));\ + } +#define WAIT(_c,_m) \ + { if (_conddebug) fprintf(stderr, "*** WAIT(%p) %s\n", &_c, #_c);\ + Z(pthread_cond_wait(&_c, &_m));\ + } + +#ifdef NOTYET +static int _yarndebug = -1; +#define PEEK(_bolt) yarnPeekLock(_bolt) +#define POSSESS(_bolt) \ + { yarnPossess(_bolt); \ + if (_yarndebug) fprintf(stderr, "*** POSSESS(%p)\t\t%ld\n", _bolt, PEEK(_bolt));\ + } +#define RELEASE(_bolt) \ + { if (_yarndebug) fprintf(stderr, "*** RELEASE(%p)\t\t%ld\n", _bolt, PEEK(_bolt));\ + yarnRelease(_bolt); \ + } +#define TWIST(_bolt, _op, _val) \ + { if (_yarndebug) fprintf(stderr, "*** TWIST(%p, %d, %d)\t%ld\n", _bolt, _op, _val, PEEK(_bolt));\ + yarnTwist(_bolt, _op, _val); \ + } +#define WAITFOR(_bolt, _op, _val) \ + { if (_yarndebug) fprintf(stderr, "*** WAITFOR(%p, %d, %d)\t%ld\n", _bolt, _op, _val, PEEK(_bolt));\ + yarnWaitFor(_bolt, _op, _val); \ + } +#endif #ifdef __cplusplus GENfree(rpmmsq) @@ -53,27 +85,45 @@ /* =============================================================== */ #if defined(WITH_MQ) -static inline -mqd_t __mq_open(const char *name, int oflag, mode_t mode, +static +mqd_t Mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr) { rpmmsq msq = NULL; mqd_t mqdes = mq_open(name, oflag, mode, attr); -SPEW((stderr, "<--\t%s(%s,0x%x,0%o,%p) qid %d\n", "mq_open", name, oflag, mode, attr, mqdes)); +SPEW((stderr, "<--\t%s(%s,0x%x,0%o,%p) qid %d\n", __FUNCTION__, name, oflag, mode, attr, mqdes)); return mqdes; } -static inline -int __mq_notify(mqd_t mqdes, const struct sigevent *sevp) +static +int Mq_getattr(mqd_t mqdes, struct mq_attr *attr) +{ + rpmmsq msq = NULL; + int rc = mq_getattr(mqdes, attr); +SPEW((stderr, "<--\t%s(0x%x,%p) rc %d\n", __FUNCTION__, mqdes, attr, rc)); + return rc; +} + +static +int Mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr) +{ + rpmmsq msq = NULL; + int rc = mq_setattr(mqdes, newattr, oldattr); +SPEW((stderr, "<--\t%s(0x%x,%p,%p) rc %d\n", __FUNCTION__, mqdes, newattr, oldattr, rc)); + return rc; +} + +static +int Mq_notify(mqd_t mqdes, const struct sigevent *sevp) { rpmmsq msq = NULL; int rc = mq_notify(mqdes, sevp); -SPEW((stderr, "<--\t%s(0x%x,%p) rc %d\n", "mq_notify", mqdes, sevp, rc)); +SPEW((stderr, "<--\t%s(0x%x,%p) rc %d\n", __FUNCTION__, mqdes, sevp, rc)); return rc; } -static inline -int __mq_send(mqd_t mqdes, const char *msg_ptr, +static +int Mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio) { rpmmsq msq = NULL; @@ -93,12 +143,39 @@ break; } } -SPEW((stderr, "<--\t%s(0x%x,%p[%lu],%u) rc %d\t\t\t\"%.*s\"\n", "mq_send", mqdes, buf, (unsigned long)count, prio, rc, nc, buf)); +SPEW((stderr, "<--\t%s(0x%x,%p[%lu],%u) rc %d\t\t\t\"%.*s\"\n", __FUNCTION__, mqdes, buf, (unsigned long)count, prio, rc, nc, buf)); return rc; } -static inline -ssize_t __mq_receive(mqd_t mqdes, char *msg_ptr, +static +int Mq_timedsend(mqd_t mqdes, const char *msg_ptr, + size_t msg_len, unsigned int msg_prio, + const struct timespec *abs_timeout) +{ + rpmmsq msq = NULL; + int ncmax = 32; + int nc = 0; + const char * buf = msg_ptr; + size_t count = msg_len; + unsigned int prio = msg_prio; + int rc; + + rc = mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, abs_timeout); + if (rc == 0) { + /* Ensure printable. */ + for (nc = 0; nc < ncmax; nc++) { + if (nc < (int)count && isprint(buf[nc])) + continue; + break; + } + } +SPEW((stderr, "<--\t%s(0x%x,%p[%lu],%u,%p) rc %d\t\t\t\"%.*s\"\n", __FUNCTION__, mqdes, buf, (unsigned long)count, prio, abs_timeout, rc, nc, buf)); + return rc; +} + + +static +ssize_t Mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio) { rpmmsq msq = NULL; @@ -119,25 +196,52 @@ break; } } -SPEW((stderr, "<--\t%s(0x%x,%p[%lu],%p) rc %ld prio %u\t\"%.*s\"\n", "mq_receive", mqdes, buf, (unsigned long)count, priop, (long)rc, prio, nc, buf)); +SPEW((stderr, "<--\t%s(0x%x,%p[%lu],%p) rc %ld prio %u\t\"%.*s\"\n", __FUNCTION__, mqdes, buf, (unsigned long)count, priop, (long)rc, prio, nc, buf)); return rc; } -static inline -int __mq_close(mqd_t mqdes) +static +ssize_t Mq_timedreceive(mqd_t mqdes, char *msg_ptr, + size_t msg_len, unsigned int *msg_prio, + const struct timespec *abs_timeout) +{ + rpmmsq msq = NULL; + int ncmax = 32; + int nc = 0; + char * buf = msg_ptr; + size_t count = msg_len; + unsigned int *priop = msg_prio; + unsigned int prio = *msg_prio; + ssize_t rc; + + rc = mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, abs_timeout); + if (rc >= 0) { + /* Ensure printable. */ + for (nc = 0; nc < ncmax; nc++) { + if (nc < rc && isprint(buf[nc])) + continue; + break; + } + } +SPEW((stderr, "<--\t%s(0x%x,%p[%lu],%p,%p) rc %ld prio %u\t\"%.*s\"\n", __FUNCTION__, mqdes, buf, (unsigned long)count, priop, abs_timeout, (long)rc, prio, nc, buf)); + return rc; +} + +static +int Mq_close(mqd_t mqdes) { rpmmsq msq = NULL; int rc = mq_close(mqdes); -SPEW((stderr, "<--\t%s(0x%x) rc %d\n", "mq_close", mqdes, rc)); +SPEW((stderr, "<--\t%s(0x%x) rc %d\n", __FUNCTION__, mqdes, rc)); return rc; } -static inline -int __mq_unlink(const char *name) +static +int Mq_unlink(const char *name) { rpmmsq msq = NULL; int rc = mq_unlink(name); -SPEW((stderr, "<--\t%s(%s) rc %d\n", "mq_unlink", name, rc)); +SPEW((stderr, "<--\t%s(%s) rc %d\n", __FUNCTION__, name, rc)); return rc; } @@ -145,17 +249,17 @@ /* =============================================================== */ #if defined(WITH_MSQ) -static inline -int __msgget(key_t key, int msgflg) +static +int Msgget(key_t key, int msgflg) { rpmmsq msq = NULL; int rc = msgget(key, msgflg); -SPEW((stderr, "<--\t%s(0x%x,0%o) rc %d\n", "msgget", key, msgflg, rc)); +SPEW((stderr, "<--\t%s(0x%x,0%o) rc %d\n", __FUNCTION__, key, msgflg, rc)); return rc; } -static inline -ssize_t __msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg) +static +ssize_t Msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg) { rpmmsq msq = NULL; int ncmax = 32; @@ -173,12 +277,12 @@ break; } } -SPEW((stderr, "<--\t%s(0x%x,%p,%lu,%ld,%d) rc %ld\t\"%.*s\"\n", "msqrcv", msqid, msgp, (unsigned long)msgsz, msgtyp, msgflg, (long)rc, nc, buf)); +SPEW((stderr, "<--\t%s(0x%x,%p,%lu,%ld,%d) rc %ld\t\"%.*s\"\n", __FUNCTION__, msqid, msgp, (unsigned long)msgsz, msgtyp, msgflg, (long)rc, nc, buf)); return rc; } -static inline -int __msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg) +static +int Msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg) { rpmmsq msq = NULL; int ncmax = 32; @@ -196,16 +300,16 @@ break; } } -SPEW((stderr, "<--\t%s(0x%x,%p,%lu,%d) rc %d\t\t\"%.*s\"\n", "msqsnd", msqid, msgp, (unsigned long)msgsz, msgflg, rc, nc, buf)); +SPEW((stderr, "<--\t%s(0x%x,%p,%lu,%d) rc %d\t\t\"%.*s\"\n", __FUNCTION__, msqid, msgp, (unsigned long)msgsz, msgflg, rc, nc, buf)); return rc; } -static inline -int __msgctl(int msqid, int cmd, struct msqid_ds *buf) +static +int Msgctl(int msqid, int cmd, struct msqid_ds *buf) { rpmmsq msq = NULL; int rc = msgctl(msqid, cmd, buf); -SPEW((stderr, "<--\t%s(0x%x,%d,%p) rc %d\n", "msqctl", msqid, cmd, buf, rc)); +SPEW((stderr, "<--\t%s(0x%x,%d,%p) rc %d\n", __FUNCTION__, msqid, cmd, buf, rc)); return rc; } #endif /* WITH_MSQ */ @@ -245,6 +349,15 @@ { rpmmsq msq = (rpmmsq) _msq; if (msq) { + + msq->tid = 0; +SPEW((stderr, "%s: inflight %d:%d sent %d recv %d timeout %d again %d\n", __FUNCTION__, msq->i, msq->imax, msq->nsent, msq->nrecv, msq->ntimeout, msq->nagain)); + Z(pthread_mutex_destroy(&msq->m)); + Z(pthread_cond_destroy(&msq->e)); + Z(pthread_cond_destroy(&msq->f)); + msq->i = 0; + msq->imax = 0; + msq->flags = 0; msq->qname = _free(msq->qname); msq->fmode = _free(msq->fmode); @@ -254,9 +367,7 @@ msq->omode = 0; msq->key = 0; msq->mtype = 0; - msq->tid = 0; - DESTROY(msq->c, msq->m); - INIT(msq->c, msq->m); + msq->ntimeout = 0; msq->nagain = 0; msq->nsent = 0; @@ -408,15 +519,15 @@ case RPMMSQ_TYPE_POSIX: { #if defined(WITH_MQ) - (void) __mq_unlink(msq->qname); + (void) Mq_unlink(msq->qname); #endif /* WITH_MQ */ } break; case RPMMSQ_TYPE_SYSV: { #if defined(WITH_MSQ) - int qid = __msgget(msq->key, (omode & 0777)); + int qid = Msgget(msq->key, (omode & 0777)); if (qid != -1) { - (void) __msgctl(qid, IPC_RMID, NULL); + (void) Msgctl(qid, IPC_RMID, NULL); } #endif /* WITH_MSQ */ } break; @@ -440,11 +551,16 @@ .mq_msgsize = 8192, /* /proc/sys/fs/mqueue/msgsize_default */ .mq_curmsgs = 0, }, *attrs = &_attrs; - msq->qid = __mq_open(msq->qname, msq->oflags, msq->omode, attrs); + msq->qid = Mq_open(msq->qname, msq->oflags, msq->omode, attrs); - /* (loopback mode) Configure the detached reader. */ + /* Initialize the monitor. */ msq->tid = 0; - INIT(msq->c, msq->m); + Z(pthread_mutex_init(&msq->m, NULL)); + Z(pthread_cond_init(&msq->e, NULL)); + Z(pthread_cond_init(&msq->f, NULL)); + msq->i = 0; + msq->imax = 10; + if (msq->qid != -1 && MSQF_ISSET(LOOP)) { pthread_attr_t attr; int xx; @@ -464,7 +580,7 @@ case RPMMSQ_TYPE_SYSV: { #if defined(WITH_MSQ) - msq->qid = __msgget(msq->key, omode); + msq->qid = Msgget(msq->key, omode); #endif /* WITH_MSQ */ } break; default: @@ -491,7 +607,15 @@ unsigned int prio = 0; - rc = __mq_receive(msq->qid, buf, count, &prio); + /* Consumer in monitor. */ + LOCK(msq->m); + while (msq->i == 0) + WAIT(msq->e, msq->m); + rc = Mq_receive(msq->qid, buf, count, &prio); + if (msq->i-- == msq->imax) + SIGNAL(msq->f); + UNLOCK(msq->m); + if (rc >= 0) { msq->nrecv++; if (priop) @@ -508,9 +632,8 @@ long msgtyp = msq->mtype; int msgflg = 0; /* IPC_NOWAIT, MSG_COPY, MSG_EXCEPT, MSG_NOERROR */ - rc = __msgrcv(msq->qid, msgp, msgsz, msgtyp, msgflg); + rc = Msgrcv(msq->qid, msgp, msgsz, msgtyp, msgflg); if (rc >= 0) { - msq->nrecv++; if (priop) *priop = msgp->mtype; if (rc > 0) @@ -537,19 +660,26 @@ case RPMMSQ_TYPE_POSIX: { #if defined(WITH_MQ) - rc = __mq_send(msq->qid, buf, count, prio); - if (rc == 0) { - msq->nsent++; - /* (loopback mode) Wait for rpmmsqReader to start up. */ - if (MSQF_ISSET(LOOP)) { - LOCK(msq->m); - while (msq->nsent > msq->nrecv) - WAIT(msq->c, msq->m); - UNLOCK(msq->m); - } + /* Producer in monitor. */ + LOCK(msq->m); + while (msq->i == msq->imax) + WAIT(msq->f, msq->m); + rc = Mq_send(msq->qid, buf, count, prio); + if (msq->i++ == 0) + SIGNAL(msq->e); + UNLOCK(msq->m); + + /* (loopback mode) Wait for the reader thread. */ + if (msq->nsent == 0 && MSQF_ISSET(LOOP)) { + /* Yield the CPU. */ + struct timespec ts = { 0, 100*1000 }; + Z(nanosleep(&ts, NULL)); } + if (rc == 0) + msq->nsent++; + if (rc == 0) /* XXX remap to write(2) return */ rc = count; #endif /* WITH_MQ */ @@ -564,7 +694,7 @@ msgp->mtype = msq->mtype; if (count) memcpy(msgp->mtext, buf, count); - rc = __msgsnd(msq->qid, msgp, msgsz, msgflg); + rc = Msgsnd(msq->qid, msgp, msgsz, msgflg); if (rc == 0) { msq->nsent++; } @@ -582,11 +712,18 @@ return rc; } +static char *SEEK_[] = { "SET", "CUR", "END", "???" }; + static int rpmmsqSeek(rpmmsq msq, _libio_pos_t pos, int whence) { +#ifdef USE_COOKIE_SEEK_POINTER + _IO_off64_t p = *pos; +#else + off_t p = pos; +#endif int rc = -2; /* assume failure */ -SPEW((stderr, "<== %s(%p,%p,%d) qid %d rc %d\n", __FUNCTION__, msq, pos, whence, (msq ? msq->qid : -1), rc)); +SPEW((stderr, "<== %s(%p, %ld, SEEK_%s) qid %d rc %d\n", __FUNCTION__, msq, p, SEEK_[whence&0x3], (msq ? msq->qid : -1), rc)); return rc; } @@ -601,44 +738,40 @@ case RPMMSQ_TYPE_POSIX: { #if defined(WITH_MQ) + if (msq->qid != -1) { + if (MSQF_ISSET(INFO) || _rpmmsq_debug) + rpmmsqDump(NULL, msq, NULL); - /* (loopback mode) Terminate rpmmsqReader. */ - if (msq->qid != -1 && MSQF_ISSET(LOOP)) { - int prio = RPMMSQ_PRIO_EXIT; - rc = __mq_send(msq->qid, __FUNCTION__, sizeof(__FUNCTION__)-1, prio); - msq->nsent++; + /* Producer in monitor. */ LOCK(msq->m); - while (msq->nsent > msq->nrecv) - WAIT(msq->c, msq->m); - UNLOCK(msq->m); + while (msq->i == msq->imax) + WAIT(msq->f, msq->m); /* Turn off the sigev detached thread. */ - rc = rpmmsqNotify(msq, NULL); - } - -if (msq->qid != -1) -SPEW((stderr, "%s: sent %d recv %d timeout %d again %d\n", __FUNCTION__, msq->nsent, msq->nrecv, msq->ntimeout, msq->nagain)); + if (MSQF_ISSET(LOOP)) + rc = rpmmsqNotify(msq, NULL); + rc = Mq_close(msq->qid); + msq->qid = -1; + if (msq->i++ == 0) + SIGNAL(msq->e); + UNLOCK(msq->m); - if (msq->qid != -1) { - if (MSQF_ISSET(INFO) || _rpmmsq_debug) - rpmmsqDump(NULL, msq, NULL); - rc = __mq_close(msq->qid); } + if (!rc && (delete || MSQF_ISSET(DELETE))) - rc = __mq_unlink(msq->qname); /* XXX rpmmsqReset? */ - rpmmsqFini(msq); - rc = 0; /* XXX */ + rc = Mq_unlink(msq->qname); /* XXX rpmmsqReset? */ #endif /* WITH_MQ */ } break; case RPMMSQ_TYPE_SYSV: { #if defined(WITH_MSQ) - if (_rpmmsq_debug || MSQF_ISSET(INFO)) - rpmmsqDump(__FUNCTION__, msq, NULL); + if (msq->qid != -1) { + if (MSQF_ISSET(INFO) || _rpmmsq_debug) + rpmmsqDump(__FUNCTION__, msq, NULL); + } + msq->qid = -1; if (delete || MSQF_ISSET(DELETE)) - rc = __msgctl(msq->qid, IPC_RMID, NULL); - rpmmsqFini(msq); - rc = 0; /* XXX */ + rc = Msgctl(msq->qid, IPC_RMID, NULL); #endif /* WITH_MSQ */ } break; default: @@ -714,7 +847,7 @@ case IPC_STAT: case IPC_SET: case IPC_RMID: - rc = __msgctl(msq->qid, cmd, buf); + rc = Msgctl(msq->qid, cmd, buf); break; } #endif /* WITH_MSQ */ @@ -737,7 +870,7 @@ case RPMMSQ_TYPE_POSIX: { #if defined(WITH_MQ) - rc = __mq_notify(msq->qid, _sevp); + rc = Mq_notify(msq->qid, _sevp); #endif /* WITH_MQ */ } break; @@ -769,63 +902,52 @@ rpmmsq msq = (rpmmsq) sv.sival_ptr; assert(msq = rpmmsqLink(msq)); + msq->tid = pthread_self(); + SPEW((stderr, "==> %s(%p) qid %d tid %ld\n", __FUNCTION__, msq, msq->qid, msq->tid)); char b[BUFSIZ]; size_t nb = sizeof(b); int rc; - while (1) { - int ncmax = 32; /* max chars to print. */ - int nc = 0; /* chars to print */ + while (msq->qid != -1) { unsigned int prio = 0; int rc; - while (1) { - - /* Yield the CPU. */ - if (msq->oflags & O_NONBLOCK) { - struct timespec ts = { 0, 100 }; - Z(nanosleep(&ts, NULL)); - } - - /* Loop on errors. */ - rc = __mq_receive(msq->qid, b, nb, &prio); - if (rc < 0) - switch (errno) { - case ETIMEDOUT: /* XXX mq_timedreceive() */ - msq->ntimeout++; - continue; - case EAGAIN: /* XXX O_NONBLOCK */ - msq->nagain++; - continue; + /* Consumer in monitor. */ + LOCK(msq->m); + while (msq->i == 0) + WAIT(msq->e, msq->m); + errno = 0; + do { + struct timespec ts = { 0, 1000 }; + rc = Mq_receive(msq->qid, b, nb, &prio); + if (rc < 0) { + switch (errno) { + case ETIMEDOUT: /* XXX Mq_timedreceive() */ + msq->ntimeout++; + Z(nanosleep(&ts, NULL)); + continue; + case EAGAIN: /* XXX O_NONBLOCK */ + msq->nagain++; + Z(nanosleep(&ts, NULL)); + continue; + } } break; - } + } while (rc < 0); + if (msq->i-- == msq->imax) + SIGNAL(msq->f); + UNLOCK(msq->m); if (rc >= 0) { msq->nrecv++; - /* Ensure printable. */ - for (nc = 0; nc < ncmax; nc++) { - if (nc < rc && isprint(b[nc])) - continue; - break; - } + /* Deliver the mssage through a callback. */ + int xx = rpmmsqDeliver(msq, b, rc, prio); + (void)xx; } - - LOCK(msq->m); - SIGNAL(msq->c); - UNLOCK(msq->m); - - /* Exit immediately on highest priority message. */ - if (prio == RPMMSQ_PRIO_EXIT) - break; - - /* Deliver the mssage through a callback. */ - int xx = rpmmsqDeliver(msq, b, nb, prio); - (void)xx; } -SPEW((stderr, "<== %s(%p) rc %d\n", __FUNCTION__, msq, rc)); +SPEW((stderr, "<== %s(%p) qid %d tid %ld rc %d\n", __FUNCTION__, msq, msq->qid, msq->tid, rc)); msq->tid = 0; msq = rpmmsqFree(msq); @@ -978,8 +1100,9 @@ } /* =============================================================== */ +static RPM_GNUC_PURE -static inline void * msqFileno(FD_t fd) +void * msqFileno(FD_t fd) { void * rc = NULL; int i; @@ -996,7 +1119,8 @@ return rc; } -static FD_t msqOpen(const char * path, const char * fmode) +static +FD_t msqOpen(const char * path, const char * fmode) { FD_t fd; rpmmsq msq = rpmmsqOpen(path, fmode); @@ -1016,7 +1140,8 @@ return fd; } -static FD_t msqFdopen(void * cookie, const char * fmode) +static +FD_t msqFdopen(void * cookie, const char * fmode) { FD_t fd = c2f(cookie); int fdno = fdFileno(fd); @@ -1056,15 +1181,17 @@ return fdLink(fd, __FUNCTION__); } +static RPM_GNUC_PURE -static int msqFlush(void * cookie) +int msqFlush(void * cookie) { FD_t fd = c2f(cookie); rpmmsq msq = (rpmmsq) msqFileno(fd); return rpmmsqFlush(msq); } -static ssize_t msqRead(void * cookie, char * buf, size_t count) +static +ssize_t msqRead(void * cookie, char * buf, size_t count) { FD_t fd = c2f(cookie); rpmmsq msq = (rpmmsq) msqFileno(fd); @@ -1099,18 +1226,25 @@ return rc; } +static RPM_GNUC_PURE -static int msqSeek(void * cookie, _libio_pos_t pos, int whence) +int msqSeek(void * cookie, _libio_pos_t pos, int whence) { FD_t fd = c2f(cookie); rpmmsq msq = (rpmmsq) msqFileno(fd); +#ifdef USE_COOKIE_SEEK_POINTER + _IO_off64_t p = *pos; +#else + off_t p = pos; +#endif +SPEW((stderr, "==> %s(%p, %ld, SEEK_%s)\n", __FUNCTION__, fd, p, SEEK_[whence&0x3])); assert(msq != NULL); - MSQONLY(fd); return rpmmsqSeek(msq, pos, whence); } -static int msqClose(void * cookie) +static +int msqClose(void * cookie) { FD_t fd = c2f(cookie); rpmmsq msq = (rpmmsq) msqFileno(fd); @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmio.c ============================================================================ $ cvs diff -u -r1.230.2.50 -r1.230.2.51 rpmio.c --- rpm/rpmio/rpmio.c 25 May 2017 20:02:13 -0000 1.230.2.50 +++ rpm/rpmio/rpmio.c 26 May 2017 13:22:17 -0000 1.230.2.51 @@ -269,6 +269,8 @@ return fd; } +static char *SEEK_[] = { "SET", "CUR", "END", "???" }; + static inline int fdSeekNot(void * cookie, _libio_pos_t pos, int whence) { @@ -498,7 +500,7 @@ rc = lseek(fdFileno(fd), p, whence); fdstat_exit(fd, opx, rc); -DBGIO(fd, (stderr, "<--\tfdSeek(%p,%ld,%d) rc %lx %s\n", cookie, (long)p, whence, (unsigned long)rc, fdbg(fd))); +DBGIO(fd, (stderr, "<--\tfdSeek(%p, %ld, SEEK_%s) rc %lx %s\n", cookie, (long)p, SEEK_[whence&0x3], (unsigned long)rc, fdbg(fd))); return (int) rc; } @@ -2438,7 +2440,7 @@ int rc; FDSANE(fd); -DBGIO(fd, (stderr, "==> Fread(%p,%u,%u,%p) %s\n", buf, (unsigned)size, (unsigned)nmemb, (fd ? fd : NULL), fdbg(fd))); +DBGIO(fd, (stderr, "==> Fread(%p,%u,%u,%p)\n", buf, (unsigned)size, (unsigned)nmemb, (fd ? fd : NULL))); if (fdGetIo(fd) == fpio) { rc = (int) fread(buf, size, nmemb, fdGetFILE(fd)); @@ -2457,7 +2459,7 @@ int rc; FDSANE(fd); -DBGIO(fd, (stderr, "==> Fwrite(%p,%u,%u,%p) %s\n", buf, (unsigned)size, (unsigned)nmemb, (fd ? fd : NULL), fdbg(fd))); +DBGIO(fd, (stderr, "==> Fwrite(%p,%u,%u,%p)\n", buf, (unsigned)size, (unsigned)nmemb, (fd ? fd : NULL))); if (fdGetIo(fd) == fpio) { rc = (int) fwrite(buf, size, nmemb, fdGetFILE(fd)); @@ -2483,7 +2485,7 @@ long int rc; FDSANE(fd); -DBGIO(fd, (stderr, "==> Fseek(%p,%ld,%d) %s\n", fd, (long)offset, whence, fdbg(fd))); +DBGIO(fd, (stderr, "==> Fseek(%p, %ld, SEEK_%s) %s\n", fd, (long)offset, SEEK_[whence&0x3], fdbg(fd))); if (fdGetIo(fd) == fpio) return fseek(fdGetFILE(fd), (long)offset, whence); @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmsq.h ============================================================================ $ cvs diff -u -r1.1.2.9 -r1.1.2.10 rpmmsq.h --- rpm/rpmio/rpmmsq.h 25 May 2017 20:02:13 -0000 1.1.2.9 +++ rpm/rpmio/rpmmsq.h 26 May 2017 13:22:17 -0000 1.1.2.10 @@ -50,8 +50,13 @@ key_t key; /*!< SysV: ftok(3) or IPC_PRIVATE. */ long mtype; /*!< SysV: message type. */ pthread_t tid; /*!< LOOP: sigev thread id. */ - pthread_cond_t c; - pthread_mutex_t m; + + pthread_mutex_t m; /*!< monitor mutex. */ + pthread_cond_t e; /*!< empty condition. */ + pthread_cond_t f; /*!< full condition. */ + int i; /*!< no. of inflight messages. */ + int imax; /*!< max. inflight messages. */ + int ntimeout; /*!< LOOP: no. of receive timeouts. */ int nagain; /*!< LOOP: no. of waits (O_NONBLOCK). */ int nsent; /*!< no. messages sent. */ @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/tmq.c ============================================================================ $ cvs diff -u -r1.1.2.9 -r1.1.2.10 tmq.c --- rpm/rpmio/tmq.c 25 May 2017 20:02:13 -0000 1.1.2.9 +++ rpm/rpmio/tmq.c 26 May 2017 13:22:18 -0000 1.1.2.10 @@ -35,7 +35,16 @@ # include <mqueue.h> #endif +#define _RPMIOB_INTERNAL #include <rpmio_internal.h> + +#ifdef NOTYET +#define _RPMZQ_INTERNAL +#define _RPMZ_INTERNAL +#define _RPMZ_INTERNAL_PIGZ +#include <rpmz.h> +#endif + #include <rpmlog.h> #include <rpmmacro.h> #include <poptIO.h> @@ -338,6 +347,7 @@ (void)blen; #define F_ISSET(_f, _FLAG) ((_f) & RPMMSQ_FLAGS_##_FLAG) +fprintf(stderr, "*** %s: flags 0x%x\n", __FUNCTION__, flags); char fmode[64]; char * te = fmode; te = stpcpy(te, "w+"); @@ -400,17 +410,22 @@ FD_t fd = Fopen(qname, fmode); if (fd) { +#ifndef DISABLE + xx = Ftell(fd); strcpy(b, "foo bar baz"); blen = strlen(b); xx = Fwrite(b, 1, blen, fd); + xx = Ferror(fd); + xx = Fflush(fd); + xx = Ftell(fd); + memset(b, 0, blen); xx = Fread(b, 1, blen, fd); + xx = Ftell(fd); xx = Fileno(fd); if (_rpmio_debug) fprintf(stderr, "<== Fileno(%p) rc %d\n", fd, xx); - xx = Fflush(fd); - xx = Ferror(fd); strcpy(b, "blah blah blah"); blen = strlen(b); @@ -426,25 +441,23 @@ xx = Fread(b, 1, blen, fd); } -#ifdef DISABLE #ifdef NOTYET xx = Feof(fd); xx = Clearerr(fd); #endif - xx = Fseek(fd, 0, SEEK_CUR); - xx = Ftell(fd); Rewind(fd); fpos_t pos = {}; xx = Fgetpos(fd, &pos); xx = Fsetpos(fd, &pos); +#else FD_t nfd = Fdopen(fd, fmode); if (nfd) { strcpy(b, "foo bar baz"); blen = strlen(b); xx = Fwrite(b, 1, blen, nfd); memset(b, 0, blen); - xx = Fread(b, 1, nb, nfd); + xx = Fread(b, 1, blen, nfd); xx = Fclose(nfd); } #endif /* DISABLE */ @@ -481,7 +494,8 @@ N_("Display queue info on close"), NULL }, { "delete", 'D', POPT_BIT_SET|POPT_ARGFLAG_TOGGLE, &flags, _MSQBIT(DELETE), N_("Remove queue after closing"), NULL }, - { "loopback", 'l', POPT_BIT_SET|POPT_ARGFLAG_TOGGLE,&flags, _MSQBIT(LOOP), + /* XXX -L no workie */ + { "loopback", 'L', POPT_BIT_SET|POPT_ARGFLAG_TOGGLE,&flags, _MSQBIT(LOOP), N_("Read messages after sending"), NULL }, { "count", 'c', POPT_ARG_INT, &count, 0, @@ . ______________________________________________________________________ RPM Package Manager http://rpm5.org CVS Sources Repository rpm-cvs@rpm5.org