RPM Package Manager, CVS Repository
  http://rpm5.org/cvs/
  ____________________________________________________________________________

  Server: rpm5.org                         Name:   Jeff Johnson
  Root:   /v/rpm/cvs                       Email:  j...@rpm5.org
  Module: rpm                              Date:   02-Jul-2016 11:33:44
  Branch: rpm-5_4                          Handle: 2016070209334400

  Modified files:           (Branch: rpm-5_4)
    rpm/rpmio               rpmmqtt.c rpmmqtt.h tmqtt.c

  Log:
    - mqtt: wire up the cli options.

  Summary:
    Revision    Changes     Path
    1.1.2.10    +316 -185   rpm/rpmio/rpmmqtt.c
    1.1.2.9     +34 -30     rpm/rpmio/rpmmqtt.h
    1.1.2.6     +65 -46     rpm/rpmio/tmqtt.c
  ____________________________________________________________________________

  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmqtt.c
  ============================================================================
  $ cvs diff -u -r1.1.2.9 -r1.1.2.10 rpmmqtt.c
  --- rpm/rpmio/rpmmqtt.c       1 Jul 2016 14:43:47 -0000       1.1.2.9
  +++ rpm/rpmio/rpmmqtt.c       2 Jul 2016 09:33:44 -0000       1.1.2.10
  @@ -30,6 +30,118 @@
   #define MF_ISSET(_FLAG) F_ISSET(mqtt->flags, _FLAG)
   
   /*==============================================================*/
  +struct rpmmqtt_s _mqtt;
  +
  +#ifndef      DYING
  +static void dumpU(const char * msg, urlinfo u)
  +{
  +    FILE * fp = stderr;
  +    const char * url = (u ? u->url : NULL);
  +    const char *path = NULL;
  +    urltype ut = urlPath(url, &path);;
  +
  +    if (msg)
  +     fprintf(fp, "===================================== %s u %p ut %d\n",
  +             msg, u, ut);
  +    if (u) {
  +
  +     fprintf(fp, "     url: %s\n", u->url);
  +     fprintf(fp, "  scheme: %s\n", u->scheme);
  +     fprintf(fp, "    user: %s\n", u->user);
  +     fprintf(fp, "password: %s\n", u->password);
  +     fprintf(fp, "    host: %s\n", u->host);
  +     fprintf(fp, " portstr: %s\n", u->portstr);
  +     fprintf(fp, "    path: %s\n", path);
  +     fprintf(fp, "   query: %s\n", u->query);
  +     fprintf(fp, "fragment: %s\n", u->fragment);
  +     if (u->query) {
  +         ARGV_t av = NULL;
  +         int xx;
  +         xx = argvSplit(&av, u->query, ",");
  +         argvPrint(u->query, av, fp);
  +         av = argvFree(av);
  +     }
  +    }
  +}
  +#endif
  +
  +static void dumpMQTT(const char * msg, rpmmqtt mqtt)
  +{
  +    FILE * fp = stderr;
  +
  +    if (msg)
  +     fprintf(fp, "===================================== %s(%p)\n",
  +             msg, mqtt);
  +    if (mqtt) {
  +
  +#define      PRINT(_fmt, _foo) fprintf(fp, "%19s: %"#_fmt"\n", #_foo, 
mqtt->_foo)
  +     PRINT(s, _address);
  +     PRINT(s, _infile);
  +     PRINT(s, idprefix);
  +     PRINT(s, _message);
  +
  +     PRINT(s, cafile);
  +     PRINT(s, _capath);
  +     PRINT(s, cert);
  +     PRINT(s, privkey);
  +     PRINT(s, ciphers);
  +     PRINT(s, _tls_version);
  +     PRINT(s, _psk_key);
  +     PRINT(s, _psk_identity);
  +     PRINT(s, _proxy);
  +
  +     PRINT(d, _max_msg_count);
  +
  +fprintf(stderr, "====================\n");
  +     PRINT(d, keepalive);
  +     PRINT(d, max_inflight);
  +     PRINT(s, protocol_version);
  +     PRINT(s, will_message);
  +     PRINT(d, will_qos);
  +     PRINT(s, will_topic);
  +
  +     PRINT(s, user);
  +     PRINT(s, password);
  +     PRINT(s, host);
  +     PRINT(d, port);
  +     PRINT(s, uri);
  +
  +     PRINT(s, _clientid);
  +     PRINT(s, clientid);
  +
  +     PRINT(u, qos);
  +
  +     if (mqtt->_topics)
  +         argvPrint("mqtt->_topics", (ARGV_t)mqtt->_topics, fp);
  +     PRINT(s, topic);
  +
  +fprintf(stderr, "====================\n");
  +     PRINT(d, ut);
  +     dumpU("mqtt->u", mqtt->u);
  +     PRINT(p, C);
  +     PRINT(u, persist_type);
  +     PRINT(p, persist_ctx);
  +     PRINT(s, persist_path);
  +
  +     PRINT(u, token);
  +     PRINT(u, timeout);
  +
  +     PRINT(d, debug);
  +     PRINT(d, trace);
  +     PRINT(d, finished);
  +
  +     PRINT(s, serverURI);
  +     PRINT(d, MQTTVersion);
  +     PRINT(d, sessionPresent);
  +
  +     PRINT(s, dn);
  +#undef       PRINT
  +
  +
  +    }
  +}
  +
  +/*==============================================================*/
   typedef struct key_s {
       int         v;
       const char *n;
  @@ -91,10 +203,10 @@
   
   /*==============================================================*/
   #ifdef       WITH_MQTT
  -static int onMessageArrived(void * _mqtt, char * topic, int topicLen,
  +static int onMessageArrived(void * _ctx, char * topic, int topicLen,
                MQTTAsync_message *  message)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    rpmmqtt mqtt = (rpmmqtt) _ctx;
       int rc = 1;
   
       (void)mqtt;
  @@ -115,17 +227,17 @@
       return rc;
   }
   
  -static void onDeliveryComplete(void * _mqtt, int token)
  +static void onDeliveryComplete(void * _ctx, int token)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    rpmmqtt mqtt = (rpmmqtt) _ctx;
       if (_rpmmqtt_debug < 0)
        rpmlog(RPMLOG_DEBUG, "--- MQTT done(%d)\n", token);
       mqtt->token = token;
   }
   
  -static void onConnectionLost(void * _mqtt, char *cause)
  +static void onConnectionLost(void * _ctx, char *cause)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    rpmmqtt mqtt = (rpmmqtt) _ctx;
   
       rpmlog(RPMLOG_DEBUG,
                "--- MQTT disconnect(%s) version(%d) present(%d)\n",
  @@ -135,14 +247,13 @@
   
       rpmlog(RPMLOG_WARNING, "MQTT reconnecting(%s) ...\n", mqtt->serverURI);
   
  -    mqtt->connected = 0;
       (void) rpmmqttConnect(mqtt);
   }
   
  -static void onDisconnectFailure(void * _mqtt, MQTTAsync_failureData * 
response)
  +static void onDisconnectFailure(void * _ctx, MQTTAsync_failureData * 
response)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  -fprintf(stderr, "<-- %s(%p,%p) MQTT disconnect failed\n", __FUNCTION__, 
_mqtt, response);
  +    rpmmqtt mqtt = (rpmmqtt) _ctx;
  +fprintf(stderr, "<-- %s(%p,%p) MQTT disconnect failed\n", __FUNCTION__, 
_ctx, response);
       if (response) {
        const char *s = response->message;
        int token = response->token;
  @@ -152,42 +263,39 @@
                token, code, s);
       } else
        rpmlog(RPMLOG_WARNING, "MQTT disconnect failed\n");
  -    mqtt->connected = 0;
       mqtt->finished = 1;
   }
   
  -static void onDisconnect(void * _mqtt, MQTTAsync_successData * response)
  +static void onDisconnect(void * _ctx, MQTTAsync_successData * response)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    rpmmqtt mqtt = (rpmmqtt) _ctx;
   
       if (mqtt->debug || _rpmmqtt_debug)
        rpmlog(RPMLOG_DEBUG,
                "MQTT disconnect(%s) version(%d) present(%d)\n",
                mqtt->serverURI, mqtt->MQTTVersion, mqtt->sessionPresent);
       mqtt->serverURI = _free(mqtt->serverURI);
  -    mqtt->connected = 0;
       mqtt->finished = 1;
   }
   
  -static void onConnectFailure(void * _mqtt, MQTTAsync_failureData * response)
  +static void onConnectFailure(void * _ctx, MQTTAsync_failureData * response)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  -fprintf(stderr, "<-- %s(%p,%p) MQTT connect failed\n", __FUNCTION__, _mqtt, 
response);
  +    rpmmqtt mqtt = (rpmmqtt) _ctx;
  +fprintf(stderr, "<-- %s(%p,%p) MQTT connect failed\n", __FUNCTION__, _ctx, 
response);
       if (response) {
        const char *s = response->message;
        int token = response->token;
        int code = response->code;
  -     rpmlog(RPMLOG_WARNING, "MQTT    connect failed: code(%d) msg %s\n",
  +     rpmlog(RPMLOG_WARNING, "MQTT    connect failed: token(%d) code(%d) msg 
%s\n",
                        token, code, s);
       } else
        rpmlog(RPMLOG_WARNING, "MQTT    connect failed\n");
  -    mqtt->connected = 0;
       mqtt->finished = 1;
   }
   
  -static void onConnect(void * _mqtt, MQTTAsync_successData * response)
  +static void onConnect(void * _ctx, MQTTAsync_successData * response)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    rpmmqtt mqtt = (rpmmqtt) _ctx;
   
       if (response) {
        mqtt->serverURI = xstrdup(response->alt.connect.serverURI);
  @@ -199,14 +307,13 @@
        rpmlog(RPMLOG_DEBUG,
                "MQTT    connect(%s) version(%d) present(%d)\n",
                mqtt->serverURI, mqtt->MQTTVersion, mqtt->sessionPresent);
  -    mqtt->connected = 1;
       mqtt->finished = 1;
   }
   
  -static void onSubscribeFailure(void * _mqtt, MQTTAsync_failureData * 
response)
  +static void onSubscribeFailure(void * _ctx, MQTTAsync_failureData * response)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  -fprintf(stderr, "<-- %s(%p,%p) MQTT connect failed\n", __FUNCTION__, _mqtt, 
response);
  +    rpmmqtt mqtt = (rpmmqtt) _ctx;
  +fprintf(stderr, "<-- %s(%p,%p) MQTT connect failed\n", __FUNCTION__, _ctx, 
response);
       if (response) {
        const char *s = response->message;
        int token = response->token;
  @@ -215,24 +322,22 @@
                        token, code, s);
       } else
        rpmlog(RPMLOG_WARNING, "MQTT  subscribe failed\n");
  -    mqtt->subscribed = 0;
       mqtt->finished = 1;
   }
   
  -static void onSubscribe(void * _mqtt, MQTTAsync_successData * response)
  +static void onSubscribe(void * _ctx, MQTTAsync_successData * response)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    rpmmqtt mqtt = (rpmmqtt) _ctx;
       int qos = response->alt.qos;
   
       if (mqtt->debug || _rpmmqtt_debug)
        rpmlog(RPMLOG_DEBUG, "MQTT  subscribe qos(%d)\n", qos);
  -    mqtt->subscribed = 1;
       mqtt->finished = 1;
   }
   
  -static void onSendFailure(void * _mqtt, MQTTAsync_failureData * response)
  +static void onSendFailure(void * _ctx, MQTTAsync_failureData * response)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    rpmmqtt mqtt = (rpmmqtt) _ctx;
   
       {
        const char *s = response->message;
  @@ -244,9 +349,9 @@
       mqtt->finished = 1;
   }
   
  -static void onSend(void * _mqtt, MQTTAsync_successData * response)
  +static void onSend(void * _ctx, MQTTAsync_successData * response)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    rpmmqtt mqtt = (rpmmqtt) _ctx;
   
       if (mqtt->debug || _rpmmqtt_debug) {
        const char * s = response->alt.pub.message.payload;
  @@ -266,15 +371,15 @@
   
   /*==============================================================*/
   #ifdef       WITH_MQTT
  -static int rpmmqttOpen(void **_mqttp, const char *clientID, const char 
*serverURI,
  -             void *_mqtt)
  +static int rpmmqttOpen(void **_ctxp, const char *clientID, const char 
*serverURI,
  +             void *_ctx)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    rpmmqtt mqtt = (rpmmqtt) _ctx;
       char * dn;
       char * te;
       int rc = MQTTCLIENT_PERSISTENCE_ERROR;
   
  -    *_mqttp = _mqtt;
  +    *_ctxp = _ctx;
   
       mqtt->dn = _free(mqtt->dn);
       dn = rpmGetPath(mqtt->persist_path, "/", clientID, "-", serverURI, NULL);
  @@ -291,14 +396,14 @@
   
   exit:
   if (mqtt->debug || _rpmmqtt_debug < 0)
  -fprintf(stderr, "<-- %s(%p,\"%s\",\"%s\",%p) rc %d dn %s\n", __FUNCTION__, 
_mqttp, clientID, serverURI, _mqtt, rc, mqtt->dn);
  +fprintf(stderr, "<-- %s(%p,\"%s\",\"%s\",%p) rc %d dn %s\n", __FUNCTION__, 
_ctxp, clientID, serverURI, _ctx, rc, mqtt->dn);
   
       return rc;
   }
   
  -static int rpmmqttClose(void *_mqtt)
  +static int rpmmqttClose(void *_ctx)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    rpmmqtt mqtt = (rpmmqtt) _ctx;
       int rc = MQTTCLIENT_PERSISTENCE_ERROR;
   
   
  @@ -314,14 +419,14 @@
   
   exit:
   if (mqtt->debug || _rpmmqtt_debug < 0)
  -fprintf(stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, _mqtt, rc);
  +fprintf(stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, _ctx, rc);
       return rc;
   }
   
  -static int rpmmqttPut(void *_mqtt, char *key,
  +static int rpmmqttPut(void *_ctx, char *key,
                int bufcount, char *buffers[], int buflens[])
   {
  -    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    rpmmqtt mqtt = (rpmmqtt) _ctx;
       char *fn = NULL;
       FD_t fd = NULL;
       size_t nb = 0;
  @@ -359,15 +464,15 @@
       if (fd)
        Fclose(fd);
   if (mqtt->debug || _rpmmqtt_debug < 0)
  -fprintf(stderr, "<-- %s(%p,\"%s\",%d,%p,%p) rc %d fn %s\n", __FUNCTION__, 
_mqtt, key, bufcount, buffers, buflens, rc, fn);
  +fprintf(stderr, "<-- %s(%p,\"%s\",%d,%p,%p) rc %d fn %s\n", __FUNCTION__, 
_ctx, key, bufcount, buffers, buflens, rc, fn);
       fn = _free(fn);
       return rc;
   }
   
  -static int rpmmqttGet(void *_mqtt, char *key,
  +static int rpmmqttGet(void *_ctx, char *key,
                char *buffer[], int *buflen)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    rpmmqtt mqtt = (rpmmqtt) _ctx;
       char *fn = NULL;
       FD_t fd = NULL;
       size_t nr = 0;
  @@ -405,16 +510,16 @@
       if (fd)
        Fclose(fd);
   if (mqtt->debug || _rpmmqtt_debug < 0)
  -fprintf(stderr, "<-- %s(%p,\"%s\",%p,%p) rc %d fn %s\n", __FUNCTION__, 
_mqtt, key, buffer, buflen, rc, fn);
  +fprintf(stderr, "<-- %s(%p,\"%s\",%p,%p) rc %d fn %s\n", __FUNCTION__, _ctx, 
key, buffer, buflen, rc, fn);
       fn = _free(fn);
       *buffer = b;
       *buflen = nb;
       return rc;
   }
   
  -static int rpmmqttRemove(void *_mqtt, char *key)
  +static int rpmmqttRemove(void *_ctx, char *key)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    rpmmqtt mqtt = (rpmmqtt) _ctx;
       char *fn = NULL;
       int rc = MQTTCLIENT_PERSISTENCE_ERROR;
   
  @@ -430,14 +535,14 @@
   
   exit:
   if (mqtt->debug || _rpmmqtt_debug < 0)
  -fprintf(stderr, "<-- %s(%p,\"%s\") rc %d fn %s\n", __FUNCTION__, _mqtt, key, 
rc, fn);
  +fprintf(stderr, "<-- %s(%p,\"%s\") rc %d fn %s\n", __FUNCTION__, _ctx, key, 
rc, fn);
       fn = _free(fn);
       return rc;
   }
   
  -static int rpmmqttKeys(void *_mqtt, char ***keys, int *nkeys)
  +static int rpmmqttKeys(void *_ctx, char ***keys, int *nkeys)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    rpmmqtt mqtt = (rpmmqtt) _ctx;
       ARGV_t av = NULL;
       int ac = 0;
       DIR * dir = NULL;
  @@ -468,15 +573,15 @@
       if (dir)
        (void) Closedir(dir);
   if (mqtt->debug || _rpmmqtt_debug < 0)
  -fprintf(stderr, "<-- %s(%p,%p,%p) rc %d keys %p[%u]\n", __FUNCTION__, _mqtt, 
keys, nkeys, rc, av, ac);
  +fprintf(stderr, "<-- %s(%p,%p,%p) rc %d keys %p[%u]\n", __FUNCTION__, _ctx, 
keys, nkeys, rc, av, ac);
       *keys = (char **) av;
       *nkeys = ac;
       return rc;
   }
   
  -static int rpmmqttClear(void *_mqtt)
  +static int rpmmqttClear(void *_ctx)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    rpmmqtt mqtt = (rpmmqtt) _ctx;
       DIR * dir = NULL;
       struct dirent *dp;
       int nerrs = 0;
  @@ -506,13 +611,13 @@
       if (dir)
        (void) Closedir(dir);
   if (mqtt->debug || _rpmmqtt_debug < 0)
  -fprintf(stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, _mqtt, rc);
  +fprintf(stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, _ctx, rc);
       return rc;
   }
   
  -static int rpmmqttContainsKey(void *_mqtt, char *key)
  +static int rpmmqttContainsKey(void *_ctx, char *key)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    rpmmqtt mqtt = (rpmmqtt) _ctx;
       DIR * dir = NULL;
       struct dirent *dp;
       int rc = MQTTCLIENT_PERSISTENCE_ERROR;
  @@ -542,7 +647,7 @@
       if (dir)
        (void) Closedir(dir);
   if (mqtt->debug || _rpmmqtt_debug < 0)
  -fprintf(stderr, "<-- %s(%p,\"%s\") rc %d\n", __FUNCTION__, _mqtt, key, rc);
  +fprintf(stderr, "<-- %s(%p,\"%s\") rc %d\n", __FUNCTION__, _ctx, key, rc);
       return rc;
   }
   
  @@ -561,6 +666,17 @@
   #endif       /* WITH_MQTT */
   
   /*==============================================================*/
  +static const MQTTAsync_willOptions _Wopts
  +             = MQTTAsync_willOptions_initializer;
  +static const MQTTAsync_connectOptions _Copts
  +             = MQTTAsync_connectOptions_initializer;
  +static const MQTTAsync_SSLOptions _Sopts
  +             = MQTTAsync_SSLOptions_initializer;
  +static const MQTTAsync_disconnectOptions _Dopts =
  +             MQTTAsync_disconnectOptions_initializer;
  +static const MQTTAsync_responseOptions _Ropts =
  +             MQTTAsync_responseOptions_initializer;
  +
   int rpmmqttConnect(rpmmqtt mqtt)
   {
       int rc = -1;
  @@ -570,46 +686,56 @@
       } else {
        urlinfo u = mqtt->u;
   
  -     MQTTAsync_willOptions Wopts = MQTTAsync_willOptions_initializer;
  +     MQTTAsync_willOptions *Wopts = &mqtt->Wopts;
  +     memcpy(Wopts, &_Wopts, sizeof(*Wopts));
   #ifdef       REF
  -     memcpy(Wopts.struct_id, "MQTW", 4);
  -     Wopts.struct_version = 0;
  +     memcpy(Wopts->struct_id, "MQTW", 4);
  +     Wopts->struct_version = 0;
   #endif
  -     Wopts.topicName = mqtt->topic;
  -     Wopts.message = (mqtt->_will_message ? mqtt->_will_message : "");
  -     Wopts.retained = (MF_ISSET(WILL_RETAIN) ? 1 : 0);
  -     Wopts.qos = mqtt->_will_qos;
  +     Wopts->topicName = (mqtt->will_topic ? mqtt->will_topic : mqtt->topic);
  +     Wopts->message = (mqtt->will_message ? mqtt->will_message : "");
  +     Wopts->retained = (MF_ISSET(WILL_RETAIN) ? 1 : 0);
  +     Wopts->qos = mqtt->will_qos;    /* XXX mqtt->qos? */
   
  -     MQTTAsync_connectOptions Copts = MQTTAsync_connectOptions_initializer;
  +     MQTTAsync_connectOptions *Copts = &mqtt->Copts;
  +     memcpy(Copts, &_Copts, sizeof(*Copts));
   #ifdef       REF
  -     memcpy(Copts.struct_id, "MQTC", 4);
  -     Copts.struct_version = 3;       /* 0-4 enables fields below */
  +     memcpy(Copts->struct_id, "MQTC", 4);
  +     Copts->struct_version = 3;      /* 0-4 enables fields below */
   #endif
  -     Copts.keepAliveInterval = 10;   /* 60 */
  -     Copts.cleansession = 0;         /* 1 discards session state */
  -     Copts.maxInflight = 10;         /* 10 */
  -
  -     Copts.will = &Wopts;            /* last will */
  -     Copts.username = (u->user ? u->user : mqtt->_user);;
  -     Copts.password = (u->password ? u->password : mqtt->_password);
  -     Copts.connectTimeout = 30;      /* secs */
  -     Copts.retryInterval = 0;        /* secs */
  -#ifdef       REF
  -     MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
  -     ssl_opts.enableServerCertAuth = 0;
  -     Copts.ssl = &ssl_opts;
  +     Copts->keepAliveInterval = mqtt->keepalive;
  +     Copts->cleansession = (MF_ISSET(CLEAN) ? 1 : 0);
  +     Copts->maxInflight = mqtt->max_inflight;
  +
  +     Copts->will = Wopts;            /* last will */
  +     Copts->username = (u && u->user ? u->user : mqtt->user);;
  +     Copts->password = (u && u->password ? u->password : mqtt->password);
  +
  +     Copts->connectTimeout = 30;     /* XXX secs. configure?*/
  +     Copts->retryInterval = 0;       /* XXX secs. configure? */
  +
  +#ifdef       NOTYET
  +     MQTTAsync_SSLOptions *Sopts = &mqtt->Sopts;
  +     memcpy(Sopts, &_Sopts, sizeof(*Sopts));
  +     Sopts->trustStore = mqtt->cafile;
  +     Sopts->keyStore = mqtt->cert;
  +     Sopts->privateKey = mqtt->privkey;
  +     Sopts->enabledCipherSuites = mqtt->ciphers;
  +     Sopts->enableServerCertAuth = (MF_ISSET(INSECURE) ? 0 : 1);
  +     Copts->ssl = Sopts;
   #else
  -     Copts.ssl = NULL;
  +     (void)_Sopts;
  +     Copts->ssl = NULL;
   #endif
  -     Copts.onSuccess = onConnect;
  -     Copts.onFailure = onConnectFailure;
  -     Copts.context = mqtt;
  -     Copts.serverURIcount = 0;
  -     Copts.serverURIs = NULL;
  +     Copts->onSuccess = onConnect;
  +     Copts->onFailure = onConnectFailure;
  +     Copts->context = mqtt;
  +     Copts->serverURIcount = 0;
  +     Copts->serverURIs = NULL;
   
        mqtt->finished = 0;
        rc = check(mqtt, "connect",
  -             MQTTAsync_connect(mqtt->C, &Copts));
  +             MQTTAsync_connect(mqtt->C, Copts));
   
        while (!mqtt->finished)
            usleep(1000);
  @@ -624,20 +750,20 @@
       int rc = -1;
   #ifdef       WITH_MQTT
       if (MQTTAsync_isConnected(mqtt->C)) {
  -     MQTTAsync_disconnectOptions Dopts =
  -             MQTTAsync_disconnectOptions_initializer;
  +     MQTTAsync_disconnectOptions *Dopts = &mqtt->Dopts;
  +     memcpy(Dopts, &_Dopts, sizeof(*Dopts));
   #ifdef       REF
  -     memcpy(Dopts.struct_id, "MQTD", 4);
  -     Dopts.struct_version = 0;
  +     memcpy(Dopts->struct_id, "MQTD", 4);
  +     Dopts->struct_version = 0;
   #endif
  -     Dopts.timeout = mqtt->msecs;
  -     Dopts.onSuccess = onDisconnect;
  -     Dopts.onFailure = onDisconnectFailure;
  -     Dopts.context = mqtt;
  +     Dopts->timeout = mqtt->timeout;
  +     Dopts->onSuccess = onDisconnect;
  +     Dopts->onFailure = onDisconnectFailure;
  +     Dopts->context = mqtt;
   
        mqtt->finished = 0;
        rc = check(mqtt, "disconnect",
  -             MQTTAsync_disconnect(mqtt->C, &Dopts));
  +             MQTTAsync_disconnect(mqtt->C, Dopts));
        while (!mqtt->finished)
            usleep(100);
       }
  @@ -666,21 +792,20 @@
       pubmsg.msgid = 0;
   #endif
   
  -    MQTTAsync_responseOptions Ropts =
  -             MQTTAsync_responseOptions_initializer;
  +    MQTTAsync_responseOptions *Ropts = &mqtt->Ropts;
  +    memcpy(Ropts, &_Ropts, sizeof(*Ropts));
   #ifdef       REF
  -    memcpy(pubmsg.struct_id, "MQTR", 4);
  -    pubmsg.struct_version = 0;
  +    memcpy(Ropts->struct_id, "MQTR", 4);
  +    Ropts->struct_version = 0;
   #endif
  -    Ropts.onSuccess = onSend;
  -    Ropts.onFailure = onSendFailure;
  -    Ropts.context = mqtt;
  -    Ropts.token = 0;
  +    Ropts->onSuccess = onSend;
  +    Ropts->onFailure = onSendFailure;
  +    Ropts->context = mqtt;
  +    Ropts->token = 0;
   
       mqtt->finished = 0;
       rc = check(mqtt, "sendMessage",
  -             MQTTAsync_sendMessage(mqtt->C, mqtt->topic, &pubmsg,
  -                     &Ropts));
  +             MQTTAsync_sendMessage(mqtt->C, mqtt->topic, &pubmsg, Ropts));
       while (!mqtt->finished)
        usleep(100);
   #endif       /* WITH_MQTT */
  @@ -748,23 +873,24 @@
   
        rpmlog(_lvl, "%19s: %s qos(%u)\n", "subtopic", subtopic, subqos);
   
  -     MQTTAsync_responseOptions Ropts =
  -             MQTTAsync_responseOptions_initializer;
  +     MQTTAsync_responseOptions *Ropts = &mqtt->Ropts;
  +     memcpy(Ropts, &_Ropts, sizeof(*Ropts));
   #ifdef       REF
  -     memcpy(pubmsg.struct_id, "MQTR", 4);
  -     pubmsg.struct_version = 0;
  +     memcpy(Ropts->struct_id, "MQTR", 4);
  +     Ropts->struct_version = 0;
   #endif
  -     Ropts.onSuccess = onSubscribe;
  -     Ropts.onFailure = onSubscribeFailure;
  -     Ropts.context = mqtt;
  +     Ropts->onSuccess = onSubscribe;
  +     Ropts->onFailure = onSubscribeFailure;
  +     Ropts->context = mqtt;
   
        mqtt->finished = 0;
        rc = check(mqtt, "subscribe",
  -             MQTTAsync_subscribe(mqtt->C, subtopic, subqos, &Ropts));
  -     while (!mqtt->finished)
  +             MQTTAsync_subscribe(mqtt->C, subtopic, subqos, Ropts));
  +fprintf(stderr, "*** finished(%d)\n", mqtt->finished);
  +     while (rc == 0 && !mqtt->finished)
            usleep(100);
   
  -     ret = 0;        /* XXX */
  +     ret = (rc ? -1 : 0);
   
        subtopic = _free(subtopic);
   
  @@ -775,9 +901,9 @@
   }
   
   /*==============================================================*/
  -static void rpmmqttFini(void * _mqtt)
  +static void rpmmqttFini(void * _ctx)
   {
  -    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    rpmmqtt mqtt = (rpmmqtt) _ctx;
   
   #ifdef       WITH_MQTT
       (void) rpmmqttDisconnect(mqtt);
  @@ -788,23 +914,23 @@
   /* ========== */
       mqtt->_address = _free(mqtt->_address);
       mqtt->_infile = _free(mqtt->_infile);
  -    mqtt->_host = _free(mqtt->_host);
  -    mqtt->_id = _free(mqtt->_id);
  -    mqtt->_idprefix = _free(mqtt->_idprefix);
  +    mqtt->host = _free(mqtt->host);
  +    mqtt->_clientid = _free(mqtt->_clientid);
  +    mqtt->idprefix = _free(mqtt->idprefix);
       mqtt->_message = _free(mqtt->_message);
  -    mqtt->_password = _free(mqtt->_password);
  -    mqtt->_topic = argvFree((ARGV_t)mqtt->_topic);
  -    mqtt->_user = _free(mqtt->_user);
  -    mqtt->_version = _free(mqtt->_version);
  -
  -    mqtt->_will_message = _free(mqtt->_will_message);
  -    mqtt->_will_topic = _free(mqtt->_will_topic);
  -    mqtt->_cafile = _free(mqtt->_cafile);
  +    mqtt->password = _free(mqtt->password);
  +    mqtt->_topics = argvFree((ARGV_t)mqtt->_topics);
  +    mqtt->user = _free(mqtt->user);
  +    mqtt->protocol_version = _free(mqtt->protocol_version); /* XXX malloc? */
  +
  +    mqtt->will_message = _free(mqtt->will_message);
  +    mqtt->will_topic = _free(mqtt->will_topic);
  +    mqtt->cafile = _free(mqtt->cafile);
       mqtt->_capath = _free(mqtt->_capath);
  -    mqtt->_cert = _free(mqtt->_cert);
  -    mqtt->_privkey = _free(mqtt->_privkey);
  -    mqtt->_ciphers = _free(mqtt->_ciphers);
  -    mqtt->_tls_version = _free(mqtt->_tls_version);
  +    mqtt->cert = _free(mqtt->cert);
  +    mqtt->privkey = _free(mqtt->privkey);
  +    mqtt->ciphers = _free(mqtt->ciphers);
  +    mqtt->_tls_version = _free(mqtt->_tls_version);  /* XXX malloc? */
       mqtt->_psk_key = _free(mqtt->_psk_key);
       mqtt->_psk_identity = _free(mqtt->_psk_identity);
       mqtt->_proxy = _free(mqtt->_proxy);
  @@ -828,39 +954,6 @@
   
   RPMIOPOOL_MODULE(mqtt)
   
  -#ifdef       DYING
  -static void dumpURL(const char * msg, urlinfo u)
  -{
  -    FILE * fp = stderr;
  -    const char * url = (u ? u->url : NULL);
  -    const char *path = NULL;
  -    urltype ut = urlPath(url, &path);;
  -
  -    if (msg)
  -     fprintf(fp, "===================================== %s u %p ut %d\n",
  -             msg, u, ut);
  -    if (u) {
  -
  -     fprintf(fp, "     url: %s\n", u->url);
  -     fprintf(fp, "  scheme: %s\n", u->scheme);
  -     fprintf(fp, "    user: %s\n", u->user);
  -     fprintf(fp, "password: %s\n", u->password);
  -     fprintf(fp, "    host: %s\n", u->host);
  -     fprintf(fp, " portstr: %s\n", u->portstr);
  -     fprintf(fp, "   query: %s\n", u->query);
  -     if (u->query) {
  -         ARGV_t av = NULL;
  -         int xx;
  -         xx = argvSplit(&av, u->query, ",");
  -         argvPrint(u->query, av, fp);
  -         av = argvFree(av);
  -     }
  -     fprintf(fp, "fragment: %s\n", u->fragment);
  -     fprintf(fp, "    path: %s\n", path);
  -    }
  -}
  -#endif
  -
   rpmmqtt rpmmqttNew(char ** av, uint32_t flags)
   {
       static const char *_av[] = { "mqtt://localhost:1883/rpm/%{pid}/mqtt", 
NULL };
  @@ -868,6 +961,29 @@
       urlinfo u = NULL;
       const char *s = NULL;
   
  +    /* -- Copy (and initialize the defaults) from the CLI options. */
  +    {   yarnLock use = mqtt->_item.use;
  +        void *pool = mqtt->_item.pool;
  +#ifdef       WTF
  +     *mqtt = _mqtt;  /* structure assignment */
  +#else
  +     memcpy(mqtt, &_mqtt, sizeof(*mqtt));
  +#endif
  +        mqtt->_item.use = use;
  +        mqtt->_item.pool = pool;
  +    }
  +    memset(&_mqtt, 0, sizeof(_mqtt));
  +    _mqtt.flags |= (MQTT_FLAGS_CLEAN|MQTT_FLAGS_EOL);
  +    _mqtt.port = 1883;
  +    _mqtt.max_inflight = 20;
  +    _mqtt.keepalive = 60;
  +    _mqtt.qos = 0;
  +    _mqtt.protocol_version = xstrdup("31");  /* XXX malloc? */
  +    _mqtt.host = rpmExpand("localhost", NULL);
  +    _mqtt.idprefix = rpmExpand("rpm", NULL);
  +    _mqtt._clientid = rpmExpand(_mqtt.idprefix, "-%%{pid}", NULL);
  +    _mqtt._tls_version = xstrdup("1.2");     /* XXX malloc? */
  +
       mqtt->flags = flags;
       mqtt->av = NULL;
   
  @@ -892,20 +1008,31 @@
   
       mqtt->ut = urlSplit(mqtt->av[0], &u);
       mqtt->u = u;
  +
       if (u->scheme == NULL
        || !strcmp(u->scheme, "mqtt")
        || !strcmp(u->scheme, "mqtts"))
       {
  -     if (u->portstr == NULL)
  +     /* XXX propagate mqtt->port ??? */
  +     if (u->portstr == NULL) {
            u->portstr = !strcmp(u->scheme, "mqtts")
                ? xstrdup("8883") : xstrdup("1883");
  +     
  +     }
        u->scheme = _free(u->scheme);
        u->scheme = xstrdup("tcp");
       }
   #ifdef       DYING
  -dumpURL(__FUNCTION__, u);
  +dumpU(__FUNCTION__, u);
   #endif
   
  +    if (mqtt->user == NULL && u->user != NULL)
  +     mqtt->user = xstrdup(u->user);
  +    if (mqtt->password == NULL && u->password != NULL)
  +     mqtt->password = xstrdup(u->password);
  +    if (mqtt->port == 0 && u->port != 0)
  +     mqtt->port = u->port;
  +
       mqtt->uri = rpmExpand(u->scheme, "://", u->host, ":", u->portstr, NULL);
   
       (void) urlPath(u->url, &s);
  @@ -921,10 +1048,11 @@
        "%{?_mqtt_clientid}%{!?_mqtt_clientid:rpm}";
       mqtt->clientid = rpmExpand(_mqtt_clientid, NULL);
   
  +    /* XXX CLI options clobbered */
   static const char _mqtt_qos[] = "%{?_mqtt_qos}%{!?_mqtt_qos:1}";
       mqtt->qos = (rpmExpandNumeric(_mqtt_qos) % 3);
   static const char _mqtt_timeout[] = 
"%{?_mqtt_timeout}%{!?_mqtt_timeout:10000}";
  -    mqtt->msecs = rpmExpandNumeric(_mqtt_timeout);
  +    mqtt->timeout = rpmExpandNumeric(_mqtt_timeout);
   
       if (u->query) {
        ARGV_t qav = NULL;
  @@ -944,7 +1072,7 @@
                continue;
            }
            if (!strncmp(t, "timeout", (te - t)) && xisdigit(te[1])) {
  -             mqtt->msecs = strtol(te+1, NULL, 0);
  +             mqtt->timeout = strtol(te+1, NULL, 0);
                continue;
            }
            if (!strncmp(t, "trace", (te - t)) && xisdigit(te[1])) {
  @@ -988,10 +1116,11 @@
   char *persist_path = rpmGetPath(_mqtt_cachedir, NULL);
   
        rpmlog(_lvl, "%19s: %s\n", "clientid", mqtt->clientid);
  -     rpmlog(_lvl, "%19s: %s qos(%d) timeout(%u)\n",
  -             "topic", mqtt->topic, mqtt->qos, mqtt->msecs);
  -     rpmlog(_lvl, "%19s: %s type(%u)\n",
  -             "persist", persist_path, mqtt->persist_type);
  +     rpmlog(_lvl, "%19s: %s qos(%d) timeout(%umsecs)\n",
  +             "topic", mqtt->topic, mqtt->qos, mqtt->timeout);
  +     rpmlog(_lvl, "%19s: type(%u) %s\n",
  +             "persist", mqtt->persist_type,
  +             (mqtt->persist_type ? persist_path : ""));
        rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace);
   
   
  @@ -1014,7 +1143,9 @@
          } break;
        }
   persist_path = _free(persist_path);
  -    
  +
  +dumpMQTT(__FUNCTION__, mqtt);
  +
        xx = check(mqtt, "create",
                MQTTAsync_create(&mqtt->C, mqtt->uri, mqtt->clientid,
                mqtt->persist_type, mqtt->persist_ctx));
  @@ -1042,19 +1173,19 @@
                rpmlog(_lvl, "%19s: %s qos(%u)\n", "subtopic", t, subqos[i]);
            }
   
  -         MQTTAsync_responseOptions Ropts =
  -             MQTTAsync_responseOptions_initializer;
  +         MQTTAsync_responseOptions *Ropts = &mqtt->Ropts;
  +         memcpy(Ropts, &_Ropts, sizeof(*Ropts));
   #ifdef  REF
  -         memcpy(pubmsg.struct_id, "MQTR", 4);
  -         pubmsg.struct_version = 0;
  +         memcpy(Ropts->struct_id, "MQTR", 4);
  +         Ropts->struct_version = 0;
   #endif
  -         Ropts.onSuccess = onSubscribe;
  -         Ropts.onFailure = onSubscribeFailure;
  -         Ropts.context = mqtt;
  +         Ropts->onSuccess = onSubscribe;
  +         Ropts->onFailure = onSubscribeFailure;
  +         Ropts->context = mqtt;
   
            xx = check(mqtt, "subscribeMany",
                MQTTAsync_subscribeMany(&mqtt->C,
  -                     mqtt->ac-1, mqtt->av+1, subqos+1, &Ropts));
  +                     mqtt->ac-1, mqtt->av+1, subqos+1, Ropts));
            subqos = _free(subqos);
   #else
            for (int i = 1; i < mqtt->ac; i++) {
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmqtt.h
  ============================================================================
  $ cvs diff -u -r1.1.2.8 -r1.1.2.9 rpmmqtt.h
  --- rpm/rpmio/rpmmqtt.h       1 Jul 2016 14:43:47 -0000       1.1.2.8
  +++ rpm/rpmio/rpmmqtt.h       2 Jul 2016 09:33:44 -0000       1.1.2.9
  @@ -14,8 +14,8 @@
   typedef enum mqttFlags_e {
       MQTT_FLAGS_NONE          = 0,
   
  -    MQTT_FLAGS_NOCLEAN               = _MFB( 0),     /*!< (sub) -c */
  -    MQTT_FLAGS_NOMSGEOL              = _MFB( 1),     /*!< (sub) -N */
  +    MQTT_FLAGS_CLEAN         = _MFB( 0),     /*!< (sub) -c */
  +    MQTT_FLAGS_EOL           = _MFB( 1),     /*!< (sub) -N */
       MQTT_FLAGS_NOSTALE               = _MFB( 2),     /*!< (sub) -R */
   
       MQTT_FLAGS_STDIN_EACH    = _MFB( 3),     /*!< -l */
  @@ -37,33 +37,29 @@
       char ** av;
       int ac;
   
  -/* ========== */
       mqttFlags flags;
  +/* ========== */
       const char *_address;                    /*!< -A */
       const char *_infile;                     /*!< -f */
  -    const char *_host;                               /*!< -h */
  -    const char *_id;                         /*!< -i */
  -    const char *_idprefix;                   /*!< -I */
  -    int _keepalive;                          /*!< -k */
  +    const char *host;                                /*!< -h */
  +    const char *idprefix;                    /*!< -I */
  +    int keepalive;                           /*!< -k */
       const char *_message;                    /*!< -m */
  -    int _max_inflight;                               /*!< -M */
  -    int _port;                                       /*!< -p */
  -    const char *_password;                   /*!< -P */
  -    int _qos;                                        /*!< -q */
  -    int _retain;                             /*!< -r */
  -    const char **_topic;                     /*!< -t */
  -    const char *_user;                               /*!< -u */
  -    const char *_version;                    /*!< -V */
  -
  -    const char * _will_message;                      /*!< --will-payload */
  -    int _will_qos;                           /*!< --will-qos */
  -    int _will_retain;                                /*!< --will-retain */
  -    const char * _will_topic;                        /*!< --will-topic */
  -    const char *_cafile;                     /*!< --cafile */
  +    int max_inflight;                                /*!< -M */
  +    int port;                                        /*!< -p */
  +    const char *password;                    /*!< -P */
  +    const char **_topics;                    /*!< -t */
  +    const char *user;                                /*!< -u */
  +    const char *protocol_version;            /*!< -V */
  +
  +    const char * will_message;                       /*!< --will-payload */
  +    int will_qos;                            /*!< --will-qos */
  +    const char * will_topic;                 /*!< --will-topic */
  +    const char *cafile;                              /*!< --cafile */
       const char *_capath;                     /*!< --capath */
  -    const char *_cert;                               /*!< --cert */
  -    const char *_privkey;                    /*!< --key */
  -    const char *_ciphers;                    /*!< --ciphers */
  +    const char *cert;                                /*!< --cert */
  +    const char *privkey;                     /*!< --key */
  +    const char *ciphers;                     /*!< --ciphers */
       const char *_tls_version;                        /*!< --tls-version */
       const char *_psk_key;                    /*!< --psk */
       const char *_psk_identity;                       /*!< --psk-identity */
  @@ -79,28 +75,36 @@
   
       const char * uri;
       const char * topic;
  +    const char *_clientid;                   /*!< -i */
       const char * clientid;
   
       unsigned persist_type;
       void * persist_ctx;              /* MQTTClient_persistence */
       const char * persist_path;
  -    const char *dn;
   
  -    unsigned qos;
  -    unsigned token;
  -    unsigned msecs;
  +    unsigned qos;                            /*<! -q */
  +    unsigned timeout;                /* msecs */
   
       int debug;
       int trace;
       volatile int finished;
  -    volatile int connected;
  -    volatile int subscribed;
  +    volatile unsigned token; /* XXX MQTTClient_subscriptve.c */
   
       char * serverURI;
       int MQTTVersion;
       int sessionPresent;
   
  +    const char *dn;
  +
  +    MQTTAsync_connectOptions Copts;
  +    MQTTAsync_willOptions Wopts;
  +    MQTTAsync_SSLOptions Sopts;
  +    MQTTAsync_disconnectOptions Dopts;
  +    MQTTAsync_responseOptions Ropts;
   };
  +
  +extern struct rpmmqtt_s _mqtt;
  +
   #endif       /* _RPMMQTT_INTERNAL */
   
   #ifdef __cplusplus
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/tmqtt.c
  ============================================================================
  $ cvs diff -u -r1.1.2.5 -r1.1.2.6 tmqtt.c
  --- rpm/rpmio/tmqtt.c 1 Jul 2016 14:43:47 -0000       1.1.2.5
  +++ rpm/rpmio/tmqtt.c 2 Jul 2016 09:33:44 -0000       1.1.2.6
  @@ -11,6 +11,10 @@
   
   #include <poptIO.h>
   #include <rpmdefs.h>
  + 
  +#ifdef  WITH_MQTT
  +#include <MQTTAsync.h>
  +#endif
   
   #define      _RPMMQTT_INTERNAL
   #include "rpmmqtt.h"
  @@ -25,8 +29,8 @@
       int rc = 0;
   
   #ifdef       DYING
  -    (void) rpmmqttRead(mqtt, "rpm/#?qos=2", 0);
  -    (void) rpmmqttRead(mqtt, "$SYS/broker/version?qos=2", 0);
  +    (void) rpmmqttRead(mqtt, "rpm/#?qos=0", 0);
  +    (void) rpmmqttRead(mqtt, "$SYS/broker/version?qos=0", 0);
   #endif
   
       nw = rpmmqttWrite(mqtt, "bzzt ...", 0);
  @@ -47,23 +51,12 @@
   #define F_ISSET(_f, _FLAG) (((_f) & ((MQTT_FLAGS_##_FLAG) & ~0x40000000)) != 
MQTT_FLAGS_NONE)
   #define MF_ISSET(_FLAG) F_ISSET(mqtt->flags, _FLAG)
   
  -static struct rpmmqtt_s _mqtt = {
  -     ._host = "localhost",   /* "localhost" */
  -     ._id = "rpm-%{pid}",    /* "mosquito_pub-%{pid}" */
  -     ._idprefix = "rpm",     /* "mosquito_pub" */
  -     ._keepalive = 60,       /* 60 */
  -     ._port = 1883,          /* 1883 */
  -     ._qos = 0,              /* 0 */
  -     ._version = "31",       /* "mqttv31" */
  -     ._tls_version="1.2",    /* "tlsv1.2" */
  -};
  -
   struct poptOption mqttOptionsTable[] = {
  - { NULL, 'c', POPT_BIT_SET,          &_mqtt.flags, MQTT_FLAGS_NOCLEAN,
  + { "clean", 'c', POPT_ARG_VAL|POPT_ARGFLAG_XOR,      &_mqtt.flags, 
MQTT_FLAGS_CLEAN,
        N_("(sub) Do not clean session."), NULL },
    { NULL, 'C', POPT_ARG_INT,          &_mqtt._max_msg_count, 0,
        N_("(sub) Disconnect/exit after <MSGCNT>."), N_("<MSGCNT>") },
  - { NULL, 'N', POPT_BIT_SET,          &_mqtt.flags, MQTT_FLAGS_NOMSGEOL,
  + { "eol", 'N', POPT_ARG_VAL|POPT_ARGFLAG_XOR,        &_mqtt.flags, 
MQTT_FLAGS_EOL,
        N_("(sub) Do not print NL after messages."), NULL },
    { NULL, 'R', POPT_BIT_SET,          &_mqtt.flags, MQTT_FLAGS_NOSTALE,
        N_("(sub) Do not print stale messages."), NULL },
  @@ -72,27 +65,27 @@
        N_("Connect from local <ADDR>."), N_("<ADDR>") },
    { "file", 'f', POPT_ARG_STRING,     &_mqtt._infile, 0,
        N_("Send <FILE> as message."), N_("<FILE>") },
  - { "host", 'h', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&_mqtt._host, 0,
  + { "host", 'h', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&_mqtt.host, 0,
        N_("Connect to <HOST>."), N_("<HOST>") },
  - { "id", 'i', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&_mqtt._id, 0,
  + { "id", 'i', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&_mqtt._clientid, 0,
        N_("MQTT client <ID>."), N_("<ID>") },
  - { "prefix", 'I', POPT_ARG_STRING,   &_mqtt._idprefix, 0,
  + { "prefix", 'I', POPT_ARG_STRING,   &_mqtt.idprefix, 0,
        N_("Use <IDPREFIX>-%{pid} for clientid."), N_("<IDPREFIX>") },
  - { "keepalive", 'k', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT, 
&_mqtt._keepalive, 0,
  + { "keepalive", 'k', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT, 
&_mqtt.keepalive, 0,
        N_("Keep alive in <SECS>."), N_("<SECS>") },
    { NULL, 'l', POPT_BIT_SET,          &_mqtt.flags, MQTT_FLAGS_STDIN_EACH,
        N_("Send messages line-by-line from stdin."), NULL },
    { "message", 'm', POPT_ARG_STRING,  &_mqtt._message, 0,
        N_("MQTT payload <MESSAGE> to send."), N_("<MESSAGE>") },
  - { NULL, 'M', POPT_ARG_INT,          &_mqtt._max_inflight, 0,
  + { NULL, 'M', POPT_ARG_INT,          &_mqtt.max_inflight, 0,
        N_("Permit <MAX> inflight messages."), N_("<MAX>") },
    { "null", 'n', POPT_BIT_SET,                &_mqtt.flags, MQTT_FLAGS_EMPTY,
        N_("Send a null (zero length) message."), NULL },
  - { "port", 'p', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT,      &_mqtt._port, 0,
  + { "port", 'p', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT,      &_mqtt.port, 0,
        N_("Connect to network <PORT>."), N_("<PORT>") },
  - { "pass", 'P', POPT_ARG_STRING,     &_mqtt._password, 0,
  + { "pass", 'P', POPT_ARG_STRING,     &_mqtt.password, 0,
        N_("Remote user <PASSWORD>."), N_("<PASSWORD>") },
  - { "qos", 'q', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT,       &_mqtt._qos, 0,
  + { "qos", 'q', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT,       &_mqtt.qos, 0,
        N_("MQTT <QOS> level."), N_("<QOS>") },
    { "retain", 'r', POPT_BIT_SET,              &_mqtt.flags, MQTT_FLAGS_RETAIN,
        N_("Retain the message on the host."), NULL },
  @@ -100,31 +93,31 @@
        N_("Send stdin lines as a single message."), NULL },
    { NULL, 'S', POPT_BIT_SET,          &_mqtt.flags, MQTT_FLAGS_DNSSRV,
        N_("Use SRV record to find remote host."), NULL },
  - { "topic", 't', POPT_ARG_ARGV,              &_mqtt._topic, 0,
  + { "topic", 't', POPT_ARG_ARGV,              &_mqtt._topics, 0,
        N_("MQTT pub/sub <TOPIC>."), N_("<TOPIC>") },
  - { "user", 'u', POPT_ARG_STRING,     &_mqtt._user, 0,
  + { "user", 'u', POPT_ARG_STRING,     &_mqtt.user, 0,
        N_("Remote <USER>."), N_("<USER>") },
  - { NULL, 'V', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&_mqtt._version, 0,
  + { NULL, 'V', 
POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&_mqtt.protocol_version, 0,
        N_("MQTT protocol <VERSION>"), N_("{31|311}") },
   
  - { "will-payload", '\0', 
POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&_mqtt._will_message, 0,
  + { "will-payload", '\0', 
POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&_mqtt.will_message, 0,
        N_("Will payload <MESSAGE> ."), N_("<MESSAGE>") },
  - { "will-qos", '\0', POPT_ARG_INT,   &_mqtt._will_qos, 0,
  + { "will-qos", '\0', POPT_ARG_INT,   &_mqtt.will_qos, 0,
        N_("Will <QOS> level."), N_("<QOS>") },
  - { "will-topic", '\0', POPT_ARG_STRING,      &_mqtt._will_topic, 0,
  + { "will-topic", '\0', POPT_ARG_STRING,      &_mqtt.will_topic, 0,
        N_("Will <TOPIC> to publish to."), N_("<TOPIC>") },
    { "will-retain", '\0', POPT_BIT_SET,        &_mqtt.flags, 
MQTT_FLAGS_WILL_RETAIN,
        N_("Retain the client Will."), NULL },
   
  - { "cafile", '\0', POPT_ARG_STRING,  &_mqtt._cafile, 0,
  + { "cafile", '\0', POPT_ARG_STRING,  &_mqtt.cafile, 0,
        N_("CA certificate(s) from <FILE>."), N_("<FILE>") },
    { "capath", '\0', POPT_ARG_STRING,  &_mqtt._capath, 0,
        N_("CA certificate(s) in <DIR>."), N_("<DIR>") },
  - { "cert", '\0', POPT_ARG_STRING,    &_mqtt._cert, 0,
  + { "cert", '\0', POPT_ARG_STRING,    &_mqtt.cert, 0,
        N_("Client authentication <CERT>."), N_("<CERT>") },
  - { "key", '\0', POPT_ARG_STRING,     &_mqtt._privkey, 0,
  + { "key", '\0', POPT_ARG_STRING,     &_mqtt.privkey, 0,
        N_("Client private <KEY>."), N_("<KEY>") },
  - { "ciphers", '\0', POPT_ARG_STRING, &_mqtt._ciphers, 0,
  + { "ciphers", '\0', POPT_ARG_STRING, &_mqtt.ciphers, 0,
        N_("TLS ciphers <LIST> (openssl)."), N_("<LIST>") },
    { "tls-version", '\0', 
POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&_mqtt._tls_version, 0,
        N_("TLS protocol <VERSION>."), N_("{1.2|1.1|1}") },
  @@ -153,17 +146,19 @@
   static char _mqtt_argv[] = "\
     mqtt://luser:jasnl@localhost:1883/rpm/mqtt?qos=1,timeout=10000,trace=4 \n\
     rpm/#?qos=1 \n\
  -  $SYS/broker/version?qos=1 \n\
  -  $SYS/broker/timestamp?qos=1 \n\
  -  $SYS/broker/uptime?qos=1 \n\
  -  $SYS/broker/clients/#?qos=1 \n\
  -  $SYS/broker/messages/#?qos=1 \n\
  -  $SYS/broker/subscriptions/#?qos=1 \n\
  -  $SYS/broker/heap/#?qos=1 \n\
  -  $SYS/broker/publish/#?qos=1 \n\
  -  $SYS/broker/bytes/#?qos=1 \n\
   ";
  -    poptContext optCon = rpmioInit(argc, argv, mqttOptionsTable);
  +#ifdef       REF
  +  $SYS/broker/version?qos=0 \n\
  +  $SYS/broker/timestamp?qos=0 \n\
  +  $SYS/broker/uptime?qos=0 \n\
  +  $SYS/broker/clients/#?qos=0 \n\
  +  $SYS/broker/messages/#?qos=0 \n\
  +  $SYS/broker/subscriptions/#?qos=0 \n\
  +  $SYS/broker/heap/#?qos=0 \n\
  +  $SYS/broker/publish/#?qos=0 \n\
  +  $SYS/broker/bytes/#?qos=0 \n
  +#endif
  +    poptContext optCon;
   #ifdef       UNUSED
       ARGV_t av = poptGetArgs(optCon);
       int ac = argvCount(av);
  @@ -172,7 +167,22 @@
       rpmmqtt mqtt;
       int rc = -1;
   
  -(void) rpmDefineMacro(NULL, "_mqtt_trace 2", 0);
  +    /* Initialize the (allocated) CLI defaults. */
  +    memset(&_mqtt, 0, sizeof(_mqtt));
  +    _mqtt.flags |= (MQTT_FLAGS_CLEAN|MQTT_FLAGS_EOL);
  +    _mqtt.port = 1883;
  +    _mqtt.max_inflight = 20;
  +    _mqtt.keepalive = 60;
  +    _mqtt.qos = 0;
  +    _mqtt.protocol_version = rpmExpand("31", NULL);
  +    _mqtt.host = rpmExpand("localhost", NULL);
  +    _mqtt.idprefix = rpmExpand("rpm", NULL);
  +    _mqtt._clientid = rpmExpand(_mqtt.idprefix, "-%%{pid}", NULL);
  +    _mqtt._tls_version = rpmExpand("1.2", NULL);
  +
  +
  +    /* Initialize the _mqtt_ macro context */
  +(void) rpmDefineMacro(NULL, "_mqtt_trace 4", 0);
   
   (void) rpmDefineMacro(NULL, "_mqtt_cachedir /var/cache/mqtt", 0);
   (void) rpmDefineMacro(NULL, "_mqtt_user luser", 0);
  @@ -181,15 +191,24 @@
   (void) rpmDefineMacro(NULL, "_mqtt_port 1883", 0);
   (void) rpmDefineMacro(NULL, "_mqtt_clientid rpm-%{pid}", 0);
   (void) rpmDefineMacro(NULL, "_mqtt_topic rpm/#", 0);
  -(void) rpmDefineMacro(NULL, "_mqtt_qos 1", 0);
  -(void) rpmDefineMacro(NULL, "_mqtt_persist 2", 0);
  +(void) rpmDefineMacro(NULL, "_mqtt_qos 0", 0);
  +(void) rpmDefineMacro(NULL, "_mqtt_persist 0", 0);
   (void) rpmDefineMacro(NULL, "_mqtt_timeout 10000", 0);
   (void) rpmDefineMacro(NULL, "_mqtt_prefix %{now} rpm pid %{pid} on cpu%{cpu} 
%{user}:%{group} ", 0);
   
  +    optCon = rpmioInit(argc, argv, mqttOptionsTable);
  +
       mqtt = rpmmqttNew(_av, _mqtt.flags);
       rc = _DoMQTT(mqtt);
  +sleep(1);
       mqtt = rpmmqttFree(mqtt);
   
  +    _mqtt.protocol_version = _free(_mqtt.protocol_version);
  +    _mqtt.host = _free(_mqtt.host);
  +    _mqtt.idprefix = _free(_mqtt.idprefix);
  +    _mqtt._clientid = _free(_mqtt._clientid);
  +    _mqtt._tls_version = _free(_mqtt._tls_version);
  +
       optCon = rpmioFini(optCon);
       return rc;
   }
  @@ .
______________________________________________________________________
RPM Package Manager                                    http://rpm5.org
CVS Sources Repository                                rpm-cvs@rpm5.org

Reply via email to