RPM Package Manager, CVS Repository http://rpm5.org/cvs/ ____________________________________________________________________________
Server: rpm5.org Name: Jeff Johnson Root: /v/rpm/cvs Email: j...@rpm5.org Module: rpm Date: 02-Jul-2016 16:32:59 Branch: rpm-5_4 Handle: 2016070214325900 Modified files: (Branch: rpm-5_4) rpm/rpmio rpmmqtt.c rpmmqtt.h tmqtt.c Log: - mqtt: bury option processing prior to embedding. Summary: Revision Changes Path 1.1.2.11 +369 -203 rpm/rpmio/rpmmqtt.c 1.1.2.10 +3 -3 rpm/rpmio/rpmmqtt.h 1.1.2.7 +5 -124 rpm/rpmio/tmqtt.c ____________________________________________________________________________ patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmqtt.c ============================================================================ $ cvs diff -u -r1.1.2.10 -r1.1.2.11 rpmmqtt.c --- rpm/rpmio/rpmmqtt.c 2 Jul 2016 09:33:44 -0000 1.1.2.10 +++ rpm/rpmio/rpmmqtt.c 2 Jul 2016 14:32:59 -0000 1.1.2.11 @@ -13,6 +13,7 @@ #include <rpmdir.h> #include <rpmmacro.h> #include <rpmurl.h> +#include <poptIO.h> #include <argv.h> #ifdef WITH_MQTT @@ -24,14 +25,13 @@ #include "debug.h" -int _rpmmqtt_debug = 0; +int _rpmmqtt_debug; +#define SPEW(_list) if (mqtt->debug || _rpmmqtt_debug < 0) fprintf _list #define F_ISSET(_f, _FLAG) (((_f) & ((MQTT_FLAGS_##_FLAG) & ~0x40000000)) != MQTT_FLAGS_NONE) #define MF_ISSET(_FLAG) F_ISSET(mqtt->flags, _FLAG) /*==============================================================*/ -struct rpmmqtt_s _mqtt; - #ifndef DYING static void dumpU(const char * msg, urlinfo u) { @@ -74,10 +74,10 @@ msg, mqtt); if (mqtt) { + argvPrint("mqtt->av", (ARGV_t)mqtt->av, fp); #define PRINT(_fmt, _foo) fprintf(fp, "%19s: %"#_fmt"\n", #_foo, mqtt->_foo) PRINT(s, _address); PRINT(s, _infile); - PRINT(s, idprefix); PRINT(s, _message); PRINT(s, cafile); @@ -90,8 +90,6 @@ PRINT(s, _psk_identity); PRINT(s, _proxy); - PRINT(d, _max_msg_count); - fprintf(stderr, "====================\n"); PRINT(d, keepalive); PRINT(d, max_inflight); @@ -106,10 +104,12 @@ PRINT(d, port); PRINT(s, uri); + PRINT(s, idprefix); PRINT(s, _clientid); PRINT(s, clientid); PRINT(u, qos); + PRINT(u, timeout); if (mqtt->_topics) argvPrint("mqtt->_topics", (ARGV_t)mqtt->_topics, fp); @@ -123,12 +123,13 @@ PRINT(p, persist_ctx); PRINT(s, persist_path); - PRINT(u, token); - PRINT(u, timeout); - PRINT(d, debug); PRINT(d, trace); PRINT(d, finished); + PRINT(u, token); + + PRINT(d, msg_count); + PRINT(d, max_msg_count); PRINT(s, serverURI); PRINT(d, MQTTVersion); @@ -136,8 +137,6 @@ PRINT(s, dn); #undef PRINT - - } } @@ -203,13 +202,13 @@ /*==============================================================*/ #ifdef WITH_MQTT -static int onMessageArrived(void * _ctx, char * topic, int topicLen, +static int onMessageArrived(void * _mqtt, char * topic, int topicLen, MQTTAsync_message * message) { - rpmmqtt mqtt = (rpmmqtt) _ctx; + rpmmqtt mqtt = (rpmmqtt) _mqtt; int rc = 1; - (void)mqtt; + mqtt->msg_count++; const char * s = message->payload; size_t ns = message->payloadlen; @@ -227,17 +226,17 @@ return rc; } -static void onDeliveryComplete(void * _ctx, int token) +static void onDeliveryComplete(void * _mqtt, int token) { - rpmmqtt mqtt = (rpmmqtt) _ctx; + rpmmqtt mqtt = (rpmmqtt) _mqtt; if (_rpmmqtt_debug < 0) rpmlog(RPMLOG_DEBUG, "--- MQTT done(%d)\n", token); mqtt->token = token; } -static void onConnectionLost(void * _ctx, char *cause) +static void onConnectionLost(void * _mqtt, char *cause) { - rpmmqtt mqtt = (rpmmqtt) _ctx; + rpmmqtt mqtt = (rpmmqtt) _mqtt; rpmlog(RPMLOG_DEBUG, "--- MQTT disconnect(%s) version(%d) present(%d)\n", @@ -250,10 +249,10 @@ (void) rpmmqttConnect(mqtt); } -static void onDisconnectFailure(void * _ctx, MQTTAsync_failureData * response) +static void onDisconnectFailure(void * _mqtt, MQTTAsync_failureData * response) { - rpmmqtt mqtt = (rpmmqtt) _ctx; -fprintf(stderr, "<-- %s(%p,%p) MQTT disconnect failed\n", __FUNCTION__, _ctx, response); + rpmmqtt mqtt = (rpmmqtt) _mqtt; +fprintf(stderr, "<-- %s(%p,%p) MQTT disconnect failed\n", __FUNCTION__, _mqtt, response); if (response) { const char *s = response->message; int token = response->token; @@ -266,9 +265,9 @@ mqtt->finished = 1; } -static void onDisconnect(void * _ctx, MQTTAsync_successData * response) +static void onDisconnect(void * _mqtt, MQTTAsync_successData * response) { - rpmmqtt mqtt = (rpmmqtt) _ctx; + rpmmqtt mqtt = (rpmmqtt) _mqtt; if (mqtt->debug || _rpmmqtt_debug) rpmlog(RPMLOG_DEBUG, @@ -278,10 +277,10 @@ mqtt->finished = 1; } -static void onConnectFailure(void * _ctx, MQTTAsync_failureData * response) +static void onConnectFailure(void * _mqtt, MQTTAsync_failureData * response) { - rpmmqtt mqtt = (rpmmqtt) _ctx; -fprintf(stderr, "<-- %s(%p,%p) MQTT connect failed\n", __FUNCTION__, _ctx, response); + rpmmqtt mqtt = (rpmmqtt) _mqtt; +fprintf(stderr, "<-- %s(%p,%p) MQTT connect failed\n", __FUNCTION__, _mqtt, response); if (response) { const char *s = response->message; int token = response->token; @@ -293,9 +292,9 @@ mqtt->finished = 1; } -static void onConnect(void * _ctx, MQTTAsync_successData * response) +static void onConnect(void * _mqtt, MQTTAsync_successData * response) { - rpmmqtt mqtt = (rpmmqtt) _ctx; + rpmmqtt mqtt = (rpmmqtt) _mqtt; if (response) { mqtt->serverURI = xstrdup(response->alt.connect.serverURI); @@ -310,10 +309,10 @@ mqtt->finished = 1; } -static void onSubscribeFailure(void * _ctx, MQTTAsync_failureData * response) +static void onSubscribeFailure(void * _mqtt, MQTTAsync_failureData * response) { - rpmmqtt mqtt = (rpmmqtt) _ctx; -fprintf(stderr, "<-- %s(%p,%p) MQTT connect failed\n", __FUNCTION__, _ctx, response); + rpmmqtt mqtt = (rpmmqtt) _mqtt; +fprintf(stderr, "<-- %s(%p,%p) MQTT connect failed\n", __FUNCTION__, _mqtt, response); if (response) { const char *s = response->message; int token = response->token; @@ -325,9 +324,9 @@ mqtt->finished = 1; } -static void onSubscribe(void * _ctx, MQTTAsync_successData * response) +static void onSubscribe(void * _mqtt, MQTTAsync_successData * response) { - rpmmqtt mqtt = (rpmmqtt) _ctx; + rpmmqtt mqtt = (rpmmqtt) _mqtt; int qos = response->alt.qos; if (mqtt->debug || _rpmmqtt_debug) @@ -335,9 +334,22 @@ mqtt->finished = 1; } -static void onSendFailure(void * _ctx, MQTTAsync_failureData * response) +static void onSubscribeMany(void * _mqtt, MQTTAsync_successData * response) { - rpmmqtt mqtt = (rpmmqtt) _ctx; + rpmmqtt mqtt = (rpmmqtt) _mqtt; + int *subqos = response->alt.qosList; + +SPEW((stderr, "--> %s(%p,%p) subqos %p[%u]\n", __FUNCTION__, _mqtt, response, subqos, mqtt->ac)); + for (int i = 0; i < mqtt->ac; i++) { + if (mqtt->debug || _rpmmqtt_debug) + rpmlog(RPMLOG_DEBUG, "MQTT subscribe qos(%d)\n", subqos[i]); + } + mqtt->finished = 1; +} + +static void onSendFailure(void * _mqtt, MQTTAsync_failureData * response) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; { const char *s = response->message; @@ -349,9 +361,9 @@ mqtt->finished = 1; } -static void onSend(void * _ctx, MQTTAsync_successData * response) +static void onSend(void * _mqtt, MQTTAsync_successData * response) { - rpmmqtt mqtt = (rpmmqtt) _ctx; + rpmmqtt mqtt = (rpmmqtt) _mqtt; if (mqtt->debug || _rpmmqtt_debug) { const char * s = response->alt.pub.message.payload; @@ -371,15 +383,15 @@ /*==============================================================*/ #ifdef WITH_MQTT -static int rpmmqttOpen(void **_ctxp, const char *clientID, const char *serverURI, - void *_ctx) +static int rpmmqttOpen(void **_mqttp, const char *clientID, const char *serverURI, + void *_mqtt) { - rpmmqtt mqtt = (rpmmqtt) _ctx; + rpmmqtt mqtt = (rpmmqtt) _mqtt; char * dn; char * te; int rc = MQTTCLIENT_PERSISTENCE_ERROR; - *_ctxp = _ctx; + *_mqttp = _mqtt; mqtt->dn = _free(mqtt->dn); dn = rpmGetPath(mqtt->persist_path, "/", clientID, "-", serverURI, NULL); @@ -395,15 +407,14 @@ rc = 0; exit: -if (mqtt->debug || _rpmmqtt_debug < 0) -fprintf(stderr, "<-- %s(%p,\"%s\",\"%s\",%p) rc %d dn %s\n", __FUNCTION__, _ctxp, clientID, serverURI, _ctx, rc, mqtt->dn); +SPEW((stderr, "<-- %s(%p,\"%s\",\"%s\",%p) rc %d dn %s\n", __FUNCTION__, _mqttp, clientID, serverURI, _mqtt, rc, mqtt->dn)); return rc; } -static int rpmmqttClose(void *_ctx) +static int rpmmqttClose(void *_mqtt) { - rpmmqtt mqtt = (rpmmqtt) _ctx; + rpmmqtt mqtt = (rpmmqtt) _mqtt; int rc = MQTTCLIENT_PERSISTENCE_ERROR; @@ -418,15 +429,14 @@ rc = 0; exit: -if (mqtt->debug || _rpmmqtt_debug < 0) -fprintf(stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, _ctx, rc); +SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, _mqtt, rc)); return rc; } -static int rpmmqttPut(void *_ctx, char *key, +static int rpmmqttPut(void *_mqtt, char *key, int bufcount, char *buffers[], int buflens[]) { - rpmmqtt mqtt = (rpmmqtt) _ctx; + rpmmqtt mqtt = (rpmmqtt) _mqtt; char *fn = NULL; FD_t fd = NULL; size_t nb = 0; @@ -445,8 +455,7 @@ for (int i = 0; i < bufcount; i++) { const char * s = buffers[i]; int ns = buflens[i]; -if (mqtt->debug || _rpmmqtt_debug < 0) -fprintf(stderr, "%5d\t%p[%d]\t\"%.*s\"\n", i, s, ns, ns, s); +SPEW((stderr, "%5d\t%p[%d]\t\"%.*s\"\n", i, s, ns, ns, s)); nb += buflens[i]; nw += Fwrite(buffers[i], sizeof(*buffers[i]), buflens[i], fd); } @@ -463,16 +472,15 @@ exit: if (fd) Fclose(fd); -if (mqtt->debug || _rpmmqtt_debug < 0) -fprintf(stderr, "<-- %s(%p,\"%s\",%d,%p,%p) rc %d fn %s\n", __FUNCTION__, _ctx, key, bufcount, buffers, buflens, rc, fn); +SPEW((stderr, "<-- %s(%p,\"%s\",%d,%p,%p) rc %d fn %s\n", __FUNCTION__, _mqtt, key, bufcount, buffers, buflens, rc, fn)); fn = _free(fn); return rc; } -static int rpmmqttGet(void *_ctx, char *key, +static int rpmmqttGet(void *_mqtt, char *key, char *buffer[], int *buflen) { - rpmmqtt mqtt = (rpmmqtt) _ctx; + rpmmqtt mqtt = (rpmmqtt) _mqtt; char *fn = NULL; FD_t fd = NULL; size_t nr = 0; @@ -509,17 +517,16 @@ exit: if (fd) Fclose(fd); -if (mqtt->debug || _rpmmqtt_debug < 0) -fprintf(stderr, "<-- %s(%p,\"%s\",%p,%p) rc %d fn %s\n", __FUNCTION__, _ctx, key, buffer, buflen, rc, fn); +SPEW((stderr, "<-- %s(%p,\"%s\",%p,%p) rc %d fn %s\n", __FUNCTION__, _mqtt, key, buffer, buflen, rc, fn)); fn = _free(fn); *buffer = b; *buflen = nb; return rc; } -static int rpmmqttRemove(void *_ctx, char *key) +static int rpmmqttRemove(void *_mqtt, char *key) { - rpmmqtt mqtt = (rpmmqtt) _ctx; + rpmmqtt mqtt = (rpmmqtt) _mqtt; char *fn = NULL; int rc = MQTTCLIENT_PERSISTENCE_ERROR; @@ -534,15 +541,14 @@ rc = 0; exit: -if (mqtt->debug || _rpmmqtt_debug < 0) -fprintf(stderr, "<-- %s(%p,\"%s\") rc %d fn %s\n", __FUNCTION__, _ctx, key, rc, fn); +SPEW((stderr, "<-- %s(%p,\"%s\") rc %d fn %s\n", __FUNCTION__, _mqtt, key, rc, fn)); fn = _free(fn); return rc; } -static int rpmmqttKeys(void *_ctx, char ***keys, int *nkeys) +static int rpmmqttKeys(void *_mqtt, char ***keys, int *nkeys) { - rpmmqtt mqtt = (rpmmqtt) _ctx; + rpmmqtt mqtt = (rpmmqtt) _mqtt; ARGV_t av = NULL; int ac = 0; DIR * dir = NULL; @@ -572,16 +578,15 @@ exit: if (dir) (void) Closedir(dir); -if (mqtt->debug || _rpmmqtt_debug < 0) -fprintf(stderr, "<-- %s(%p,%p,%p) rc %d keys %p[%u]\n", __FUNCTION__, _ctx, keys, nkeys, rc, av, ac); +SPEW((stderr, "<-- %s(%p,%p,%p) rc %d keys %p[%u]\n", __FUNCTION__, _mqtt, keys, nkeys, rc, av, ac)); *keys = (char **) av; *nkeys = ac; return rc; } -static int rpmmqttClear(void *_ctx) +static int rpmmqttClear(void *_mqtt) { - rpmmqtt mqtt = (rpmmqtt) _ctx; + rpmmqtt mqtt = (rpmmqtt) _mqtt; DIR * dir = NULL; struct dirent *dp; int nerrs = 0; @@ -610,14 +615,13 @@ exit: if (dir) (void) Closedir(dir); -if (mqtt->debug || _rpmmqtt_debug < 0) -fprintf(stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, _ctx, rc); +SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, _mqtt, rc)); return rc; } -static int rpmmqttContainsKey(void *_ctx, char *key) +static int rpmmqttContainsKey(void *_mqtt, char *key) { - rpmmqtt mqtt = (rpmmqtt) _ctx; + rpmmqtt mqtt = (rpmmqtt) _mqtt; DIR * dir = NULL; struct dirent *dp; int rc = MQTTCLIENT_PERSISTENCE_ERROR; @@ -646,8 +650,7 @@ exit: if (dir) (void) Closedir(dir); -if (mqtt->debug || _rpmmqtt_debug < 0) -fprintf(stderr, "<-- %s(%p,\"%s\") rc %d\n", __FUNCTION__, _ctx, key, rc); +SPEW((stderr, "<-- %s(%p,\"%s\") rc %d\n", __FUNCTION__, _mqtt, key, rc)); return rc; } @@ -727,12 +730,24 @@ (void)_Sopts; Copts->ssl = NULL; #endif + Copts->onSuccess = onConnect; Copts->onFailure = onConnectFailure; Copts->context = mqtt; + Copts->serverURIcount = 0; Copts->serverURIs = NULL; + Copts->MQTTVersion = 0; + if (mqtt->protocol_version) { + if (!strcmp(mqtt->protocol_version, "auto")) + Copts->MQTTVersion = 0; + if (!strcmp(mqtt->protocol_version, "31")) + Copts->MQTTVersion = 3; + if (!strcmp(mqtt->protocol_version, "311")) + Copts->MQTTVersion = 4; + } + mqtt->finished = 0; rc = check(mqtt, "connect", MQTTAsync_connect(mqtt->C, Copts)); @@ -742,6 +757,7 @@ } #endif /* WITH_MQTT */ +SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, mqtt, rc)); return rc; } @@ -768,6 +784,7 @@ usleep(100); } #endif +SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, mqtt, rc)); return rc; } @@ -810,6 +827,52 @@ usleep(100); #endif /* WITH_MQTT */ +SPEW((stderr, "<-- %s(%p,%p[%u]) rc %d\n", __FUNCTION__, mqtt, s, (unsigned)ns, rc)); + return rc; +} + +static int rpmmqttSubscribeMany(rpmmqtt mqtt, int ac, char ** av) +{ + int rc = -1; + +SPEW((stderr, "--> %s(%p,%p[%d])\n", __FUNCTION__, mqtt, av, ac)); + if (ac <= 0) + goto exit; +#ifdef WITH_MQTT + int _lvl = RPMLOG_DEBUG; + int *subqos = xcalloc(mqtt->ac, sizeof(*subqos)); + for (int i = 0; i < ac; i++) { + char * t = av[i]; + char * te = strchr(t, '?'); + subqos[i] = mqtt->qos; /* XXX */ + if (te) { + *te++ = '\0'; + if ((te = strchr(te, '='))) { + if (!strncmp(t, "qos", (te - t))) + subqos[i] = strtoul(te+1, NULL, 0); + } + } + rpmlog(_lvl, "%19s: %s qos(%u)\n", "subtopic", t, subqos[i]); + } + + MQTTAsync_responseOptions *Ropts = &mqtt->Ropts; + memcpy(Ropts, &_Ropts, sizeof(*Ropts)); +#ifdef REF + memcpy(Ropts->struct_id, "MQTR", 4); + Ropts->struct_version = 0; +#endif + Ropts->onSuccess = onSubscribeMany; + Ropts->onFailure = onSubscribeFailure; + Ropts->context = mqtt; + + rc = check(mqtt, "subscribeMany", + MQTTAsync_subscribeMany(&mqtt->C, + ac, av, subqos, Ropts)); + subqos = _free(subqos); +#endif /* WITH_MQTT */ + +exit: +SPEW((stderr, "<-- %s(%p,%p[%d]) rc %d\n", __FUNCTION__, mqtt, av, ac, rc)); return rc; } @@ -820,12 +883,8 @@ #ifdef WITH_MQTT if (rpmmqttConnect(mqtt) == 0) { -#ifdef DYING - static char _mqtt_prefix[] = - "%{now} rpm pid %{pid} on cpu%{cpu} %{user}:%{group} "; -#else static char _mqtt_prefix[] = "%{?_mqtt_prefix}"; -#endif + /* XXX extra space */ char * t = rpmExpand(_mqtt_prefix, " ", s, NULL); size_t nt = strlen(t); @@ -836,6 +895,7 @@ } #endif /* WITH_MQTT */ +SPEW((stderr, "<-- %s(%p,%p[%u]) ret %d\n", __FUNCTION__, mqtt, s, (unsigned)ns, (int)ret)); return ret; } @@ -886,7 +946,6 @@ mqtt->finished = 0; rc = check(mqtt, "subscribe", MQTTAsync_subscribe(mqtt->C, subtopic, subqos, Ropts)); -fprintf(stderr, "*** finished(%d)\n", mqtt->finished); while (rc == 0 && !mqtt->finished) usleep(100); @@ -897,115 +956,181 @@ } #endif /* WITH_MQTT */ +SPEW((stderr, "<-- %s(%p,%p[%u]) ret %d\n", __FUNCTION__, mqtt, s, (unsigned)ns, (int)ret)); return ret; } /*==============================================================*/ -static void rpmmqttFini(void * _ctx) +static rpmRC +rpmmqttInitPopt(rpmmqtt mqtt, int ac, char * const* av) { - rpmmqtt mqtt = (rpmmqtt) _ctx; - -#ifdef WITH_MQTT - (void) rpmmqttDisconnect(mqtt); - (void) check(mqtt, "destroy", - (MQTTAsync_destroy(&mqtt->C), 0)); -#endif /* WITH_MQTT */ - -/* ========== */ - mqtt->_address = _free(mqtt->_address); - mqtt->_infile = _free(mqtt->_infile); - mqtt->host = _free(mqtt->host); - mqtt->_clientid = _free(mqtt->_clientid); - mqtt->idprefix = _free(mqtt->idprefix); - mqtt->_message = _free(mqtt->_message); - mqtt->password = _free(mqtt->password); - mqtt->_topics = argvFree((ARGV_t)mqtt->_topics); - mqtt->user = _free(mqtt->user); - mqtt->protocol_version = _free(mqtt->protocol_version); /* XXX malloc? */ + struct poptOption rpmmqttOptionsTable[] = { + { "clean", 'c', POPT_ARG_VAL|POPT_ARGFLAG_XOR,&mqtt->flags, MQTT_FLAGS_CLEAN, + N_("(sub) Do not clean session."), NULL }, + { NULL, 'C', POPT_ARG_INT, &mqtt->max_msg_count, 0, + N_("(sub) Disconnect/exit after <MSGCNT>."), N_("<MSGCNT>") }, + { "eol", 'N', POPT_ARG_VAL|POPT_ARGFLAG_XOR, &mqtt->flags, MQTT_FLAGS_EOL, + N_("(sub) Do not print NL after messages."), NULL }, + { NULL, 'R', POPT_BIT_SET, &mqtt->flags, MQTT_FLAGS_NOSTALE, + N_("(sub) Do not print stale messages."), NULL }, + + { NULL, 'A', POPT_ARG_STRING, &mqtt->_address, 0, + N_("Connect from local <ADDR>."), N_("<ADDR>") }, + { "debug", 'd', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &_rpmmqtt_debug, -1, + N_("Debug spewage."), NULL }, + { "file", 'f', POPT_ARG_STRING, &mqtt->_infile, 0, + N_("Send <FILE> as message."), N_("<FILE>") }, + { "host", 'h', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&mqtt->host, 0, + N_("Connect to <HOST>."), N_("<HOST>") }, + { "id", 'i', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&mqtt->_clientid, 0, + N_("MQTT client <ID>."), N_("<ID>") }, + { "prefix", 'I', POPT_ARG_STRING, &mqtt->idprefix, 0, + N_("Use <IDPREFIX>-%{pid} for clientid."), N_("<IDPREFIX>") }, + { "keepalive", 'k', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT, &mqtt->keepalive, 0, + N_("Keep alive in <SECS>."), N_("<SECS>") }, + { NULL, 'l', POPT_BIT_SET, &mqtt->flags, MQTT_FLAGS_STDIN_EACH, + N_("Send messages line-by-line from stdin."), NULL }, + { "message", 'm', POPT_ARG_STRING, &mqtt->_message, 0, + N_("MQTT payload <MESSAGE> to send."), N_("<MESSAGE>") }, + { NULL, 'M', POPT_ARG_INT, &mqtt->max_inflight, 0, + N_("Permit <MAX> inflight messages."), N_("<MAX>") }, + { "null", 'n', POPT_BIT_SET, &mqtt->flags, MQTT_FLAGS_EMPTY, + N_("Send a null (zero length) message."), NULL }, + { "port", 'p', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT, &mqtt->port, 0, + N_("Connect to network <PORT>."), N_("<PORT>") }, + { "pass", 'P', POPT_ARG_STRING, &mqtt->password, 0, + N_("Remote user <PASSWORD>."), N_("<PASSWORD>") }, + { "qos", 'q', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT, &mqtt->qos, 0, + N_("MQTT <QOS> level."), N_("<QOS>") }, + { "retain", 'r', POPT_BIT_SET, &mqtt->flags, MQTT_FLAGS_RETAIN, + N_("Retain the message on the host."), NULL }, + { NULL, 's', POPT_BIT_SET, &mqtt->flags, MQTT_FLAGS_STDIN_ALL, + N_("Send stdin lines as a single message."), NULL }, + { NULL, 'S', POPT_BIT_SET, &mqtt->flags, MQTT_FLAGS_DNSSRV, + N_("Use SRV record to find remote host."), NULL }, + { "topic", 't', POPT_ARG_ARGV, &mqtt->_topics, 0, + N_("MQTT pub/sub <TOPIC>."), N_("<TOPIC>") }, + { "user", 'u', POPT_ARG_STRING, &mqtt->user, 0, + N_("Remote <USER>."), N_("<USER>") }, + { "protocol", 'V', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&mqtt->protocol_version, 0, + N_("MQTT protocol <VERSION>"), N_("{auto|311|31}") }, + + { "will-payload", '\0', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&mqtt->will_message, 0, + N_("Will payload <MESSAGE> ."), N_("<MESSAGE>") }, + { "will-qos", '\0', POPT_ARG_INT, &mqtt->will_qos, 0, + N_("Will <QOS> level."), N_("<QOS>") }, + { "will-topic", '\0', POPT_ARG_STRING, &mqtt->will_topic, 0, + N_("Will <TOPIC> to publish to."), N_("<TOPIC>") }, + { "will-retain", '\0', POPT_BIT_SET, &mqtt->flags, MQTT_FLAGS_WILL_RETAIN, + N_("Retain the client Will."), NULL }, + + { "cafile", '\0', POPT_ARG_STRING, &mqtt->cafile, 0, + N_("CA certificate(s) from <FILE>."), N_("<FILE>") }, + { "capath", '\0', POPT_ARG_STRING, &mqtt->_capath, 0, + N_("CA certificate(s) in <DIR>."), N_("<DIR>") }, + { "cert", '\0', POPT_ARG_STRING, &mqtt->cert, 0, + N_("Client authentication <CERT>."), N_("<CERT>") }, + { "key", '\0', POPT_ARG_STRING, &mqtt->privkey, 0, + N_("Client private <KEY>."), N_("<KEY>") }, + { "ciphers", '\0', POPT_ARG_STRING, &mqtt->ciphers, 0, + N_("TLS ciphers <LIST> (openssl)."), N_("<LIST>") }, + { "tls-version", '\0', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&mqtt->_tls_version, 0, + N_("TLS protocol <VERSION>."), N_("{1.2|1.1|1}") }, + { "insecure", '\0', POPT_BIT_SET, &mqtt->flags, MQTT_FLAGS_INSECURE, + N_("Do not verify server cert matches DNS host."), NULL }, + { "psk", '\0', POPT_ARG_STRING, &mqtt->_psk_key, 0, + N_("TLS-PSK mode <KEY> hex (w/o 0x)"), N_("<KEY>") }, + { "psk-identity", '\0', POPT_ARG_STRING, &mqtt->_psk_identity, 0, + N_("TLS-PSK <IDENTITY>."), N_("<IDENTITY>") }, + { "proxy", '\0', POPT_ARG_STRING, &mqtt->_proxy, 0, + N_("SOCKS5 proxy <URL>."), N_("<URL>") }, + + { NULL, '\0', POPT_ARG_INCLUDE_TABLE, rpmioAllPoptTable, 0, + N_("Common options for all rpmio executables:"), + NULL }, + + POPT_AUTOALIAS + POPT_AUTOHELP + + POPT_TABLEEND + }; + static int _popt_context_flags = 0; /* XXX POPT_CONTEXT_POSIXMEHARDER */ + poptContext con = NULL; + rpmRC rc = RPMRC_FAIL; /* assume failure */ + int xx; + +SPEW((stderr, "--> %s(%p,%p[%d])\n", __FUNCTION__, mqtt, av, ac)); + + con = poptGetContext(av[0], ac, (const char **)av, + rpmmqttOptionsTable, _popt_context_flags); + + while ((xx = poptGetNextOpt(con)) > 0) + switch (xx) { + default: + fprintf(stderr, _("%s: option table misconfigured (%d)\n"), + __FUNCTION__, xx); + goto exit; + break; + } - mqtt->will_message = _free(mqtt->will_message); - mqtt->will_topic = _free(mqtt->will_topic); - mqtt->cafile = _free(mqtt->cafile); - mqtt->_capath = _free(mqtt->_capath); - mqtt->cert = _free(mqtt->cert); - mqtt->privkey = _free(mqtt->privkey); - mqtt->ciphers = _free(mqtt->ciphers); - mqtt->_tls_version = _free(mqtt->_tls_version); /* XXX malloc? */ - mqtt->_psk_key = _free(mqtt->_psk_key); - mqtt->_psk_identity = _free(mqtt->_psk_identity); - mqtt->_proxy = _free(mqtt->_proxy); -/* ========== */ + mqtt->av = NULL; + xx = argvAppend((ARGV_t *)&mqtt->av, poptGetArgs(con)); - mqtt->C = NULL; - mqtt->uri = _free(mqtt->uri); - mqtt->topic = _free(mqtt->topic); - mqtt->clientid = _free(mqtt->clientid); - mqtt->persist_path = _free(mqtt->persist_path); - mqtt->persist_ctx = _free(mqtt->persist_ctx); - mqtt->dn = _free(mqtt->dn); + mqtt->ac = argvCount((ARGV_t)mqtt->av); + rc = RPMRC_OK; - mqtt->serverURI = _free(mqtt->serverURI); +exit: + if (con) + con = poptFreeContext(con); - if (mqtt->av) - (void) argvFree((ARGV_t)mqtt->av); - mqtt->av = NULL; - mqtt->flags = 0; +SPEW((stderr, "<-- %s(%p,%p[%d]) rc %d\n", __FUNCTION__, mqtt, av, ac, rc)); + return rc; } -RPMIOPOOL_MODULE(mqtt) - -rpmmqtt rpmmqttNew(char ** av, uint32_t flags) +static rpmRC rpmmqttInit(rpmmqtt mqtt, int ac, const char ** av, + mqttFlags flags) { - static const char *_av[] = { "mqtt://localhost:1883/rpm/%{pid}/mqtt", NULL }; - rpmmqtt mqtt = rpmmqttGetPool(_rpmmqttPool); + static mqttFlags _flags = (MQTT_FLAGS_CLEAN|MQTT_FLAGS_EOL); urlinfo u = NULL; const char *s = NULL; + rpmRC rc; + +SPEW((stderr, "--> %s(%p,%p[%d],0x%x)\n", __FUNCTION__, mqtt, av, ac, flags)); - /* -- Copy (and initialize the defaults) from the CLI options. */ - { yarnLock use = mqtt->_item.use; - void *pool = mqtt->_item.pool; -#ifdef WTF - *mqtt = _mqtt; /* structure assignment */ + /* -- Initialize the default values before popt processing. */ + mqtt->flags = (flags ? flags : _flags); + mqtt->port = 1883; + mqtt->max_inflight = 20; + mqtt->keepalive = 60; + mqtt->qos = 0; +#ifdef DYING + mqtt->protocol_version = rpmExpand("31", NULL); #else - memcpy(mqtt, &_mqtt, sizeof(*mqtt)); + mqtt->protocol_version = rpmExpand("auto", NULL); + mqtt->will_message = rpmExpand("", NULL); #endif - mqtt->_item.use = use; - mqtt->_item.pool = pool; - } - memset(&_mqtt, 0, sizeof(_mqtt)); - _mqtt.flags |= (MQTT_FLAGS_CLEAN|MQTT_FLAGS_EOL); - _mqtt.port = 1883; - _mqtt.max_inflight = 20; - _mqtt.keepalive = 60; - _mqtt.qos = 0; - _mqtt.protocol_version = xstrdup("31"); /* XXX malloc? */ - _mqtt.host = rpmExpand("localhost", NULL); - _mqtt.idprefix = rpmExpand("rpm", NULL); - _mqtt._clientid = rpmExpand(_mqtt.idprefix, "-%%{pid}", NULL); - _mqtt._tls_version = xstrdup("1.2"); /* XXX malloc? */ - - mqtt->flags = flags; - mqtt->av = NULL; - - if (av == NULL) - av = (char **)_av; - - if (av) { - int ac = argvCount((ARGV_t)av); - for (int i = 0; i < ac; i++) { - char * t = rpmExpand(av[i], NULL); - ARGV_t nav = NULL; - int xx = argvSplit(&nav, t, NULL); - int nac = argvCount(nav); - for (int j = 0; j < nac; j++) - xx = argvAdd((ARGV_t *)&mqtt->av, nav[j]); - nav = argvFree(nav); - t = _free(t); - } + mqtt->host = rpmExpand("localhost", NULL); + mqtt->idprefix = rpmExpand("rpm", NULL); + mqtt->_clientid = rpmExpand(mqtt->idprefix, "-%%{pid}", NULL); + mqtt->_tls_version = rpmExpand("1.2", NULL); + + /* -- Process the options/arguments. */ + rc = rpmmqttInitPopt(mqtt, ac, (char *const *)av); + if (rc) + goto exit; + + if (mqtt->ac == 0) { +static const char *_av[] = { + "mqtt://luser:jasnl@localhost:1883/rpm/mqtt?trace=4", + "rpm/#", + NULL, +}; + (void) argvAppend((ARGV_t *)&mqtt->av, _av); + mqtt->ac = argvCount((ARGV_t)mqtt->av); } -argvPrint(__FUNCTION__, (ARGV_t)mqtt->av, NULL); - mqtt->ac = argvCount((ARGV_t)mqtt->av); + /* -- Set unspecified options from the URI parameters. */ + /* XXX W2DO: which takes precedence, options or URI??? */ mqtt->ut = urlSplit(mqtt->av[0], &u); mqtt->u = u; @@ -1030,9 +1155,9 @@ mqtt->user = xstrdup(u->user); if (mqtt->password == NULL && u->password != NULL) mqtt->password = xstrdup(u->password); + /* XXX mqtt->host? */ if (mqtt->port == 0 && u->port != 0) mqtt->port = u->port; - mqtt->uri = rpmExpand(u->scheme, "://", u->host, ":", u->portstr, NULL); (void) urlPath(u->url, &s); @@ -1082,8 +1207,79 @@ } qav = argvFree(qav); } + + rc = RPMRC_OK; + +exit: +SPEW((stderr, "<-- %s(%p,%p[%d],0x%x) rc %d\n", __FUNCTION__, mqtt, av, ac, flags, rc)); + return rc; +} + +/*==============================================================*/ +static void rpmmqttFini(void * _mqtt) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + #ifdef WITH_MQTT + (void) rpmmqttDisconnect(mqtt); + (void) check(mqtt, "destroy", + (MQTTAsync_destroy(&mqtt->C), 0)); +#endif /* WITH_MQTT */ + +/* ========== */ + mqtt->_address = _free(mqtt->_address); + mqtt->_infile = _free(mqtt->_infile); + mqtt->host = _free(mqtt->host); + mqtt->_clientid = _free(mqtt->_clientid); + mqtt->idprefix = _free(mqtt->idprefix); + mqtt->_message = _free(mqtt->_message); + mqtt->password = _free(mqtt->password); + mqtt->_topics = argvFree((ARGV_t)mqtt->_topics); + mqtt->user = _free(mqtt->user); + mqtt->protocol_version = _free(mqtt->protocol_version); /* XXX malloc? */ + + mqtt->will_message = _free(mqtt->will_message); + mqtt->will_topic = _free(mqtt->will_topic); + mqtt->cafile = _free(mqtt->cafile); + mqtt->_capath = _free(mqtt->_capath); + mqtt->cert = _free(mqtt->cert); + mqtt->privkey = _free(mqtt->privkey); + mqtt->ciphers = _free(mqtt->ciphers); + mqtt->_tls_version = _free(mqtt->_tls_version); /* XXX malloc? */ + mqtt->_psk_key = _free(mqtt->_psk_key); + mqtt->_psk_identity = _free(mqtt->_psk_identity); + mqtt->_proxy = _free(mqtt->_proxy); +/* ========== */ + + mqtt->C = NULL; + mqtt->uri = _free(mqtt->uri); + mqtt->topic = _free(mqtt->topic); + mqtt->clientid = _free(mqtt->clientid); + mqtt->persist_path = _free(mqtt->persist_path); + mqtt->persist_ctx = _free(mqtt->persist_ctx); + mqtt->dn = _free(mqtt->dn); + + mqtt->serverURI = _free(mqtt->serverURI); + + if (mqtt->av) + (void) argvFree((ARGV_t)mqtt->av); + mqtt->av = NULL; + mqtt->flags = 0; +} + +RPMIOPOOL_MODULE(mqtt) + +rpmmqtt rpmmqttNew(char ** av, uint32_t flags) +{ + rpmmqtt mqtt = rpmmqttGetPool(_rpmmqttPool); + int ac = argvCount((ARGV_t)av); + +SPEW((stderr, "--> %s(%p,0x%x)\n", __FUNCTION__, av, flags)); + rpmRC rc = rpmmqttInit(mqtt, ac, (const char **)av, flags); + (void)rc; + +#ifdef WITH_MQTT { static int oneshot; int _lvl = RPMLOG_DEBUG; int xx; @@ -1157,41 +1353,11 @@ onDeliveryComplete)); if (mqtt->ac > 1) { -#ifdef NOTYET - int *subqos = xcalloc(mqtt->ac, sizeof(*subqos)); - for (int i = 1; i < mqtt->ac; i++) { - char * t = mqtt->av[i]; - char * te = strchr(t, '?'); - subqos[i] = 1; /* XXX */ - if (te) { - *te++ = '\0'; - if ((te = strchr(te, '='))) { - if (!strncmp(t, "qos", (te - t))) - subqos[i] = strtoul(te+1, NULL, 0); - } - } - rpmlog(_lvl, "%19s: %s qos(%u)\n", "subtopic", t, subqos[i]); - } - - MQTTAsync_responseOptions *Ropts = &mqtt->Ropts; - memcpy(Ropts, &_Ropts, sizeof(*Ropts)); -#ifdef REF - memcpy(Ropts->struct_id, "MQTR", 4); - Ropts->struct_version = 0; -#endif - Ropts->onSuccess = onSubscribe; - Ropts->onFailure = onSubscribeFailure; - Ropts->context = mqtt; - - xx = check(mqtt, "subscribeMany", - MQTTAsync_subscribeMany(&mqtt->C, - mqtt->ac-1, mqtt->av+1, subqos+1, Ropts)); - subqos = _free(subqos); +#ifdef NOTYET /* XXX segfault here. */ + xx = rpmmqttSubscribeMany(mqtt, mqtt->ac-1, mqtt->av+1); #else - for (int i = 1; i < mqtt->ac; i++) { -fprintf(stderr, "%5d %s\n", i, mqtt->av[i]); + for (int i = 1; i < mqtt->ac; i++) xx = rpmmqttRead(mqtt, mqtt->av[i], 0); - } #endif } } @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmqtt.h ============================================================================ $ cvs diff -u -r1.1.2.9 -r1.1.2.10 rpmmqtt.h --- rpm/rpmio/rpmmqtt.h 2 Jul 2016 09:33:44 -0000 1.1.2.9 +++ rpm/rpmio/rpmmqtt.h 2 Jul 2016 14:32:59 -0000 1.1.2.10 @@ -65,7 +65,6 @@ const char *_psk_identity; /*!< --psk-identity */ const char *_proxy; /*!< --proxy */ - int _max_msg_count; /*!< (sub) -C */ /* ========== */ urltype ut; @@ -90,6 +89,9 @@ volatile int finished; volatile unsigned token; /* XXX MQTTClient_subscriptve.c */ + int msg_count; + int max_msg_count; /*!< (sub) -C */ + char * serverURI; int MQTTVersion; int sessionPresent; @@ -103,8 +105,6 @@ MQTTAsync_responseOptions Ropts; }; -extern struct rpmmqtt_s _mqtt; - #endif /* _RPMMQTT_INTERNAL */ #ifdef __cplusplus @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/tmqtt.c ============================================================================ $ cvs diff -u -r1.1.2.6 -r1.1.2.7 tmqtt.c --- rpm/rpmio/tmqtt.c 2 Jul 2016 09:33:44 -0000 1.1.2.6 +++ rpm/rpmio/tmqtt.c 2 Jul 2016 14:32:59 -0000 1.1.2.7 @@ -46,103 +46,10 @@ return rc; } -/*==============================================================*/ - -#define F_ISSET(_f, _FLAG) (((_f) & ((MQTT_FLAGS_##_FLAG) & ~0x40000000)) != MQTT_FLAGS_NONE) -#define MF_ISSET(_FLAG) F_ISSET(mqtt->flags, _FLAG) - -struct poptOption mqttOptionsTable[] = { - { "clean", 'c', POPT_ARG_VAL|POPT_ARGFLAG_XOR, &_mqtt.flags, MQTT_FLAGS_CLEAN, - N_("(sub) Do not clean session."), NULL }, - { NULL, 'C', POPT_ARG_INT, &_mqtt._max_msg_count, 0, - N_("(sub) Disconnect/exit after <MSGCNT>."), N_("<MSGCNT>") }, - { "eol", 'N', POPT_ARG_VAL|POPT_ARGFLAG_XOR, &_mqtt.flags, MQTT_FLAGS_EOL, - N_("(sub) Do not print NL after messages."), NULL }, - { NULL, 'R', POPT_BIT_SET, &_mqtt.flags, MQTT_FLAGS_NOSTALE, - N_("(sub) Do not print stale messages."), NULL }, - - { NULL, 'A', POPT_ARG_STRING, &_mqtt._address, 0, - N_("Connect from local <ADDR>."), N_("<ADDR>") }, - { "file", 'f', POPT_ARG_STRING, &_mqtt._infile, 0, - N_("Send <FILE> as message."), N_("<FILE>") }, - { "host", 'h', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&_mqtt.host, 0, - N_("Connect to <HOST>."), N_("<HOST>") }, - { "id", 'i', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&_mqtt._clientid, 0, - N_("MQTT client <ID>."), N_("<ID>") }, - { "prefix", 'I', POPT_ARG_STRING, &_mqtt.idprefix, 0, - N_("Use <IDPREFIX>-%{pid} for clientid."), N_("<IDPREFIX>") }, - { "keepalive", 'k', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT, &_mqtt.keepalive, 0, - N_("Keep alive in <SECS>."), N_("<SECS>") }, - { NULL, 'l', POPT_BIT_SET, &_mqtt.flags, MQTT_FLAGS_STDIN_EACH, - N_("Send messages line-by-line from stdin."), NULL }, - { "message", 'm', POPT_ARG_STRING, &_mqtt._message, 0, - N_("MQTT payload <MESSAGE> to send."), N_("<MESSAGE>") }, - { NULL, 'M', POPT_ARG_INT, &_mqtt.max_inflight, 0, - N_("Permit <MAX> inflight messages."), N_("<MAX>") }, - { "null", 'n', POPT_BIT_SET, &_mqtt.flags, MQTT_FLAGS_EMPTY, - N_("Send a null (zero length) message."), NULL }, - { "port", 'p', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT, &_mqtt.port, 0, - N_("Connect to network <PORT>."), N_("<PORT>") }, - { "pass", 'P', POPT_ARG_STRING, &_mqtt.password, 0, - N_("Remote user <PASSWORD>."), N_("<PASSWORD>") }, - { "qos", 'q', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT, &_mqtt.qos, 0, - N_("MQTT <QOS> level."), N_("<QOS>") }, - { "retain", 'r', POPT_BIT_SET, &_mqtt.flags, MQTT_FLAGS_RETAIN, - N_("Retain the message on the host."), NULL }, - { NULL, 's', POPT_BIT_SET, &_mqtt.flags, MQTT_FLAGS_STDIN_ALL, - N_("Send stdin lines as a single message."), NULL }, - { NULL, 'S', POPT_BIT_SET, &_mqtt.flags, MQTT_FLAGS_DNSSRV, - N_("Use SRV record to find remote host."), NULL }, - { "topic", 't', POPT_ARG_ARGV, &_mqtt._topics, 0, - N_("MQTT pub/sub <TOPIC>."), N_("<TOPIC>") }, - { "user", 'u', POPT_ARG_STRING, &_mqtt.user, 0, - N_("Remote <USER>."), N_("<USER>") }, - { NULL, 'V', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&_mqtt.protocol_version, 0, - N_("MQTT protocol <VERSION>"), N_("{31|311}") }, - - { "will-payload", '\0', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&_mqtt.will_message, 0, - N_("Will payload <MESSAGE> ."), N_("<MESSAGE>") }, - { "will-qos", '\0', POPT_ARG_INT, &_mqtt.will_qos, 0, - N_("Will <QOS> level."), N_("<QOS>") }, - { "will-topic", '\0', POPT_ARG_STRING, &_mqtt.will_topic, 0, - N_("Will <TOPIC> to publish to."), N_("<TOPIC>") }, - { "will-retain", '\0', POPT_BIT_SET, &_mqtt.flags, MQTT_FLAGS_WILL_RETAIN, - N_("Retain the client Will."), NULL }, - - { "cafile", '\0', POPT_ARG_STRING, &_mqtt.cafile, 0, - N_("CA certificate(s) from <FILE>."), N_("<FILE>") }, - { "capath", '\0', POPT_ARG_STRING, &_mqtt._capath, 0, - N_("CA certificate(s) in <DIR>."), N_("<DIR>") }, - { "cert", '\0', POPT_ARG_STRING, &_mqtt.cert, 0, - N_("Client authentication <CERT>."), N_("<CERT>") }, - { "key", '\0', POPT_ARG_STRING, &_mqtt.privkey, 0, - N_("Client private <KEY>."), N_("<KEY>") }, - { "ciphers", '\0', POPT_ARG_STRING, &_mqtt.ciphers, 0, - N_("TLS ciphers <LIST> (openssl)."), N_("<LIST>") }, - { "tls-version", '\0', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&_mqtt._tls_version, 0, - N_("TLS protocol <VERSION>."), N_("{1.2|1.1|1}") }, - { "insecure", '\0', POPT_BIT_SET, &_mqtt.flags, MQTT_FLAGS_INSECURE, - N_("Do not verify server cert matches DNS host."), NULL }, - { "psk", '\0', POPT_ARG_STRING, &_mqtt._psk_key, 0, - N_("TLS-PSK mode <KEY> hex (w/o 0x)"), N_("<KEY>") }, - { "psk-identity", '\0', POPT_ARG_STRING, &_mqtt._psk_identity, 0, - N_("TLS-PSK <IDENTITY>."), N_("<IDENTITY>") }, - { "proxy", '\0', POPT_ARG_STRING, &_mqtt._proxy, 0, - N_("SOCKS5 proxy <URL>."), N_("<URL>") }, - - { NULL, '\0', POPT_ARG_INCLUDE_TABLE, rpmioAllPoptTable, 0, - N_("Common options for all rpmio executables:"), - NULL }, - - POPT_AUTOALIAS - POPT_AUTOHELP - - POPT_TABLEEND -}; - int main(int argc, char *argv[]) { +#ifdef UNUSED static char _mqtt_argv[] = "\ mqtt://luser:jasnl@localhost:1883/rpm/mqtt?qos=1,timeout=10000,trace=4 \n\ rpm/#?qos=1 \n\ @@ -158,29 +65,13 @@ $SYS/broker/publish/#?qos=0 \n\ $SYS/broker/bytes/#?qos=0 \n #endif - poptContext optCon; -#ifdef UNUSED ARGV_t av = poptGetArgs(optCon); int ac = argvCount(av); -#endif static char *_av[] = { _mqtt_argv, NULL, }; +#endif rpmmqtt mqtt; int rc = -1; - /* Initialize the (allocated) CLI defaults. */ - memset(&_mqtt, 0, sizeof(_mqtt)); - _mqtt.flags |= (MQTT_FLAGS_CLEAN|MQTT_FLAGS_EOL); - _mqtt.port = 1883; - _mqtt.max_inflight = 20; - _mqtt.keepalive = 60; - _mqtt.qos = 0; - _mqtt.protocol_version = rpmExpand("31", NULL); - _mqtt.host = rpmExpand("localhost", NULL); - _mqtt.idprefix = rpmExpand("rpm", NULL); - _mqtt._clientid = rpmExpand(_mqtt.idprefix, "-%%{pid}", NULL); - _mqtt._tls_version = rpmExpand("1.2", NULL); - - /* Initialize the _mqtt_ macro context */ (void) rpmDefineMacro(NULL, "_mqtt_trace 4", 0); @@ -191,24 +82,14 @@ (void) rpmDefineMacro(NULL, "_mqtt_port 1883", 0); (void) rpmDefineMacro(NULL, "_mqtt_clientid rpm-%{pid}", 0); (void) rpmDefineMacro(NULL, "_mqtt_topic rpm/#", 0); -(void) rpmDefineMacro(NULL, "_mqtt_qos 0", 0); -(void) rpmDefineMacro(NULL, "_mqtt_persist 0", 0); +(void) rpmDefineMacro(NULL, "_mqtt_qos 1", 0); +(void) rpmDefineMacro(NULL, "_mqtt_persist 2", 0); (void) rpmDefineMacro(NULL, "_mqtt_timeout 10000", 0); (void) rpmDefineMacro(NULL, "_mqtt_prefix %{now} rpm pid %{pid} on cpu%{cpu} %{user}:%{group} ", 0); - optCon = rpmioInit(argc, argv, mqttOptionsTable); - - mqtt = rpmmqttNew(_av, _mqtt.flags); + mqtt = rpmmqttNew(argv, 0); rc = _DoMQTT(mqtt); -sleep(1); mqtt = rpmmqttFree(mqtt); - _mqtt.protocol_version = _free(_mqtt.protocol_version); - _mqtt.host = _free(_mqtt.host); - _mqtt.idprefix = _free(_mqtt.idprefix); - _mqtt._clientid = _free(_mqtt._clientid); - _mqtt._tls_version = _free(_mqtt._tls_version); - - optCon = rpmioFini(optCon); return rc; } @@ . ______________________________________________________________________ RPM Package Manager http://rpm5.org CVS Sources Repository rpm-cvs@rpm5.org