RPM Package Manager, CVS Repository
  http://rpm5.org/cvs/
  ____________________________________________________________________________

  Server: rpm5.org                         Name:   Jeff Johnson
  Root:   /v/rpm/cvs                       Email:  j...@rpm5.org
  Module: rpm                              Date:   11-Jul-2016 22:26:53
  Branch: rpm-5_4                          Handle: 2016071120265300

  Modified files:           (Branch: rpm-5_4)
    rpm                     CHANGES configure.ac
    rpm/rpmio               rpmmqtt.c rpmmqtt.h tmqtt.c

  Log:
    - mqtt: add a WITH_MOSQUITTO implementation.

  Summary:
    Revision    Changes     Path
    1.3501.2.516+1  -0      rpm/CHANGES
    2.472.2.153 +20 -2      rpm/configure.ac
    1.1.2.20    +1790 -347  rpm/rpmio/rpmmqtt.c
    1.1.2.18    +134 -22    rpm/rpmio/rpmmqtt.h
    1.1.2.13    +5  -6      rpm/rpmio/tmqtt.c
  ____________________________________________________________________________

  patch -p0 <<'@@ .'
  Index: rpm/CHANGES
  ============================================================================
  $ cvs diff -u -r1.3501.2.515 -r1.3501.2.516 CHANGES
  --- rpm/CHANGES       7 Jul 2016 14:34:56 -0000       1.3501.2.515
  +++ rpm/CHANGES       11 Jul 2016 20:26:53 -0000      1.3501.2.516
  @@ -1,4 +1,5 @@
   5.4.17 -> 5.4.18:
  +    - jbj: mqtt: add a WITH_MOSQUITTO implementation.
       - jbj: blake2: upgrade to 20160619 release.
       - jbj: macros: stub-in rpmmc/rpmme pools for MacroContext/MacroEntry.
       - jbj: mqtt: prepare for MacroContext sub-classing.
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/configure.ac
  ============================================================================
  $ cvs diff -u -r2.472.2.152 -r2.472.2.153 configure.ac
  --- rpm/configure.ac  29 Jun 2016 12:17:57 -0000      2.472.2.152
  +++ rpm/configure.ac  11 Jul 2016 20:26:53 -0000      2.472.2.153
  @@ -2246,10 +2246,28 @@
   
   # MQTT
   RPM_CHECK_LIB(
  -    [MQTT], [mqtt],
  +    [Paho], [paho],
       [paho-mqtt3as], [MQTTAsync_create], [MQTTAsync.h],
       [no,external:none], [],
  -    [ AC_DEFINE(WITH_MQTT, 1, [Define if building with MQTT])
  +    [ AC_DEFINE(WITH_PAHO, 1, [Define if building with MQTT+PAHO])
  +    ], [])
  +RPM_CHECK_LIB(
  +    [Mosquitto], [mosquitto],
  +    [mosquitto], [mosquitto_lib_init], [mosquitto.h],
  +    [no,external:none], [],
  +    [ AC_DEFINE(WITH_MOSQUITTO, 1, [Define if building with MQTT+MOSQUITTO])
  +    ], [])
  +RPM_CHECK_LIB(
  +    [RabbitMQ], [rabbitmq],
  +    [rabbitmq], [amqp_new_connection], [amqp.h],
  +    [no,external:none], [],
  +    [ AC_DEFINE(WITH_RABBITMQ, 1, [Define if building with MQTT+RABBITMQ])
  +    ], [])
  +RPM_CHECK_LIB(
  +    [ZeroMQ], [zeromq],
  +    [zmq], [zmq_ctx_new], [zmq.h],
  +    [no,external:none], [],
  +    [ AC_DEFINE(WITH_ZEROMQ, 1, [Define if building with MQTT+ZEROMQ])
       ], [])
   
   # Libgit2
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmqtt.c
  ============================================================================
  $ cvs diff -u -r1.1.2.19 -r1.1.2.20 rpmmqtt.c
  --- rpm/rpmio/rpmmqtt.c       10 Jul 2016 16:16:04 -0000      1.1.2.19
  +++ rpm/rpmio/rpmmqtt.c       11 Jul 2016 20:26:53 -0000      1.1.2.20
  @@ -3,6 +3,15 @@
    */
   
   #include "system.h"
  +
  +#undef       WITH_PAHO
  +#undef       WITH_RABBITMQ
  +#undef       WITH_ZEROMQ
  +
  +#if defined(WITH_PAHO) || defined(WITH_MOSQUITTO) || defined(WITH_RABBITMQ) 
|| defined(WITH_ZEROMQ)
  +#define      WITH_MQTT
  +#endif
  +
   #include <stdio.h>
   #include <stdlib.h>
   #include <string.h>
  @@ -19,17 +28,25 @@
   #include <poptIO.h>
   #include <argv.h>
   
  -#ifdef       WITH_MQTT
  +#ifdef       WITH_PAHO
   #include <MQTTAsync.h>
   #endif
  +#ifdef       WITH_MOSQUITTO
  +#include <mosquitto.h>
  +#endif
  +#ifdef       WITH_RABBITMQ
  +#include <amqp.h>
  +#endif
  +#ifdef       WITH_ZEROMQ
  +#include <zmq.h>
  +#endif
   
   #define      _RPMMQTT_INTERNAL
   #include <rpmmqtt.h>
   
   #include "debug.h"
   
  -int _rpmmqtt_debug = 1;
  -#define SPEW(_list)     if (mqtt->debug || _rpmmqtt_debug < 0) fprintf _list
  +int _rpmmqtt_debug;
   
   #define F_ISSET(_f, _FLAG) (((_f) & ((MQTT_FLAGS_##_FLAG) & ~0x40000000)) != 
MQTT_FLAGS_NONE)
   #define MF_ISSET(_FLAG) F_ISSET(mqtt->flags, _FLAG)
  @@ -121,8 +138,8 @@
       return t;
   }
   
  -#define _ENTRY(_v)      { MQTT_FLAGS_##_v, #_v, }
   static KEY MqttFlags[] = {
  +#define _ENTRY(_v)      { MQTT_FLAGS_##_v, #_v, }
       _ENTRY(CLEAN),
       _ENTRY(EOL),
       _ENTRY(NOSTALE),
  @@ -133,8 +150,8 @@
       _ENTRY(RETAIN),
       _ENTRY(WILL_RETAIN),
       _ENTRY(BUFFER),
  -};
   #undef       _ENTRY
  +};
   static size_t nMqttFlags = sizeof(MqttFlags) / sizeof(MqttFlags[0]);
   
   static const char * fmtMqttFlags(uint32_t flags)
  @@ -146,33 +163,10 @@
   }
   #define      _MQTTFLAGS(_flags)      fmtMqttFlags(_flags)
   
  -#define _ENTRY(_v)      { MQTTASYNC_##_v, #_v, }
  -static KEY rpmmqtt_errs[] = {
  -#ifdef       WITH_MQTT
  -    _ENTRY(SUCCESS),
  -    _ENTRY(FAILURE),
  -    _ENTRY(PERSISTENCE_ERROR),
  -    _ENTRY(DISCONNECTED),
  -    _ENTRY(MAX_MESSAGES_INFLIGHT),
  -    _ENTRY(BAD_UTF8_STRING),
  -    _ENTRY(NULL_PARAMETER),
  -    _ENTRY(TOPICNAME_TRUNCATED),
  -    _ENTRY(BAD_STRUCTURE),
  -    _ENTRY(BAD_QOS),
  -    _ENTRY(NO_MORE_MSGIDS),
  -    _ENTRY(OPERATION_INCOMPLETE),
  -    _ENTRY(MAX_BUFFERED_MESSAGES),
  -#else
  -    { 0, NULL },
  -#endif
  -};
  -#undef       _ENTRY
  -static size_t rpmmqtt_nerrs = sizeof(rpmmqtt_errs) / sizeof(rpmmqtt_errs[0]);
  -
  -static const char * rpmmqttStrerror(int v)
  +static const char * rpmmqttStrerror(rpmmqtt mqtt, int v)
   {
  -    KEY * tbl = rpmmqtt_errs;
  -    size_t ntbl = rpmmqtt_nerrs;
  +    KEY * tbl = (KEY *) (mqtt && mqtt->vec ? mqtt->vec->errs : NULL);
  +    size_t ntbl = (mqtt && mqtt->vec && mqtt->vec->errs ? mqtt->vec->nerrs : 
0);
       const char * n = NULL;
       static char buf[64];
   
  @@ -192,11 +186,14 @@
   static rpmRC Xcheck(rpmmqtt mqtt, const char * msg, int rc,
                int printit, const char * func, const char * fn, unsigned ln)
   {
  +    int all = 0;
   
  -    if (rc != 0) {   /* MQTTCLIENT_SUCCESS */
  +    if (all || rc != 0) {
        int _lvl = RPMLOG_WARNING;
  -     rpmlog(_lvl, "%s:%s:%u: MQTTAsync_%s: %s(%d)\n",
  -             func, fn, ln, msg, rpmmqttStrerror(rc), rc);
  +     const char * api = (mqtt && mqtt->vec && mqtt->vec->prefix
  +             ? mqtt->vec->prefix : "");
  +     rpmlog(_lvl, "%s:%s:%u: %s%s: %s(%d)\n",
  +             func, fn, ln, api, msg, rpmmqttStrerror(mqtt, rc), rc);
       }
       return (rpmRC) rc;
   }
  @@ -390,8 +387,8 @@
   }
   
   /*==============================================================*/
  -#ifdef       WITH_MQTT
  -static int onMessageArrived(void * _mqtt, char * topic, int topicLen,
  +#ifdef       WITH_PAHO
  +static int pahoOnMessageArrived(void * _mqtt, char * topic, int topicLen,
                MQTTAsync_message *  message)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
  @@ -401,8 +398,9 @@
   
       const char * s = (const char *) message->payload;
       size_t ns = message->payloadlen;
  -    if (_rpmmqtt_debug < 0)
  -     rpmlog(RPMLOG_DEBUG, "MQTT rcvd topic(%s) \"%.*s\"\n", topic, (int)ns, 
s);
  +
  +    rpmlog(RPMLOG_DEBUG, "%s rcvd topic(%s) \"%.*s\"\n", mqtt->vec->name,
  +                     topic, (int)ns, s);
   
       if (mqtt->iob) {
        mqtt->iob = rpmiobAppend(mqtt->iob, topic, 0);
  @@ -442,73 +440,67 @@
       return rc;
   }
   
  -static void onDeliveryComplete(void * _mqtt, int token)
  +static void pahoOnDeliveryComplete(void * _mqtt, int token)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
  -    if (_rpmmqtt_debug < 0)
  -     rpmlog(RPMLOG_DEBUG, "--- MQTT done(%d)\n", token);
       mqtt->token = token;
   }
   
  -static void onConnectionLost(void * _mqtt, char *cause)
  +static void pahoOnConnectionLost(void * _mqtt, char *cause)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
   
       rpmlog(RPMLOG_DEBUG,
  -             "--- MQTT disconnect(%s) version(%d) present(%d)\n",
  +             "%s disconnect(%s) version(%d) present(%d)\n", mqtt->vec->name,
                mqtt->serverURI, mqtt->MQTTVersion, mqtt->sessionPresent);
       if (cause)
        rpmlog(RPMLOG_DEBUG, "\tcause: %s\n", cause);
   
  -    rpmlog(RPMLOG_WARNING, "MQTT reconnecting(%s) ...\n", mqtt->serverURI);
  +    rpmlog(RPMLOG_WARNING, "%s reconnecting(%s) ...\n", mqtt->vec->name,
  +                     mqtt->serverURI);
   
       (void) rpmmqttConnect(mqtt);
   }
   
  -static void onFailure(void * _mqtt, MQTTAsync_failureData * response)
  +static void pahoOnFailure(void * _mqtt, MQTTAsync_failureData * response)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
  -    rpmlog(RPMLOG_WARNING, "MQTT failed\n");
  -SPEW((stderr, "<-- %s(%p,%p)\n", __FUNCTION__, _mqtt, response));
  +    rpmlog(RPMLOG_WARNING, "%s cmd failed.\n", mqtt->vec->name);
       mqtt->finished = 1;
   }
   
  -static void onSuccess(void * _mqtt, MQTTAsync_successData * response)
  +static void pahoOnSuccess(void * _mqtt, MQTTAsync_successData * response)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
  -SPEW((stderr, "<-- %s(%p,%p)\n", __FUNCTION__, _mqtt, response));
       mqtt->finished = 1;
   }
   
  -static void onDisconnectFailure(void * _mqtt, MQTTAsync_failureData * 
response)
  +static void pahoOnDisconnectFailure(void * _mqtt, MQTTAsync_failureData * 
response)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
  -fprintf(stderr, "<-- %s(%p,%p) MQTT disconnect failed\n", __FUNCTION__, 
_mqtt, response);
  -    rpmlog(RPMLOG_WARNING, "MQTT disconnect failed\n");
  +    rpmlog(RPMLOG_WARNING, "%s disconnect failed\n", mqtt->vec->name);
       mqtt->finished = 1;
   }
   
  -static void onDisconnect(void * _mqtt, MQTTAsync_successData * response)
  +static void pahoOnDisconnect(void * _mqtt, MQTTAsync_successData * response)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
   
  -    if (mqtt->debug || _rpmmqtt_debug)
  -     rpmlog(RPMLOG_DEBUG,
  -             "MQTT disconnect(%s) version(%d) present(%d)\n",
  +    rpmlog(RPMLOG_DEBUG,
  +             "%s disconnect(%s) version(%d) present(%d)\n", mqtt->vec->name,
                mqtt->serverURI, mqtt->MQTTVersion, mqtt->sessionPresent);
       mqtt->serverURI = _free(mqtt->serverURI);
       mqtt->finished = 1;
   }
   
  -static void onConnectFailure(void * _mqtt, MQTTAsync_failureData * response)
  +static void pahoOnConnectFailure(void * _mqtt, MQTTAsync_failureData * 
response)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
  -fprintf(stderr, "<-- %s(%p,%p) MQTT connect failed\n", __FUNCTION__, _mqtt, 
response);
  -    rpmlog(RPMLOG_WARNING, "MQTT    connect failed\n");
  +    rpmlog(RPMLOG_WARNING, "%s connect failed\n", mqtt->vec->name);
       mqtt->finished = 1;
   }
   
  -static void onConnect(void * _mqtt, MQTTAsync_successData * response)
  +static void pahoOnConnect(void * _mqtt, MQTTAsync_successData * response)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
   
  @@ -519,77 +511,73 @@
        mqtt->sessionPresent = response->alt.connect.sessionPresent;
       }
   
  -    if (mqtt->debug || _rpmmqtt_debug)
  -     rpmlog(RPMLOG_DEBUG,
  -             "MQTT    connect(%s) version(%d) present(%d)\n",
  +    rpmlog(RPMLOG_DEBUG,
  +             "%s connect(%s) version(%d) present(%d)\n", mqtt->vec->name,
                mqtt->serverURI, mqtt->MQTTVersion, mqtt->sessionPresent);
       mqtt->finished = 1;
   }
   
  -static void onSubscribeFailure(void * _mqtt, MQTTAsync_failureData * 
response)
  +static void pahoOnSubscribeFailure(void * _mqtt, MQTTAsync_failureData * 
response)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
  -fprintf(stderr, "<-- %s(%p,%p) MQTT connect failed\n", __FUNCTION__, _mqtt, 
response);
       if (response) {
        const char *s = response->message;
        int token = response->token;
        int code = response->code;
  -     rpmlog(RPMLOG_WARNING, "MQTT  subscribe(%d) failed: code(%d) msg %s\n",
  -                     token, code, s);
  +     rpmlog(RPMLOG_WARNING, "%s subscribe(%d) failed: code(%d) msg %s\n",
  +                     mqtt->vec->name, token, code, s);
       } else
  -     rpmlog(RPMLOG_WARNING, "MQTT  subscribe failed\n");
  +     rpmlog(RPMLOG_WARNING, "%s subscribe failed\n", mqtt->vec->name);
       mqtt->finished = 1;
   }
   
  -static void onSubscribe(void * _mqtt, MQTTAsync_successData * response)
  +static void pahoOnSubscribe(void * _mqtt, MQTTAsync_successData * response)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
       int qos = response->alt.qos;
   
  -    if (mqtt->debug || _rpmmqtt_debug)
  -     rpmlog(RPMLOG_DEBUG, "MQTT  subscribe qos(%d)\n", qos);
  +    rpmlog(RPMLOG_DEBUG, "%s subscribe qos(%d)\n", mqtt->vec->name, qos);
       mqtt->finished = 1;
   }
   
  -static void onSubscribeManyFailure(void * _mqtt, MQTTAsync_failureData * 
response)
  +static void pahoOnSubscribeManyFailure(void * _mqtt, MQTTAsync_failureData * 
response)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
  -    rpmlog(RPMLOG_WARNING, "MQTT    subscribeMany failed\n");
  +
  +    rpmlog(RPMLOG_WARNING, "%s subscribeMany failed\n", mqtt->vec->name);
       mqtt->finished = 1;
   }
   
  -static void onSubscribeMany(void * _mqtt, MQTTAsync_successData * response)
  +static void pahoOnSubscribeMany(void * _mqtt, MQTTAsync_successData * 
response)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +
   #ifdef       DYING
       int *subqos = response->alt.qosList;
  -
  -SPEW((stderr, "--> %s(%p,%p) subqos %p[%u]\n", __FUNCTION__, _mqtt, 
response, subqos, mqtt->ac));
       for (int i = 0; i < mqtt->ac; i++) {
  -     if (mqtt->debug || _rpmmqtt_debug)
  -         rpmlog(RPMLOG_DEBUG, "MQTT  subscribe qos(%d)\n", subqos[i]);
  +     rpmlog(RPMLOG_DEBUG, "%s subscribe qos(%d)\n", mqtt->vec->name, 
subqos[i]);
       }
   #endif
  +
       mqtt->finished = 1;
   }
   
  -static void onUnsubscribeManyFailure(void * _mqtt, MQTTAsync_failureData * 
response)
  +static void pahoOnUnsubscribeManyFailure(void * _mqtt, MQTTAsync_failureData 
* response)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
  -    rpmlog(RPMLOG_WARNING, "MQTT    unsubscribeMany failed\n");
  +
  +    rpmlog(RPMLOG_WARNING, "%s unsubscribeMany failed\n", mqtt->vec->name);
       mqtt->finished = 1;
   }
   
  -static void onUnsubscribeMany(void * _mqtt, MQTTAsync_successData * response)
  +static void pahoOnUnsubscribeMany(void * _mqtt, MQTTAsync_successData * 
response)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
   
  -    rpmlog(RPMLOG_DEBUG, "MQTT  unsubscribeMany\n");
  -SPEW((stderr, "<-- %s(%p,%p)\n", __FUNCTION__, _mqtt, response));
       mqtt->finished = 1;
   }
   
  -static void onSendFailure(void * _mqtt, MQTTAsync_failureData * response)
  +static void pahoOnSendFailure(void * _mqtt, MQTTAsync_failureData * response)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
   
  @@ -597,22 +585,22 @@
        const char *s = response->message;
        int token = response->token;
        int code = response->code;
  -     rpmlog(RPMLOG_WARNING, "MQTT send(%d) failed: code(%d) msg %s\n",
  -                     token, code, s);
  +     rpmlog(RPMLOG_WARNING, "%s send(%d) failed: code(%d) msg %s\n",
  +                     mqtt->vec->name, token, code, s);
       }
       mqtt->finished = 1;
   }
   
  -static void onSend(void * _mqtt, MQTTAsync_successData * response)
  +static void pahoOnSend(void * _mqtt, MQTTAsync_successData * response)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
   
  -    if (mqtt->debug || _rpmmqtt_debug) {
  +    {
        const char * s = (const char *) response->alt.pub.message.payload;
        size_t ns = response->alt.pub.message.payloadlen;
        int token = response->token;
  -     rpmlog(RPMLOG_DEBUG, "MQTT sent(%d) topic(%s) \"%.*s\"\n",
  -                     token, mqtt->topic, (int)ns, s);
  +     rpmlog(RPMLOG_DEBUG, "%s sent(%d) topic(%s) \"%.*s\"\n",
  +                     mqtt->vec->name, token, mqtt->topic, (int)ns, s);
       }
       mqtt->finished = 1;
   }
  @@ -621,11 +609,11 @@
   {
       rpmlog(RPMLOG_DEBUG, "%s\n", message);
   }
  -#endif       /* WITH_MQTT */
  +#endif       /* WITH_PAHO */
   
   /*==============================================================*/
  -#ifdef       WITH_MQTT
  -static int rpmmqttOpen(void **_mqttp, const char *clientID, const char 
*serverURI,
  +#ifdef       WITH_PAHO
  +static int pahoOpen(void **_mqttp, const char *clientID, const char 
*serverURI,
                void *_mqtt)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
  @@ -649,12 +637,12 @@
       rc = 0;
   
   exit:
  -SPEW((stderr, "<-- %s(%p,\"%s\",\"%s\",%p) rc %d dn %s\n", __FUNCTION__, 
_mqttp, clientID, serverURI, _mqtt, rc, mqtt->cachedn));
  +SPEW((stderr, "<--    %s(%p,\"%s\",\"%s\",%p) rc %d dn %s\n", __FUNCTION__, 
_mqttp, clientID, serverURI, _mqtt, rc, mqtt->cachedn));
   
       return rc;
   }
   
  -static int rpmmqttClose(void *_mqtt)
  +static int pahoClose(void *_mqtt)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
       int rc = MQTTCLIENT_PERSISTENCE_ERROR;
  @@ -671,11 +659,11 @@
       rc = 0;
   
   exit:
  -SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, _mqtt, rc));
  +SPEW((stderr, "<--    %s(%p) rc %d\n", __FUNCTION__, _mqtt, rc));
       return rc;
   }
   
  -static int rpmmqttPut(void *_mqtt, char *key,
  +static int pahoPut(void *_mqtt, char *key,
                int bufcount, char *buffers[], int buflens[])
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
  @@ -697,10 +685,31 @@
       for (int i = 0; i < bufcount; i++) {
        const char * s = buffers[i];
        int ns = buflens[i];
  +     int j;
  +     for (j = 0; j < ns; j++) {
  +         if (xisprint(s[j]))
  +             continue;
  +         break;
  +     }
  +     if (i > 0 && (j == ns || (j == ns-1 && s[j] == '\0'))) {
   SPEW((stderr, "%5d\t%p[%d]\t\"%.*s\"\n", i, s, ns, ns, s));
  +     } else {
  +         if (ns <= 8) {
  +             unsigned long val = 0;
  +             j = ns;
  +             while (--j >= 0) {
  +                 val <<= 8;
  +                 val |= s[j];
  +             }
  +SPEW((stderr, "%5d\t%p[%d]\t0x%lx\n", i, s, ns, val));
  +         } else {
  +SPEW((stderr, "%5d\t%p[%d]\n", i, s, ns));
  +         }
  +     }
        nb += buflens[i];
        nw += Fwrite(buffers[i], sizeof(*buffers[i]), buflens[i], fd);
       }
  +    (void) fdatasync(Fileno(fd));
       (void) Fclose(fd);
       fd = NULL;
   
  @@ -714,13 +723,12 @@
   exit:
       if (fd)
        Fclose(fd);
  -SPEW((stderr, "<-- %s(%p,\"%s\",%d,%p,%p) rc %d fn %s\n", __FUNCTION__, 
_mqtt, key, bufcount, buffers, buflens, rc, fn));
  +SPEW((stderr, "<--    %s(%p,\"%s\",%d,%p,%p) rc %d fn %s\n", __FUNCTION__, 
_mqtt, key, bufcount, buffers, buflens, rc, fn));
       fn = _free(fn);
       return rc;
   }
   
  -static int rpmmqttGet(void *_mqtt, char *key,
  -             char *buffer[], int *buflen)
  +static int pahoGet(void *_mqtt, char *key, char *buffer[], int *buflen)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
       char *fn = NULL;
  @@ -759,14 +767,14 @@
   exit:
       if (fd)
        Fclose(fd);
  -SPEW((stderr, "<-- %s(%p,\"%s\",%p,%p) rc %d fn %s\n", __FUNCTION__, _mqtt, 
key, buffer, buflen, rc, fn));
  +SPEW((stderr, "<--    %s(%p,\"%s\",%p,%p) rc %d fn %s\n", __FUNCTION__, 
_mqtt, key, buffer, buflen, rc, fn));
       fn = _free(fn);
       *buffer = b;
       *buflen = nb;
       return rc;
   }
   
  -static int rpmmqttRemove(void *_mqtt, char *key)
  +static int pahoRemove(void *_mqtt, char *key)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
       char *fn = NULL;
  @@ -783,12 +791,12 @@
       rc = 0;
   
   exit:
  -SPEW((stderr, "<-- %s(%p,\"%s\") rc %d fn %s\n", __FUNCTION__, _mqtt, key, 
rc, fn));
  +SPEW((stderr, "<--    %s(%p,\"%s\") rc %d fn %s\n", __FUNCTION__, _mqtt, 
key, rc, fn));
       fn = _free(fn);
       return rc;
   }
   
  -static int rpmmqttKeys(void *_mqtt, char ***keys, int *nkeys)
  +static int pahoKeys(void *_mqtt, char ***keys, int *nkeys)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
       ARGV_t av = NULL;
  @@ -820,13 +828,13 @@
   exit:
       if (dir)
        (void) Closedir(dir);
  -SPEW((stderr, "<-- %s(%p,%p,%p) rc %d keys %p[%u]\n", __FUNCTION__, _mqtt, 
keys, nkeys, rc, av, ac));
  +SPEW((stderr, "<--    %s(%p,%p,%p) rc %d keys %p[%u]\n", __FUNCTION__, 
_mqtt, keys, nkeys, rc, av, ac));
       *keys = (char **) av;
       *nkeys = ac;
       return rc;
   }
   
  -static int rpmmqttClear(void *_mqtt)
  +static int pahoClear(void *_mqtt)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
       DIR * dir = NULL;
  @@ -857,11 +865,11 @@
   exit:
       if (dir)
        (void) Closedir(dir);
  -SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, _mqtt, rc));
  +SPEW((stderr, "<--    %s(%p) rc %d\n", __FUNCTION__, _mqtt, rc));
       return rc;
   }
   
  -static int rpmmqttContainsKey(void *_mqtt, char *key)
  +static int pahoContainsKey(void *_mqtt, char *key)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
       DIR * dir = NULL;
  @@ -892,28 +900,28 @@
   exit:
       if (dir)
        (void) Closedir(dir);
  -SPEW((stderr, "<-- %s(%p,\"%s\") rc %d\n", __FUNCTION__, _mqtt, key, rc));
  +SPEW((stderr, "<--    %s(%p,\"%s\") rc %d\n", __FUNCTION__, _mqtt, key, rc));
       return rc;
   }
   
  -static MQTTClient_persistence _rpmmqtt_persistence = {
  +static MQTTClient_persistence _mqtt_persistence = {
       NULL,
  -    rpmmqttOpen,
  -    rpmmqttClose,
  -    rpmmqttPut,
  -    rpmmqttGet,
  -    rpmmqttRemove,
  -    rpmmqttKeys,
  -    rpmmqttClear,
  -    rpmmqttContainsKey,
  +    pahoOpen,
  +    pahoClose,
  +    pahoPut,
  +    pahoGet,
  +    pahoRemove,
  +    pahoKeys,
  +    pahoClear,
  +    pahoContainsKey,
   };
   
  -#endif       /* WITH_MQTT */
  +#endif       /* WITH_PAHO */
   
   /*==============================================================*/
   
  -#ifdef       WITH_MQTT
  -struct mqttState_s {
  +#ifdef       WITH_PAHO
  +struct pahoState_s {
       MQTTAsync_connectOptions C;
       MQTTAsync_willOptions    W;
       MQTTAsync_SSLOptions     S;
  @@ -923,7 +931,7 @@
       MQTTAsync_message                M;
   };
   
  -static struct mqttState_s mqttStateInitial = {
  +static struct pahoState_s pahoStateInitial = {
       .C = MQTTAsync_connectOptions_initializer,
       .W = MQTTAsync_willOptions_initializer,
       .S = MQTTAsync_SSLOptions_initializer,
  @@ -932,14 +940,13 @@
       .O = MQTTAsync_createOptions_initializer,
       .M = MQTTAsync_message_initializer,
   };
  -#endif       /* WITH_MQTT */
   
  -static void * AOBJ(rpmmqtt mqtt, char otype)
  +static
  +void * AOBJ(rpmmqtt mqtt, char otype)
   {
       void * ptr = NULL;
  -#ifdef       WITH_MQTT
  -    mqttState p = NULL;
  -    mqttState q = &mqttStateInitial;
  +    pahoState_t p = NULL;
  +    pahoState_t q = &pahoStateInitial;
       MQTTAsync_connectOptions *C;
       MQTTAsync_disconnectOptions *D;
       MQTTAsync_message *M;
  @@ -950,8 +957,8 @@
       urlinfo u;
   
       if (mqtt->state == NULL)
  -     mqtt->state = (mqttState) xcalloc(1, sizeof(*p));
  -    p = (mqttState) mqtt->state;
  +     mqtt->state = (pahoState_t) xcalloc(1, sizeof(*p));
  +    p = (pahoState_t) mqtt->state;
   
       switch (otype) {
       case 'C':
  @@ -965,7 +972,7 @@
        W = (MQTTAsync_willOptions *) AOBJ(mqtt, 'W');          /* XXX LWT */
        C->will = (W && W->topicName ? W : NULL);
   
  -     C->username = (u && u->user ? u->user : mqtt->user);;
  +     C->username = (u && u->user ? u->user : mqtt->user);
        C->password = (u && u->password ? u->password : mqtt->password);
   
        C->connectTimeout = 30; /* XXX secs. configure?*/
  @@ -974,8 +981,8 @@
        S = (MQTTAsync_SSLOptions *) AOBJ(mqtt, 'S');           /* XXX SSL */
        C->ssl = (S && S->keyStore) ? S : NULL; /* XXX */
   
  -     C->onSuccess = onConnect;
  -     C->onFailure = onConnectFailure;
  +     C->onSuccess = pahoOnConnect;
  +     C->onFailure = pahoOnConnectFailure;
        C->context = mqtt;
   
        C->serverURIcount = 0;
  @@ -999,8 +1006,8 @@
        D = &p->D;
        memcpy(D, &q->D, sizeof(*D));
        D->timeout = mqtt->timeout;
  -     D->onSuccess = onDisconnect;
  -     D->onFailure = onDisconnectFailure;
  +     D->onSuccess = pahoOnDisconnect;
  +     D->onFailure = pahoOnDisconnectFailure;
        D->context = mqtt;
        ptr = (void *) D;
        break;
  @@ -1054,55 +1061,170 @@
   assert(0);
        break;
       }
  -#endif       /* WITH_MQTT */
       return ptr;
   }
   
  -rpmRC rpmmqttConnect(rpmmqtt mqtt)
  +static
  +rpmRC pahoDestroy(rpmmqtt mqtt)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +    rc = check(mqtt, "destroy",
  +             (MQTTAsync_destroy(&mqtt->I), 0));
  +    mqtt->state = _free(mqtt->state);
  +    mqtt->I = NULL;
  +    return rc;
  +}
  +
  +static
  +rpmRC pahoCreate(rpmmqtt mqtt)
   {
  +    static int oneshot;
  +    int _lvl = RPMLOG_DEBUG;
       rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +    int xx;
  +
  +#ifdef       DYING
  +mqtt->trace = 4;     /* XXX */
  +#endif
  +    if (mqtt->trace && rpmIsDebug()) {
  +     xx = check(mqtt, "setTraceCallback",
  +             (MQTTAsync_setTraceCallback(onTrace), 0));
  +     xx = check(mqtt, "setTraceLevel",
  +             (MQTTAsync_setTraceLevel((enum 
MQTTASYNC_TRACE_LEVELS)mqtt->trace), 0));
  +    }
  +
  +    rpmlog(_lvl, "==================== %s\n", mqtt->vec->name);
  +
  +    if (!oneshot) {
  +     if (mqtt->trace == 0) {
  +         MQTTAsync_nameValue *NV = MQTTAsync_getVersionInfo();
  +         while (NV->name) {
  +             rpmlog(_lvl, "%19s: %s\n", NV->name, NV->value);
  +             NV++;
  +         }
  +     }
  +     oneshot++;
  +    }
  +
  +    /* XXX improve integration */
  +static const char _mqtt_persist[] =
  +     "%{?_mqtt_persist}%{!?_mqtt_persist:2}";
  +    mqtt->persist_type = (rpmmqttExpandNumeric(mqtt, _mqtt_persist) % 3);
  +static const char _mqtt_cachedir[] =
  +     "%{?_mqtt_cachedir}%{!?_mqtt_cachedir:/var/cache/mqtt}";
  +char *persist_path = rpmGetPath(_mqtt_cachedir, NULL);
  +
  +    rpmlog(_lvl, "%19s: %s\n", "clientid", mqtt->clientid);
  +    rpmlog(_lvl, "%19s: %s qos(%d) timeout(%u msecs)\n",
  +             "topic", mqtt->topic, mqtt->qos, mqtt->timeout);
  +    rpmlog(_lvl, "%19s: type(%u) %s\n",
  +             "persist", mqtt->persist_type,
  +             (mqtt->persist_type ? persist_path : ""));
  +    rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags));
  +    if (mqtt->trace)
  +     rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace);
  +
  +mqtt->persist_path = _free(mqtt->persist_path);
  +mqtt->persist_ctx = _free(mqtt->persist_ctx);
  +    switch (mqtt->persist_type) {
  +    default:
  +    case MQTTCLIENT_PERSISTENCE_NONE:
  +     mqtt->persist_ctx = NULL;
  +     break;
  +    case MQTTCLIENT_PERSISTENCE_DEFAULT:
  +      {
  +     mqtt->persist_path = xstrdup(persist_path);
  +     /* XXX rpmmqttFini double free */
  +     mqtt->persist_ctx = (void *)xstrdup(mqtt->persist_path);
  +      } break;
  +    case MQTTCLIENT_PERSISTENCE_USER:
  +      {
  +     mqtt->persist_path = xstrdup(persist_path);
  +     MQTTClient_persistence * ctx =
  +             (MQTTClient_persistence *) xmalloc(sizeof(*ctx));
  +     *ctx = _mqtt_persistence;       /* structure assignment */
  +     ctx->context = mqtt;
  +     mqtt->persist_ctx = ctx;
  +      } break;
  +    }
  +persist_path = _free(persist_path);
  +
  +mqtt->u = NULL;
  +dumpMQTT(__FUNCTION__, mqtt);
  +
  +    if (mqtt->I == NULL) {
  +     xx = check(mqtt, "createWithOptions",
  +             MQTTAsync_createWithOptions(&mqtt->I,
  +                     mqtt->uri, mqtt->clientid,
  +                     mqtt->persist_type, mqtt->persist_ctx,
  +                     (MQTTAsync_createOptions *)AOBJ(mqtt, 'O')));
  +
  +     xx = check(mqtt, "setCallbacks",
  +             MQTTAsync_setCallbacks(mqtt->I, mqtt,
  +                     pahoOnConnectionLost,
  +                     pahoOnMessageArrived,
  +                     pahoOnDeliveryComplete));
  +    }
  +
  +    rc = RPMRC_OK;
   
  -SPEW((stderr, "--> %s(%p)\n", __FUNCTION__, mqtt));
  -#ifdef       WITH_MQTT
  -    if (!MQTTAsync_isConnected(mqtt->I)) {
  +    return rc;
  +}
  +
  +static
  +rpmRC pahoDisconnect(rpmmqtt mqtt)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +    if (rpmmqttIsConnected(mqtt) == RPMRC_OK) {
        mqtt->finished = 0;
  -     rc = check(mqtt, "connect",
  -             MQTTAsync_connect(mqtt->I,
  -                     (const MQTTAsync_connectOptions*)AOBJ(mqtt, 'C')));
  +     rc = check(mqtt, "disconnect",
  +             MQTTAsync_disconnect(mqtt->I,
  +                     (const MQTTAsync_disconnectOptions*)AOBJ(mqtt, 'D')));
        while (!mqtt->finished)
  -         usleep(1000);
  +         usleep(100);
        if (rc)
            goto exit;
       }
       rc = RPMRC_OK;
  +
   exit:
  -#endif       /* WITH_MQTT */
  -SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, mqtt, rc));
       return rc;
   }
   
  -rpmRC rpmmqttDisconnect(rpmmqtt mqtt)
  +static
  +rpmRC pahoConnect(rpmmqtt mqtt)
   {
       rpmRC rc = RPMRC_FAIL;   /* assume failure */
  -#ifdef       WITH_MQTT
  -    if (MQTTAsync_isConnected(mqtt->I)) {
  +
  +    if (rpmmqttIsConnected(mqtt) == RPMRC_NOTFOUND) {
        mqtt->finished = 0;
  -     rc = check(mqtt, "disconnect",
  -             MQTTAsync_disconnect(mqtt->I,
  -                     (const MQTTAsync_disconnectOptions*)AOBJ(mqtt, 'D')));
  +     rc = check(mqtt, "connect",
  +             MQTTAsync_connect(mqtt->I,
  +                     (const MQTTAsync_connectOptions*)AOBJ(mqtt, 'C')));
        while (!mqtt->finished)
  -         usleep(100);
  +         usleep(1000);
        if (rc)
            goto exit;
       }
       rc = RPMRC_OK;
  +
   exit:
  -#endif
  -SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, mqtt, rc));
       return rc;
   }
   
  -rpmRC rpmmqttSendMessage(rpmmqtt mqtt, const char * topic,
  +static
  +rpmRC pahoIsConnected(rpmmqtt mqtt)
  +{
  +    rpmRC rc = RPMRC_NOTFOUND;
  +
  +    if (check(mqtt, "isconnected",
  +             MQTTAsync_isConnected(mqtt->I)))
  +     rc = RPMRC_OK;
  +    return rc;
  +}
  +
  +static
  +rpmRC pahoSendMessage(rpmmqtt mqtt, const char * topic,
                const char * s, size_t ns)
   {
       rpmRC rc = RPMRC_FAIL;   /* assume failure */
  @@ -1114,8 +1236,7 @@
       if (ns == 0)
        ns = strlen(s);
   
  -#ifdef       WITH_MQTT
  -    if (!rpmmqttConnect(mqtt)) {
  +    if (rpmmqttIsConnected(mqtt) == RPMRC_OK) {
        MQTTAsync_message *M =
            (MQTTAsync_message *) AOBJ(mqtt, 'M');
        M->payloadlen = ns;
  @@ -1123,8 +1244,8 @@
   
        MQTTAsync_responseOptions *R =
            (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R');
  -     R->onSuccess = onSend;
  -     R->onFailure = onSendFailure;
  +     R->onSuccess = pahoOnSend;
  +     R->onFailure = pahoOnSendFailure;
   
        mqtt->finished = 0;
        rc = check(mqtt, "sendMessage",
  @@ -1135,21 +1256,18 @@
            goto exit;
        rc = RPMRC_OK;
       }
  -exit:
  -#endif       /* WITH_MQTT */
   
  -SPEW((stderr, "<-- %s(%p,%p[%u]) rc %d\n", __FUNCTION__, mqtt, s, 
(unsigned)ns, rc));
  +exit:
       return rc;
   }
   
  -rpmRC rpmmqttSubscribeMany(rpmmqtt mqtt, int ac, char ** av)
  +static
  +rpmRC pahoSubscribeMany(rpmmqtt mqtt, int ac, char ** av)
   {
       rpmRC rc = RPMRC_FAIL;   /* assume failure */
   
  -SPEW((stderr, "--> %s(%p,%p[%d])\n", __FUNCTION__, mqtt, av, ac));
       if (ac <= 0)
        goto exit;
  -#ifdef       WITH_MQTT
       if (!rpmmqttConnect(mqtt)) {
        int _lvl = RPMLOG_DEBUG;
        int *subqos = (int *) xcalloc(ac, sizeof(*subqos));
  @@ -1171,8 +1289,8 @@
   
        MQTTAsync_responseOptions *R =
                (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R');
  -     R->onSuccess = onSubscribeMany;
  -     R->onFailure = onSubscribeManyFailure;
  +     R->onSuccess = pahoOnSubscribeMany;
  +     R->onFailure = pahoOnSubscribeManyFailure;
   
        mqtt->finished = 0;
        rc = check(mqtt, "subscribeMany",
  @@ -1185,19 +1303,16 @@
            goto exit;
        rc = RPMRC_OK;
       }
  -#endif       /* WITH_MQTT */
   
   exit:
  -SPEW((stderr, "<-- %s(%p,%p[%d]) rc %d\n", __FUNCTION__, mqtt, av, ac, rc));
       return rc;
   }
   
  -rpmRC rpmmqttSubscribe(rpmmqtt mqtt, const char * topic, int qos)
  +static
  +rpmRC pahoSubscribe(rpmmqtt mqtt, const char * topic, int qos)
   {
       rpmRC rc = RPMRC_FAIL;   /* assume failure */
   
  -SPEW((stderr, "--> %s(%p,%p,%d)\n", __FUNCTION__, mqtt, topic, qos));
  -#ifdef       WITH_MQTT
       if (!rpmmqttConnect(mqtt)) {
        int _lvl = RPMLOG_DEBUG;
   
  @@ -1205,8 +1320,8 @@
   
        MQTTAsync_responseOptions *R =
                (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R');
  -     R->onSuccess = onSubscribe;
  -     R->onFailure = onSubscribeFailure;
  +     R->onSuccess = pahoOnSubscribe;
  +     R->onFailure = pahoOnSubscribeFailure;
   
        mqtt->finished = 0;
        rc = check(mqtt, "subscribe",
  @@ -1217,19 +1332,16 @@
            goto exit;
        rc = RPMRC_OK;
       }
  -#endif       /* WITH_MQTT */
   
   exit:
  -SPEW((stderr, "<-- %s(%p,%p[%d]) rc %d\n", __FUNCTION__, mqtt, topic, qos, 
rc));
       return rc;
   }
   
  -rpmRC rpmmqttUnsubscribe(rpmmqtt mqtt, const char * topic)
  +static
  +rpmRC pahoUnsubscribe(rpmmqtt mqtt, const char * topic)
   {
       rpmRC rc = RPMRC_FAIL;   /* assume failure */
   
  -SPEW((stderr, "--> %s(%p,\"%s\")\n", __FUNCTION__, mqtt, topic));
  -#ifdef       WITH_MQTT
       if (!rpmmqttConnect(mqtt)) {
        int _lvl = RPMLOG_DEBUG;
   
  @@ -1237,8 +1349,8 @@
   
        MQTTAsync_responseOptions *R =
                (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R');
  -     R->onSuccess = onUnsubscribeMany;
  -     R->onFailure = onUnsubscribeManyFailure;
  +     R->onSuccess = pahoOnUnsubscribeMany;
  +     R->onFailure = pahoOnUnsubscribeManyFailure;
   
        mqtt->finished = 0;
        rc = check(mqtt, "unsubscribe",
  @@ -1249,21 +1361,18 @@
            goto exit;
        rc = RPMRC_OK;
       }
  -#endif       /* WITH_MQTT */
   
   exit:
  -SPEW((stderr, "<-- %s(%p,\"%s\") rc %d\n", __FUNCTION__, mqtt, topic, rc));
       return rc;
   }
   
  -rpmRC rpmmqttUnsubscribeMany(rpmmqtt mqtt, int ac, char ** av)
  +static
  +rpmRC pahoUnsubscribeMany(rpmmqtt mqtt, int ac, char ** av)
   {
       rpmRC rc = RPMRC_FAIL;   /* assume failure */
   
  -SPEW((stderr, "--> %s(%p,%p[%d])\n", __FUNCTION__, mqtt, av, ac));
       if (ac <= 0)
        goto exit;
  -#ifdef       WITH_MQTT
       if (!rpmmqttConnect(mqtt)) {
        int _lvl = RPMLOG_DEBUG;
        for (int i = 0; i < ac; i++) {
  @@ -1273,8 +1382,8 @@
   
        MQTTAsync_responseOptions *R =
                (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R');
  -     R->onSuccess = onUnsubscribeMany;
  -     R->onFailure = onUnsubscribeManyFailure;
  +     R->onSuccess = pahoOnUnsubscribeMany;
  +     R->onFailure = pahoOnUnsubscribeManyFailure;
   
        mqtt->finished = 0;
        rc = check(mqtt, "unsubscribeMany",
  @@ -1285,67 +1394,1492 @@
            goto exit;
        rc = RPMRC_OK;
       }
  -#endif       /* WITH_MQTT */
   
   exit:
  -SPEW((stderr, "<-- %s(%p,%p[%d]) rc %d\n", __FUNCTION__, mqtt, av, ac, rc));
       return rc;
   }
   
  +static KEY pahoErrs[] = {
  +#define _ENTRY(_v)      { MQTTASYNC_##_v, #_v, }
  +    _ENTRY(SUCCESS),
  +    _ENTRY(FAILURE),
  +    _ENTRY(PERSISTENCE_ERROR),
  +    _ENTRY(DISCONNECTED),
  +    _ENTRY(MAX_MESSAGES_INFLIGHT),
  +    _ENTRY(BAD_UTF8_STRING),
  +    _ENTRY(NULL_PARAMETER),
  +    _ENTRY(TOPICNAME_TRUNCATED),
  +    _ENTRY(BAD_STRUCTURE),
  +    _ENTRY(BAD_QOS),
  +    _ENTRY(NO_MORE_MSGIDS),
  +    _ENTRY(OPERATION_INCOMPLETE),
  +    _ENTRY(MAX_BUFFERED_MESSAGES),
  +#undef       _ENTRY
  +    { 0, NULL },
  +};
  +
  +static
  +struct mqttVec_s pahoVec = {
  +    .name            = "paho",
  +    .port            = 1883,
  +    .sport           = 8883,
  +    .prefix          = "MQTTAsync_",
  +    .errs            = pahoErrs,
  +    .nerrs           = sizeof(pahoErrs) / sizeof(pahoErrs[0]),
  +    .destroy         = pahoDestroy,
  +    .create          = pahoCreate,
  +    .disconnect              = pahoDisconnect,
  +    .connect         = pahoConnect,
  +    .isconnected     = pahoIsConnected,
  +    .unsubscribe     = pahoUnsubscribe,
  +    .subscribe               = pahoSubscribe,
  +    .unsubscribeMany = pahoUnsubscribeMany,
  +    .subscribeMany   = pahoSubscribeMany,
  +    .sendMessage     = pahoSendMessage,
  +};
  +#endif       /* WITH_PAHO */
  +
   /*==============================================================*/
  -rpmRC rpmmqttPub(rpmmqtt mqtt, const char * topic, const char *s, size_t ns)
  +#ifdef       WITH_MOSQUITTO
  +static int mosqGetPassword(char *b, int size, int rwflag, void * _mqtt)
   {
  -    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    (void)mqtt;
  +    int nb = 0;
  +    /* XXX returns password in b[nb] */
  +    return nb;
  +}
   
  -    if (topic == NULL)
  -     topic = mqtt->topic;
  -    if (s == NULL)
  -     s = "";
  -    if (ns == 0)
  -     ns = strlen(s);
  +static void mosqOnConnect(struct mosquitto * I, void * _mqtt, int rc)
  +{
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    rpmlog(RPMLOG_DEBUG, "%s: connected rc(%d)\n", mqtt->vec->name, rc);
  +    mqtt->finished = 1;
  +}
   
  -#ifdef       WITH_MQTT
  -    if (rpmmqttConnect(mqtt))
  -     goto exit;
  -    {        static char _mqtt_prefix[] = "%{?_mqtt_prefix}";
  -     /* XXX extra space: prepend in *sendMessage or *send  */
  -     char * t = rpmmqttExpand(mqtt, NULL,
  -                     _mqtt_prefix, " ", s, NULL);
  -     size_t nt = strlen(t);
  +static void mosqOnDisconnect(struct mosquitto * I, void * _mqtt, int rc)
  +{
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    rpmlog(RPMLOG_DEBUG, "%s: disconnected rc(%d)\n", mqtt->vec->name, rc);
  +mqtt->connected = 0;
  +    mqtt->finished = 1;
  +}
   
  -     rc = rpmmqttSendMessage(mqtt, topic, t, nt);
  -     t = _free(t);
  -     if (rc)
  -         goto exit;
  -     rc = RPMRC_OK;
  +static void mosqOnPublish(struct mosquitto * I, void * _mqtt, int mid)
  +{
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    rpmlog(RPMLOG_DEBUG, "%s: sent(%d)\n", mqtt->vec->name, mid);
  +    mqtt->token = mid;
  +    mqtt->finished = 1;
  +}
  +
  +static void mosqOnMessage(struct mosquitto * I, void * _mqtt,
  +             const struct mosquitto_message *msg)
  +{
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +
  +    mqtt->msg_count++;
  +    mqtt->token = msg->mid;
  +
  +    const char * s = (const char *) msg->payload;
  +    size_t ns = msg->payloadlen;
  +
  +    rpmlog(RPMLOG_DEBUG, "%s: rcvd topic(%s) qos(%d) retain(%d) \"%.*s\"\n",
  +             mqtt->vec->name, msg->topic, msg->qos, (msg->retain ? 1 : 0),
  +             (int)ns, s);
  +
  +    if (mqtt->iob) {
  +     mqtt->iob = rpmiobAppend(mqtt->iob, msg->topic, 0);
  +     mqtt->iob = rpmiobAppend(mqtt->iob, ":\t", 0);
  +     {   char * t = (char *) memcpy(alloca(ns+1), s, ns);
  +         t[ns] = '\0';
  +         mqtt->iob = rpmiobAppend(mqtt->iob, t, 1);
  +     }
  +    }
  +    switch (mqtt->msg_output) {
  +    case MQTT_OUTPUT_UNKNOWN:
  +    case MQTT_OUTPUT_CALLBACK:
  +    default:
  +     break;
  +    case MQTT_OUTPUT_STDOUT:
  +    case MQTT_OUTPUT_FILE:
  +     if (mqtt->ofd) {
  +         size_t nw;
  +         nw = Fwrite(msg->topic, sizeof(*msg->topic), strlen(msg->topic),
  +                     mqtt->ofd);
  +         nw = Fwrite(":\t", 1, sizeof(":\t")-1, mqtt->ofd);
  +         nw = Fwrite(s, sizeof(*s), ns, mqtt->ofd);
  +         nw = Fwrite("\n", 1, sizeof("\n")-1, mqtt->ofd);
  +         (void)nw;
  +     }
  +     break;
       }
  +#ifdef       NEW
  +    message->qos
  +    message->retained
  +    message->dup
  +    message->msgid
  +#endif
   
  -exit:
  -#endif       /* WITH_MQTT */
  +    mqtt->finished = 1;
  +}
   
  -SPEW((stderr, "<-- %s(%p,%p[%u]) rc %d\n", __FUNCTION__, mqtt, s, 
(unsigned)ns, (int)rc));
  -    return rc;
  +static void mosqOnSubscribe(struct mosquitto * I, void * _mqtt, int mid,
  +             int qos_counted, const int *granted_qos)
  +{
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    rpmlog(RPMLOG_DEBUG, "%s: subscribed(%d) qos %p[%d]\n",
  +             mqtt->vec->name, mid, granted_qos, qos_counted);
  +    mqtt->token = mid;
  +    mqtt->finished = 1;
   }
   
  -rpmRC rpmmqttSub(rpmmqtt mqtt, const char *s, size_t ns)
  +static void mosqOnUnsubscribe(struct mosquitto * I, void * _mqtt, int mid)
   {
  -    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    rpmlog(RPMLOG_DEBUG, "%s: unsubscribed(%d)\n", mqtt->vec->name, mid);
  +    mqtt->token = mid;
  +    mqtt->finished = 1;
  +}
   
  -    if (ns == 0) ns = strlen(s);
  +static void mosqOnLog(struct mosquitto * I, void * _mqtt,
  +             int logtype, const char *str)
  +{
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    const char * subsys = "";
  +    int lvl = RPMLOG_DEBUG;
   
  -#ifdef       WITH_MQTT
  -    if (rpmmqttConnect(mqtt))
  -     goto exit;
  +    if (logtype & MOSQ_LOG_INFO)
  +     lvl = RPMLOG_INFO;
  +    if (logtype & MOSQ_LOG_NOTICE)
  +     lvl = RPMLOG_NOTICE;
  +    if (logtype & MOSQ_LOG_WARNING)
  +     lvl = RPMLOG_WARNING;
  +    if (logtype & MOSQ_LOG_ERR)
  +     lvl = RPMLOG_ERR;
  +    if (logtype & MOSQ_LOG_DEBUG)
  +     lvl = RPMLOG_DEBUG;
  +    if (logtype & MOSQ_LOG_SUBSCRIBE)
  +     subsys = "subscribe: ";
  +    if (logtype & MOSQ_LOG_SUBSCRIBE)
  +     subsys = "subscribe: ";
  +    if (logtype & MOSQ_LOG_UNSUBSCRIBE)
  +     subsys = "unsubscribe: ";
  +    if (logtype & MOSQ_LOG_WEBSOCKETS)
  +     subsys = "websockets: ";
   
  -    {        char * subtopic = rpmmqttExpand(mqtt, NULL,
  -             (s ? s : mqtt->topic), NULL);
  -     unsigned subqos = mqtt->qos;
  -     char *t, *te;
  -     int _lvl = RPMLOG_DEBUG;
  +    rpmlog(RPMLOG_DEBUG, "%s: %s%s\n", mqtt->vec->name, subsys, str);
  +}
   
  -     if ((t = strchr(subtopic, '?')) != NULL) {
  -         ARGV_t qav = NULL;
  -         int qac;
  +static
  +rpmRC mosqDestroy(rpmmqtt mqtt)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +    int xx;
  +#ifdef       NOTYET
  +    mqtt->state = _free(mqtt->state);
  +#endif
  +    if (mqtt->I) {
  +
  +     xx = check(mqtt, "loop_stop",
  +             mosquitto_loop_stop(mqtt->I, true));
  +
  +     xx = check(mqtt, "destroy",
  +             (mosquitto_destroy(mqtt->I), 0));
  +    }
  +    mqtt->I = NULL;
  +    mosquitto_lib_cleanup();
  +    return rc;
  +}
  +
  +static
  +rpmRC mosqCreate(rpmmqtt mqtt)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +    static int oneshot;
  +    int _lvl = RPMLOG_DEBUG;
  +    int xx;
  +
  +#ifdef       DYING
  +mqtt->trace = 4;     /* XXX */
  +#endif
  +
  +    rpmlog(_lvl, "==================== %s\n", mqtt->vec->name);
  +
  +    if (!oneshot) {
  +     int major = 0;
  +     int minor = 0;
  +     int revision = 0;
  +     int version = mosquitto_lib_version(&major, &minor, &revision);
  +     rpmlog(_lvl, "%19s: %d.%d.%d (%d)\n", "version",
  +             major, minor, revision, version);
  +     oneshot++;
  +    }
  +
  +    rpmlog(_lvl, "%19s: %s\n", "clientid", mqtt->clientid);
  +    rpmlog(_lvl, "%19s: %s qos(%d) timeout(%u msecs)\n",
  +             "topic", mqtt->topic, mqtt->qos, mqtt->timeout);
  +    rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags));
  +    if (mqtt->trace)
  +     rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace);
  +
  +mqtt->u = NULL;
  +dumpMQTT(__FUNCTION__, mqtt);
  +
  +    xx = check(mqtt, "lib_init",
  +             mosquitto_lib_init());
  +
  +    xx = check(mqtt, "new",
  +             (mqtt->I = mosquitto_new(mqtt->clientid,
  +                     (MF_ISSET(CLEAN) ? true : false),
  +                     mqtt)) == NULL);
  +
  +    mosquitto_connect_callback_set(mqtt->I, mosqOnConnect);
  +    mosquitto_disconnect_callback_set(mqtt->I, mosqOnDisconnect);
  +    mosquitto_publish_callback_set(mqtt->I, mosqOnPublish);
  +    mosquitto_message_callback_set(mqtt->I, mosqOnMessage);
  +    mosquitto_subscribe_callback_set(mqtt->I, mosqOnSubscribe);
  +    mosquitto_unsubscribe_callback_set(mqtt->I, mosqOnUnsubscribe);
  +
  +#ifdef       NOTYET
  +    if (mqtt->trace && rpmIsDebug())
  +#else
  +    if (mqtt->trace || mqtt->debug)
  +#endif
  +     mosquitto_log_callback_set(mqtt->I, mosqOnLog);
  +
  +/*
  + * Example 1:
  + *      delay=2, delay_max=10, exponential_backoff=False
  + *      Delays would be: 2, 4, 6, 8, 10, 10, ...
  + *
  + * Example 2:
  + *      delay=3, delay_max=30, exponential_backoff=True
  + *      Delays would be: 3, 6, 12, 24, 30, 30, ...
  + */
  +    xx = check(mqtt, "reconnect_delay_set",
  +             mosquitto_reconnect_delay_set(mqtt->I, 2, 10, false));
  +
  +    xx = check(mqtt, "user_data_set",
  +             (mosquitto_user_data_set(mqtt->I, mqtt), 0));
  +
  +    xx = check(mqtt, "loop_start",
  +             mosquitto_loop_start(mqtt->I));
  +
  +    rc = RPMRC_OK;
  +
  +    return rc;
  +}
  +
  +static
  +rpmRC mosqDisconnect(rpmmqtt mqtt)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +    int xx;
  +
  +    if (rpmmqttIsConnected(mqtt) == RPMRC_OK) {
  +
  +     xx = check(mqtt, "loop",
  +             mosquitto_loop(mqtt->I, 20000, 1));     /* XXX */
  +
  +     mqtt->finished = 0;
  +     rc = check(mqtt, "disconnect",
  +             mosquitto_disconnect(mqtt->I));
  +
  +     xx = check(mqtt, "loop",
  +             mosquitto_loop(mqtt->I, -1, 1));
  +
  +mqtt->connected = 0;
  +     if (rc)
  +         goto exit;
  +    }
  +    rc = RPMRC_OK;
  +
  +exit:
  +    return rc;
  +}
  +
  +static
  +rpmRC mosqConnect(rpmmqtt mqtt)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +    int xx;
  +
  +    if (rpmmqttIsConnected(mqtt) == RPMRC_NOTFOUND) {
  +     urlinfo u = mqtt->u;
  +
  +     if (mqtt->will_topic) {
  +         const char *payload =
  +             (mqtt->will_message ? mqtt->will_message : "");
  +         int payloadlen = strlen(payload);
  +         xx = check(mqtt, "will_set",
  +             mosquitto_will_set(mqtt->I,
  +             (mqtt->will_topic ? mqtt->will_topic : mqtt->topic),
  +             payloadlen, payload,
  +             mqtt->will_qos, /* XXX mqtt->qos? */
  +             (MF_ISSET(WILL_RETAIN) ? true : false)));
  +     } else
  +         xx = check(mqtt, "will_clear",
  +             mosquitto_will_clear(mqtt->I));
  +
  +     xx = check(mqtt, "username_pw_set",
  +             mosquitto_username_pw_set(mqtt->I,
  +                     (u && u->user ? u->user : mqtt->user),
  +                     (u && u->password ? u->password : mqtt->password)));
  +
  +     if (mqtt->cafile || mqtt->_capath) {
  +         xx = check(mqtt, "tls_set",
  +                 mosquitto_tls_set(mqtt->I,
  +                     mqtt->cafile, mqtt->_capath,
  +                     mqtt->cert, mqtt->privkey, mosqGetPassword));
  +     }
  +     if (MF_ISSET(INSECURE)) {
  +         xx = check(mqtt, "tls_insecure_set",
  +                 mosquitto_tls_insecure_set(mqtt->I, true));
  +     }
  +     if (mqtt->_psk_key) {
  +         xx = check(mqtt, "tls_psk_set",
  +                 mosquitto_tls_psk_set(mqtt->I,
  +                     mqtt->_psk_key, mqtt->_psk_identity, NULL));
  +     }
  +     if (mqtt->ciphers && mqtt->_tls_version) {
  +         static int _cert_reqs = 1;  /* XXX 0/1: SSL_VERIFY_{NONE,PEER} */
  +         char * _tls_version = rpmExpand("tlsv", mqtt->_tls_version, NULL);
  +         xx = check(mqtt, "tls_opts_set",
  +             mosquitto_tls_opts_set(mqtt->I,
  +                     _cert_reqs, _tls_version, mqtt->ciphers));
  +         _tls_version = _free(_tls_version);
  +     }
  +
  +        xx = check(mqtt, "max_inflight_messages_set",
  +             mosquitto_max_inflight_messages_set(mqtt->I,
  +                     (mqtt->max_inflight ? mqtt->max_inflight : 20)));
  +
  +     if (mqtt->_proxy) {
  +         urlinfo u;
  +         int ut = (urltype) urlSplit(mqtt->_proxy, &u);
  +         (void) ut;
  +         xx = check(mqtt, "socks5_set",
  +             mosquitto_socks5_set(mqtt->I,
  +                     u->host,
  +                     u->port,
  +                     u->user,
  +                     u->password));
  +     }
  +
  +     {   int MQTTVersion = MQTT_PROTOCOL_V31;
  +         if (!strcmp(mqtt->protocol_version, "31"))
  +             MQTTVersion = MQTT_PROTOCOL_V31;
  +         if (!strcmp(mqtt->protocol_version, "311"))
  +             MQTTVersion = MQTT_PROTOCOL_V311;
  +         xx = check(mqtt, "opts_set",
  +             mosquitto_opts_set(mqtt->I,
  +                     MOSQ_OPT_PROTOCOL_VERSION, &MQTTVersion));
  +     }
  +
  +#ifndef      NOTYET
  +     mqtt->finished = 0;
  +     /* XXX mosquitto_connect_srv(..., mqtt->_address? */
  +     if (mqtt->_address) {
  +         rc = check(mqtt, "connect_bind_async",
  +                 mosquitto_connect_bind_async(mqtt->I,
  +                     (u && u->host ? u->host : mqtt->host),
  +                     (u && u->host ? u->port : mqtt->port),
  +                     mqtt->keepalive, mqtt->_address));
  +     } else
  +         rc = check(mqtt, "connect_async",
  +                 mosquitto_connect_async(mqtt->I,
  +                     (u && u->host ? u->host : mqtt->host),
  +                     (u && u->host ? u->port : mqtt->port),
  +                     mqtt->keepalive));
  +     
  +#else
  +     if (mqtt->_address) {
  +         if (MF_ISSET(DNSSRV)) {
  +             rc = check(mqtt, "connect_srv",
  +                 mosquitto_connect_srv(mqtt->I,
  +                     (u && u->host ? u->host : mqtt->host),
  +                     mqtt->keepalive, mqtt->_address));
  +         } else {
  +             rc = check(mqtt, "connect_bind",
  +                 mosquitto_connect_bind(mqtt->I,
  +                     (u && u->host ? u->host : mqtt->host),
  +                     (u && u->host ? u->port : mqtt->port),
  +                     mqtt->keepalive, mqtt->_address));
  +         }
  +     } else
  +         rc = check(mqtt, "connect",
  +                 mosquitto_connect(mqtt->I,
  +                     (u && u->host ? u->host : mqtt->host),
  +                     (u && u->host ? u->port : mqtt->port),
  +                     mqtt->keepalive));
  +#endif
  +
  +     xx = check(mqtt, "loop",
  +             mosquitto_loop(mqtt->I, -1, 1));
  +
  +     if (rc)
  +         goto exit;
  +mqtt->connected = 1;
  +    }
  +    rc = RPMRC_OK;
  +
  +exit:
  +    return rc;
  +}
  +
  +static
  +rpmRC mosqIsConnected(rpmmqtt mqtt)
  +{
  +    rpmRC rc = (mqtt->connected ? RPMRC_OK : RPMRC_NOTFOUND);
  +    return rc;
  +}
  +
  +static
  +rpmRC mosqSendMessage(rpmmqtt mqtt, const char * topic,
  +             const char * s, size_t ns)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +    int xx;
  +
  +    if (topic == NULL)
  +     topic = mqtt->topic;
  +    if (s == NULL)
  +     s = "";
  +    if (ns == 0)
  +     ns = strlen(s);
  +
  +    if (!rpmmqttConnect(mqtt)) {
  +
  +     mqtt->finished = 0;
  +     rc = check(mqtt, "publish",
  +             mosquitto_publish(mqtt->I,
  +                     (int *)&mqtt->token,
  +                     (topic ? topic : mqtt->topic),
  +                     ns, s,
  +                     mqtt->qos,
  +                     (MF_ISSET(RETAIN) ? true : false)));
  +
  +     rpmlog(RPMLOG_DEBUG, "%s: send(%d) topic(%s) \"%.*s\"\n",
  +                     mqtt->vec->name, mqtt->token, mqtt->topic, (int)ns, s);
  +
  +     xx = check(mqtt, "loop",
  +             mosquitto_loop(mqtt->I, -1, 1));
  +
  +     if (rc)
  +         goto exit;
  +     rc = RPMRC_OK;
  +    }
  +
  +exit:
  +    return rc;
  +}
  +
  +static
  +rpmRC mosqSubscribeMany(rpmmqtt mqtt, int ac, char ** av)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +
  +    if (ac <= 0)
  +     goto exit;
  +    if (!rpmmqttConnect(mqtt)) {
  +     int *subqos = (int *) xcalloc(ac, sizeof(*subqos));
  +     for (int i = 0; i < ac; i++) {
  +         char * t = av[i];
  +#ifdef       NOTYET          /* XXX qos as subtopic ?qos=N? */
  +         char * te = strchr(t, '?');
  +         if (te) {
  +             *te++ = '\0';
  +             if ((te = strchr(te, '='))) {
  +                 if (!strncmp(t, "qos", (te - t)))
  +                     subqos[i] = strtoul(te+1, NULL, 0);
  +             }
  +         } else
  +#endif
  +             subqos[i] = mqtt->qos;  /* XXX */
  +         rc = rpmmqttSubscribe(mqtt, t, subqos[i]);
  +         if (rc)
  +             break;
  +     }
  +
  +     if (subqos)
  +         free(subqos);
  +     if (rc)
  +         goto exit;
  +     rc = RPMRC_OK;
  +    }
  +
  +exit:
  +    return rc;
  +}
  +
  +static
  +rpmRC mosqSubscribe(rpmmqtt mqtt, const char * topic, int qos)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +    int xx;
  +
  +    if (!rpmmqttConnect(mqtt)) {
  +
  +     rpmlog(RPMLOG_DEBUG, "%19s: %s qos(%d)\n", "subscribe", topic, qos);
  +
  +     mqtt->finished = 0;
  +     rc = check(mqtt, "subscribe",
  +             mosquitto_subscribe(mqtt->I,
  +                     (int *)&mqtt->token,
  +                     (topic ? topic : mqtt->topic),
  +                     qos));
  +
  +     xx = check(mqtt, "loop",
  +             mosquitto_loop(mqtt->I, -1, 1));
  +
  +     if (rc)
  +         goto exit;
  +     rc = RPMRC_OK;
  +    }
  +
  +exit:
  +    return rc;
  +}
  +
  +static
  +rpmRC mosqUnsubscribe(rpmmqtt mqtt, const char * topic)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +    int xx;
  +
  +    if (!rpmmqttConnect(mqtt)) {
  +
  +     rpmlog(RPMLOG_DEBUG, "%19s: %s\n", "unsubscribe", topic);
  +
  +     mqtt->finished = 0;
  +     rc = check(mqtt, "unsubscribe",
  +             mosquitto_unsubscribe(mqtt->I,
  +                     (int *)&mqtt->token,
  +                     (topic ? topic : mqtt->topic)));
  +
  +     xx = check(mqtt, "loop",
  +             mosquitto_loop(mqtt->I, -1, 1));
  +
  +     if (rc)
  +         goto exit;
  +     rc = RPMRC_OK;
  +    }
  +
  +exit:
  +    return rc;
  +}
  +
  +static
  +rpmRC mosqUnsubscribeMany(rpmmqtt mqtt, int ac, char ** av)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +
  +    if (ac <= 0)
  +     goto exit;
  +    if (!rpmmqttConnect(mqtt)) {
  +     for (int i = 0; i < ac; i++) {
  +         rc = rpmmqttUnsubscribe(mqtt, av[i]);
  +         if (rc)
  +             goto exit;
  +     }
  +    }
  +
  +exit:
  +    return rc;
  +}
  +
  +static KEY mosqErrs[] = {
  +#define _ENTRY(_v)      { MOSQ_ERR_##_v, #_v, }
  +    _ENTRY(CONN_PENDING),
  +    _ENTRY(SUCCESS),
  +    _ENTRY(NOMEM),
  +    _ENTRY(PROTOCOL),
  +    _ENTRY(INVAL),
  +    _ENTRY(NO_CONN),
  +    _ENTRY(CONN_REFUSED),
  +    _ENTRY(NOT_FOUND),
  +    _ENTRY(CONN_LOST),
  +    _ENTRY(TLS),
  +    _ENTRY(PAYLOAD_SIZE),
  +    _ENTRY(NOT_SUPPORTED),
  +    _ENTRY(AUTH),
  +    _ENTRY(ACL_DENIED),
  +    _ENTRY(UNKNOWN),
  +    _ENTRY(ERRNO),
  +    _ENTRY(EAI),
  +    _ENTRY(PROXY),
  +#undef       _ENTRY
  +    { 0, NULL },
  +};
  +
  +static
  +struct mqttVec_s mosqVec = {
  +    .name            = "mosquitto",
  +    .port            = 1883,
  +    .sport           = 8883,
  +    .prefix          = "mosquitto_",
  +    .errs            = mosqErrs,
  +    .nerrs           = sizeof(mosqErrs) / sizeof(mosqErrs[0]),
  +    .destroy         = mosqDestroy,
  +    .create          = mosqCreate,
  +    .disconnect              = mosqDisconnect,
  +    .connect         = mosqConnect,
  +    .isconnected     = mosqIsConnected,
  +    .unsubscribe     = mosqUnsubscribe,
  +    .subscribe               = mosqSubscribe,
  +    .unsubscribeMany = mosqUnsubscribeMany,
  +    .subscribeMany   = mosqSubscribeMany,
  +    .sendMessage     = mosqSendMessage,
  +};
  +#endif       /* WITH_MOSQUITTO */
  +
  +/*==============================================================*/
  +#ifdef       WITH_RABBITMQ
  +static
  +rpmRC amqpDestroy(rpmmqtt mqtt)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +#ifdef       NOTYET
  +    rc = check(mqtt, "destroy",
  +             (MQTTAsync_destroy(&mqtt->I), 0));
  +    mqtt->state = _free(mqtt->state);
  +    mqtt->I = NULL;
  +#endif
  +    return rc;
  +}
  +
  +static
  +rpmRC amqpCreate(rpmmqtt mqtt)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +#ifdef       NOTYET
  +    static int oneshot;
  +    int _lvl = RPMLOG_DEBUG;
  +    int xx;
  +
  +#ifdef       DYING
  +mqtt->trace = 4;     /* XXX */
  +#endif
  +    if (mqtt->trace && rpmIsDebug()) {
  +     xx = check(mqtt, "setTraceCallback",
  +             (MQTTAsync_setTraceCallback(onTrace), 0));
  +     xx = check(mqtt, "setTraceLevel",
  +             (MQTTAsync_setTraceLevel((enum 
MQTTASYNC_TRACE_LEVELS)mqtt->trace), 0));
  +    }
  +
  +    rpmlog(_lvl, "==================== %s\n", mqtt->vec->name);
  +
  +    if (!oneshot) {
  +     if (mqtt->trace == 0) {
  +         MQTTAsync_nameValue *NV = MQTTAsync_getVersionInfo();
  +         while (NV->name) {
  +             rpmlog(_lvl, "%19s: %s\n", NV->name, NV->value);
  +             NV++;
  +         }
  +     }
  +     oneshot++;
  +    }
  +
  +    /* XXX improve integration */
  +static const char _mqtt_persist[] =
  +     "%{?_mqtt_persist}%{!?_mqtt_persist:2}";
  +    mqtt->persist_type = (rpmmqttExpandNumeric(mqtt, _mqtt_persist) % 3);
  +static const char _mqtt_cachedir[] =
  +     "%{?_mqtt_cachedir}%{!?_mqtt_cachedir:/var/cache/mqtt}";
  +char *persist_path = rpmGetPath(_mqtt_cachedir, NULL);
  +
  +    rpmlog(_lvl, "%19s: %s\n", "clientid", mqtt->clientid);
  +    rpmlog(_lvl, "%19s: %s qos(%d) timeout(%u msecs)\n",
  +             "topic", mqtt->topic, mqtt->qos, mqtt->timeout);
  +    rpmlog(_lvl, "%19s: type(%u) %s\n",
  +             "persist", mqtt->persist_type,
  +             (mqtt->persist_type ? persist_path : ""));
  +    rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags));
  +    if (mqtt->trace)
  +     rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace);
  +
  +mqtt->persist_path = _free(mqtt->persist_path);
  +mqtt->persist_ctx = _free(mqtt->persist_ctx);
  +    switch (mqtt->persist_type) {
  +    default:
  +    case MQTTCLIENT_PERSISTENCE_NONE:
  +     mqtt->persist_ctx = NULL;
  +     break;
  +    case MQTTCLIENT_PERSISTENCE_DEFAULT:
  +      {
  +     mqtt->persist_path = xstrdup(persist_path);
  +     /* XXX rpmmqttFini double free */
  +     mqtt->persist_ctx = (void *)xstrdup(mqtt->persist_path);
  +      } break;
  +    case MQTTCLIENT_PERSISTENCE_USER:
  +      {
  +     mqtt->persist_path = xstrdup(persist_path);
  +     MQTTClient_persistence * ctx =
  +             (MQTTClient_persistence *) xmalloc(sizeof(*ctx));
  +     *ctx = _mqtt_persistence;       /* structure assignment */
  +     ctx->context = mqtt;
  +     mqtt->persist_ctx = ctx;
  +      } break;
  +    }
  +persist_path = _free(persist_path);
  +
  +mqtt->u = NULL;
  +dumpMQTT(__FUNCTION__, mqtt);
  +
  +    if (mqtt->I == NULL) {
  +     xx = check(mqtt, "createWithOptions",
  +             MQTTAsync_createWithOptions(&mqtt->I,
  +                     mqtt->uri, mqtt->clientid,
  +                     mqtt->persist_type, mqtt->persist_ctx,
  +                     (MQTTAsync_createOptions *)AOBJ(mqtt, 'O')));
  +
  +     xx = check(mqtt, "setCallbacks",
  +             MQTTAsync_setCallbacks(mqtt->I, mqtt,
  +                     amqpOnConnectionLost,
  +                     amqpOnMessageArrived,
  +                     amqpOnDeliveryComplete));
  +    }
  +
  +    rc = RPMRC_OK;
  +#endif       /* NOTYET */
  +
  +    return rc;
  +}
  +
  +static
  +rpmRC amqpDisconnect(rpmmqtt mqtt)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +    if (rpmmqttIsConnected(mqtt) == RPMRC_OK) {
  +#ifdef       NOTYET
  +     mqtt->finished = 0;
  +     rc = check(mqtt, "disconnect",
  +             MQTTAsync_disconnect(mqtt->I,
  +                     (const MQTTAsync_disconnectOptions*)AOBJ(mqtt, 'D')));
  +     while (!mqtt->finished)
  +         usleep(100);
  +#endif
  +     if (rc)
  +         goto exit;
  +    }
  +    rc = RPMRC_OK;
  +
  +exit:
  +    return rc;
  +}
  +
  +static
  +rpmRC amqpConnect(rpmmqtt mqtt)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +
  +    if (rpmmqttIsConnected(mqtt) == RPMRC_NOTFOUND) {
  +#ifdef       NOTYET
  +     mqtt->finished = 0;
  +     rc = check(mqtt, "connect",
  +             MQTTAsync_connect(mqtt->I,
  +                     (const MQTTAsync_connectOptions*)AOBJ(mqtt, 'C')));
  +     while (!mqtt->finished)
  +         usleep(1000);
  +#endif
  +     if (rc)
  +         goto exit;
  +    }
  +    rc = RPMRC_OK;
  +
  +exit:
  +    return rc;
  +}
  +
  +static
  +rpmRC amqpIsConnected(rpmmqtt mqtt)
  +{
  +    rpmRC rc = RPMRC_NOTFOUND;
  +
  +#ifdef       NOTYET
  +    if (check(mqtt, "isconnected",
  +             MQTTAsync_isConnected(mqtt->I)))
  +     rc = RPMRC_OK;
  +#endif
  +    return rc;
  +}
  +
  +static
  +rpmRC amqpSendMessage(rpmmqtt mqtt, const char * topic,
  +             const char * s, size_t ns)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +
  +    if (topic == NULL)
  +     topic = mqtt->topic;
  +    if (s == NULL)
  +     s = "";
  +    if (ns == 0)
  +     ns = strlen(s);
  +
  +    if (!rpmmqttConnect(mqtt)) {
  +#ifdef       NOTYET
  +     MQTTAsync_message *M =
  +         (MQTTAsync_message *) AOBJ(mqtt, 'M');
  +     M->payloadlen = ns;
  +     M->payload = (char *) s;
  +
  +     MQTTAsync_responseOptions *R =
  +         (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R');
  +     R->onSuccess = amqpOnSend;
  +     R->onFailure = amqpOnSendFailure;
  +
  +     mqtt->finished = 0;
  +     rc = check(mqtt, "sendMessage",
  +             MQTTAsync_sendMessage(mqtt->I, topic, M, R));
  +     while (!mqtt->finished)
  +         usleep(100);
  +#endif
  +     if (rc)
  +         goto exit;
  +     rc = RPMRC_OK;
  +    }
  +
  +exit:
  +    return rc;
  +}
  +
  +static
  +rpmRC amqpSubscribeMany(rpmmqtt mqtt, int ac, char ** av)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +
  +    if (ac <= 0)
  +     goto exit;
  +    if (!rpmmqttConnect(mqtt)) {
  +     int _lvl = RPMLOG_DEBUG;
  +     int *subqos = (int *) xcalloc(ac, sizeof(*subqos));
  +     for (int i = 0; i < ac; i++) {
  +         char * t = av[i];
  +#ifdef       NOTYET
  +         char * te = strchr(t, '?');
  +         if (te) {
  +             *te++ = '\0';
  +             if ((te = strchr(te, '='))) {
  +                 if (!strncmp(t, "qos", (te - t)))
  +                     subqos[i] = strtoul(te+1, NULL, 0);
  +             }
  +         } else
  +#endif
  +             subqos[i] = mqtt->qos;  /* XXX */
  +         rpmlog(_lvl, "%19s: %s qos(%u)\n", "subscribe", t, subqos[i]);
  +     }
  +
  +#ifdef       NOTYET
  +     MQTTAsync_responseOptions *R =
  +             (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R');
  +     R->onSuccess = amqpOnSubscribeMany;
  +     R->onFailure = amqpOnSubscribeManyFailure;
  +
  +     mqtt->finished = 0;
  +     rc = check(mqtt, "subscribeMany",
  +             MQTTAsync_subscribeMany(mqtt->I, ac, av, subqos, R));
  +     while (!mqtt->finished)
  +         usleep(100);
  +#endif
  +     if (subqos)
  +         free(subqos);
  +     if (rc)
  +         goto exit;
  +     rc = RPMRC_OK;
  +    }
  +
  +exit:
  +    return rc;
  +}
  +
  +static
  +rpmRC amqpSubscribe(rpmmqtt mqtt, const char * topic, int qos)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +
  +    if (!rpmmqttConnect(mqtt)) {
  +     int _lvl = RPMLOG_DEBUG;
  +
  +     rpmlog(_lvl, "%19s: %s qos(%d)\n", "subscribe", topic, qos);
  +
  +#ifdef       NOTYET
  +     MQTTAsync_responseOptions *R =
  +             (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R');
  +     R->onSuccess = amqpOnSubscribe;
  +     R->onFailure = amqpOnSubscribeFailure;
  +
  +     mqtt->finished = 0;
  +     rc = check(mqtt, "subscribe",
  +             MQTTAsync_subscribe(mqtt->I, topic, qos, R));
  +     while (!mqtt->finished)
  +         usleep(100);
  +#endif
  +     if (rc)
  +         goto exit;
  +     rc = RPMRC_OK;
  +    }
  +
  +exit:
  +    return rc;
  +}
  +
  +static
  +rpmRC amqpUnsubscribe(rpmmqtt mqtt, const char * topic)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +
  +    if (!rpmmqttConnect(mqtt)) {
  +     int _lvl = RPMLOG_DEBUG;
  +
  +     rpmlog(_lvl, "%19s: %s\n", "unsubscribe", topic);
  +
  +#ifdef       NOTYET
  +     MQTTAsync_responseOptions *R =
  +             (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R');
  +     R->onSuccess = amqpOnUnsubscribeMany;
  +     R->onFailure = amqpOnUnsubscribeManyFailure;
  +
  +     mqtt->finished = 0;
  +     rc = check(mqtt, "unsubscribe",
  +             MQTTAsync_unsubscribe(mqtt->I, topic, R));
  +     while (!mqtt->finished)
  +         usleep(100);
  +#endif
  +     if (rc)
  +         goto exit;
  +     rc = RPMRC_OK;
  +    }
  +
  +exit:
  +    return rc;
  +}
  +
  +static
  +rpmRC amqpUnsubscribeMany(rpmmqtt mqtt, int ac, char ** av)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +
  +    if (ac <= 0)
  +     goto exit;
  +    if (!rpmmqttConnect(mqtt)) {
  +     int _lvl = RPMLOG_DEBUG;
  +     for (int i = 0; i < ac; i++) {
  +         char * t = av[i];
  +         rpmlog(_lvl, "%19s: %s\n", "unsubscribe", t);
  +     }
  +
  +#ifdef       NOTYET
  +     MQTTAsync_responseOptions *R =
  +             (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R');
  +     R->onSuccess = amqpOnUnsubscribeMany;
  +     R->onFailure = amqpOnUnsubscribeManyFailure;
  +
  +     mqtt->finished = 0;
  +     rc = check(mqtt, "unsubscribeMany",
  +             MQTTAsync_unsubscribeMany(mqtt->I, ac, av, R));
  +     while (!mqtt->finished)
  +         usleep(100);
  +#endif
  +
  +     if (rc)
  +         goto exit;
  +     rc = RPMRC_OK;
  +    }
  +
  +exit:
  +    return rc;
  +}
  +
  +static KEY amqpErrs[] = {
  +#define _ENTRY(_v)      { AMQP_STATUS_##_v, #_v, }
  +    _ENTRY(OK),
  +    _ENTRY(NO_MEMORY),
  +    _ENTRY(BAD_AMQP_DATA),
  +    _ENTRY(UNKNOWN_CLASS),
  +    _ENTRY(UNKNOWN_METHOD),
  +    _ENTRY(HOSTNAME_RESOLUTION_FAILED),
  +    _ENTRY(INCOMPATIBLE_AMQP_VERSION),
  +    _ENTRY(CONNECTION_CLOSED),
  +    _ENTRY(BAD_URL),
  +    _ENTRY(SOCKET_ERROR),
  +    _ENTRY(INVALID_PARAMETER),
  +    _ENTRY(TABLE_TOO_BIG),
  +    _ENTRY(WRONG_METHOD),
  +    _ENTRY(TIMEOUT),
  +    _ENTRY(TIMER_FAILURE),
  +    _ENTRY(HEARTBEAT_TIMEOUT),
  +    _ENTRY(UNEXPECTED_STATE),
  +    _ENTRY(SOCKET_CLOSED),
  +    _ENTRY(SOCKET_INUSE),
  +    _ENTRY(BROKER_UNSUPPORTED_SASL_METHOD),
  +    _ENTRY(UNSUPPORTED),
  +    _ENTRY(TCP_ERROR),
  +    _ENTRY(TCP_SOCKETLIB_INIT_ERROR),
  +    _ENTRY(SSL_ERROR),
  +    _ENTRY(SSL_HOSTNAME_VERIFY_FAILED),
  +    _ENTRY(SSL_PEER_VERIFY_FAILED),
  +    _ENTRY(SSL_CONNECTION_FAILED),
  +#undef       _ENTRY
  +    { 0, NULL },
  +};
  +
  +static
  +struct mqttVec_s amqpVec = {
  +    .name            = "amqp",
  +    .port            = 5672,
  +    .sport           = 5671,
  +    .prefix          = "amqp_",
  +    .errs            = amqpErrs,
  +    .nerrs           = sizeof(amqpErrs) / sizeof(amqpErrs[0]),
  +    .destroy         = amqpDestroy,
  +    .create          = amqpCreate,
  +    .disconnect              = amqpDisconnect,
  +    .connect         = amqpConnect,
  +    .isconnected     = amqpIsConnected,      /* XXX */
  +    .unsubscribe     = amqpUnsubscribe,
  +    .subscribe               = amqpSubscribe,
  +    .unsubscribeMany = amqpUnsubscribeMany,
  +    .subscribeMany   = amqpSubscribeMany,
  +    .sendMessage     = amqpSendMessage,
  +};
  +#endif       /* WITH_RABBITMQ */
  +
  +/*==============================================================*/
  +#ifdef       WITH_ZEROMQ
  +static
  +rpmRC zmqDestroy(rpmmqtt mqtt)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +#ifdef       NOTYET
  +    rc = check(mqtt, "destroy",
  +             (MQTTAsync_destroy(&mqtt->I), 0));
  +    mqtt->state = _free(mqtt->state);
  +    mqtt->I = NULL;
  +#endif
  +    return rc;
  +}
  +
  +static
  +rpmRC zmqCreate(rpmmqtt mqtt)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +#ifdef       NOTYET
  +    static int oneshot;
  +    int _lvl = RPMLOG_DEBUG;
  +    int xx;
  +
  +#ifdef       DYING
  +mqtt->trace = 4;     /* XXX */
  +#endif
  +    if (mqtt->trace && rpmIsDebug()) {
  +     xx = check(mqtt, "setTraceCallback",
  +             (MQTTAsync_setTraceCallback(onTrace), 0));
  +     xx = check(mqtt, "setTraceLevel",
  +             (MQTTAsync_setTraceLevel((enum 
MQTTASYNC_TRACE_LEVELS)mqtt->trace), 0));
  +    }
  +
  +    rpmlog(_lvl, "==================== %s\n", mqtt->vec->name);
  +
  +    if (!oneshot) {
  +     if (mqtt->trace == 0) {
  +         MQTTAsync_nameValue *NV = MQTTAsync_getVersionInfo();
  +         while (NV->name) {
  +             rpmlog(_lvl, "%19s: %s\n", NV->name, NV->value);
  +             NV++;
  +         }
  +     }
  +     oneshot++;
  +    }
  +
  +    /* XXX improve integration */
  +static const char _mqtt_persist[] =
  +     "%{?_mqtt_persist}%{!?_mqtt_persist:2}";
  +    mqtt->persist_type = (rpmmqttExpandNumeric(mqtt, _mqtt_persist) % 3);
  +static const char _mqtt_cachedir[] =
  +     "%{?_mqtt_cachedir}%{!?_mqtt_cachedir:/var/cache/mqtt}";
  +char *persist_path = rpmGetPath(_mqtt_cachedir, NULL);
  +
  +    rpmlog(_lvl, "%19s: %s\n", "clientid", mqtt->clientid);
  +    rpmlog(_lvl, "%19s: %s qos(%d) timeout(%u msecs)\n",
  +             "topic", mqtt->topic, mqtt->qos, mqtt->timeout);
  +    rpmlog(_lvl, "%19s: type(%u) %s\n",
  +             "persist", mqtt->persist_type,
  +             (mqtt->persist_type ? persist_path : ""));
  +    rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags));
  +    if (mqtt->trace)
  +     rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace);
  +
  +mqtt->persist_path = _free(mqtt->persist_path);
  +mqtt->persist_ctx = _free(mqtt->persist_ctx);
  +    switch (mqtt->persist_type) {
  +    default:
  +    case MQTTCLIENT_PERSISTENCE_NONE:
  +     mqtt->persist_ctx = NULL;
  +     break;
  +    case MQTTCLIENT_PERSISTENCE_DEFAULT:
  +      {
  +     mqtt->persist_path = xstrdup(persist_path);
  +     /* XXX rpmmqttFini double free */
  +     mqtt->persist_ctx = (void *)xstrdup(mqtt->persist_path);
  +      } break;
  +    case MQTTCLIENT_PERSISTENCE_USER:
  +      {
  +     mqtt->persist_path = xstrdup(persist_path);
  +     MQTTClient_persistence * ctx =
  +             (MQTTClient_persistence *) xmalloc(sizeof(*ctx));
  +     *ctx = _mqtt_persistence;       /* structure assignment */
  +     ctx->context = mqtt;
  +     mqtt->persist_ctx = ctx;
  +      } break;
  +    }
  +persist_path = _free(persist_path);
  +
  +mqtt->u = NULL;
  +dumpMQTT(__FUNCTION__, mqtt);
  +
  +    if (mqtt->I == NULL) {
  +     xx = check(mqtt, "createWithOptions",
  +             MQTTAsync_createWithOptions(&mqtt->I,
  +                     mqtt->uri, mqtt->clientid,
  +                     mqtt->persist_type, mqtt->persist_ctx,
  +                     (MQTTAsync_createOptions *)AOBJ(mqtt, 'O')));
  +
  +     xx = check(mqtt, "setCallbacks",
  +             MQTTAsync_setCallbacks(mqtt->I, mqtt,
  +                     zmqOnConnectionLost,
  +                     zmqOnMessageArrived,
  +                     zmqOnDeliveryComplete));
  +    }
  +
  +    rc = RPMRC_OK;
  +#endif       /* NOTYET */
  +
  +    return rc;
  +}
  +
  +static
  +rpmRC zmqDisconnect(rpmmqtt mqtt)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +    if (rpmmqttIsConnected(mqtt) == RPMRC_OK) {
  +#ifdef       NOTYET
  +     mqtt->finished = 0;
  +     rc = check(mqtt, "disconnect",
  +             MQTTAsync_disconnect(mqtt->I,
  +                     (const MQTTAsync_disconnectOptions*)AOBJ(mqtt, 'D')));
  +     while (!mqtt->finished)
  +         usleep(100);
  +#endif
  +     if (rc)
  +         goto exit;
  +    }
  +    rc = RPMRC_OK;
  +
  +exit:
  +    return rc;
  +}
  +
  +static
  +rpmRC zmqConnect(rpmmqtt mqtt)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +
  +    if (rpmmqttIsConnected(mqtt) == RPMRC_NOTFOUND) {
  +#ifdef       NOTYET
  +     mqtt->finished = 0;
  +     rc = check(mqtt, "connect",
  +             MQTTAsync_connect(mqtt->I,
  +                     (const MQTTAsync_connectOptions*)AOBJ(mqtt, 'C')));
  +     while (!mqtt->finished)
  +         usleep(1000);
  +#endif
  +     if (rc)
  +         goto exit;
  +    }
  +    rc = RPMRC_OK;
  +
  +exit:
  +    return rc;
  +}
  +
  +static
  +rpmRC zmqIsConnected(rpmmqtt mqtt)
  +{
  +    rpmRC rc = RPMRC_NOTFOUND;
  +
  +#ifdef       NOTYET
  +    if (check(mqtt, "isconnected",
  +             MQTTAsync_isConnected(mqtt->I)))
  +     rc = RPMRC_OK;
  +#endif
  +    return rc;
  +}
  +
  +static
  +rpmRC zmqSendMessage(rpmmqtt mqtt, const char * topic,
  +             const char * s, size_t ns)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +
  +    if (topic == NULL)
  +     topic = mqtt->topic;
  +    if (s == NULL)
  +     s = "";
  +    if (ns == 0)
  +     ns = strlen(s);
  +
  +    if (!rpmmqttConnect(mqtt)) {
  +#ifdef       NOTYET
  +     MQTTAsync_message *M =
  +         (MQTTAsync_message *) AOBJ(mqtt, 'M');
  +     M->payloadlen = ns;
  +     M->payload = (char *) s;
  +
  +     MQTTAsync_responseOptions *R =
  +         (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R');
  +     R->onSuccess = zmqOnSend;
  +     R->onFailure = zmqOnSendFailure;
  +
  +     mqtt->finished = 0;
  +     rc = check(mqtt, "sendMessage",
  +             MQTTAsync_sendMessage(mqtt->I, topic, M, R));
  +     while (!mqtt->finished)
  +         usleep(100);
  +#endif
  +     if (rc)
  +         goto exit;
  +     rc = RPMRC_OK;
  +    }
  +
  +exit:
  +    return rc;
  +}
  +
  +static
  +rpmRC zmqSubscribeMany(rpmmqtt mqtt, int ac, char ** av)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +
  +    if (ac <= 0)
  +     goto exit;
  +    if (!rpmmqttConnect(mqtt)) {
  +     int _lvl = RPMLOG_DEBUG;
  +     int *subqos = (int *) xcalloc(ac, sizeof(*subqos));
  +     for (int i = 0; i < ac; i++) {
  +         char * t = av[i];
  +#ifdef       NOTYET
  +         char * te = strchr(t, '?');
  +         if (te) {
  +             *te++ = '\0';
  +             if ((te = strchr(te, '='))) {
  +                 if (!strncmp(t, "qos", (te - t)))
  +                     subqos[i] = strtoul(te+1, NULL, 0);
  +             }
  +         } else
  +#endif
  +             subqos[i] = mqtt->qos;  /* XXX */
  +         rpmlog(_lvl, "%19s: %s qos(%u)\n", "subscribe", t, subqos[i]);
  +     }
  +
  +#ifdef       NOTYET
  +     MQTTAsync_responseOptions *R =
  +             (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R');
  +     R->onSuccess = zmqOnSubscribeMany;
  +     R->onFailure = zmqOnSubscribeManyFailure;
  +
  +     mqtt->finished = 0;
  +     rc = check(mqtt, "subscribeMany",
  +             MQTTAsync_subscribeMany(mqtt->I, ac, av, subqos, R));
  +     while (!mqtt->finished)
  +         usleep(100);
  +#endif
  +     if (subqos)
  +         free(subqos);
  +     if (rc)
  +         goto exit;
  +     rc = RPMRC_OK;
  +    }
  +
  +exit:
  +    return rc;
  +}
  +
  +static
  +rpmRC zmqSubscribe(rpmmqtt mqtt, const char * topic, int qos)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +
  +    if (!rpmmqttConnect(mqtt)) {
  +     int _lvl = RPMLOG_DEBUG;
  +
  +     rpmlog(_lvl, "%19s: %s qos(%d)\n", "subscribe", topic, qos);
  +
  +#ifdef       NOTYET
  +     MQTTAsync_responseOptions *R =
  +             (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R');
  +     R->onSuccess = zmqOnSubscribe;
  +     R->onFailure = zmqOnSubscribeFailure;
  +
  +     mqtt->finished = 0;
  +     rc = check(mqtt, "subscribe",
  +             MQTTAsync_subscribe(mqtt->I, topic, qos, R));
  +     while (!mqtt->finished)
  +         usleep(100);
  +#endif
  +     if (rc)
  +         goto exit;
  +     rc = RPMRC_OK;
  +    }
  +
  +exit:
  +    return rc;
  +}
  +
  +static
  +rpmRC zmqUnsubscribe(rpmmqtt mqtt, const char * topic)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +
  +    if (!rpmmqttConnect(mqtt)) {
  +     int _lvl = RPMLOG_DEBUG;
  +
  +     rpmlog(_lvl, "%19s: %s\n", "unsubscribe", topic);
  +
  +#ifdef       NOTYET
  +     MQTTAsync_responseOptions *R =
  +             (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R');
  +     R->onSuccess = zmqOnUnsubscribeMany;
  +     R->onFailure = zmqOnUnsubscribeManyFailure;
  +
  +     mqtt->finished = 0;
  +     rc = check(mqtt, "unsubscribe",
  +             MQTTAsync_unsubscribe(mqtt->I, topic, R));
  +     while (!mqtt->finished)
  +         usleep(100);
  +#endif
  +     if (rc)
  +         goto exit;
  +     rc = RPMRC_OK;
  +    }
  +
  +exit:
  +    return rc;
  +}
  +
  +static
  +rpmRC zmqUnsubscribeMany(rpmmqtt mqtt, int ac, char ** av)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +
  +    if (ac <= 0)
  +     goto exit;
  +    if (!rpmmqttConnect(mqtt)) {
  +     int _lvl = RPMLOG_DEBUG;
  +     for (int i = 0; i < ac; i++) {
  +         char * t = av[i];
  +         rpmlog(_lvl, "%19s: %s\n", "unsubscribe", t);
  +     }
  +
  +#ifdef       NOTYET
  +     MQTTAsync_responseOptions *R =
  +             (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R');
  +     R->onSuccess = zmqOnUnsubscribeMany;
  +     R->onFailure = zmqOnUnsubscribeManyFailure;
  +
  +     mqtt->finished = 0;
  +     rc = check(mqtt, "unsubscribeMany",
  +             MQTTAsync_unsubscribeMany(mqtt->I, ac, av, R));
  +     while (!mqtt->finished)
  +         usleep(100);
  +#endif
  +
  +     if (rc)
  +         goto exit;
  +     rc = RPMRC_OK;
  +    }
  +
  +exit:
  +    return rc;
  +}
  +
  +static KEY zmqErrs[] = {
  +    { 0, NULL },
  +};
  +
  +static
  +struct mqttVec_s zmqVec = {
  +    .name            = "zmq",
  +    .port            = 0xfffe,       /* XXX W2DO? */
  +    .sport           = 0xfffe,       /* XXX W2DO? */
  +    .prefix          = "zmq_",
  +    .errs            = zmqErrs,      /* XXX W2DO? */
  +    .nerrs           = sizeof(zmqErrs) / sizeof(zmqErrs[0]),
  +    .destroy         = zmqDestroy,
  +    .create          = zmqCreate,
  +    .disconnect              = zmqDisconnect,
  +    .connect         = zmqConnect,
  +    .isconnected     = zmqIsConnected,       /* XXX */
  +    .unsubscribe     = zmqUnsubscribe,
  +    .subscribe               = zmqSubscribe,
  +    .unsubscribeMany = zmqUnsubscribeMany,
  +    .subscribeMany   = zmqSubscribeMany,
  +    .sendMessage     = zmqSendMessage,
  +};
  +#endif       /* WITH_ZEROMQ */
  +
  +/*==============================================================*/
  +rpmRC rpmmqttPub(rpmmqtt mqtt, const char * topic, const char *s, size_t ns)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +
  +    if (topic == NULL)
  +     topic = mqtt->topic;
  +    if (s == NULL)
  +     s = "";
  +    if (ns == 0)
  +     ns = strlen(s);
  +
  +    if (rpmmqttConnect(mqtt))
  +     goto exit;
  +    {        static char _mqtt_prefix[] = "%{?_mqtt_prefix}";
  +     /* XXX extra space: prepend in *sendMessage or *send  */
  +     char * t = rpmmqttExpand(mqtt, NULL,
  +                     _mqtt_prefix, " ", s, NULL);
  +     size_t nt = strlen(t);
  +
  +     rc = rpmmqttSendMessage(mqtt, topic, t, nt);
  +     t = _free(t);
  +     if (rc)
  +         goto exit;
  +     rc = RPMRC_OK;
  +    }
  +
  +exit:
  +SPEW((stderr, "<-- %s(%p,%p[%u]) rc %d\n", __FUNCTION__, mqtt, s, 
(unsigned)ns, (int)rc));
  +    return rc;
  +}
  +
  +rpmRC rpmmqttSub(rpmmqtt mqtt, const char *s, size_t ns)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +
  +    if (ns == 0) ns = strlen(s);
  +
  +    if (rpmmqttConnect(mqtt))
  +     goto exit;
  +
  +    {        char * subtopic = rpmmqttExpand(mqtt, NULL,
  +             (s ? s : mqtt->topic), NULL);
  +     unsigned subqos = mqtt->qos;
  +     char *t, *te;
  +int _lvl = RPMLOG_DEBUG;
  +
  +     /* XXX qos appended to subscription topic? */
  +     if ((t = strchr(subtopic, '?')) != NULL) {
  +         ARGV_t qav = NULL;
  +         int qac;
   
            *t++ = '\0';
            (void) argvSplit(&qav, t, ",");
  @@ -1365,20 +2899,8 @@
   
        rpmlog(_lvl, "%19s: %s qos(%u)\n", "subtopic", subtopic, subqos);
   
  -     MQTTAsync_responseOptions *R =
  -             (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R');
  -     R->onSuccess = onSubscribe;
  -     R->onFailure = onSubscribeFailure;
  -
  -#ifdef       DYING
  -     mqtt->finished = 0;
  -     rc = check(mqtt, "subscribe",
  -             MQTTAsync_subscribe(mqtt->I, subtopic, subqos, R));
  -     while (rc == 0 && !mqtt->finished)
  -         usleep(100);
  -#else
        rc = rpmmqttSubscribe(mqtt, subtopic, subqos);
  -#endif
  +
        subtopic = _free(subtopic);
        if (rc)
            goto exit;
  @@ -1386,8 +2908,6 @@
       }
   
   exit:
  -#endif       /* WITH_MQTT */
  -
   SPEW((stderr, "<-- %s(%p,%p[%u]) ret %d\n", __FUNCTION__, mqtt, s, 
(unsigned)ns, (int)rc));
       return rc;
   }
  @@ -1462,7 +2982,7 @@
        N_("Retain the client Will."), NULL },
       POPT_TABLEEND
     };
  -  struct poptOption rpmmqttSubscribePoptTable[] = {
  +  struct poptOption mqttSubscribePoptTable[] = {
      { "clean", 'c', POPT_ARG_VAL|POPT_ARGFLAG_XOR,&mqtt->flags, 
MQTT_FLAGS_CLEAN,
        N_("(sub) Do not clean session."), NULL },
      { NULL, 'C', POPT_ARG_INT,                &mqtt->max_msg_count, 0,
  @@ -1561,7 +3081,7 @@
        N_("MQTT Protocol:"),
        NULL },
   
  -   { NULL, '\0', POPT_ARG_INCLUDE_TABLE, &rpmmqttSubscribePoptTable, 0,
  +   { NULL, '\0', POPT_ARG_INCLUDE_TABLE, &mqttSubscribePoptTable, 0,
        N_("MQTT Subscribe:"),
        NULL },
   
  @@ -1917,20 +3437,16 @@
   static rpmRC rpmmqttInitSubscribe(rpmmqtt mqtt, const char ** topics)
   {
       rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +    int nsubs = 0;
       int xx;
   
  -    if (topics) {
  -     int nsubs = argvCount((ARGV_t)topics);
  -#ifndef      NOTYET  /* XXX MQTT segfault if done here. */
   argvPrint(__FUNCTION__, (ARGV_t)topics, NULL);
  +    if (topics) {
  +     nsubs = argvCount((ARGV_t)topics);
        xx = rpmmqttSubscribeMany(mqtt, nsubs, (char **)topics);
  -#else
  -     for (int i = 0; i < nsubs; i++) {
  -         xx = rpmmqttSub(mqtt, topics[i], 0);
  -     }
  -#endif
       }
       rc = RPMRC_OK;
  +SPEW((stderr, "<-- %s(%p,%p[%d]) rc %d\n", __FUNCTION__, mqtt, topics, 
nsubs, rc));
       return rc;
   }
   
  @@ -2021,11 +3537,8 @@
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
   
  -#ifdef       WITH_MQTT
       (void) rpmmqttDisconnect(mqtt);
  -    (void) check(mqtt, "destroy",
  -             (MQTTAsync_destroy(&mqtt->I), 0));
  -#endif       /* WITH_MQTT */
  +    (void) rpmmqttDestroy(mqtt);
   
   /* ========== */
       mqtt->_progname = _free(mqtt->_progname);
  @@ -2104,75 +3617,28 @@
       rpmRC rc = rpmmqttInit(mqtt, ac, (const char **)av, (mqttFlags)flags);
       (void)rc;
   
  -#ifdef       WITH_MQTT
  -    {        static int oneshot;
  -     int _lvl = RPMLOG_DEBUG;
  +    {
        int xx;
   
  -mqtt->trace = 4;
  -     if (mqtt->trace && rpmIsDebug()) {
  -         xx = check(mqtt, "setTraceCallback",
  -             (MQTTAsync_setTraceCallback(onTrace), 0));
  -         xx = check(mqtt, "setTraceLevel",
  -             (MQTTAsync_setTraceLevel((enum 
MQTTASYNC_TRACE_LEVELS)mqtt->trace), 0));
  -     }
  -
  -     rpmlog(_lvl, "==================== MQTT\n");
  -
  -     if (!oneshot) {
  -         if (mqtt->trace == 0) {
  -             MQTTAsync_nameValue *I = MQTTAsync_getVersionInfo();
  -             while (I->name) {
  -                 rpmlog(_lvl, "%19s: %s\n", I->name, I->value);
  -                 I++;
  -             }
  -         }
  -         oneshot++;
  -     }
  -
  -     /* XXX improve integration */
  -static const char _mqtt_persist[] =
  -     "%{?_mqtt_persist}%{!?_mqtt_persist:2}";
  -     mqtt->persist_type = (rpmmqttExpandNumeric(mqtt, _mqtt_persist) % 3);
  -static const char _mqtt_cachedir[] =
  -     "%{?_mqtt_cachedir}%{!?_mqtt_cachedir:/var/cache/mqtt}";
  -char *persist_path = rpmGetPath(_mqtt_cachedir, NULL);
  -
  -     rpmlog(_lvl, "%19s: %s\n", "clientid", mqtt->clientid);
  -     rpmlog(_lvl, "%19s: %s qos(%d) timeout(%u msecs)\n",
  -             "topic", mqtt->topic, mqtt->qos, mqtt->timeout);
  -     rpmlog(_lvl, "%19s: type(%u) %s\n",
  -             "persist", mqtt->persist_type,
  -             (mqtt->persist_type ? persist_path : ""));
  -     rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags));
  -     if (mqtt->trace)
  -         rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace);
  -
  +     /* XXX these need to be initialized based on URI scheme */
  +#ifdef       WITH_PAHO
  +     if (mqtt->vec == NULL)
  +         mqtt->vec = &pahoVec;
  +#endif
  +#ifdef       WITH_MOSQUITTO
  +     if (mqtt->vec == NULL)
  +         mqtt->vec = &mosqVec;
  +#endif
  +#ifdef       WITH_RABBITMQ
  +     if (mqtt->vec == NULL)
  +         mqtt->vec = &amqpVec;
  +#endif
  +#ifdef       WITH_ZEROMQ
  +     if (mqtt->vec == NULL)
  +         mqtt->vec = &zmqVec;
  +#endif
   
  -mqtt->persist_path = _free(mqtt->persist_path);
  -mqtt->persist_ctx = _free(mqtt->persist_ctx);
  -     switch (mqtt->persist_type) {
  -     default:
  -     case MQTTCLIENT_PERSISTENCE_NONE:
  -         mqtt->persist_ctx = NULL;
  -         break;
  -     case MQTTCLIENT_PERSISTENCE_DEFAULT:
  -       {
  -         mqtt->persist_path = xstrdup(persist_path);
  -         /* XXX rpmmqttFini double free */
  -         mqtt->persist_ctx = (void *)xstrdup(mqtt->persist_path);
  -       } break;
  -     case MQTTCLIENT_PERSISTENCE_USER:
  -       {
  -         mqtt->persist_path = xstrdup(persist_path);
  -         MQTTClient_persistence * ctx =
  -             (MQTTClient_persistence *) xmalloc(sizeof(*ctx));
  -         *ctx = _rpmmqtt_persistence;        /* structure assignment */
  -         ctx->context = mqtt;
  -         mqtt->persist_ctx = ctx;
  -       } break;
  -     }
  -persist_path = _free(persist_path);
  +     xx = rpmmqttCreate(mqtt);
   
        /* Prepare for subscription delivery. */
        if (MF_ISSET(BUFFER) && mqtt->iob == NULL)
  @@ -2196,23 +3662,6 @@
            break;
        }
   
  -mqtt->u = NULL;
  -dumpMQTT(__FUNCTION__, mqtt);
  -
  -     if (mqtt->I == NULL) {
  -         xx = check(mqtt, "createWithOptions",
  -             MQTTAsync_createWithOptions(&mqtt->I,
  -                     mqtt->uri, mqtt->clientid,
  -                     mqtt->persist_type, mqtt->persist_ctx,
  -                     (MQTTAsync_createOptions *)AOBJ(mqtt, 'O')));
  -
  -         xx = check(mqtt, "setCallbacks",
  -             MQTTAsync_setCallbacks(mqtt->I, mqtt,
  -                     onConnectionLost,
  -                     onMessageArrived,
  -                     onDeliveryComplete));
  -     }
  -
        /* XXX If any topic has a wild card, then switch to "sub" mode. */
        {
            const char * topic;
  @@ -2258,10 +3707,7 @@
            /* Publish any initial messages (if any). */
            xx = rpmmqttInitPublish(mqtt, mqtt->topics);
        }
  -     
  -
       }
  -#endif       /* WITH_MQTT */
   
       return rpmmqttLink(mqtt);
   }
  @@ -2277,7 +3723,6 @@
   
       msg = rpmmqttExpand(mqtt, NULL,
                "%now", " ", str, NULL);
  -#if defined(WITH_MQTT)
       if (rpmmqttConnect(mqtt))
        goto exit;
       if (msg != NULL && !rpmmqttSendMessage(mqtt, NULL, msg, strlen(msg))) {
  @@ -2288,8 +3733,6 @@
       }
   
   exit:
  -#endif
  -
       msg = _free(msg);
   SPEW((stderr, "<== %s(%p,\"%s\",%p) rc %d\n", __FUNCTION__, mqtt, str, 
resultp, rc));
       return rc;
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmqtt.h
  ============================================================================
  $ cvs diff -u -r1.1.2.17 -r1.1.2.18 rpmmqtt.h
  --- rpm/rpmio/rpmmqtt.h       10 Jul 2016 16:16:04 -0000      1.1.2.17
  +++ rpm/rpmio/rpmmqtt.h       11 Jul 2016 20:26:53 -0000      1.1.2.18
  @@ -7,8 +7,9 @@
   
   extern int _rpmmqtt_debug;
   
  -typedef struct rpmmqtt_s * rpmmqtt;
  -typedef struct mqttState_s * mqttState;
  +typedef struct rpmmqtt_s     * rpmmqtt;
  +typedef struct pahoState_s   * pahoState_t;
  +typedef      struct mqttVec_s        * mqttVec_t;
   
   #define _KFB(n) (1U << (n))
   #define _MFB(n) (_KFB(n) | 0x40000000)
  @@ -47,7 +48,14 @@
       MQTT_OUTPUT_CALLBACK     = (1 << 2),
   } mqttOutput;
   
  +#ifdef __cplusplus
  +extern "C" {
  +#endif
  +
   #if defined(_RPMMQTT_INTERNAL)
  +
  +#define SPEW(_list)     if (mqtt->debug || _rpmmqtt_debug < 0) fprintf _list
  +
   struct rpmmqtt_s {
       struct rpmioItem_s _item;        /*!< usage mutex and pool identifier. */
       MacroContext mc;
  @@ -116,19 +124,138 @@
   
       /* -- MQTT internals */
       void * I;                        /* MQTTClient */
  +    volatile int connected;  /* XXX mosquitto */
       volatile int finished;
       volatile unsigned token;
       char * serverURI;
       int MQTTVersion;
       int sessionPresent;
  -    mqttState state;
  +    pahoState_t state;
  +    mqttVec_t vec;
   };
   
  -#endif       /* _RPMMQTT_INTERNAL */
  +struct mqttVec_s {
  +    const char * name;
  +    uint16_t port;
  +    uint16_t sport;
  +    const char * prefix;
  +    void * errs;
  +    size_t nerrs;
  +    rpmRC (*destroy)         (rpmmqtt mqtt);
  +    rpmRC (*create)          (rpmmqtt mqtt);
  +    rpmRC (*disconnect)              (rpmmqtt mqtt);
  +    rpmRC (*connect)         (rpmmqtt mqtt);
  +    rpmRC (*isconnected)     (rpmmqtt mqtt);
  +    rpmRC (*unsubscribe)     (rpmmqtt mqtt, const char *topic);
  +    rpmRC (*subscribe)               (rpmmqtt mqtt, const char *topic, int 
qos);
  +    rpmRC (*unsubscribeMany) (rpmmqtt mqtt, int ac, char ** av);
  +    rpmRC (*subscribeMany)   (rpmmqtt mqtt, int ac, char ** av);
  +    rpmRC (*sendMessage)     (rpmmqtt mqtt, const char * topic,
  +                                     const char *s, size_t ns);
  +};
   
  -#ifdef __cplusplus
  -extern "C" {
  -#endif
  +static inline
  +rpmRC rpmmqttDestroy(rpmmqtt mqtt)
  +{
  +    rpmRC rc = mqtt && mqtt->vec && mqtt->vec->destroy
  +     ? (mqtt->vec->destroy) (mqtt)
  +     : RPMRC_FAIL;
  +SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, mqtt, rc));
  +    return rc;
  +}
  +
  +static inline
  +rpmRC rpmmqttCreate(rpmmqtt mqtt)
  +{
  +    rpmRC rc = mqtt && mqtt->vec && mqtt->vec->create
  +     ? (mqtt->vec->create) (mqtt)
  +     : RPMRC_FAIL;
  +SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, mqtt, rc));
  +    return rc;
  +}
  +
  +static inline
  +rpmRC rpmmqttDisconnect(rpmmqtt mqtt)
  +{
  +    rpmRC rc = mqtt && mqtt->vec && mqtt->vec->disconnect
  +     ? (mqtt->vec->disconnect) (mqtt)
  +     : RPMRC_FAIL;
  +SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, mqtt, rc));
  +    return rc;
  +}
  +
  +static inline
  +rpmRC rpmmqttConnect(rpmmqtt mqtt)
  +{
  +    rpmRC rc = mqtt && mqtt->vec && mqtt->vec->connect
  +     ? (mqtt->vec->connect) (mqtt)
  +     : RPMRC_FAIL;
  +SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, mqtt, rc));
  +    return rc;
  +}
  +
  +static inline
  +rpmRC rpmmqttIsConnected(rpmmqtt mqtt)
  +{
  +    rpmRC rc = mqtt && mqtt->vec && mqtt->vec->isconnected
  +     ? (mqtt->vec->isconnected) (mqtt)
  +     : RPMRC_NOTFOUND;       /* XXX */
  +SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, mqtt, rc));
  +    return rc;
  +}
  +
  +static inline
  +rpmRC rpmmqttUnsubscribe(rpmmqtt mqtt, const char *topic)
  +{
  +    rpmRC rc = mqtt && mqtt->vec && mqtt->vec->unsubscribe
  +     ? (mqtt->vec->unsubscribe) (mqtt, topic)
  +     : RPMRC_FAIL;
  +SPEW((stderr, "<-- %s(%p,\"%s\") rc %d\n", __FUNCTION__, mqtt, topic, rc));
  +    return rc;
  +}
  +
  +static inline
  +rpmRC rpmmqttSubscribe(rpmmqtt mqtt, const char *topic, int qos)
  +{
  +    rpmRC rc = mqtt && mqtt->vec && mqtt->vec->subscribe
  +     ? (mqtt->vec->subscribe) (mqtt, topic, qos)
  +     : RPMRC_FAIL;
  +SPEW((stderr, "<-- %s(%p,%p[%d]) rc %d\n", __FUNCTION__, mqtt, topic, qos, 
rc));
  +    return rc;
  +}
  +
  +static inline
  +rpmRC rpmmqttUnsubscribeMany(rpmmqtt mqtt, int ac, char ** av)
  +{
  +    rpmRC rc = mqtt && mqtt->vec && mqtt->vec->unsubscribeMany
  +     ? (mqtt->vec->unsubscribeMany) (mqtt, ac, av)
  +     : RPMRC_FAIL;
  +SPEW((stderr, "<-- %s(%p,%p[%d]) rc %d\n", __FUNCTION__, mqtt, av, ac, rc));
  +    return rc;
  +}
  +
  +static inline
  +rpmRC rpmmqttSubscribeMany(rpmmqtt mqtt, int ac, char ** av)
  +{
  +    rpmRC rc = mqtt && mqtt->vec && mqtt->vec->subscribeMany
  +     ? (mqtt->vec->subscribeMany) (mqtt, ac, av)
  +     : RPMRC_FAIL;
  +SPEW((stderr, "<-- %s(%p,%p[%d]) rc %d\n", __FUNCTION__, mqtt, av, ac, rc));
  +    return rc;
  +}
  +
  +static inline
  +rpmRC rpmmqttSendMessage(rpmmqtt mqtt, const char * topic,
  +             const char *s, size_t ns)
  +{
  +    rpmRC rc = mqtt && mqtt->vec && mqtt->vec->sendMessage
  +     ? (mqtt->vec->sendMessage) (mqtt, topic, s, ns)
  +     : RPMRC_FAIL;
  +SPEW((stderr, "<-- %s(%p,%p[%u]) rc %d\n", __FUNCTION__, mqtt, s, 
(unsigned)ns, rc));
  +    return rc;
  +}
  +
  +#endif       /* _RPMMQTT_INTERNAL */
   
   /**
    * Unreference a mqtt wrapper instance.
  @@ -171,21 +298,6 @@
        RPM_GNUC_NULL_TERMINATED;
   int rpmmqttExpandNumeric(rpmmqtt mqtt, const char *arg);
   
  -rpmRC rpmmqttConnect(rpmmqtt mqtt);
  -
  -rpmRC rpmmqttDisconnect(rpmmqtt mqtt);
  -
  -rpmRC rpmmqttSendMessage(rpmmqtt mqtt, const char * topic,
  -             const char *s, size_t ns);
  -
  -rpmRC rpmmqttSubscribeMany(rpmmqtt mqtt, int ac, char ** av);
  -
  -rpmRC rpmmqttUnsubscribeMany(rpmmqtt mqtt, int ac, char ** av);
  -
  -rpmRC rpmmqttSubscribe(rpmmqtt mqtt, const char *topic, int qos);
  -
  -rpmRC rpmmqttUnsubscribe(rpmmqtt mqtt, const char *topic);
  -
   rpmRC rpmmqttPub(rpmmqtt mqtt, const char * topic, const char *s, size_t ns);
   
   rpmRC rpmmqttSub(rpmmqtt mqtt, const char *s, size_t ns);
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/tmqtt.c
  ============================================================================
  $ cvs diff -u -r1.1.2.12 -r1.1.2.13 tmqtt.c
  --- rpm/rpmio/tmqtt.c 6 Jul 2016 13:26:36 -0000       1.1.2.12
  +++ rpm/rpmio/tmqtt.c 11 Jul 2016 20:26:53 -0000      1.1.2.13
  @@ -32,17 +32,12 @@
   int qos = 2;
   
       xx = rpmmqttConnect(mqtt);
  +
       xx = rpmmqttSubscribe(mqtt, av[0], qos);
       xx = rpmmqttUnsubscribe(mqtt, av[0]);
       xx = rpmmqttSubscribeMany(mqtt, ac, (char **)av);
       xx = rpmmqttUnsubscribeMany(mqtt, ac, (char **)av);
  -    xx = rpmmqttDisconnect(mqtt);
    
  -#ifdef       DYING
  -    (void) rpmmqttSub(mqtt, "rpm/#?qos=0", 0);
  -    (void) rpmmqttSub(mqtt, "$SYS/broker/version?qos=0", 0);
  -#endif
  -
       xx = rpmmqttPub(mqtt, NULL, "bzzt ...", 0);
       xx = rpmmqttPub(mqtt, NULL, "bzzT ...", 0);
       xx = rpmmqttPub(mqtt, NULL, "bzZT ...", 0);
  @@ -50,6 +45,8 @@
       xx = rpmmqttPub(mqtt, NULL, "BZZT ...", 0);
       xx = rpmmqttPub(mqtt, NULL, "SWAT !!!", 0);
   
  +    xx = rpmmqttDisconnect(mqtt);
  +
       return rc;
   }
   
  @@ -115,5 +112,7 @@
       }
       mqtt = rpmmqttFree(mqtt);
   
  +    (void) rpmioClean();
  +
       return rc;
   }
  @@ .
______________________________________________________________________
RPM Package Manager                                    http://rpm5.org
CVS Sources Repository                                rpm-cvs@rpm5.org

Reply via email to