RPM Package Manager, CVS Repository http://rpm5.org/cvs/ ____________________________________________________________________________
Server: rpm5.org Name: Jeff Johnson Root: /v/rpm/cvs Email: j...@rpm5.org Module: rpm Date: 05-Jul-2016 16:49:11 Branch: rpm-5_4 Handle: 2016070514491100 Modified files: (Branch: rpm-5_4) rpm/rpmio rpmmqtt.c rpmmqtt.h tmqtt.c Log: - mqtt: WIP. Summary: Revision Changes Path 1.1.2.14 +394 -191 rpm/rpmio/rpmmqtt.c 1.1.2.13 +23 -21 rpm/rpmio/rpmmqtt.h 1.1.2.10 +15 -7 rpm/rpmio/tmqtt.c ____________________________________________________________________________ patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmqtt.c ============================================================================ $ cvs diff -u -r1.1.2.13 -r1.1.2.14 rpmmqtt.c --- rpm/rpmio/rpmmqtt.c 4 Jul 2016 07:45:23 -0000 1.1.2.13 +++ rpm/rpmio/rpmmqtt.c 5 Jul 2016 14:49:11 -0000 1.1.2.14 @@ -7,9 +7,9 @@ #include <stdlib.h> #include <string.h> -#define _RPMIOB_INTERNAL -#include <rpmiotypes.h> /* for rpmiobSlurp */ -#include <rpmio.h> /* for *Pool methods */ +#define _RPMIOB_INTERNAL /* for rpmiobSlurp */ +#include <rpmiotypes.h> +#include <rpmio.h> /* for *Pool methods */ #include <rpmlog.h> #include <rpmcb.h> #include <rpmdir.h> @@ -27,7 +27,7 @@ #include "debug.h" -int _rpmmqtt_debug; +int _rpmmqtt_debug = 1; #define SPEW(_list) if (mqtt->debug || _rpmmqtt_debug < 0) fprintf _list #define F_ISSET(_f, _FLAG) (((_f) & ((MQTT_FLAGS_##_FLAG) & ~0x40000000)) != MQTT_FLAGS_NONE) @@ -67,6 +67,141 @@ } #endif +/*==============================================================*/ +typedef struct key_s { + int v; + const char *n; +} KEY; + +static const char * tblName(uint32_t v, KEY * tbl, size_t ntbl) +{ + const char * n = NULL; + static char buf[32]; + size_t i; + + for (i = 0; i < ntbl; i++) { +uint32_t tbl_v = tbl[i].v & 0x3fffffff; + if (v != tbl_v) + continue; + n = tbl[i].n; + break; + } + if (n == NULL) { + (void) snprintf(buf, sizeof(buf), "0x%x", (unsigned)v); + n = buf; + } + return n; +} + +static const char * fmtBits(uint32_t flags, KEY tbl[], size_t ntbl, char *t) +{ + char pre = '<'; + char * te = t; + int i; + +flags &= 0x3fffffff; + sprintf(t, "0x%x", (unsigned)flags); + te = t; + te += strlen(te); + for (i = 0; i < 32; i++) { + uint32_t mask = (1 << i); + const char * name; + + if (!(flags & mask)) + continue; + + name = tblName(mask, tbl, ntbl); + *te++ = pre; + pre = ','; + te = stpcpy(te, name); + } + if (pre == ',') *te++ = '>'; + *te = '\0'; + return t; +} + +#define _ENTRY(_v) { MQTT_FLAGS_##_v, #_v, } +static KEY MqttFlags[] = { + _ENTRY(CLEAN), + _ENTRY(EOL), + _ENTRY(NOSTALE), + _ENTRY(LINES), + _ENTRY(EMPTY), + _ENTRY(DNSSRV), + _ENTRY(INSECURE), + _ENTRY(RETAIN), + _ENTRY(WILL_RETAIN), + _ENTRY(BUFFER), +}; +#undef _ENTRY +static size_t nMqttFlags = sizeof(MqttFlags) / sizeof(MqttFlags[0]); + +static const char * fmtMqttFlags(uint32_t flags) +{ + static char buf[BUFSIZ]; + char * te = buf; + (void) fmtBits(flags, MqttFlags, nMqttFlags, te); + return buf; +} +#define _MQTTFLAGS(_flags) fmtMqttFlags(_flags) + +#define _ENTRY(_v) { MQTTASYNC_##_v, #_v, } +static KEY rpmmqtt_errs[] = { +#ifdef WITH_MQTT + _ENTRY(SUCCESS), + _ENTRY(FAILURE), + _ENTRY(PERSISTENCE_ERROR), + _ENTRY(DISCONNECTED), + _ENTRY(MAX_MESSAGES_INFLIGHT), + _ENTRY(BAD_UTF8_STRING), + _ENTRY(NULL_PARAMETER), + _ENTRY(TOPICNAME_TRUNCATED), + _ENTRY(BAD_STRUCTURE), + _ENTRY(BAD_QOS), + _ENTRY(NO_MORE_MSGIDS), + _ENTRY(OPERATION_INCOMPLETE), + _ENTRY(MAX_BUFFERED_MESSAGES), +#else + { 0, NULL }, +#endif +}; +#undef _ENTRY +static size_t rpmmqtt_nerrs = sizeof(rpmmqtt_errs) / sizeof(rpmmqtt_errs[0]); + +static const char * rpmmqttStrerror(int v) +{ + KEY * tbl = rpmmqtt_errs; + size_t ntbl = rpmmqtt_nerrs; + const char * n = NULL; + static char buf[64]; + + for (size_t i = 0; i < ntbl; i++) { + if (v != tbl[i].v) + continue; + n = tbl[i].n; + break; + } + if (n == NULL) { + (void) snprintf(buf, sizeof(buf), "0x%x", v); + n = buf; + } + return n; +} + +static int Xcheck(rpmmqtt mqtt, const char * msg, int rc, + int printit, const char * func, const char * fn, unsigned ln) +{ + + if (rc != 0) { /* MQTTCLIENT_SUCCESS */ + int _lvl = RPMLOG_WARNING; + rpmlog(_lvl, "%s:%s:%u: MQTTAsync_%s: %s(%d)\n", + func, fn, ln, msg, rpmmqttStrerror(rc), rc); + } + return rc; +} +#define check(_o, _m, _rc) \ + Xcheck(_o, _m, _rc, _rpmmqtt_debug, __FUNCTION__, __FILE__, __LINE__) + static void dumpMQTT(const char * msg, rpmmqtt mqtt) { FILE * fp = stderr; @@ -76,9 +211,9 @@ msg, mqtt); if (mqtt) { -#define PRINT(_fmt, _foo) fprintf(fp, "%19s: %"#_fmt"\n", #_foo, mqtt->_foo) if (mqtt->flags != 0x40000003) - PRINT(x, flags); + fprintf(fp, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags)); +#define PRINT(_fmt, _foo) fprintf(fp, "%19s: %"#_fmt"\n", #_foo, mqtt->_foo) if (mqtt->msg_input != 0) PRINT(d, msg_input); if (mqtt->msg_output != 1) @@ -86,6 +221,11 @@ if (mqtt->av) argvPrint("mqtt->av", (ARGV_t)mqtt->av, fp); + if (mqtt->_progname) + PRINT(s, _progname); + if (mqtt->_progmode) + PRINT(s, _progmode); + if (mqtt->_address) PRINT(s, _address); if (mqtt->msgs) @@ -148,8 +288,10 @@ if (mqtt->timeout != 10000) PRINT(u, timeout); - if (mqtt->_topics) - argvPrint("mqtt->_topics", (ARGV_t)mqtt->_topics, fp); + if (mqtt->topics) + argvPrint("mqtt->topics", (ARGV_t)mqtt->topics, fp); + if (mqtt->subtopics) + argvPrint("mqtt->subtopics", (ARGV_t)mqtt->subtopics, fp); if (mqtt->_filter_out) argvPrint("mqtt->_filter_out", (ARGV_t)mqtt->_filter_out, fp); if (mqtt->topic == NULL || strcmp(mqtt->topic, "rpm/mqtt")) @@ -208,69 +350,6 @@ } /*==============================================================*/ -typedef struct key_s { - int v; - const char *n; -} KEY; - -#define _ENTRY(_v) { MQTTASYNC_##_v, #_v, } -static KEY rpmmqtt_errs[] = { -#ifdef WITH_MQTT - _ENTRY(SUCCESS), - _ENTRY(FAILURE), - _ENTRY(PERSISTENCE_ERROR), - _ENTRY(DISCONNECTED), - _ENTRY(MAX_MESSAGES_INFLIGHT), - _ENTRY(BAD_UTF8_STRING), - _ENTRY(NULL_PARAMETER), - _ENTRY(TOPICNAME_TRUNCATED), - _ENTRY(BAD_STRUCTURE), - _ENTRY(BAD_QOS), - _ENTRY(NO_MORE_MSGIDS), - _ENTRY(OPERATION_INCOMPLETE), - _ENTRY(MAX_BUFFERED_MESSAGES), -#else - { 0, NULL }, -#endif -}; -static size_t rpmmqtt_nerrs = sizeof(rpmmqtt_errs) / sizeof(rpmmqtt_errs[0]); -#undef _ENTRY - -static const char * rpmmqttStrerror(int v) -{ - KEY * tbl = rpmmqtt_errs; - size_t ntbl = rpmmqtt_nerrs; - const char * n = NULL; - static char buf[64]; - - for (size_t i = 0; i < ntbl; i++) { - if (v != tbl[i].v) - continue; - n = tbl[i].n; - break; - } - if (n == NULL) { - (void) snprintf(buf, sizeof(buf), "0x%x", v); - n = buf; - } - return n; -} - -static int Xcheck(rpmmqtt mqtt, const char * msg, int rc, - int printit, const char * func, const char * fn, unsigned ln) -{ - - if (rc != 0) { /* MQTTCLIENT_SUCCESS */ - int _lvl = RPMLOG_WARNING; - rpmlog(_lvl, "%s:%s:%u: MQTTAsync_%s: %s(%d)\n", - func, fn, ln, msg, rpmmqttStrerror(rc), rc); - } - return rc; -} -#define check(_o, _m, _rc) \ - Xcheck(_o, _m, _rc, _rpmmqtt_debug, __FUNCTION__, __FILE__, __LINE__) - -/*==============================================================*/ #ifdef WITH_MQTT static int onMessageArrived(void * _mqtt, char * topic, int topicLen, MQTTAsync_message * message) @@ -283,7 +362,7 @@ const char * s = message->payload; size_t ns = message->payloadlen; if (_rpmmqtt_debug < 0) - rpmlog(RPMLOG_DEBUG, "MQTT rcvd topic(%s) \"%.*s\"\n", topic, ns, s); + rpmlog(RPMLOG_DEBUG, "MQTT rcvd topic(%s) \"%.*s\"\n", topic, (int)ns, s); if (mqtt->iob) { mqtt->iob = rpmiobAppend(mqtt->iob, topic, 0); @@ -400,7 +479,7 @@ const char *s = response->message; int token = response->token; int code = response->code; - rpmlog(RPMLOG_WARNING, "MQTT subscribe failed: code(%d) msg %s\n", + rpmlog(RPMLOG_WARNING, "MQTT subscribe(%d) failed: code(%d) msg %s\n", token, code, s); } else rpmlog(RPMLOG_WARNING, "MQTT subscribe failed\n"); @@ -453,7 +532,7 @@ size_t ns = response->alt.pub.message.payloadlen; int token = response->token; rpmlog(RPMLOG_DEBUG, "MQTT sent(%d) topic(%s) \"%.*s\"\n", - token, mqtt->topic, ns, s); + token, mqtt->topic, (int)ns, s); } mqtt->finished = 1; } @@ -783,9 +862,6 @@ switch (otype) { case 'C': u = mqtt->u; -#ifdef DYING -assert(u); -#endif memcpy(C, &_C, sizeof(*C)); C->keepAliveInterval = mqtt->keepalive; C->cleansession = (MF_ISSET(CLEAN) ? 1 : 0); @@ -822,8 +898,6 @@ C->automaticReconnect = 1; C->minRetryInterval = 1; /* secs */ C->maxRetryInterval = 60; /* secs */ -mqtt->ut = 0; -mqtt->u = NULL; ptr = (void *) C; break; case 'D': @@ -919,11 +993,17 @@ return rc; } -int rpmmqttSendMessage(rpmmqtt mqtt, const char * s, size_t ns) +int rpmmqttSendMessage(rpmmqtt mqtt, const char * topic, + const char * s, size_t ns) { int rc = -1; - if (ns == 0) ns = strlen(s); + if (topic == NULL) + topic = mqtt->topic; + if (s == NULL) + s = ""; + if (ns == 0) + ns = strlen(s); #ifdef WITH_MQTT MQTTAsync_message *M = AOBJ(mqtt, 'M'); @@ -936,7 +1016,7 @@ mqtt->finished = 0; rc = check(mqtt, "sendMessage", - MQTTAsync_sendMessage(mqtt->I, mqtt->topic, M, R)); + MQTTAsync_sendMessage(mqtt->I, topic, M, R)); while (!mqtt->finished) usleep(100); #endif /* WITH_MQTT */ @@ -954,18 +1034,20 @@ goto exit; #ifdef WITH_MQTT int _lvl = RPMLOG_DEBUG; - int *subqos = xcalloc(mqtt->ac, sizeof(*subqos)); + int *subqos = xcalloc(ac, sizeof(*subqos)); for (int i = 0; i < ac; i++) { char * t = av[i]; +#ifdef NOTYET char * te = strchr(t, '?'); - subqos[i] = mqtt->qos; /* XXX */ if (te) { *te++ = '\0'; if ((te = strchr(te, '='))) { if (!strncmp(t, "qos", (te - t))) subqos[i] = strtoul(te+1, NULL, 0); } - } + } else +#endif + subqos[i] = mqtt->qos; /* XXX */ rpmlog(_lvl, "%19s: %s qos(%u)\n", "subtopic", t, subqos[i]); } @@ -985,11 +1067,17 @@ } /*==============================================================*/ -int rpmmqttPub(rpmmqtt mqtt, const char *s, size_t ns) +int rpmmqttPub(rpmmqtt mqtt, const char * topic, const char *s, size_t ns) { int ret = -1; /* assume failure */ - if (ns == 0) ns = strlen(s); + if (topic == NULL) + topic = mqtt->topic; + if (s == NULL) + s = ""; + if (ns == 0) + ns = strlen(s); + #ifdef WITH_MQTT if (rpmmqttConnect(mqtt)) @@ -999,7 +1087,7 @@ char * t = rpmExpand(_mqtt_prefix, " ", s, NULL); size_t nt = strlen(t); - if (!rpmmqttSendMessage(mqtt, t, nt)) + if (!rpmmqttSendMessage(mqtt, topic, t, nt)) ret = nt; t = _free(t); @@ -1123,11 +1211,9 @@ N_("MQTT <QOS> level."), N_("<QOS>") }, { "retain", 'r', POPT_BIT_SET, &mqtt->flags, MQTT_FLAGS_RETAIN, N_("Retain the message on the host."), NULL }, - { NULL, 's', POPT_BIT_SET, &mqtt->flags, MQTT_FLAGS_STDIN, - N_("Send stdin lines as a single message."), NULL }, { NULL, 'S', POPT_BIT_SET, &mqtt->flags, MQTT_FLAGS_DNSSRV, N_("Use SRV record to find remote host."), NULL }, - { "topic", 't', POPT_ARG_ARGV, &mqtt->_topics, 0, + { "topic", 't', POPT_ARG_ARGV, &mqtt->topics, 0, N_("MQTT pub/sub <TOPIC>."), N_("<TOPIC>") }, { "user", 'u', POPT_ARG_STRING, &mqtt->user, 0, N_("Remote <USER>."), N_("<USER>") }, @@ -1183,6 +1269,20 @@ con = poptGetContext(av[0], ac, (const char **)av, rpmmqttOptionsTable, _popt_context_flags); +mqtt->_progname = _free(mqtt->_progname); +mqtt->_progmode = _free(mqtt->_progmode); + { const char *arg0 = poptGetInvocationName(con); + const char * _progname; + const char * _progmode; + if ((_progname = strrchr(arg0, '/')) != NULL) _progname++; + else _progname = arg0; + if (!strncmp(_progname, "lt-", sizeof("lt-")-1)) + _progname += sizeof("lt-")-1; + _progmode = (strstr(_progname, "sub") ? "sub" : "pub"); + mqtt->_progname = xstrdup(_progname); + mqtt->_progmode = xstrdup(_progmode); + } + while ((xx = poptGetNextOpt(con)) > 0) switch (xx) { default: @@ -1214,7 +1314,6 @@ /* XXX MQTT_FLAGS_EOL */ /* XXX MQTT_FLAGS_NOSTALE */ /* XXX MQTT_FLAGS_LINES */ - /* XXX MQTT_FLAGS_STDIN */ /* XXX MQTT_FLAGS_EMPTY */ /* XXX MQTT_FLAGS_DNSSRV */ /* XXX MQTT_FLAGS_INSECURE */ @@ -1242,8 +1341,6 @@ mqtt->clientid = rpmExpand(_mqtt_clientid, NULL); #endif - /* XXX load mqtt->_topics from _mqtt_subtopics */ - return rc; } @@ -1255,10 +1352,17 @@ const char *te; rpmRC rc = RPMRC_OK; - (void) argvSplit(&av, query, ","); + /* https://en.wikipedia.org/wiki/Query_string */ + /* XXX ampersand? semi-colon? comma? whitespace? newlines? */ + (void) argvSplit(&av, query, "&;,"); +#ifdef DYING +argvPrint(__FUNCTION__, av, NULL); +#endif ac = argvCount(av); for (int i = 0; i < ac; i++) { t = av[i]; + if (!(t && *t)) + continue; if ((te = strchr(t, '=')) == NULL) te += strlen(t); if (!strncmp(t, "clean", (te - t))) { @@ -1268,7 +1372,6 @@ /* XXX MQTT_FLAGS_EOL */ /* XXX MQTT_FLAGS_NOSTALE */ /* XXX MQTT_FLAGS_LINES */ - /* XXX MQTT_FLAGS_STDIN */ /* XXX MQTT_FLAGS_EMPTY */ /* XXX MQTT_FLAGS_DNSSRV */ /* XXX MQTT_FLAGS_INSECURE */ @@ -1280,14 +1383,14 @@ if (!strncmp(t, "qos", (te - t))) { mqtt->qos = (te[0] == '=' && xisdigit(te[1])) ? strtoul(te+1, NULL, 0) - : 2; /* XXX default qos=2? */ + : 2; /* XXX default qos=2? */ mqtt->qos %= 3; continue; } if (!strncmp(t, "timeout", (te - t))) { mqtt->timeout = (te[0] == '=' && xisdigit(te[1])) ? strtoul(te+1, NULL, 0) - : 10000; /* XXX default timeout=10000? */ + : 10000;/* XXX default timeout=10000? */ continue; } if (!strncmp(t, "max_inflight", (te - t))) { @@ -1306,9 +1409,42 @@ if (!strncmp(t, "trace", (te - t))) { mqtt->trace = (te[0] == '=' && xisdigit(te[1])) ? strtoul(te+1, NULL, 0) - : 4; /* XXX default trace=4? */ + : 4; /* XXX default trace=4? */ continue; } + /* XXX error msg? */ + rc = RPMRC_FAIL; + } + av = argvFree(av); + + return rc; +} + +static rpmRC rpmmqttInitURIFragment(rpmmqtt mqtt, const char *fragment) +{ + ARGV_t av = NULL; + int ac; + const char *t; + const char *te; + rpmRC rc = RPMRC_OK; + + /* https://en.wikipedia.org/wiki/Fragment_identifier */ + /* XXX GOOG AJAX search (deprecated) */ + if (*fragment == '!') + fragment++; + /* XXX ampersand? semi-colon? comma? whitespace? newlines? */ + (void) argvSplit(&av, fragment, "&;,"); +#ifdef DYING +argvPrint(__FUNCTION__, av, NULL); +#endif + ac = argvCount(av); + for (int i = 0; i < ac; i++) { + t = av[i]; + if (!(t && *t)) + continue; + if ((te = strchr(t, '=')) == NULL) + te += strlen(t); + (void) argvAdd(&mqtt->subtopics, t); } av = argvFree(av); @@ -1323,6 +1459,9 @@ /* -- Set unspecified MQTT options from the URI parameters. */ mqtt->ut = urlSplit(url, &u); mqtt->u = u; +#ifdef DYING +dumpU(__FUNCTION__, u); +#endif if (u->scheme == NULL || !strcmp(u->scheme, "mqtt") @@ -1364,9 +1503,11 @@ } } - if (u->query) { - rc = rpmmqttInitURIQuery(mqtt, u->query); - } + if (u->query && rpmmqttInitURIQuery(mqtt, u->query)) + rc = RPMRC_FAIL; + + if (u->fragment && rpmmqttInitURIFragment(mqtt, u->fragment)) + rc = RPMRC_FAIL; return rc; } @@ -1374,6 +1515,7 @@ static rpmRC rpmmqttInit(rpmmqtt mqtt, int ac, const char ** av, mqttFlags flags) { + const char * uri = NULL; rpmRC rc; SPEW((stderr, "--> %s(%p,%p[%d],0x%x)\n", __FUNCTION__, mqtt, av, ac, flags)); @@ -1396,9 +1538,15 @@ mqtt->_clientid = rpmExpand(mqtt->idprefix, "-%%{pid}", NULL); /* -- Initialize values frpm default URI. */ -static const char _mqtt_default_uri[] = - "mqtt://localhost:1883/?qos=0,timeout=10000,max_inflight=20,keepalive=60"; - rc = rpmmqttInitURI(mqtt, _mqtt_default_uri); + static const char _mqtt_uri[] = + "mqtt://localhost:1883/rpm/mqtt" + "?qos=0;" + "timeout=10000;" + "max_inflight=20;"; + "keepalive=60;"; + uri = rpmExpand("%{?_mqtt_uri}%{!?_mqtt_uri:", _mqtt_uri, "}", NULL); + + rc = rpmmqttInitURI(mqtt, uri); if (rc) goto exit; @@ -1413,11 +1561,8 @@ goto exit; /* -- Set message input/output modes from flags/options. */ - if (MF_ISSET(STDIN)) - (void) argvAdd(&mqtt->ifn, "-"); - if (mqtt->msgs) - mqtt->msg_input = MQTT_INPUT_OPTION; + mqtt->msg_input = MQTT_INPUT_OPTIONS; else if (mqtt->ifn) mqtt->msg_input = MF_ISSET(LINES) ? MQTT_INPUT_FILES_LINES : MQTT_INPUT_FILES; @@ -1433,8 +1578,8 @@ } /* -- XXX Ensure topic/clientid are set. */ - if (mqtt->clientid == NULL) { assert(mqtt->_clientid); + if (mqtt->clientid == NULL) { mqtt->clientid = rpmExpand(mqtt->_clientid, NULL); } if (mqtt->topic == NULL) { @@ -1442,26 +1587,118 @@ mqtt->topic = rpmExpand(mqtt->idprefix, "/mqtt", NULL); } -#ifdef DYING - /* -- XXX Add default argv (if not specified) */ - if (mqtt->ac == 0) { -static const char *_av[] = { - "mqtt://luser:jasnl@localhost:1883/rpm/mqtt", - "rpm/#", - NULL, -}; - (void) argvAppend((ARGV_t *)&mqtt->av, _av); - mqtt->ac = argvCount((ARGV_t)mqtt->av); - } -#endif - rc = RPMRC_OK; exit: + uri = _free(uri); SPEW((stderr, "<-- %s(%p,%p[%d],0x%x) rc %d\n", __FUNCTION__, mqtt, av, ac, flags, rc)); return rc; } +/* XXX add subtopics arg? */ +static rpmRC rpmmqttInitSubscribe(rpmmqtt mqtt) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + int xx; + + if (mqtt->subtopics) { + int nsubs = argvCount((ARGV_t)mqtt->subtopics); +#ifdef NOTYET /* XXX MQTT segfault if done here. */ +argvPrint("mqtt->subtopics", (ARGV_t)mqtt->subtopics, NULL); + xx = rpmmqttSubscribeMany(mqtt, nsubs, (char **)mqtt->subtopics); +#else + for (int i = 0; i < nsubs; i++) { + xx = rpmmqttSub(mqtt, mqtt->subtopics[i], 0); + } +#endif + } + rc = RPMRC_OK; + return rc; +} + +/* XXX add topics arg? */ +static rpmRC rpmmqttInitPublish(rpmmqtt mqtt) +{ + int ntopics = argvCount((ARGV_t)mqtt->topics); + int nmsgs = argvCount((ARGV_t)mqtt->msgs); + int nifn = argvCount(mqtt->ifn); + const char * ifn = NULL; + rpmiob iob = NULL; + const char * topic = NULL; + const char * s = NULL; + int ns = 0; + ARGV_t lav = NULL; + int lac = 0; + rpmRC rc = RPMRC_FAIL; /* assume failure */ + int xx; + + switch (mqtt->msg_input) { + case MQTT_INPUT_UNKNOWN: + default: + break; + case MQTT_INPUT_OPTIONS: + for (int i = 0; i < nmsgs; i++) { + s = mqtt->msgs[i]; + ns = strlen(s); + if (ntopics <= 0) + xx = rpmmqttPub(mqtt, topic, s, ns); + else + for (int j = 0; j < ntopics; j++) { + topic = mqtt->topics[j]; + xx = rpmmqttPub(mqtt, topic, s, ns); + } + } + break; + case MQTT_INPUT_FILES: + for (int i = 0; i < nifn; i++) { + ifn = mqtt->ifn[i]; + if (rpmiobSlurp(ifn, &iob) == 0) { + s = rpmiobStr(iob); + ns = rpmiobLen(iob); + /* XXX trim */ + if (ns == 0) /* XXX skip empty files? */ + continue; + if (ntopics <= 0) + xx = rpmmqttPub(mqtt, topic, s, ns); + else + for (int j = 0; j < ntopics; j++) { + topic = mqtt->topics[j]; + xx = rpmmqttPub(mqtt, topic, s, ns); + } + } + iob = rpmiobFree(iob); + } + break; + case MQTT_INPUT_FILES_LINES: + for (int i = 0; i < nifn; i++) { + ifn = mqtt->ifn[i]; + if (rpmiobSlurp(ifn, &iob) == 0) { + xx = argvSplit(&lav, rpmiobStr(iob), "\n\r"); + lac = argvCount(lav); + for (int j = 0; j < lac; j++) { + s = lav[j]; + ns = strlen(s); + /* XXX trim */ + if (ns == 0) /* XXX skip empty lines? */ + continue; + if (ntopics <= 0) + xx = rpmmqttPub(mqtt, topic, s, ns); + else + for (int k = 0; j < ntopics; j++) { + topic = mqtt->topics[k]; + xx = rpmmqttPub(mqtt, topic, s, ns); + } + } + lav = argvFree(lav); + } + iob = rpmiobFree(iob); + } + break; + } + rc = RPMRC_OK; + return rc; +} + /*==============================================================*/ static void rpmmqttFini(void * _mqtt) { @@ -1474,13 +1711,17 @@ #endif /* WITH_MQTT */ /* ========== */ + mqtt->_progname = _free(mqtt->_progname); + mqtt->_progmode = _free(mqtt->_progmode); + mqtt->_address = _free(mqtt->_address); mqtt->host = _free(mqtt->host); mqtt->_clientid = _free(mqtt->_clientid); mqtt->idprefix = _free(mqtt->idprefix); mqtt->msgs = argvFree((ARGV_t)mqtt->msgs); mqtt->password = _free(mqtt->password); - mqtt->_topics = argvFree((ARGV_t)mqtt->_topics); + mqtt->topics = argvFree((ARGV_t)mqtt->topics); + mqtt->subtopics = argvFree((ARGV_t)mqtt->subtopics); mqtt->_filter_out = argvFree((ARGV_t)mqtt->_filter_out); mqtt->user = _free(mqtt->user); mqtt->protocol_version = _free(mqtt->protocol_version); /* XXX malloc? */ @@ -1526,7 +1767,7 @@ rpmmqtt rpmmqttNew(char ** av, uint32_t flags) { - static const char * _av[] = { (char *) "rpmmqtt", NULL }; + static const char * _av[] = { (char *) "mqtt", NULL }; rpmmqtt mqtt = (flags & 0x80000000) ? rpmmqttI() : rpmmqttGetPool(_rpmmqttPool); @@ -1535,6 +1776,9 @@ /* XXX quick-n-dirty recursion avoidance. */ if (av == NULL) av = (char **) _av; +#ifdef DYING +argvPrint(__FUNCTION__, (ARGV_t)av, NULL); +#endif int ac = argvCount((ARGV_t)av); flags &= ~0x80000000; rpmRC rc = rpmmqttInit(mqtt, ac, (const char **)av, flags); @@ -1545,6 +1789,7 @@ int _lvl = RPMLOG_DEBUG; int xx; +mqtt->trace = 4; if (mqtt->trace && rpmIsDebug()) { xx = check(mqtt, "setTraceCallback", (MQTTAsync_setTraceCallback(onTrace), 0)); @@ -1579,7 +1824,9 @@ rpmlog(_lvl, "%19s: type(%u) %s\n", "persist", mqtt->persist_type, (mqtt->persist_type ? persist_path : "")); - rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace); + rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags)); + if (mqtt->trace) + rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace); mqtt->persist_path = _free(mqtt->persist_path); @@ -1631,83 +1878,39 @@ mqtt->u = NULL; dumpMQTT(__FUNCTION__, mqtt); - if (mqtt->I == NULL) { - xx = check(mqtt, "createWithOptions", + if (mqtt->I == NULL) { + xx = check(mqtt, "createWithOptions", MQTTAsync_createWithOptions(&mqtt->I, mqtt->uri, mqtt->clientid, mqtt->persist_type, mqtt->persist_ctx, AOBJ(mqtt, 'O'))); - xx = check(mqtt, "setCallbacks", + xx = check(mqtt, "setCallbacks", MQTTAsync_setCallbacks(mqtt->I, mqtt, onConnectionLost, onMessageArrived, onDeliveryComplete)); + } - /* Subscribe to channels (if any). */ - /* XXX rework using mqtt->_topics instead */ - if (mqtt->ac > 1) { -#ifdef NOTYET /* XXX segfault here. */ - xx = rpmmqttSubscribeMany(mqtt, mqtt->ac-1, mqtt->av+1); + /* Subscribe to initial topics (if any). */ +#ifdef DYING +argvPrint("mqtt->subtopics", (ARGV_t)mqtt->subtopics, NULL); + if (mqtt->subtopics) { + int nsubs = argvCount((ARGV_t)mqtt->subtopics); +#ifdef NOTYET /* XXX MQTT segfault if done here. */ + xx = rpmmqttSubscribeMany(mqtt, nsubs, (char **)mqtt->subtopics); #else - for (int i = 1; i < mqtt->ac; i++) - xx = rpmmqttSub(mqtt, mqtt->av[i], 0); + for (int i = 0; i < nsubs; i++) { + xx = rpmmqttSub(mqtt, mqtt->subtopics[i], 0); + } #endif } +#else + xx = rpmmqttInitSubscribe(mqtt); +#endif + + /* Publish any initial messages (if any). */ + xx = rpmmqttInitPublish(mqtt); - /* Publish any initial messages. */ - switch (mqtt->msg_input) { - case MQTT_INPUT_UNKNOWN: - default: - break; - case MQTT_INPUT_OPTION: - if (mqtt->msgs) { - int nmsgs = argvCount((ARGV_t)mqtt->msgs); - for (int i = 0; i < nmsgs; i++) { - xx = rpmmqttPub(mqtt, mqtt->msgs[i], 0); - } - } - break; - case MQTT_INPUT_STDIN: - case MQTT_INPUT_FILES: - if (mqtt->ifn) { - int nifn = argvCount(mqtt->ifn); - for (int i = 0; i < nifn; i++) { - const char * ifn = mqtt->ifn[i]; - rpmiob iob = NULL; - if (rpmiobSlurp(ifn, &iob) == 0) { - xx = rpmmqttPub(mqtt, - rpmiobStr(iob), rpmiobLen(iob)); - } - iob = rpmiobFree(iob); - } - } - break; - case MQTT_INPUT_STDIN_LINES: - case MQTT_INPUT_FILES_LINES: - if (mqtt->ifn) { - int nifn = argvCount(mqtt->ifn); - for (int i = 0; i < nifn; i++) { - const char * ifn = mqtt->ifn[i]; - rpmiob iob = NULL; - if (rpmiobSlurp(ifn, &iob) == 0) { - ARGV_t lav = NULL; - int lac; - xx = argvSplit(&lav, rpmiobStr(iob), "\n\r"); - lac = argvCount(lav); - for (int j = 0; j < lac; j++) { - const char * s = lav[j]; - size_t ns = strlen(s); - if (ns > 0) /* XXX skip empty lines? */ - xx = rpmmqttPub(mqtt, s, ns); - } - lav = argvFree(lav); - } - iob = rpmiobFree(iob); - } - } - break; - } - } } #endif /* WITH_MQTT */ @@ -1727,7 +1930,7 @@ #if defined(WITH_MQTT) if (rpmmqttConnect(mqtt)) goto exit; - if (msg != NULL && !rpmmqttSendMessage(mqtt, msg, strlen(msg))) { + if (msg != NULL && !rpmmqttSendMessage(mqtt, NULL, msg, strlen(msg))) { rc = RPMRC_OK; if (resultp) { *resultp = (mqtt->iob ? rpmiobStr(mqtt->iob) : ""); @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmqtt.h ============================================================================ $ cvs diff -u -r1.1.2.12 -r1.1.2.13 rpmmqtt.h --- rpm/rpmio/rpmmqtt.h 4 Jul 2016 07:45:23 -0000 1.1.2.12 +++ rpm/rpmio/rpmmqtt.h 5 Jul 2016 14:49:11 -0000 1.1.2.13 @@ -19,10 +19,9 @@ MQTT_FLAGS_NOSTALE = _MFB( 2), /*!< (sub) -R */ MQTT_FLAGS_LINES = _MFB( 3), /*!< -l */ - MQTT_FLAGS_STDIN = _MFB( 4), /*!< -s */ - MQTT_FLAGS_EMPTY = _MFB( 5), /*!< -n */ - MQTT_FLAGS_DNSSRV = _MFB( 6), /*!< -S */ - MQTT_FLAGS_INSECURE = _MFB( 7), /*!< --insecure */ + MQTT_FLAGS_EMPTY = _MFB( 4), /*!< -n */ + MQTT_FLAGS_DNSSRV = _MFB( 5), /*!< -S */ + MQTT_FLAGS_INSECURE = _MFB( 6), /*!< --insecure */ MQTT_FLAGS_RETAIN = _MFB( 8), /*!< -r */ MQTT_FLAGS_WILL_RETAIN = _MFB( 9), /*!< --will-retain */ @@ -35,11 +34,9 @@ typedef enum mqttInput_e { MQTT_INPUT_UNKNOWN = 0, - MQTT_INPUT_OPTION = 1, /*!< -m str */ + MQTT_INPUT_OPTIONS = 1, /*!< -m str */ MQTT_INPUT_FILES_LINES = 2, /*!< -l -f fn */ MQTT_INPUT_FILES = 3, /*!< -f fn */ - MQTT_INPUT_STDIN_LINES = 4, /*!< -l -f '-' */ - MQTT_INPUT_STDIN = 5, /*!< -s */ } mqttInput; typedef enum mqttOutput_e { @@ -59,28 +56,32 @@ mqttInput msg_input; mqttOutput msg_output; /* ========== */ + const char *_progname; + const char *_progmode; + const char *_address; /*!< -A */ - const char *host; /*!< -h */ - const char *idprefix; /*!< -I */ + const char * host; /*!< -h */ + const char * idprefix; /*!< -I */ int keepalive; /*!< -k */ const char **msgs; /*!< -m */ int max_inflight; /*!< -M */ int port; /*!< -p */ - const char *password; /*!< -P */ - const char **_topics; /*!< -t */ + const char * password; /*!< -P */ + const char **topics; /*!< -t */ + const char **subtopics; /*!< -t */ const char **_filter_out; /*!< -T */ - const char *user; /*!< -u */ - const char *protocol_version; /*!< -V */ + const char * user; /*!< -u */ + const char * protocol_version; /*!< -V */ const char * will_message; /*!< --will-payload */ int will_qos; /*!< --will-qos */ const char * will_topic; /*!< --will-topic */ - const char *cafile; /*!< --cafile */ + const char * cafile; /*!< --cafile */ const char *_capath; /*!< --capath */ - const char *cert; /*!< --cert */ - const char *privkey; /*!< --key */ - const char *ciphers; /*!< --ciphers */ + const char * cert; /*!< --cert */ + const char * privkey; /*!< --key */ + const char * ciphers; /*!< --ciphers */ const char *_tls_version; /*!< --tls-version */ const char *_psk_key; /*!< --psk */ @@ -118,12 +119,12 @@ int MQTTVersion; int sessionPresent; - const char *cachedn; + const char * cachedn; FD_t ifd; const char **ifn; /*!< -f */ FD_t ofd; - const char *ofn; /*!< -o */ + const char * ofn; /*!< -o */ rpmiob iob; MQTTAsync_connectOptions C; @@ -180,9 +181,10 @@ int rpmmqttDisconnect(rpmmqtt mqtt); -int rpmmqttSendMessage(rpmmqtt mqtt, const char *s, size_t ns); +int rpmmqttSendMessage(rpmmqtt mqtt, const char * topic, + const char *s, size_t ns); -int rpmmqttPub(rpmmqtt mqtt, const char *s, size_t ns); +int rpmmqttPub(rpmmqtt mqtt, const char * topic, const char *s, size_t ns); int rpmmqttSub(rpmmqtt mqtt, const char *s, size_t ns); @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/tmqtt.c ============================================================================ $ cvs diff -u -r1.1.2.9 -r1.1.2.10 tmqtt.c --- rpm/rpmio/tmqtt.c 4 Jul 2016 07:45:23 -0000 1.1.2.9 +++ rpm/rpmio/tmqtt.c 5 Jul 2016 14:49:11 -0000 1.1.2.10 @@ -33,13 +33,13 @@ (void) rpmmqttSub(mqtt, "$SYS/broker/version?qos=0", 0); #endif - nw = rpmmqttPub(mqtt, "bzzt ...", 0); - nw = rpmmqttPub(mqtt, "bzzT ...", 0); - nw = rpmmqttPub(mqtt, "bzZT ...", 0); - nw = rpmmqttPub(mqtt, "bZZT ...", 0); - nw = rpmmqttPub(mqtt, "BZZT ...", 0); + nw = rpmmqttPub(mqtt, NULL, "bzzt ...", 0); + nw = rpmmqttPub(mqtt, NULL, "bzzT ...", 0); + nw = rpmmqttPub(mqtt, NULL, "bzZT ...", 0); + nw = rpmmqttPub(mqtt, NULL, "bZZT ...", 0); + nw = rpmmqttPub(mqtt, NULL, "BZZT ...", 0); (void) rpmmqttDisconnect(mqtt); - nw = rpmmqttPub(mqtt, "SWAT !!!", 0); + nw = rpmmqttPub(mqtt, NULL, "SWAT !!!", 0); (void) rpmmqttDisconnect(mqtt); (void) rpmmqttConnect(mqtt); @@ -74,10 +74,18 @@ /* Initialize the _mqtt_ macro context */ +static char _mqtt_uri[] = + "_mqtt_uri mqtt://luser:jasnl@localhost:1883/rpm/mqtt" + "?qos=1;" + "timeout=10000;" + "trace=4;" + "#rpm/#"; +(void) rpmDefineMacro(NULL, _mqtt_uri, 0); (void) rpmDefineMacro(NULL, "_mqtt_persist 2", 0); (void) rpmDefineMacro(NULL, "_mqtt_cachedir /var/cache/mqtt", 0); -(void) rpmDefineMacro(NULL, "_mqtt_prefix %{now} %{nil}", 0); +(void) rpmDefineMacro(NULL, "nil %{!?nil}", 0); +(void) rpmDefineMacro(NULL, "_mqtt_prefix %{now}", 0); #ifdef DYING (void) rpmDefineMacro(NULL, "_mqtt_trace 4", 0); @@ . ______________________________________________________________________ RPM Package Manager http://rpm5.org CVS Sources Repository rpm-cvs@rpm5.org