RPM Package Manager, CVS Repository
  http://rpm5.org/cvs/
  ____________________________________________________________________________

  Server: rpm5.org                         Name:   Jeff Johnson
  Root:   /v/rpm/cvs                       Email:  j...@rpm5.org
  Module: rpm                              Date:   02-Jul-2016 16:32:59
  Branch: rpm-5_4                          Handle: 2016070214325900

  Modified files:           (Branch: rpm-5_4)
    rpm/rpmio               rpmmqtt.c rpmmqtt.h tmqtt.c

  Log:
    - mqtt: bury option processing prior to embedding.

  Summary:
    Revision    Changes     Path
    1.1.2.11    +369 -203   rpm/rpmio/rpmmqtt.c
    1.1.2.10    +3  -3      rpm/rpmio/rpmmqtt.h
    1.1.2.7     +5  -124    rpm/rpmio/tmqtt.c
  ____________________________________________________________________________

  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmqtt.c
  ============================================================================
  $ cvs diff -u -r1.1.2.10 -r1.1.2.11 rpmmqtt.c
  --- rpm/rpmio/rpmmqtt.c       2 Jul 2016 09:33:44 -0000       1.1.2.10
  +++ rpm/rpmio/rpmmqtt.c       2 Jul 2016 14:32:59 -0000       1.1.2.11
  @@ -13,6 +13,7 @@
   #include <rpmdir.h>
   #include <rpmmacro.h>
   #include <rpmurl.h>
  +#include <poptIO.h>
   #include <argv.h>
   
   #ifdef       WITH_MQTT
  @@ -24,14 +25,13 @@
   
   #include "debug.h"
   
  -int _rpmmqtt_debug = 0;
  +int _rpmmqtt_debug;
  +#define SPEW(_list)     if (mqtt->debug || _rpmmqtt_debug < 0) fprintf _list
   
   #define F_ISSET(_f, _FLAG) (((_f) & ((MQTT_FLAGS_##_FLAG) & ~0x40000000)) != 
