RPM Package Manager, CVS Repository http://rpm5.org/cvs/ ____________________________________________________________________________
Server: rpm5.org Name: Jeff Johnson Root: /v/rpm/cvs Email: j...@rpm5.org Module: rpm Date: 24-May-2017 08:03:18 Branch: rpm-5_4 Handle: 2017052406031800 Modified files: (Branch: rpm-5_4) rpm CHANGES rpm/rpmio msqio.c rpmmsq.h tmq.c Log: - msqio: add condvar to get rid of racy timeouts (loopback mode only). - msqio: make peace with O_NONBLOCK. Summary: Revision Changes Path 1.3501.2.559+2 -0 rpm/CHANGES 1.1.2.9 +161 -73 rpm/rpmio/msqio.c 1.1.2.8 +4 -1 rpm/rpmio/rpmmsq.h 1.1.2.8 +41 -4 rpm/rpmio/tmq.c ____________________________________________________________________________ patch -p0 <<'@@ .' Index: rpm/CHANGES ============================================================================ $ cvs diff -u -r1.3501.2.558 -r1.3501.2.559 CHANGES --- rpm/CHANGES 23 May 2017 19:36:46 -0000 1.3501.2.558 +++ rpm/CHANGES 24 May 2017 06:03:18 -0000 1.3501.2.559 @@ -1,4 +1,6 @@ 5.4.17 -> 5.4.18: + - jbj: msqio: add condvar to get rid of racy timeouts (loopback mode only). + - jbj: msqio: make peace with O_NONBLOCK. - jbj: msqio: permit POSIX/SysV message queue coexistence, default is POSIX. - jbj: msqio: set oflags/fdno on fd. - jbj: msqio: loopback mode refactoring. @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/msqio.c ============================================================================ $ cvs diff -u -r1.1.2.8 -r1.1.2.9 msqio.c --- rpm/rpmio/msqio.c 23 May 2017 19:36:46 -0000 1.1.2.8 +++ rpm/rpmio/msqio.c 24 May 2017 06:03:18 -0000 1.1.2.9 @@ -32,10 +32,20 @@ #define MSQF_ISSET(_FLAG) F_ISSET(msq->flags, _FLAG) #define SPEW(_list) \ - if (MSQF_ISSET(DEBUG) || _rpmmsq_debug || _rpmio_debug) fprintf _list + if ((msq && MSQF_ISSET(DEBUG)) || _rpmmsq_debug || _rpmio_debug) fprintf _list #define MSQONLY(fd) assert(fdGetIo(fd) == msqio) +#define Z(_rc) assert((_rc) == 0) +#define LOCK(_m) { Z(pthread_mutex_lock(&_m)); } +#define UNLOCK(_m) { Z(pthread_mutex_unlock(&_m)); } +#define SIGNAL(_c) { Z(pthread_cond_signal(&_c)); } +#define WAIT(_c,_m) { Z(pthread_cond_wait(&_c, &_m)); } +#define DESTROY(_c,_m) \ + { Z(pthread_cond_destroy(&_c)); Z(pthread_mutex_destroy(&_m)); } +#define INIT(_c,_m) \ + { Z(pthread_cond_init(&_c, NULL)); Z(pthread_mutex_init(&_m, NULL)); } + #ifdef __cplusplus GENfree(rpmmsq) #endif /* __cplusplus */ @@ -46,13 +56,16 @@ if (msq) { msq->flags = 0; msq->qname = _free(msq->qname); - msq->qid = 0; + msq->qid = -1; msq->oflags = 0; msq->omode = 0; msq->key = 0; msq->mtype = 0; msq->tid = 0; + DESTROY(msq->c, msq->m); + INIT(msq->c, msq->m); msq->ntimeout = 0; + msq->nagain = 0; msq->nwait = 0; msq->nsent = 0; msq->nrecv = 0; @@ -122,6 +135,28 @@ omode |= IPC_EXCL; continue; break; + case 'D': /* XXX collides with rpmio O_DIRECT */ + flags |= RPMMSQ_FLAGS_DELETE; + continue; + break; + case 'R': /* XXX */ + flags |= RPMMSQ_FLAGS_RESET; + continue; + break; + case 'I': /* XXX collides with rpmio fallocate(2) */ + flags |= RPMMSQ_FLAGS_INFO; + continue; + break; + case 'P': /* XXX collides with rpmio syncfs(2) */ + flags &= ~RPMMSQ_TYPE_MASK; + flags |= RPMMSQ_TYPE_POSIX; + continue; + break; + case 'S': /* XXX collides with rpmio sync(2) */ + flags &= ~RPMMSQ_TYPE_MASK; + flags |= RPMMSQ_TYPE_SYSV; + continue; + break; case '?': /* XXX loopback mode */ flags |= RPMMSQ_FLAGS_DEBUG; continue; @@ -174,7 +209,7 @@ { #if defined(WITH_MQ) int xx = mq_unlink(msq->qname); -SPEW((stderr, "<-- %s(%s) rc %d (%s)\n", "mq_unlink", msq->qname, xx, __FUNCTION__)); +SPEW((stderr, "<--\t%s(%s) rc %d (%s)\n", "mq_unlink", msq->qname, xx, __FUNCTION__)); #endif /* WITH_MQ */ } break; case RPMMSQ_TYPE_SYSV: @@ -184,7 +219,7 @@ if (qid != -1) { int cmd = IPC_RMID; int xx = msgctl(qid, cmd, NULL); -SPEW((stderr, "<-- %s(0x%x,0%o,%p) rc %d (%s)\n", "msqctl", qid, cmd, NULL, xx, __FUNCTION__)); +SPEW((stderr, "<--\t%s(0x%x,%d,%p) rc %d (%s)\n", "msqctl", qid, cmd, NULL, xx, __FUNCTION__)); } #endif /* WITH_MSQ */ } break; @@ -204,9 +239,11 @@ .mq_curmsgs = 0, }, *attrs = &_attrs; msq->qid = mq_open(msq->qname, msq->oflags, msq->omode, attrs); -SPEW((stderr, "<-- %s(%s,0x%x,0%o,%p) qid %d\n", "mq_open", msq->qname, msq->oflags, msq->omode, attrs, msq->qid)); +SPEW((stderr, "<--\t%s(%s,0x%x,0%o,%p) qid %d\n", "mq_open", msq->qname, msq->oflags, msq->omode, attrs, msq->qid)); /* (loopback mode) Configure the detached reader. */ + msq->tid = 0; + INIT(msq->c, msq->m); if (msq->qid != -1 && MSQF_ISSET(LOOP)) { pthread_attr_t attr; int xx; @@ -227,14 +264,14 @@ { #if defined(WITH_MSQ) msq->qid = msgget(msq->key, omode); -SPEW((stderr, "<-- %s(0x%x,0%o) qid %d\n", "msqget", msq->key, omode, msq->qid)); +SPEW((stderr, "<--\t%s(0x%x,0%o) qid %d\n", "msqget", msq->key, omode, msq->qid)); #endif /* WITH_MSQ */ } break; default: break; } -SPEW((stderr, "<-- %s(%s,%s,%d,0x%x) qid %d\n", __FUNCTION__, path, fmode, fdno, flags, (msq ? msq->qid : -1))); +SPEW((stderr, "<== %s(%s,%s,%d,0x%x) qid %d\n", __FUNCTION__, path, fmode, fdno, flags, (msq ? msq->qid : -1))); return rpmmsqLink(msq); } @@ -243,6 +280,7 @@ { ssize_t rc = -1; /* assume failure */ + if (msq) /* XXX WTF? */ switch (msq->flags & RPMMSQ_TYPE_MASK) { case RPMMSQ_TYPE_DEFAULT: case RPMMSQ_TYPE_POSIX: @@ -257,7 +295,7 @@ nb = rc; msq->nrecv++; } -SPEW((stderr, "<-- %s(0x%x,%p[%lu],%p) rc %ld prio %u\t\"%.*s\"\n", "mq_receive", msq->qid, buf, (unsigned long)count, priop, (long)rc, prio, nb, buf)); +SPEW((stderr, "<--\t%s(0x%x,%p[%lu],%p) rc %ld prio %u\t\"%.*s\"\n", "mq_receive", msq->qid, buf, (unsigned long)count, priop, (long)rc, prio, nb, buf)); if (priop) *priop = prio; #endif /* WITH_MQ */ @@ -279,7 +317,7 @@ nb = rc; msq->nrecv++; } -SPEW((stderr, "<-- %s(0x%x,%p,%lu,%ld,%d) rc %ld\t\"%.*s\"\n", "msqrcv", msq->qid, msgp, (unsigned long)msgsz, (long)msgtyp, msgflg, (long)rc, nb, buf)); +SPEW((stderr, "<--\t%s(0x%x,%p,%lu,%ld,%d) rc %ld\t\"%.*s\"\n", "msqrcv", msq->qid, msgp, (unsigned long)msgsz, (long)msgtyp, msgflg, (long)rc, nb, buf)); msgp = _free(msgp); #endif /* WITH_MSQ */ } break; @@ -287,6 +325,7 @@ break; } +SPEW((stderr, "<== %s(%p,%p[%lu],%p) qid %d rc %ld *priop %u\n", __FUNCTION__, msq, buf, count, priop, (msq ? msq->qid : -1), rc, (priop ? *priop : 0))); return rc; } @@ -294,28 +333,29 @@ { ssize_t rc = -1; /* assume failure */ + if (msq) /* XXX WTF? */ switch (msq->flags & RPMMSQ_TYPE_MASK) { case RPMMSQ_TYPE_DEFAULT: case RPMMSQ_TYPE_POSIX: { #if defined(WITH_MQ) + rc = mq_send(msq->qid, buf, count, prio); - int nb = (rc == 0 ? count : 0); -SPEW((stderr, "<-- %s(0x%x,%p[%lu],%u) rc %ld\t\"%.*s\"\n", "mq_send", msq->qid, buf, (unsigned long)count, prio, (long)rc, nb, buf)); - if (rc == 0) { /* XXX remap to write(2) return */ - rc = count; - msq->nsent++; - } + msq->nsent++; /* (loopback mode) Wait for rpmmsqReader to start up. */ - sched_yield(); /* Give detached threads a chance to run. */ - if (MSQF_ISSET(LOOP) && prio == 0 && msq->tid == 0) { - struct timespec req = { 0, 1000*1000 }; - while (msq->tid == 0) { - nanosleep(&req, NULL); - msq->nwait++; - } + if (MSQF_ISSET(LOOP)) { + LOCK(msq->m); + while (msq->nsent > msq->nrecv) + WAIT(msq->c, msq->m); + UNLOCK(msq->m); } + + int nb = (rc == 0 ? count : 0); +SPEW((stderr, "<--\t%s(0x%x,%p[%lu],%u) rc %ld\t\t\t\"%.*s\"\n", "mq_send", msq->qid, buf, (unsigned long)count, prio, (long)rc, nb, buf)); + + if (rc == 0) /* XXX remap to write(2) return */ + rc = count; #endif /* WITH_MQ */ } break; case RPMMSQ_TYPE_SYSV: @@ -330,7 +370,7 @@ memcpy(msgp->mtext, buf, count); rc = msgsnd(msq->qid, msgp, msgsz, msgflg); int nb = (rc == 0 ? count : 0); -SPEW((stderr, "<-- %s(0x%x,%p,%lu,%d) rc %ld\t\"%.*s\"\n", "msqsnd", msq->qid, msgp, (unsigned long)msgsz, msgflg, (long)rc, nb, buf)); +SPEW((stderr, "<--\t%s(0x%x,%p,%lu,%d) rc %ld\t\t\"%.*s\"\n", "msqsnd", msq->qid, msgp, (unsigned long)msgsz, msgflg, (long)rc, nb, buf)); msgp = _free(msgp); if (rc == 0) { /* XXX remap to write(2) return */ rc = count; @@ -342,13 +382,15 @@ break; } +SPEW((stderr, "<== %s(%p,%p[%lu],%u) qid %d rc %ld\n", __FUNCTION__, msq, buf, count, prio, (msq ? msq->qid : -1), rc)); return rc; } static int rpmmsqSeek(rpmmsq msq, _libio_pos_t pos, int whence) { - ssize_t rc = -2; /* assume failure */ + int rc = -2; /* assume failure */ +SPEW((stderr, "<== %s(%p,%p,%d) qid %d rc %d\n", __FUNCTION__, msq, pos, whence, (msq ? msq->qid : -1), rc)); return rc; } @@ -356,35 +398,41 @@ { int rc = -2; /* assume failure */ + if (msq) /* XXX WTF? */ switch (msq->flags & RPMMSQ_TYPE_MASK) { case RPMMSQ_TYPE_DEFAULT: case RPMMSQ_TYPE_POSIX: { #if defined(WITH_MQ) + int prio = RPMMSQ_PRIO_EXIT; + + rc = mq_send(msq->qid, __FUNCTION__, sizeof(__FUNCTION__)-1, prio); + msq->nsent++; + + /* (loopback mode) Terminate rpmmsqReader. */ if (MSQF_ISSET(LOOP)) { - /* (loopback mode) Terminate rpmmsqReader. */ - if (msq->tid) { - rc = rpmmsqSend(msq, __FUNCTION__, sizeof(__FUNCTION__)-1, RPMMSQ_PRIO_EXIT); - struct timespec req = { 0, 1000*1000 }; - do { - nanosleep(&req, NULL); - msq->nwait++; - } while (msq->tid != 0); - } - /* (loopback mode) Turn off the sigev detached thread. */ - rc = rpmmsqNotify(msq, NULL); + LOCK(msq->m); + while (msq->nsent > msq->nrecv) + WAIT(msq->c, msq->m); + UNLOCK(msq->m); } -SPEW((stderr, "%s: sent %d recv %d wait %d timeout %d\n", __FUNCTION__, msq->nsent, msq->nrecv, msq->nwait, msq->ntimeout)); +SPEW((stderr, "<--\t%s(0x%x,\"%s\",%u) rc %ld\n", "mq_send", msq->qid, __FUNCTION__, prio, (long)rc)); - if (_rpmmsq_debug) + /* (loopback mode) Turn off the sigev detached thread. */ + rc = rpmmsqNotify(msq, NULL); + +SPEW((stderr, "%s: sent %d recv %d wait %d timeout %d again %d\n", __FUNCTION__, msq->nsent, msq->nrecv, msq->nwait, msq->ntimeout, msq->nagain)); + + if (_rpmmsq_debug || MSQF_ISSET(INFO)) rpmmsqDump(__FUNCTION__, msq, NULL); rc = mq_close(msq->qid); -SPEW((stderr, "<-- %s(0x%x) rc %d\n", "mq_close", msq->qid, rc)); +SPEW((stderr, "<--\t%s(0x%x) rc %d\n", "mq_close", msq->qid, rc)); if (!rc && (delete || MSQF_ISSET(DELETE))) { rc = mq_unlink(msq->qname); /* XXX rpmmsqReset? */ -SPEW((stderr, "<-- %s(%s) rc %d (%s)\n", "mq_unlink", msq->qname, rc, __FUNCTION__)); +SPEW((stderr, "<--\t%s(%s) rc %d (%s)\n", "mq_unlink", msq->qname, rc, __FUNCTION__)); } + rpmmsqFini(msq); rc = 0; /* XXX */ #endif /* WITH_MQ */ } break; @@ -392,37 +440,49 @@ case RPMMSQ_TYPE_SYSV: { #if defined(WITH_MSQ) - if (_rpmmsq_debug) + if (_rpmmsq_debug || MSQF_ISSET(INFO)) rpmmsqDump(__FUNCTION__, msq, NULL); if (delete || MSQF_ISSET(DELETE)) { int cmd = IPC_RMID; rc = msgctl(msq->qid, cmd, NULL); -SPEW((stderr, "<-- %s(0x%x,0%o,%p) rc %d (%s)\n", "msqctl", msq->qid, cmd, NULL, rc, __FUNCTION__)); +SPEW((stderr, "<--\t%s(0x%x,%d,%p) rc %d (%s)\n", "msqctl", msq->qid, cmd, NULL, rc, __FUNCTION__)); } + rpmmsqFini(msq); rc = 0; /* XXX */ #endif /* WITH_MSQ */ } break; default: break; } +SPEW((stderr, "<== %s(%p,%d) qid %d rc %d\n", __FUNCTION__, msq, delete, (msq ? msq->qid : -1), rc)); return rc; } static rpmmsq rpmmsqOpen(const char * path, const char * fmode) { - return rpmmsqNew(path, fmode, -1, 0); + rpmmsq msq = NULL; +fprintf(stderr, "==> %s(%s,%s)\n", __FUNCTION__, path, fmode); + msq = rpmmsqNew(path, fmode, -1, 0); +SPEW((stderr, "<== %s(%s,%s) qid %d\n", __FUNCTION__, path, fmode, (msq ? msq->qid : -1))); + return msq; } static rpmmsq rpmmsqFdopen(void * _fdno, const char * fmode) { + rpmmsq msq = NULL; int fdno = (int)((long)_fdno); /* XXX hack */ - return rpmmsqNew(NULL, fmode, fdno, 0); +fprintf(stderr, "==> %s(%d,%s)\n", __FUNCTION__, fdno, fmode); + msq = rpmmsqNew(NULL, fmode, fdno, 0); +SPEW((stderr, "<== %s(%d,%s) qid %d\n", __FUNCTION__, fdno, fmode, (msq ? msq->qid : -1))); + return msq; } static int rpmmsqFlush(void * _msq) { - ssize_t rc = -2; /* assume failure */ + int rc = -2; /* assume failure */ + rpmmsq msq = (rpmmsq) _msq; /* XXX TODO: drain the message queue? */ +SPEW((stderr, "<== %s(%p) qid %d rc %d\n", __FUNCTION__, _msq, (msq ? msq->qid : -1), rc)); return rc; } @@ -464,13 +524,14 @@ rc = msgctl(msq->qid, cmd, buf); break; } -SPEW((stderr, "<-- %s(0x%x,%d,%p) rc %d\n", "msqctl", (unsigned)msq->qid, cmd, buf, rc)); +SPEW((stderr, "<--\t%s(0x%x,%d,%p) rc %d\n", "msqctl", (unsigned)msq->qid, cmd, buf, rc)); #endif /* WITH_MSQ */ } break; default: break; } +SPEW((stderr, "<== %s(%p,%d,%p) qid %d rc %d\n", __FUNCTION__, msq, cmd, buf, (msq ? msq->qid : -1), rc)); return rc; } @@ -485,7 +546,7 @@ { #if defined(WITH_MQ) rc = mq_notify(msq->qid, _sevp); -SPEW((stderr, "<-- %s(0x%x,%p) rc %d\n", "mq_notify", msq->qid, _sevp, rc)); +SPEW((stderr, "<--\t%s(0x%x,%p) rc %d\n", "mq_notify", msq->qid, _sevp, rc)); #endif /* WITH_MQ */ } break; @@ -496,6 +557,7 @@ break; } +SPEW((stderr, "<== %s(%p,%p) qid %d rc %d\n", __FUNCTION__, msq, _sevp, (msq ? msq->qid : -1), rc)); return rc; } @@ -504,37 +566,62 @@ #if defined(WITH_MQ) rpmmsq msq = (rpmmsq) sv.sival_ptr; -assert(msq); - msq = rpmmsqLink(msq); - msq->tid = pthread_self(); +assert(msq = rpmmsqLink(msq)); SPEW((stderr, "==> %s(%p) qid %d tid %ld\n", __FUNCTION__, msq, msq->qid, msq->tid)); char b[BUFSIZ]; size_t nb = sizeof(b); - struct timespec ts = { 0, 0 }; int rc; while (1) { - int lrto_secs = 2; /* XXX loopback retry timeout seconds */ unsigned prio = 0; - ts.tv_sec = time(NULL) + lrto_secs; - int rc = mq_timedreceive(msq->qid, b, nb, &prio, &ts); - int nr = (rc >= 0 ? rc : 0); -SPEW((stderr, "<-- %s(0x%x,%p[%lu]) rc %d prio %u\t\"%.*s\"\n", "mq_timedreceive", msq->qid, b, (unsigned long)nb, rc, prio, nr, b)); - if (rc < 0 && errno == ETIMEDOUT) { - msq->ntimeout; - continue; + int rc; + + while (1) { + + /* Yield the CPU. */ + if (msq->oflags & O_NONBLOCK) { + struct timespec ts = { 0, 100 }; + Z(nanosleep(&ts, NULL)); + } + + rc = mq_receive(msq->qid, b, nb, &prio); + if (rc < 0) + switch (errno) { + case ETIMEDOUT: + msq->ntimeout++; + continue; + case EAGAIN: /* XXX O_NONBLOCK */ + msq->nagain++; + continue; + } + break; } + + LOCK(msq->m); msq->nrecv++; - /* Exit immediately on error or highest priority message. */ - if (rc < 0 || !strcmp(b, "XXX") || prio == RPMMSQ_PRIO_EXIT) + SIGNAL(msq->c); + UNLOCK(msq->m); + + int nr = (rc >= 0 ? rc : 0); +SPEW((stderr, "<--\t%s(0x%x,%p[%lu]) rc %d prio %u\t\"%.*s\"\n", "mq_receive", msq->qid, b, (unsigned long)nb, rc, prio, nr, b)); + + /* Exit immediately on error. */ + if (rc < 0) { + fprintf(stderr, "*** %s: error exit(%d): %m\n", __FUNCTION__, errno); + break; + } + + /* Exit on highest priority message. */ + if (prio == RPMMSQ_PRIO_EXIT) break; } + SPEW((stderr, "<== %s(%p) rc %d\n", __FUNCTION__, msq, rc)); msq->tid = 0; - msq = rpmmsqFree(msq); + #endif /* WITH_MQ */ } @@ -585,12 +672,12 @@ fprintf(fp, "\t rtime: %lu\n", ds->msg_rtime); fprintf(fp, "\t ctime: %lu\n", ds->msg_ctime); #if defined(__linux__) - fprintf(fp, "\t cbytes: %lu\n", ds->__msg_cbytes); + fprintf(fp, "\t cbytes: %lu b in queue\n", ds->__msg_cbytes); #endif - fprintf(fp, "\t qnum: %lu\n", ds->msg_qnum); + fprintf(fp, "\t qnum: %lu msgs in queue\n", ds->msg_qnum); fprintf(fp, "\t qbytes: %lu\n", ds->msg_qbytes); - fprintf(fp, "\t lspid: %d\n", ds->msg_lspid); - fprintf(fp, "\t lrpid: %d\n", ds->msg_lrpid); + fprintf(fp, "\t lspid: %d pid of last send\n", ds->msg_lspid); + fprintf(fp, "\t lrpid: %d pid of last recv\n", ds->msg_lrpid); } #endif /* WITH_MSQ */ } @@ -603,14 +690,14 @@ #if defined(WITH_MSQ) && defined(IPC_INFO) struct msginfo * mi = (struct msginfo *) _ptr;; if (mi) { - fprintf(fp, "\tmsgpool: %7d Kb\n", mi->msgpool); - fprintf(fp, "\t msgmap: %7d\n", mi->msgmap); - fprintf(fp, "\t msgmax: %7d\n", mi->msgmax); - fprintf(fp, "\t msgmnb: %7d b\n", mi->msgmnb); - fprintf(fp, "\t msgmni: %7d\n", mi->msgmni); - fprintf(fp, "\t msgssz: %7d\n", mi->msgssz); - fprintf(fp, "\t msgtql: %7d\n", mi->msgtql); - fprintf(fp, "\t msgseg: %7d\n", mi->msgseg); + fprintf(fp, "\tmsgpool: %7d Kib buffer pool\n", mi->msgpool); + fprintf(fp, "\t msgmap: %7d entries in msg map\n", mi->msgmap); + fprintf(fp, "\t msgmax: %7d b max msg size\n", mi->msgmax); + fprintf(fp, "\t msgmnb: %7d b max in queue\n", mi->msgmnb); + fprintf(fp, "\t msgmni: %7d max no. of queues\n", mi->msgmni); + fprintf(fp, "\t msgssz: %7d msg segment size\n", mi->msgssz); + fprintf(fp, "\t msgtql: %7d max msgs in system\n", mi->msgtql); + fprintf(fp, "\t msgseg: %7d max no. of segments\n", mi->msgseg); } #endif /* WITH_MSQ */ } @@ -674,6 +761,7 @@ break; } +SPEW((stderr, "<== %s(%p,%p) qid %d rc %d\n", __FUNCTION__, msg, msq, (msq ? msq->qid : -1), rc)); return rc; } @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmsq.h ============================================================================ $ cvs diff -u -r1.1.2.7 -r1.1.2.8 rpmmsq.h --- rpm/rpmio/rpmmsq.h 23 May 2017 19:36:46 -0000 1.1.2.7 +++ rpm/rpmio/rpmmsq.h 24 May 2017 06:03:18 -0000 1.1.2.8 @@ -47,8 +47,11 @@ int omode; /*!< message queue open mode. */ key_t key; /*!< SysV: ftok(3) or IPC_PRIVATE. */ long mtype; /*!< SysV: message type. */ - volatile pthread_t tid; /*!< LOOP: sigev thread id. */ + pthread_t tid; /*!< LOOP: sigev thread id. */ + pthread_cond_t c; + pthread_mutex_t m; int ntimeout; /*!< LOOP: no. of receive timeouts. */ + int nagain; /*!< LOOP: no. of waits (O_NONBLOCK). */ int nwait; /*!< LOOP: no. of waits for thread change. */ int nsent; /*!< no. messages sent. */ int nrecv; /*!< no. messages received. */ @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/tmq.c ============================================================================ $ cvs diff -u -r1.1.2.7 -r1.1.2.8 tmq.c --- rpm/rpmio/tmq.c 23 May 2017 19:36:46 -0000 1.1.2.7 +++ rpm/rpmio/tmq.c 24 May 2017 06:03:18 -0000 1.1.2.8 @@ -366,7 +366,7 @@ int xx; const char *qname = "/rpm";; - rpmmsq msq = rpmmsqNew(qname, "w+l?", -1, flags); + rpmmsq msq = rpmmsqNew(qname, "w+S?", -1, flags); memset(b, 0, nb); if (msq) { @@ -383,19 +383,56 @@ xx = rpmmsqRecv(msq, b, nb, NULL); memset(b, 0, nb); xx = rpmmsqRecv(msq, b, nb, NULL); + + for (int i = 0; i < 1000; i++) { + snprintf(b, nb, "%d", i); + blen = strlen(b); + xx = rpmmsqSend(msq, b, blen, priority); + memset(b, 0, nb); + xx = rpmmsqRecv(msq, b, nb, NULL); + } } xx = rpmmsqClose(msq, 0); msq = rpmmsqFree(msq); - /* XXX mq_notify memory leak if loopback mode. */ - FD_t fd = Fopen(qname, "w+?.msqio"); + FD_t fd = Fopen(qname, "w+lP?.msqio"); if (fd) { strcpy(b, "foo bar baz"); blen = strlen(b); xx = Fwrite(b, 1, blen, fd); memset(b, 0, blen); xx = Fread(b, 1, nb, fd); + strcpy(b, "blah blah blah"); + blen = strlen(b); + xx = Fwrite(b, 1, blen, fd); + memset(b, 0, blen); + xx = Fread(b, 1, nb, fd); + + for (int i = 0; i < 1000; i++) { + snprintf(b, nb, "%d", i); + blen = strlen(b); + xx = Fwrite(b, 1, blen, fd); + } + +#ifdef DISABLE + xx = Fflush(fd); + xx = Fileno(fd); +if (_rpmio_debug) +fprintf(stderr, "<== Fileno(%p) rc %d\n", fd, xx); + xx = Ferror(fd); +#ifdef NOTYET + xx = Feof(fd); + xx = Clearerr(fd); +#endif + xx = Fseek(fd, 0, SEEK_CUR); + xx = Ftell(fd); + Rewind(fd); + fpos_t pos = {}; + xx = Fgetpos(fd, &pos); + xx = Fsetpos(fd, &pos); +#endif + xx = Fclose(fd); } else perror("Fopen"); @@ -417,7 +454,7 @@ { "delete", 'D', POPT_BIT_XOR|POPT_ARGFLAG_TOGGLE, &flags, _MSQBIT(DELETE), N_("remove queue after closing"), NULL }, - { "info", 'i', POPT_BIT_XOR|POPT_ARGFLAG_TOGGLE, &flags, _MSQBIT(INFO), + { "info", 'I', POPT_BIT_XOR|POPT_ARGFLAG_TOGGLE, &flags, _MSQBIT(INFO), N_("display queue info on close"), NULL }, { "loop", 'l', POPT_BIT_XOR|POPT_ARGFLAG_TOGGLE, &flags, _MSQBIT(LOOP), N_("toggle mq_notify running"), NULL }, @@ . ______________________________________________________________________ RPM Package Manager http://rpm5.org CVS Sources Repository rpm-cvs@rpm5.org