RPM Package Manager, CVS Repository http://rpm5.org/cvs/ ____________________________________________________________________________
Server: rpm5.org Name: Jeff Johnson Root: /v/rpm/cvs Email: j...@rpm5.org Module: rpm Date: 05-Jul-2016 17:58:33 Branch: rpm-5_4 Handle: 2016070515583300 Modified files: (Branch: rpm-5_4) rpm/rpmio rpmmqtt.c Log: - mqtt: sketch in "sub" vs "pub" handling. */ Summary: Revision Changes Path 1.1.2.15 +54 -29 rpm/rpmio/rpmmqtt.c ____________________________________________________________________________ patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmqtt.c ============================================================================ $ cvs diff -u -r1.1.2.14 -r1.1.2.15 rpmmqtt.c --- rpm/rpmio/rpmmqtt.c 5 Jul 2016 14:49:11 -0000 1.1.2.14 +++ rpm/rpmio/rpmmqtt.c 5 Jul 2016 15:58:33 -0000 1.1.2.15 @@ -1595,20 +1595,19 @@ return rc; } -/* XXX add subtopics arg? */ -static rpmRC rpmmqttInitSubscribe(rpmmqtt mqtt) +static rpmRC rpmmqttInitSubscribe(rpmmqtt mqtt, const char ** topics) { rpmRC rc = RPMRC_FAIL; /* assume failure */ int xx; - if (mqtt->subtopics) { - int nsubs = argvCount((ARGV_t)mqtt->subtopics); + if (topics) { + int nsubs = argvCount((ARGV_t)topics); #ifdef NOTYET /* XXX MQTT segfault if done here. */ -argvPrint("mqtt->subtopics", (ARGV_t)mqtt->subtopics, NULL); - xx = rpmmqttSubscribeMany(mqtt, nsubs, (char **)mqtt->subtopics); +argvPrint(__FUNCTION__, (ARGV_t)topics, NULL); + xx = rpmmqttSubscribeMany(mqtt, nsubs, topics); #else for (int i = 0; i < nsubs; i++) { - xx = rpmmqttSub(mqtt, mqtt->subtopics[i], 0); + xx = rpmmqttSub(mqtt, topics[i], 0); } #endif } @@ -1616,10 +1615,9 @@ return rc; } -/* XXX add topics arg? */ -static rpmRC rpmmqttInitPublish(rpmmqtt mqtt) +static rpmRC rpmmqttInitPublish(rpmmqtt mqtt, const char ** topics) { - int ntopics = argvCount((ARGV_t)mqtt->topics); + int ntopics = argvCount((ARGV_t)topics); int nmsgs = argvCount((ARGV_t)mqtt->msgs); int nifn = argvCount(mqtt->ifn); const char * ifn = NULL; @@ -1644,7 +1642,7 @@ xx = rpmmqttPub(mqtt, topic, s, ns); else for (int j = 0; j < ntopics; j++) { - topic = mqtt->topics[j]; + topic = topics[j]; xx = rpmmqttPub(mqtt, topic, s, ns); } } @@ -1662,7 +1660,7 @@ xx = rpmmqttPub(mqtt, topic, s, ns); else for (int j = 0; j < ntopics; j++) { - topic = mqtt->topics[j]; + topic = topics[j]; xx = rpmmqttPub(mqtt, topic, s, ns); } } @@ -1685,7 +1683,7 @@ xx = rpmmqttPub(mqtt, topic, s, ns); else for (int k = 0; j < ntopics; j++) { - topic = mqtt->topics[k]; + topic = topics[k]; xx = rpmmqttPub(mqtt, topic, s, ns); } } @@ -1891,25 +1889,52 @@ onDeliveryComplete)); } - /* Subscribe to initial topics (if any). */ -#ifdef DYING -argvPrint("mqtt->subtopics", (ARGV_t)mqtt->subtopics, NULL); - if (mqtt->subtopics) { - int nsubs = argvCount((ARGV_t)mqtt->subtopics); -#ifdef NOTYET /* XXX MQTT segfault if done here. */ - xx = rpmmqttSubscribeMany(mqtt, nsubs, (char **)mqtt->subtopics); -#else - for (int i = 0; i < nsubs; i++) { - xx = rpmmqttSub(mqtt, mqtt->subtopics[i], 0); + /* XXX If any topic has a wild card, then switch to "sub" mode. */ + { + const char * topic; + int bingo = 0; + + topic = mqtt->topic; + if (strstr(topic, "+") || strstr(topic, "#")) + bingo = 1; + else { + int ntopics = argvCount((ARGV_t)mqtt->topics); + for (int i = 0; i < ntopics; i++) { + topic = mqtt->topics[i]; + if (!(strstr(topic, "+") || strstr(topic, "#"))) + continue; + bingo = 1; + break; + } + } + if (bingo) { + mqtt->_progmode = _free(mqtt->_progmode); + mqtt->_progmode = xstrdup("sub"); } -#endif } -#else - xx = rpmmqttInitSubscribe(mqtt); -#endif - /* Publish any initial messages (if any). */ - xx = rpmmqttInitPublish(mqtt); + /* Execute in either "sub" or "pub" mode. */ + if (!strcmp(mqtt->_progmode, "sub")) { + /* Subscribe to topics (if any). */ + if (mqtt->topic) + xx = rpmmqttSub(mqtt, mqtt->topic, strlen(mqtt->topic)); + xx = rpmmqttInitSubscribe(mqtt, mqtt->subtopics); + xx = rpmmqttInitSubscribe(mqtt, mqtt->topics); + + /* Wait for messages ... */ + while (mqtt->max_msg_count <= 0 + || mqtt->msg_count < mqtt->max_msg_count) + { + sleep(1); + } + } else { + /* Subscribe to initial topics (if any). */ + xx = rpmmqttInitSubscribe(mqtt, mqtt->subtopics); + + /* Publish any initial messages (if any). */ + xx = rpmmqttInitPublish(mqtt, mqtt->topics); + } + } #endif /* WITH_MQTT */ @@ . ______________________________________________________________________ RPM Package Manager http://rpm5.org CVS Sources Repository rpm-cvs@rpm5.org