MQTT_FLAGS_NONE)
   #define MF_ISSET(_FLAG) F_ISSET(mqtt->flags, _FLAG)
   
   /*==============================================================*/
  -struct rpmmqtt_s _mqtt;
  -
   #ifndef      DYING
   static void dumpU(const char * msg, urlinfo u)
   {
  @@ -74,10 +74,10 @@
                msg, mqtt);
       if (mqtt) {
   
  +     argvPrint("mqtt->av", (ARGV_t)mqtt->av, fp);
   #define      PRINT(_fmt, _foo) fprintf(fp, "%19s: %"#_fmt"\n", #_foo, 
mqtt->_foo)
        PRINT(s, _address);
        PRINT(s, _infile);
  -     PRINT(s, idprefix);
        PRINT(s, _message);
   
        PRINT(s, cafile);
  @@ -90,8 +90,6 @@
        PRINT(s, _psk_identity);
        PRINT(s, _proxy);
   
  -     PRINT(d, _max_msg_count);
  -
   fprintf(stderr, "====================\n");
        PRINT(d, keepalive);
        PRINT(d, max_inflight);
  @@ -106,10 +104,12 @@
        PRINT(d, port);
        PRINT(s, uri);
   
  +     PRINT(s, idprefix);
        PRINT(s, _clientid);
        PRINT(s, clientid);
   
        PRINT(u, qos);
  +     PRINT(u, timeout);
   
        if (mqtt->_topics)
            argvPrint("mqtt->_topics", (ARGV_t)mqtt->_topics, fp);
  @@ -123,12 +123,13 @@
        PRINT(p, persist_ctx);
        PRINT(s, persist_path);
   
  -     PRINT(u, token);
  -     PRINT(u, timeout);
  -
        PRINT(d, debug);
        PRINT(d, trace);
        PRINT(d, finished);
  +     PRINT(u, token);
  +
  +     PRINT(d, msg_count);
  +     PRINT(d, max_msg_count);
   
        PRINT(s, serverURI);
        PRINT(d, MQTTVersion);
  @@ -136,8 +137,6 @@
   
        PRINT(s, dn);
   #undef       PRINT
  -
  -
       }
   }
   
  @@ -203,13 +202,13 @@
   
   /*==============================================================*/
   #ifdef       WITH_MQTT
  -static int onMessageArrived(void * _ctx, char * topic, int topicLen,
  +static int onMessageArrived(void * _mqtt, char * topic, int topicLen,
                MQTTAsync_message *  message)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _ctx;
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
       int rc = 1;
   
  -    (void)mqtt;
  +    mqtt->msg_count++;
   
       const char * s = message->payload;
       size_t ns = message->payloadlen;
  @@ -227,17 +226,17 @@
       return rc;
   }
   
  -static void onDeliveryComplete(void * _ctx, int token)
  +static void onDeliveryComplete(void * _mqtt, int token)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _ctx;
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
       if (_rpmmqtt_debug < 0)
        rpmlog(RPMLOG_DEBUG, "--- MQTT done(%d)\n", token);
       mqtt->token = token;
   }
   
  -static void onConnectionLost(void * _ctx, char *cause)
  +static void onConnectionLost(void * _mqtt, char *cause)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _ctx;
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
   
       rpmlog(RPMLOG_DEBUG,
                "--- MQTT disconnect(%s) version(%d) present(%d)\n",
  @@ -250,10 +249,10 @@
       (void) rpmmqttConnect(mqtt);
   }
   
  -static void onDisconnectFailure(void * _ctx, MQTTAsync_failureData * 
response)
  +static void onDisconnectFailure(void * _mqtt, MQTTAsync_failureData * 
response)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _ctx;
  -fprintf(stderr, "<-- %s(%p,%p) MQTT disconnect failed\n", __FUNCTION__, 
_ctx, response);
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +fprintf(stderr, "<-- %s(%p,%p) MQTT disconnect failed\n", __FUNCTION__, 
_mqtt, response);
       if (response) {
        const char *s = response->message;
        int token = response->token;
  @@ -266,9 +265,9 @@
       mqtt->finished = 1;
   }
   
  -static void onDisconnect(void * _ctx, MQTTAsync_successData * response)
  +static void onDisconnect(void * _mqtt, MQTTAsync_successData * response)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _ctx;
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
   
       if (mqtt->debug || _rpmmqtt_debug)
        rpmlog(RPMLOG_DEBUG,
  @@ -278,10 +277,10 @@
       mqtt->finished = 1;
   }
   
  -static void onConnectFailure(void * _ctx, MQTTAsync_failureData * response)
  +static void onConnectFailure(void * _mqtt, MQTTAsync_failureData * response)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _ctx;
  -fprintf(stderr, "<-- %s(%p,%p) MQTT connect failed\n", __FUNCTION__, _ctx, 
response);
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +fprintf(stderr, "<-- %s(%p,%p) MQTT connect failed\n", __FUNCTION__, _mqtt, 
response);
       if (response) {
        const char *s = response->message;
        int token = response->token;
  @@ -293,9 +292,9 @@
       mqtt->finished = 1;
   }
   
  -static void onConnect(void * _ctx, MQTTAsync_successData * response)
  +static void onConnect(void * _mqtt, MQTTAsync_successData * response)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _ctx;
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
   
       if (response) {
        mqtt->serverURI = xstrdup(response->alt.connect.serverURI);
  @@ -310,10 +309,10 @@
       mqtt->finished = 1;
   }
   
  -static void onSubscribeFailure(void * _ctx, MQTTAsync_failureData * response)
  +static void onSubscribeFailure(void * _mqtt, MQTTAsync_failureData * 
response)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _ctx;
  -fprintf(stderr, "<-- %s(%p,%p) MQTT connect failed\n", __FUNCTION__, _ctx, 
response);
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +fprintf(stderr, "<-- %s(%p,%p) MQTT connect failed\n", __FUNCTION__, _mqtt, 
response);
       if (response) {
        const char *s = response->message;
        int token = response->token;
  @@ -325,9 +324,9 @@
       mqtt->finished = 1;
   }
   
  -static void onSubscribe(void * _ctx, MQTTAsync_successData * response)
  +static void onSubscribe(void * _mqtt, MQTTAsync_successData * response)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _ctx;
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
       int qos = response->alt.qos;
   
       if (mqtt->debug || _rpmmqtt_debug)
  @@ -335,9 +334,22 @@
       mqtt->finished = 1;
   }
   
  -static void onSendFailure(void * _ctx, MQTTAsync_failureData * response)
  +static void onSubscribeMany(void * _mqtt, MQTTAsync_successData * response)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _ctx;
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    int *subqos = response->alt.qosList;
  +
  +SPEW((stderr, "--> %s(%p,%p) subqos %p[%u]\n", __FUNCTION__, _mqtt, 
response, subqos, mqtt->ac));
  +    for (int i = 0; i < mqtt->ac; i++) {
  +     if (mqtt->debug || _rpmmqtt_debug)
  +         rpmlog(RPMLOG_DEBUG, "MQTT  subscribe qos(%d)\n", subqos[i]);
  +    }
  +    mqtt->finished = 1;
  +}
  +
  +static void onSendFailure(void * _mqtt, MQTTAsync_failureData * response)
  +{
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
   
       {
        const char *s = response->message;
  @@ -349,9 +361,9 @@
       mqtt->finished = 1;
   }
   
  -static void onSend(void * _ctx, MQTTAsync_successData * response)
  +static void onSend(void * _mqtt, MQTTAsync_successData * response)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _ctx;
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
   
       if (mqtt->debug || _rpmmqtt_debug) {
        const char * s = response->alt.pub.message.payload;
  @@ -371,15 +383,15 @@
   
   /*==============================================================*/
   #ifdef       WITH_MQTT
  -static int rpmmqttOpen(void **_ctxp, const char *clientID, const char 
*serverURI,
  -             void *_ctx)
  +static int rpmmqttOpen(void **_mqttp, const char *clientID, const char 
*serverURI,
  +             void *_mqtt)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _ctx;
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
       char * dn;
       char * te;
       int rc = MQTTCLIENT_PERSISTENCE_ERROR;
   
  -    *_ctxp = _ctx;
  +    *_mqttp = _mqtt;
   
       mqtt->dn = _free(mqtt->dn);
       dn = rpmGetPath(mqtt->persist_path, "/", clientID, "-", serverURI, NULL);
  @@ -395,15 +407,14 @@
       rc = 0;
   
   exit:
  -if (mqtt->debug || _rpmmqtt_debug < 0)
  -fprintf(stderr, "<-- %s(%p,\"%s\",\"%s\",%p) rc %d dn %s\n", __FUNCTION__, 
_ctxp, clientID, serverURI, _ctx, rc, mqtt->dn);
  +SPEW((stderr, "<-- %s(%p,\"%s\",\"%s\",%p) rc %d dn %s\n", __FUNCTION__, 
_mqttp, clientID, serverURI, _mqtt, rc, mqtt->dn));
   
       return rc;
   }
   
  -static int rpmmqttClose(void *_ctx)
  +static int rpmmqttClose(void *_mqtt)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _ctx;
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
       int rc = MQTTCLIENT_PERSISTENCE_ERROR;
   
   
  @@ -418,15 +429,14 @@
       rc = 0;
   
   exit:
  -if (mqtt->debug || _rpmmqtt_debug < 0)
  -fprintf(stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, _ctx, rc);
  +SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, _mqtt, rc));
       return rc;
   }
   
  -static int rpmmqttPut(void *_ctx, char *key,
  +static int rpmmqttPut(void *_mqtt, char *key,
                int bufcount, char *buffers[], int buflens[])
   {
  -    rpmmqtt mqtt = (rpmmqtt) _ctx;
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
       char *fn = NULL;
       FD_t fd = NULL;
       size_t nb = 0;
  @@ -445,8 +455,7 @@
       for (int i = 0; i < bufcount; i++) {
        const char * s = buffers[i];
        int ns = buflens[i];
  -if (mqtt->debug || _rpmmqtt_debug < 0)
  -fprintf(stderr, "%5d\t%p[%d]\t\"%.*s\"\n", i, s, ns, ns, s);
  +SPEW((stderr, "%5d\t%p[%d]\t\"%.*s\"\n", i, s, ns, ns, s));
        nb += buflens[i];
        nw += Fwrite(buffers[i], sizeof(*buffers[i]), buflens[i], fd);
       }
  @@ -463,16 +472,15 @@
   exit:
       if (fd)
        Fclose(fd);
  -if (mqtt->debug || _rpmmqtt_debug < 0)
  -fprintf(stderr, "<-- %s(%p,\"%s\",%d,%p,%p) rc %d fn %s\n", __FUNCTION__, 
_ctx, key, bufcount, buffers, buflens, rc, fn);
  +SPEW((stderr, "<-- %s(%p,\"%s\",%d,%p,%p) rc %d fn %s\n", __FUNCTION__, 
_mqtt, key, bufcount, buffers, buflens, rc, fn));
       fn = _free(fn);
       return rc;
   }
   
  -static int rpmmqttGet(void *_ctx, char *key,
  +static int rpmmqttGet(void *_mqtt, char *key,
                char *buffer[], int *buflen)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _ctx;
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
       char *fn = NULL;
       FD_t fd = NULL;
       size_t nr = 0;
  @@ -509,17 +517,16 @@
   exit:
       if (fd)
        Fclose(fd);
  -if (mqtt->debug || _rpmmqtt_debug < 0)
  -fprintf(stderr, "<-- %s(%p,\"%s\",%p,%p) rc %d fn %s\n", __FUNCTION__, _ctx, 
key, buffer, buflen, rc, fn);
  +SPEW((stderr, "<-- %s(%p,\"%s\",%p,%p) rc %d fn %s\n", __FUNCTION__, _mqtt, 
key, buffer, buflen, rc, fn));
       fn = _free(fn);
       *buffer = b;
       *buflen = nb;
       return rc;
   }
   
  -static int rpmmqttRemove(void *_ctx, char *key)
  +static int rpmmqttRemove(void *_mqtt, char *key)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _ctx;
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
       char *fn = NULL;
       int rc = MQTTCLIENT_PERSISTENCE_ERROR;
   
  @@ -534,15 +541,14 @@
       rc = 0;
   
   exit:
  -if (mqtt->debug || _rpmmqtt_debug < 0)
  -fprintf(stderr, "<-- %s(%p,\"%s\") rc %d fn %s\n", __FUNCTION__, _ctx, key, 
rc, fn);
  +SPEW((stderr, "<-- %s(%p,\"%s\") rc %d fn %s\n", __FUNCTION__, _mqtt, key, 
rc, fn));
       fn = _free(fn);
       return rc;
   }
   
  -static int rpmmqttKeys(void *_ctx, char ***keys, int *nkeys)
  +static int rpmmqttKeys(void *_mqtt, char ***keys, int *nkeys)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _ctx;
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
       ARGV_t av = NULL;
       int ac = 0;
       DIR * dir = NULL;
  @@ -572,16 +578,15 @@
   exit:
       if (dir)
        (void) Closedir(dir);
  -if (mqtt->debug || _rpmmqtt_debug < 0)
  -fprintf(stderr, "<-- %s(%p,%p,%p) rc %d keys %p[%u]\n", __FUNCTION__, _ctx, 
keys, nkeys, rc, av, ac);
  +SPEW((stderr, "<-- %s(%p,%p,%p) rc %d keys %p[%u]\n", __FUNCTION__, _mqtt, 
keys, nkeys, rc, av, ac));
       *keys = (char **) av;
       *nkeys = ac;
       return rc;
   }
   
  -static int rpmmqttClear(void *_ctx)
  +static int rpmmqttClear(void *_mqtt)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _ctx;
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
       DIR * dir = NULL;
       struct dirent *dp;
       int nerrs = 0;
  @@ -610,14 +615,13 @@
   exit:
       if (dir)
        (void) Closedir(dir);
  -if (mqtt->debug || _rpmmqtt_debug < 0)
  -fprintf(stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, _ctx, rc);
  +SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, _mqtt, rc));
       return rc;
   }
   
  -static int rpmmqttContainsKey(void *_ctx, char *key)
  +static int rpmmqttContainsKey(void *_mqtt, char *key)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _ctx;
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
       DIR * dir = NULL;
       struct dirent *dp;
       int rc = MQTTCLIENT_PERSISTENCE_ERROR;
  @@ -646,8 +650,7 @@
   exit:
       if (dir)
        (void) Closedir(dir);
  -if (mqtt->debug || _rpmmqtt_debug < 0)
  -fprintf(stderr, "<-- %s(%p,\"%s\") rc %d\n", __FUNCTION__, _ctx, key, rc);
  +SPEW((stderr, "<-- %s(%p,\"%s\") rc %d\n", __FUNCTION__, _mqtt, key, rc));
       return rc;
   }
   
  @@ -727,12 +730,24 @@
        (void)_Sopts;
        Copts->ssl = NULL;
   #endif
  +
        Copts->onSuccess = onConnect;
        Copts->onFailure = onConnectFailure;
        Copts->context = mqtt;
  +
        Copts->serverURIcount = 0;
        Copts->serverURIs = NULL;
   
  +     Copts->MQTTVersion = 0;
  +     if (mqtt->protocol_version) {
  +         if (!strcmp(mqtt->protocol_version, "auto"))
  +             Copts->MQTTVersion = 0;
  +         if (!strcmp(mqtt->protocol_version, "31"))
  +             Copts->MQTTVersion = 3;
  +         if (!strcmp(mqtt->protocol_version, "311"))
  +             Copts->MQTTVersion = 4;
  +     }
  +
        mqtt->finished = 0;
        rc = check(mqtt, "connect",
                MQTTAsync_connect(mqtt->C, Copts));
  @@ -742,6 +757,7 @@
   
       }
   #endif       /* WITH_MQTT */
  +SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, mqtt, rc));
       return rc;
   }
   
  @@ -768,6 +784,7 @@
            usleep(100);
       }
   #endif
  +SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, mqtt, rc));
       return rc;
   }
   
  @@ -810,6 +827,52 @@
        usleep(100);
   #endif       /* WITH_MQTT */
   
  +SPEW((stderr, "<-- %s(%p,%p[%u]) rc %d\n", __FUNCTION__, mqtt, s, 
(unsigned)ns, rc));
  +    return rc;
  +}
  +
  +static int rpmmqttSubscribeMany(rpmmqtt mqtt, int ac, char ** av)
  +{
  +    int rc = -1;
  +
  +SPEW((stderr, "--> %s(%p,%p[%d])\n", __FUNCTION__, mqtt, av, ac));
  +    if (ac <= 0)
  +     goto exit;
  +#ifdef       WITH_MQTT
  +    int _lvl = RPMLOG_DEBUG;
  +    int *subqos = xcalloc(mqtt->ac, sizeof(*subqos));
  +    for (int i = 0; i < ac; i++) {
  +     char * t = av[i];
  +     char * te = strchr(t, '?');
  +     subqos[i] = mqtt->qos;  /* XXX */
  +     if (te) {
  +         *te++ = '\0';
  +         if ((te = strchr(te, '='))) {
  +             if (!strncmp(t, "qos", (te - t)))
  +                 subqos[i] = strtoul(te+1, NULL, 0);
  +         }
  +     }
  +     rpmlog(_lvl, "%19s: %s qos(%u)\n", "subtopic", t, subqos[i]);
  +    }
  +
  +    MQTTAsync_responseOptions *Ropts = &mqtt->Ropts;
  +    memcpy(Ropts, &_Ropts, sizeof(*Ropts));
  +#ifdef  REF
  +    memcpy(Ropts->struct_id, "MQTR", 4);
  +    Ropts->struct_version = 0;
  +#endif
  +    Ropts->onSuccess = onSubscribeMany;
  +    Ropts->onFailure = onSubscribeFailure;
  +    Ropts->context = mqtt;
  +
  +    rc = check(mqtt, "subscribeMany",
  +             MQTTAsync_subscribeMany(&mqtt->C,
  +                     ac, av, subqos, Ropts));
  +    subqos = _free(subqos);
  +#endif       /* WITH_MQTT */
  +
  +exit:
  +SPEW((stderr, "<-- %s(%p,%p[%d]) rc %d\n", __FUNCTION__, mqtt, av, ac, rc));
       return rc;
   }
   
  @@ -820,12 +883,8 @@
   
   #ifdef       WITH_MQTT
       if (rpmmqttConnect(mqtt) == 0) {
  -#ifdef       DYING
  -     static char _mqtt_prefix[] =
  -             "%{now} rpm pid %{pid} on cpu%{cpu} %{user}:%{group} ";
  -#else
        static char _mqtt_prefix[] = "%{?_mqtt_prefix}";
  -#endif
  +     /* XXX extra space */
        char * t = rpmExpand(_mqtt_prefix, " ", s, NULL);
        size_t nt = strlen(t);
   
  @@ -836,6 +895,7 @@
       }
   #endif       /* WITH_MQTT */
   
  +SPEW((stderr, "<-- %s(%p,%p[%u]) ret %d\n", __FUNCTION__, mqtt, s, 
(unsigned)ns, (int)ret));
       return ret;
   }
   
  @@ -886,7 +946,6 @@
        mqtt->finished = 0;
        rc = check(mqtt, "subscribe",
                MQTTAsync_subscribe(mqtt->C, subtopic, subqos, Ropts));
  -fprintf(stderr, "*** finished(%d)\n", mqtt->finished);
        while (rc == 0 && !mqtt->finished)
            usleep(100);
   
  @@ -897,115 +956,181 @@
       }
   #endif       /* WITH_MQTT */
   
  +SPEW((stderr, "<-- %s(%p,%p[%u]) ret %d\n", __FUNCTION__, mqtt, s, 
(unsigned)ns, (int)ret));
       return ret;
   }
   
   /*==============================================================*/
  -static void rpmmqttFini(void * _ctx)
  +static rpmRC
  +rpmmqttInitPopt(rpmmqtt mqtt, int ac, char * const* av)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _ctx;
  -
  -#ifdef       WITH_MQTT
  -    (void) rpmmqttDisconnect(mqtt);
  -    (void) check(mqtt, "destroy",
  -             (MQTTAsync_destroy(&mqtt->C), 0));
  -#endif       /* WITH_MQTT */
  -
  -/* ========== */
  -    mqtt->_address = _free(mqtt->_address);
  -    mqtt->_infile = _free(mqtt->_infile);
  -    mqtt->host = _free(mqtt->host);
  -    mqtt->_clientid = _free(mqtt->_clientid);
  -    mqtt->idprefix = _free(mqtt->idprefix);
  -    mqtt->_message = _free(mqtt->_message);
  -    mqtt->password = _free(mqtt->password);
  -    mqtt->_topics = argvFree((ARGV_t)mqtt->_topics);
  -    mqtt->user = _free(mqtt->user);
  -    mqtt->protocol_version = _free(mqtt->protocol_version); /* XXX malloc? */
  +  struct poptOption rpmmqttOptionsTable[] = {
  +   { "clean", 'c', POPT_ARG_VAL|POPT_ARGFLAG_XOR,&mqtt->flags, 
MQTT_FLAGS_CLEAN,
  +     N_("(sub) Do not clean session."), NULL },
  +   { NULL, 'C', POPT_ARG_INT,                &mqtt->max_msg_count, 0,
  +     N_("(sub) Disconnect/exit after <MSGCNT>."), N_("<MSGCNT>") },
  +   { "eol", 'N', POPT_ARG_VAL|POPT_ARGFLAG_XOR,      &mqtt->flags, 
MQTT_FLAGS_EOL,
  +     N_("(sub) Do not print NL after messages."), NULL },
  +   { NULL, 'R', POPT_BIT_SET,                &mqtt->flags, 
MQTT_FLAGS_NOSTALE,
  +     N_("(sub) Do not print stale messages."), NULL },
  +
  +   { NULL, 'A', POPT_ARG_STRING,             &mqtt->_address, 0,
  +     N_("Connect from local <ADDR>."), N_("<ADDR>") },
  +   { "debug", 'd', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &_rpmmqtt_debug, -1,
  +     N_("Debug spewage."), NULL },
  +   { "file", 'f', POPT_ARG_STRING,   &mqtt->_infile, 0,
  +     N_("Send <FILE> as message."), N_("<FILE>") },
  +   { "host", 'h', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&mqtt->host, 0,
  +     N_("Connect to <HOST>."), N_("<HOST>") },
  +   { "id", 'i', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&mqtt->_clientid, 
0,
  +     N_("MQTT client <ID>."), N_("<ID>") },
  +   { "prefix", 'I', POPT_ARG_STRING, &mqtt->idprefix, 0,
  +     N_("Use <IDPREFIX>-%{pid} for clientid."), N_("<IDPREFIX>") },
  +   { "keepalive", 'k', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT, 
&mqtt->keepalive, 0,
  +     N_("Keep alive in <SECS>."), N_("<SECS>") },
  +   { NULL, 'l', POPT_BIT_SET,                &mqtt->flags, 
MQTT_FLAGS_STDIN_EACH,
  +     N_("Send messages line-by-line from stdin."), NULL },
  +   { "message", 'm', POPT_ARG_STRING,        &mqtt->_message, 0,
  +     N_("MQTT payload <MESSAGE> to send."), N_("<MESSAGE>") },
  +   { NULL, 'M', POPT_ARG_INT,                &mqtt->max_inflight, 0,
  +     N_("Permit <MAX> inflight messages."), N_("<MAX>") },
  +   { "null", 'n', POPT_BIT_SET,              &mqtt->flags, MQTT_FLAGS_EMPTY,
  +     N_("Send a null (zero length) message."), NULL },
  +   { "port", 'p', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT,    &mqtt->port, 0,
  +     N_("Connect to network <PORT>."), N_("<PORT>") },
  +   { "pass", 'P', POPT_ARG_STRING,   &mqtt->password, 0,
  +     N_("Remote user <PASSWORD>."), N_("<PASSWORD>") },
  +   { "qos", 'q', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT,     &mqtt->qos, 0,
  +     N_("MQTT <QOS> level."), N_("<QOS>") },
  +   { "retain", 'r', POPT_BIT_SET,            &mqtt->flags, MQTT_FLAGS_RETAIN,
  +     N_("Retain the message on the host."), NULL },
  +   { NULL, 's', POPT_BIT_SET,                &mqtt->flags, 
MQTT_FLAGS_STDIN_ALL,
  +     N_("Send stdin lines as a single message."), NULL },
  +   { NULL, 'S', POPT_BIT_SET,                &mqtt->flags, MQTT_FLAGS_DNSSRV,
  +     N_("Use SRV record to find remote host."), NULL },
  +   { "topic", 't', POPT_ARG_ARGV,            &mqtt->_topics, 0,
  +     N_("MQTT pub/sub <TOPIC>."), N_("<TOPIC>") },
  +   { "user", 'u', POPT_ARG_STRING,   &mqtt->user, 0,
  +     N_("Remote <USER>."), N_("<USER>") },
  +   { "protocol", 'V', 
POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&mqtt->protocol_version, 0,
  +     N_("MQTT protocol <VERSION>"), N_("{auto|311|31}") },
  +
  +   { "will-payload", '\0', 
POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&mqtt->will_message, 0,
  +     N_("Will payload <MESSAGE> ."), N_("<MESSAGE>") },
  +   { "will-qos", '\0', POPT_ARG_INT, &mqtt->will_qos, 0,
  +     N_("Will <QOS> level."), N_("<QOS>") },
  +   { "will-topic", '\0', POPT_ARG_STRING,    &mqtt->will_topic, 0,
  +     N_("Will <TOPIC> to publish to."), N_("<TOPIC>") },
  +   { "will-retain", '\0', POPT_BIT_SET,      &mqtt->flags, 
MQTT_FLAGS_WILL_RETAIN,
  +     N_("Retain the client Will."), NULL },
  +
  +   { "cafile", '\0', POPT_ARG_STRING,        &mqtt->cafile, 0,
  +     N_("CA certificate(s) from <FILE>."), N_("<FILE>") },
  +   { "capath", '\0', POPT_ARG_STRING,        &mqtt->_capath, 0,
  +     N_("CA certificate(s) in <DIR>."), N_("<DIR>") },
  +   { "cert", '\0', POPT_ARG_STRING,  &mqtt->cert, 0,
  +     N_("Client authentication <CERT>."), N_("<CERT>") },
  +   { "key", '\0', POPT_ARG_STRING,   &mqtt->privkey, 0,
  +     N_("Client private <KEY>."), N_("<KEY>") },
  +   { "ciphers", '\0', POPT_ARG_STRING,       &mqtt->ciphers, 0,
  +     N_("TLS ciphers <LIST> (openssl)."), N_("<LIST>") },
  +   { "tls-version", '\0', 
POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&mqtt->_tls_version, 0,
  +     N_("TLS protocol <VERSION>."), N_("{1.2|1.1|1}") },
  +   { "insecure", '\0', POPT_BIT_SET, &mqtt->flags, MQTT_FLAGS_INSECURE,
  +     N_("Do not verify server cert matches DNS host."), NULL },
  +   { "psk", '\0', POPT_ARG_STRING,   &mqtt->_psk_key, 0,
  +     N_("TLS-PSK mode <KEY> hex (w/o 0x)"), N_("<KEY>") },
  +   { "psk-identity", '\0', POPT_ARG_STRING, &mqtt->_psk_identity, 0,
  +     N_("TLS-PSK <IDENTITY>."), N_("<IDENTITY>") },
  +   { "proxy", '\0', POPT_ARG_STRING, &mqtt->_proxy, 0,
  +     N_("SOCKS5 proxy <URL>."), N_("<URL>") },
  +
  +   { NULL, '\0', POPT_ARG_INCLUDE_TABLE, rpmioAllPoptTable, 0,
  +     N_("Common options for all rpmio executables:"),
  +     NULL },
  +
  +    POPT_AUTOALIAS
  +    POPT_AUTOHELP
  +
  +    POPT_TABLEEND
  +  };
  +    static int _popt_context_flags = 0; /* XXX POPT_CONTEXT_POSIXMEHARDER */
  +    poptContext con = NULL;
  +    rpmRC rc = RPMRC_FAIL;      /* assume failure */
  +    int xx;
  +
  +SPEW((stderr, "--> %s(%p,%p[%d])\n", __FUNCTION__, mqtt, av, ac));
  +
  +    con = poptGetContext(av[0], ac, (const char **)av,
  +             rpmmqttOptionsTable, _popt_context_flags);
  +
  +    while ((xx = poptGetNextOpt(con)) > 0)
  +    switch (xx) {
  +    default:
  +     fprintf(stderr, _("%s: option table misconfigured (%d)\n"),
  +                __FUNCTION__, xx);
  +     goto exit;
  +     break;
  +    }
   
  -    mqtt->will_message = _free(mqtt->will_message);
  -    mqtt->will_topic = _free(mqtt->will_topic);
  -    mqtt->cafile = _free(mqtt->cafile);
  -    mqtt->_capath = _free(mqtt->_capath);
  -    mqtt->cert = _free(mqtt->cert);
  -    mqtt->privkey = _free(mqtt->privkey);
  -    mqtt->ciphers = _free(mqtt->ciphers);
  -    mqtt->_tls_version = _free(mqtt->_tls_version);  /* XXX malloc? */
  -    mqtt->_psk_key = _free(mqtt->_psk_key);
  -    mqtt->_psk_identity = _free(mqtt->_psk_identity);
  -    mqtt->_proxy = _free(mqtt->_proxy);
  -/* ========== */
  +    mqtt->av = NULL;
  +    xx = argvAppend((ARGV_t *)&mqtt->av, poptGetArgs(con));
   
  -    mqtt->C = NULL;
  -    mqtt->uri = _free(mqtt->uri);
  -    mqtt->topic = _free(mqtt->topic);
  -    mqtt->clientid = _free(mqtt->clientid);
  -    mqtt->persist_path = _free(mqtt->persist_path);
  -    mqtt->persist_ctx = _free(mqtt->persist_ctx);
  -    mqtt->dn = _free(mqtt->dn);
  +    mqtt->ac = argvCount((ARGV_t)mqtt->av);
  +    rc = RPMRC_OK;
   
  -    mqtt->serverURI = _free(mqtt->serverURI);
  +exit:
  +    if (con)
  +     con = poptFreeContext(con);
   
  -    if (mqtt->av)
  -     (void) argvFree((ARGV_t)mqtt->av);
  -    mqtt->av = NULL;
  -    mqtt->flags = 0;
  +SPEW((stderr, "<-- %s(%p,%p[%d]) rc %d\n", __FUNCTION__, mqtt, av, ac, rc));
  +    return rc;
   }
   
  -RPMIOPOOL_MODULE(mqtt)
  -
  -rpmmqtt rpmmqttNew(char ** av, uint32_t flags)
  +static rpmRC rpmmqttInit(rpmmqtt mqtt, int ac, const char ** av,
  +             mqttFlags flags)
   {
  -    static const char *_av[] = { "mqtt://localhost:1883/rpm/%{pid}/mqtt", 
NULL };
  -    rpmmqtt mqtt = rpmmqttGetPool(_rpmmqttPool);
  +    static mqttFlags _flags = (MQTT_FLAGS_CLEAN|MQTT_FLAGS_EOL);
       urlinfo u = NULL;
       const char *s = NULL;
  +    rpmRC rc;
  +
  +SPEW((stderr, "--> %s(%p,%p[%d],0x%x)\n", __FUNCTION__, mqtt, av, ac, 
flags));
   
  -    /* -- Copy (and initialize the defaults) from the CLI options. */
  -    {   yarnLock use = mqtt->_item.use;
  -        void *pool = mqtt->_item.pool;
  -#ifdef       WTF
  -     *mqtt = _mqtt;  /* structure assignment */
  +    /* -- Initialize the default values before popt processing. */
  +    mqtt->flags = (flags ? flags : _flags);
  +    mqtt->port = 1883;
  +    mqtt->max_inflight = 20;
  +    mqtt->keepalive = 60;
  +    mqtt->qos = 0;
  +#ifdef       DYING
  +    mqtt->protocol_version = rpmExpand("31", NULL);
   #else
  -     memcpy(mqtt, &_mqtt, sizeof(*mqtt));
  +    mqtt->protocol_version = rpmExpand("auto", NULL);
  +    mqtt->will_message = rpmExpand("", NULL);
   #endif
  -        mqtt->_item.use = use;
  -        mqtt->_item.pool = pool;
  -    }
  -    memset(&_mqtt, 0, sizeof(_mqtt));
  -    _mqtt.flags |= (MQTT_FLAGS_CLEAN|MQTT_FLAGS_EOL);
  -    _mqtt.port = 1883;
  -    _mqtt.max_inflight = 20;
  -    _mqtt.keepalive = 60;
  -    _mqtt.qos = 0;
  -    _mqtt.protocol_version = xstrdup("31");  /* XXX malloc? */
  -    _mqtt.host = rpmExpand("localhost", NULL);
  -    _mqtt.idprefix = rpmExpand("rpm", NULL);
  -    _mqtt._clientid = rpmExpand(_mqtt.idprefix, "-%%{pid}", NULL);
  -    _mqtt._tls_version = xstrdup("1.2");     /* XXX malloc? */
  -
  -    mqtt->flags = flags;
  -    mqtt->av = NULL;
  -
  -    if (av == NULL)
  -     av = (char **)_av;
  -
  -    if (av) {
  -     int ac = argvCount((ARGV_t)av);
  -     for (int i = 0; i < ac; i++) {
  -         char * t = rpmExpand(av[i], NULL);
  -         ARGV_t nav = NULL;
  -         int xx = argvSplit(&nav, t, NULL);
  -         int nac = argvCount(nav);
  -         for (int j = 0; j < nac; j++)
  -             xx = argvAdd((ARGV_t *)&mqtt->av, nav[j]);
  -         nav = argvFree(nav);
  -         t = _free(t);
  -     }
  +    mqtt->host = rpmExpand("localhost", NULL);
  +    mqtt->idprefix = rpmExpand("rpm", NULL);
  +    mqtt->_clientid = rpmExpand(mqtt->idprefix, "-%%{pid}", NULL);
  +    mqtt->_tls_version = rpmExpand("1.2", NULL);
  +
  +    /* -- Process the options/arguments. */
  +    rc = rpmmqttInitPopt(mqtt, ac, (char *const *)av);
  +    if (rc)
  +        goto exit;
  +
  +    if (mqtt->ac == 0) {
  +static const char *_av[] = {
  +    "mqtt://luser:jasnl@localhost:1883/rpm/mqtt?trace=4",
  +    "rpm/#",
  +    NULL,
  +};
  +     (void) argvAppend((ARGV_t *)&mqtt->av, _av);
  +     mqtt->ac = argvCount((ARGV_t)mqtt->av);
       }
  -argvPrint(__FUNCTION__, (ARGV_t)mqtt->av, NULL);
  -    mqtt->ac = argvCount((ARGV_t)mqtt->av);
   
  +    /* -- Set unspecified options from the URI parameters. */
  +    /* XXX W2DO: which takes precedence, options or URI??? */
       mqtt->ut = urlSplit(mqtt->av[0], &u);
       mqtt->u = u;
   
  @@ -1030,9 +1155,9 @@
        mqtt->user = xstrdup(u->user);
       if (mqtt->password == NULL && u->password != NULL)
        mqtt->password = xstrdup(u->password);
  +     /* XXX mqtt->host? */
       if (mqtt->port == 0 && u->port != 0)
        mqtt->port = u->port;
  -
       mqtt->uri = rpmExpand(u->scheme, "://", u->host, ":", u->portstr, NULL);
   
       (void) urlPath(u->url, &s);
  @@ -1082,8 +1207,79 @@
        }
        qav = argvFree(qav);
       }
  +
  +    rc = RPMRC_OK;
  +
  +exit:
  +SPEW((stderr, "<-- %s(%p,%p[%d],0x%x) rc %d\n", __FUNCTION__, mqtt, av, ac, 
flags, rc));
  +    return rc;
  +}
  +
  +/*==============================================================*/
  +static void rpmmqttFini(void * _mqtt)
  +{
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +
   #ifdef       WITH_MQTT
  +    (void) rpmmqttDisconnect(mqtt);
  +    (void) check(mqtt, "destroy",
  +             (MQTTAsync_destroy(&mqtt->C), 0));
  +#endif       /* WITH_MQTT */
  +
  +/* ========== */
  +    mqtt->_address = _free(mqtt->_address);
  +    mqtt->_infile = _free(mqtt->_infile);
  +    mqtt->host = _free(mqtt->host);
  +    mqtt->_clientid = _free(mqtt->_clientid);
  +    mqtt->idprefix = _free(mqtt->idprefix);
  +    mqtt->_message = _free(mqtt->_message);
  +    mqtt->password = _free(mqtt->password);
  +    mqtt->_topics = argvFree((ARGV_t)mqtt->_topics);
  +    mqtt->user = _free(mqtt->user);
  +    mqtt->protocol_version = _free(mqtt->protocol_version); /* XXX malloc? */
  +
  +    mqtt->will_message = _free(mqtt->will_message);
  +    mqtt->will_topic = _free(mqtt->will_topic);
  +    mqtt->cafile = _free(mqtt->cafile);
  +    mqtt->_capath = _free(mqtt->_capath);
  +    mqtt->cert = _free(mqtt->cert);
  +    mqtt->privkey = _free(mqtt->privkey);
  +    mqtt->ciphers = _free(mqtt->ciphers);
  +    mqtt->_tls_version = _free(mqtt->_tls_version);  /* XXX malloc? */
  +    mqtt->_psk_key = _free(mqtt->_psk_key);
  +    mqtt->_psk_identity = _free(mqtt->_psk_identity);
  +    mqtt->_proxy = _free(mqtt->_proxy);
  +/* ========== */
  +
  +    mqtt->C = NULL;
  +    mqtt->uri = _free(mqtt->uri);
  +    mqtt->topic = _free(mqtt->topic);
  +    mqtt->clientid = _free(mqtt->clientid);
  +    mqtt->persist_path = _free(mqtt->persist_path);
  +    mqtt->persist_ctx = _free(mqtt->persist_ctx);
  +    mqtt->dn = _free(mqtt->dn);
  +
  +    mqtt->serverURI = _free(mqtt->serverURI);
  +
  +    if (mqtt->av)
  +     (void) argvFree((ARGV_t)mqtt->av);
  +    mqtt->av = NULL;
  +    mqtt->flags = 0;
  +}
  +
  +RPMIOPOOL_MODULE(mqtt)
  +
  +rpmmqtt rpmmqttNew(char ** av, uint32_t flags)
  +{
  +    rpmmqtt mqtt = rpmmqttGetPool(_rpmmqttPool);
  +    int ac = argvCount((ARGV_t)av);
  +
  +SPEW((stderr, "--> %s(%p,0x%x)\n", __FUNCTION__, av, flags));
   
  +    rpmRC rc = rpmmqttInit(mqtt, ac, (const char **)av, flags);
  +    (void)rc;
  +
  +#ifdef       WITH_MQTT
       {        static int oneshot;
        int _lvl = RPMLOG_DEBUG;
        int xx;
  @@ -1157,41 +1353,11 @@
                        onDeliveryComplete));
   
        if (mqtt->ac > 1) {
  -#ifdef       NOTYET
  -         int *subqos = xcalloc(mqtt->ac, sizeof(*subqos));
  -         for (int i = 1; i < mqtt->ac; i++) {
  -             char * t = mqtt->av[i];
  -             char * te = strchr(t, '?');
  -             subqos[i] = 1;  /* XXX */
  -             if (te) {
  -                 *te++ = '\0';
  -                 if ((te = strchr(te, '='))) {
  -                     if (!strncmp(t, "qos", (te - t)))
  -                         subqos[i] = strtoul(te+1, NULL, 0);
  -                 }
  -             }
  -             rpmlog(_lvl, "%19s: %s qos(%u)\n", "subtopic", t, subqos[i]);
  -         }
  -
  -         MQTTAsync_responseOptions *Ropts = &mqtt->Ropts;
  -         memcpy(Ropts, &_Ropts, sizeof(*Ropts));
  -#ifdef  REF
  -         memcpy(Ropts->struct_id, "MQTR", 4);
  -         Ropts->struct_version = 0;
  -#endif
  -         Ropts->onSuccess = onSubscribe;
  -         Ropts->onFailure = onSubscribeFailure;
  -         Ropts->context = mqtt;
  -
  -         xx = check(mqtt, "subscribeMany",
  -             MQTTAsync_subscribeMany(&mqtt->C,
  -                     mqtt->ac-1, mqtt->av+1, subqos+1, Ropts));
  -         subqos = _free(subqos);
  +#ifdef       NOTYET  /* XXX segfault here. */
  +         xx = rpmmqttSubscribeMany(mqtt, mqtt->ac-1, mqtt->av+1);
   #else
  -         for (int i = 1; i < mqtt->ac; i++) {
  -fprintf(stderr, "%5d %s\n", i, mqtt->av[i]);
  +         for (int i = 1; i < mqtt->ac; i++)
                xx = rpmmqttRead(mqtt, mqtt->av[i], 0);
  -         }
   #endif
        }
       }
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmqtt.h
  ============================================================================
  $ cvs diff -u -r1.1.2.9 -r1.1.2.10 rpmmqtt.h
  --- rpm/rpmio/rpmmqtt.h       2 Jul 2016 09:33:44 -0000       1.1.2.9
  +++ rpm/rpmio/rpmmqtt.h       2 Jul 2016 14:32:59 -0000       1.1.2.10
  @@ -65,7 +65,6 @@
       const char *_psk_identity;                       /*!< --psk-identity */
       const char *_proxy;                              /*!< --proxy */
   
  -    int _max_msg_count;                              /*!< (sub) -C */
   /* ========== */
   
       urltype ut;
  @@ -90,6 +89,9 @@
       volatile int finished;
       volatile unsigned token; /* XXX MQTTClient_subscriptve.c */
   
  +    int msg_count;
  +    int max_msg_count;                               /*!< (sub) -C */
  +
       char * serverURI;
       int MQTTVersion;
       int sessionPresent;
  @@ -103,8 +105,6 @@
       MQTTAsync_responseOptions Ropts;
   };
   
  -extern struct rpmmqtt_s _mqtt;
  -
   #endif       /* _RPMMQTT_INTERNAL */
   
   #ifdef __cplusplus
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/tmqtt.c
  ============================================================================
  $ cvs diff -u -r1.1.2.6 -r1.1.2.7 tmqtt.c
  --- rpm/rpmio/tmqtt.c 2 Jul 2016 09:33:44 -0000       1.1.2.6
  +++ rpm/rpmio/tmqtt.c 2 Jul 2016 14:32:59 -0000       1.1.2.7
  @@ -46,103 +46,10 @@
       return rc;
   }
   
  -/*==============================================================*/
  -
  -#define F_ISSET(_f, _FLAG) (((_f) & ((MQTT_FLAGS_##_FLAG) & ~0x40000000)) != 
MQTT_FLAGS_NONE)
  -#define MF_ISSET(_FLAG) F_ISSET(mqtt->flags, _FLAG)
  -
  -struct poptOption mqttOptionsTable[] = {
  - { "clean", 'c', POPT_ARG_VAL|POPT_ARGFLAG_XOR,      &_mqtt.flags, 
MQTT_FLAGS_CLEAN,
  -     N_("(sub) Do not clean session."), NULL },
  - { NULL, 'C', POPT_ARG_INT,          &_mqtt._max_msg_count, 0,
  -     N_("(sub) Disconnect/exit after <MSGCNT>."), N_("<MSGCNT>") },
  - { "eol", 'N', POPT_ARG_VAL|POPT_ARGFLAG_XOR,        &_mqtt.flags, 
MQTT_FLAGS_EOL,
  -     N_("(sub) Do not print NL after messages."), NULL },
  - { NULL, 'R', POPT_BIT_SET,          &_mqtt.flags, MQTT_FLAGS_NOSTALE,
  -     N_("(sub) Do not print stale messages."), NULL },
  -
  - { NULL, 'A', POPT_ARG_STRING,               &_mqtt._address, 0,
  -     N_("Connect from local <ADDR>."), N_("<ADDR>") },
  - { "file", 'f', POPT_ARG_STRING,     &_mqtt._infile, 0,
  -     N_("Send <FILE> as message."), N_("<FILE>") },
  - { "host", 'h', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&_mqtt.host, 0,
  -     N_("Connect to <HOST>."), N_("<HOST>") },
  - { "id", 'i', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&_mqtt._clientid, 0,
  -     N_("MQTT client <ID>."), N_("<ID>") },
  - { "prefix", 'I', POPT_ARG_STRING,   &_mqtt.idprefix, 0,
  -     N_("Use <IDPREFIX>-%{pid} for clientid."), N_("<IDPREFIX>") },
  - { "keepalive", 'k', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT, 
&_mqtt.keepalive, 0,
  -     N_("Keep alive in <SECS>."), N_("<SECS>") },
  - { NULL, 'l', POPT_BIT_SET,          &_mqtt.flags, MQTT_FLAGS_STDIN_EACH,
  -     N_("Send messages line-by-line from stdin."), NULL },
  - { "message", 'm', POPT_ARG_STRING,  &_mqtt._message, 0,
  -     N_("MQTT payload <MESSAGE> to send."), N_("<MESSAGE>") },
  - { NULL, 'M', POPT_ARG_INT,          &_mqtt.max_inflight, 0,
  -     N_("Permit <MAX> inflight messages."), N_("<MAX>") },
  - { "null", 'n', POPT_BIT_SET,                &_mqtt.flags, MQTT_FLAGS_EMPTY,
  -     N_("Send a null (zero length) message."), NULL },
  - { "port", 'p', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT,      &_mqtt.port, 0,
  -     N_("Connect to network <PORT>."), N_("<PORT>") },
  - { "pass", 'P', POPT_ARG_STRING,     &_mqtt.password, 0,
  -     N_("Remote user <PASSWORD>."), N_("<PASSWORD>") },
  - { "qos", 'q', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT,       &_mqtt.qos, 0,
  -     N_("MQTT <QOS> level."), N_("<QOS>") },
  - { "retain", 'r', POPT_BIT_SET,              &_mqtt.flags, MQTT_FLAGS_RETAIN,
  -     N_("Retain the message on the host."), NULL },
  - { NULL, 's', POPT_BIT_SET,          &_mqtt.flags, MQTT_FLAGS_STDIN_ALL,
  -     N_("Send stdin lines as a single message."), NULL },
  - { NULL, 'S', POPT_BIT_SET,          &_mqtt.flags, MQTT_FLAGS_DNSSRV,
  -     N_("Use SRV record to find remote host."), NULL },
  - { "topic", 't', POPT_ARG_ARGV,              &_mqtt._topics, 0,
  -     N_("MQTT pub/sub <TOPIC>."), N_("<TOPIC>") },
  - { "user", 'u', POPT_ARG_STRING,     &_mqtt.user, 0,
  -     N_("Remote <USER>."), N_("<USER>") },
  - { NULL, 'V', 
POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&_mqtt.protocol_version, 0,
  -     N_("MQTT protocol <VERSION>"), N_("{31|311}") },
  -
  - { "will-payload", '\0', 
POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&_mqtt.will_message, 0,
  -     N_("Will payload <MESSAGE> ."), N_("<MESSAGE>") },
  - { "will-qos", '\0', POPT_ARG_INT,   &_mqtt.will_qos, 0,
  -     N_("Will <QOS> level."), N_("<QOS>") },
  - { "will-topic", '\0', POPT_ARG_STRING,      &_mqtt.will_topic, 0,
  -     N_("Will <TOPIC> to publish to."), N_("<TOPIC>") },
  - { "will-retain", '\0', POPT_BIT_SET,        &_mqtt.flags, 
MQTT_FLAGS_WILL_RETAIN,
  -     N_("Retain the client Will."), NULL },
  -
  - { "cafile", '\0', POPT_ARG_STRING,  &_mqtt.cafile, 0,
  -     N_("CA certificate(s) from <FILE>."), N_("<FILE>") },
  - { "capath", '\0', POPT_ARG_STRING,  &_mqtt._capath, 0,
  -     N_("CA certificate(s) in <DIR>."), N_("<DIR>") },
  - { "cert", '\0', POPT_ARG_STRING,    &_mqtt.cert, 0,
  -     N_("Client authentication <CERT>."), N_("<CERT>") },
  - { "key", '\0', POPT_ARG_STRING,     &_mqtt.privkey, 0,
  -     N_("Client private <KEY>."), N_("<KEY>") },
  - { "ciphers", '\0', POPT_ARG_STRING, &_mqtt.ciphers, 0,
  -     N_("TLS ciphers <LIST> (openssl)."), N_("<LIST>") },
  - { "tls-version", '\0', 
POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&_mqtt._tls_version, 0,
  -     N_("TLS protocol <VERSION>."), N_("{1.2|1.1|1}") },
  - { "insecure", '\0', POPT_BIT_SET,   &_mqtt.flags, MQTT_FLAGS_INSECURE,
  -     N_("Do not verify server cert matches DNS host."), NULL },
  - { "psk", '\0', POPT_ARG_STRING,     &_mqtt._psk_key, 0,
  -     N_("TLS-PSK mode <KEY> hex (w/o 0x)"), N_("<KEY>") },
  - { "psk-identity", '\0', POPT_ARG_STRING, &_mqtt._psk_identity, 0,
  -     N_("TLS-PSK <IDENTITY>."), N_("<IDENTITY>") },
  - { "proxy", '\0', POPT_ARG_STRING,   &_mqtt._proxy, 0,
  -     N_("SOCKS5 proxy <URL>."), N_("<URL>") },
  -
  - { NULL, '\0', POPT_ARG_INCLUDE_TABLE, rpmioAllPoptTable, 0,
  -     N_("Common options for all rpmio executables:"),
  -     NULL },
  -
  -  POPT_AUTOALIAS
  -  POPT_AUTOHELP
  -
  -  POPT_TABLEEND
  -};
  -
   int
   main(int argc, char *argv[])
   {
  +#ifdef       UNUSED
   static char _mqtt_argv[] = "\
     mqtt://luser:jasnl@localhost:1883/rpm/mqtt?qos=1,timeout=10000,trace=4 \n\
     rpm/#?qos=1 \n\
  @@ -158,29 +65,13 @@
     $SYS/broker/publish/#?qos=0 \n\
     $SYS/broker/bytes/#?qos=0 \n
   #endif
  -    poptContext optCon;
  -#ifdef       UNUSED
       ARGV_t av = poptGetArgs(optCon);
       int ac = argvCount(av);
  -#endif
       static char *_av[] = { _mqtt_argv, NULL, };
  +#endif
       rpmmqtt mqtt;
       int rc = -1;
   
  -    /* Initialize the (allocated) CLI defaults. */
  -    memset(&_mqtt, 0, sizeof(_mqtt));
  -    _mqtt.flags |= (MQTT_FLAGS_CLEAN|MQTT_FLAGS_EOL);
  -    _mqtt.port = 1883;
  -    _mqtt.max_inflight = 20;
  -    _mqtt.keepalive = 60;
  -    _mqtt.qos = 0;
  -    _mqtt.protocol_version = rpmExpand("31", NULL);
  -    _mqtt.host = rpmExpand("localhost", NULL);
  -    _mqtt.idprefix = rpmExpand("rpm", NULL);
  -    _mqtt._clientid = rpmExpand(_mqtt.idprefix, "-%%{pid}", NULL);
  -    _mqtt._tls_version = rpmExpand("1.2", NULL);
  -
  -
       /* Initialize the _mqtt_ macro context */
   (void) rpmDefineMacro(NULL, "_mqtt_trace 4", 0);
   
  @@ -191,24 +82,14 @@
   (void) rpmDefineMacro(NULL, "_mqtt_port 1883", 0);
   (void) rpmDefineMacro(NULL, "_mqtt_clientid rpm-%{pid}", 0);
   (void) rpmDefineMacro(NULL, "_mqtt_topic rpm/#", 0);
  -(void) rpmDefineMacro(NULL, "_mqtt_qos 0", 0);
  -(void) rpmDefineMacro(NULL, "_mqtt_persist 0", 0);
  +(void) rpmDefineMacro(NULL, "_mqtt_qos 1", 0);
  +(void) rpmDefineMacro(NULL, "_mqtt_persist 2", 0);
   (void) rpmDefineMacro(NULL, "_mqtt_timeout 10000", 0);
   (void) rpmDefineMacro(NULL, "_mqtt_prefix %{now} rpm pid %{pid} on cpu%{cpu} 
%{user}:%{group} ", 0);
   
  -    optCon = rpmioInit(argc, argv, mqttOptionsTable);
  -
  -    mqtt = rpmmqttNew(_av, _mqtt.flags);
  +    mqtt = rpmmqttNew(argv, 0);
       rc = _DoMQTT(mqtt);
  -sleep(1);
       mqtt = rpmmqttFree(mqtt);
   
  -    _mqtt.protocol_version = _free(_mqtt.protocol_version);
  -    _mqtt.host = _free(_mqtt.host);
  -    _mqtt.idprefix = _free(_mqtt.idprefix);
  -    _mqtt._clientid = _free(_mqtt._clientid);
  -    _mqtt._tls_version = _free(_mqtt._tls_version);
  -
  -    optCon = rpmioFini(optCon);
       return rc;
   }
  @@ .
______________________________________________________________________
RPM Package Manager                                    http://rpm5.org
CVS Sources Repository                                rpm-cvs@rpm5.org

Reply via email to