RPM Package Manager, CVS Repository
  http://rpm5.org/cvs/
  ____________________________________________________________________________

  Server: rpm5.org                         Name:   Jeff Johnson
  Root:   /v/rpm/cvs                       Email:  j...@rpm5.org
  Module: rpm                              Date:   05-Jul-2016 16:49:11
  Branch: rpm-5_4                          Handle: 2016070514491100

  Modified files:           (Branch: rpm-5_4)
    rpm/rpmio               rpmmqtt.c rpmmqtt.h tmqtt.c

  Log:
    - mqtt: WIP.

  Summary:
    Revision    Changes     Path
    1.1.2.14    +394 -191   rpm/rpmio/rpmmqtt.c
    1.1.2.13    +23 -21     rpm/rpmio/rpmmqtt.h
    1.1.2.10    +15 -7      rpm/rpmio/tmqtt.c
  ____________________________________________________________________________

  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmqtt.c
  ============================================================================
  $ cvs diff -u -r1.1.2.13 -r1.1.2.14 rpmmqtt.c
  --- rpm/rpmio/rpmmqtt.c       4 Jul 2016 07:45:23 -0000       1.1.2.13
  +++ rpm/rpmio/rpmmqtt.c       5 Jul 2016 14:49:11 -0000       1.1.2.14
  @@ -7,9 +7,9 @@
   #include <stdlib.h>
   #include <string.h>
   
  -#define      _RPMIOB_INTERNAL
  -#include <rpmiotypes.h>      /* for rpmiobSlurp */
  -#include <rpmio.h>   /* for *Pool methods */
  +#define      _RPMIOB_INTERNAL        /* for rpmiobSlurp */
  +#include <rpmiotypes.h>
  +#include <rpmio.h>           /* for *Pool methods */
   #include <rpmlog.h>
   #include <rpmcb.h>
   #include <rpmdir.h>
  @@ -27,7 +27,7 @@
   
   #include "debug.h"
   
  -int _rpmmqtt_debug;
  +int _rpmmqtt_debug = 1;
   #define SPEW(_list)     if (mqtt->debug || _rpmmqtt_debug < 0) fprintf _list
   
   #define F_ISSET(_f, _FLAG) (((_f) & ((MQTT_FLAGS_##_FLAG) & ~0x40000000)) != 
MQTT_FLAGS_NONE)
  @@ -67,6 +67,141 @@
   }
   #endif
   
  +/*==============================================================*/
  +typedef struct key_s {
  +    int         v;
  +    const char *n;
  +} KEY;
  +
  +static const char * tblName(uint32_t v, KEY * tbl, size_t ntbl)
  +{
  +    const char * n = NULL;
  +    static char buf[32];
  +    size_t i;
  +
  +    for (i = 0; i < ntbl; i++) {
  +uint32_t tbl_v = tbl[i].v & 0x3fffffff;
  +     if (v != tbl_v)
  +         continue;
  +     n = tbl[i].n;
  +     break;
  +    }
  +    if (n == NULL) {
  +     (void) snprintf(buf, sizeof(buf), "0x%x", (unsigned)v);
  +     n = buf;
  +    }
  +    return n;
  +}
  +
  +static const char * fmtBits(uint32_t flags, KEY tbl[], size_t ntbl, char *t)
  +{
  +    char pre = '<';
  +    char * te = t;
  +    int i;
  +
  +flags &= 0x3fffffff;
  +    sprintf(t, "0x%x", (unsigned)flags);
  +    te = t;
  +    te += strlen(te);
  +    for (i = 0; i < 32; i++) {
  +     uint32_t mask = (1 << i);
  +     const char * name;
  +
  +     if (!(flags & mask))
  +         continue;
  +
  +     name = tblName(mask, tbl, ntbl);
  +     *te++ = pre;
  +     pre = ',';
  +     te = stpcpy(te, name);
  +    }
  +    if (pre == ',') *te++ = '>';
  +    *te = '\0';
  +    return t;
  +}
  +
  +#define _ENTRY(_v)      { MQTT_FLAGS_##_v, #_v, }
  +static KEY MqttFlags[] = {
  +    _ENTRY(CLEAN),
  +    _ENTRY(EOL),
  +    _ENTRY(NOSTALE),
  +    _ENTRY(LINES),
  +    _ENTRY(EMPTY),
  +    _ENTRY(DNSSRV),
  +    _ENTRY(INSECURE),
  +    _ENTRY(RETAIN),
  +    _ENTRY(WILL_RETAIN),
  +    _ENTRY(BUFFER),
  +};
  +#undef       _ENTRY
  +static size_t nMqttFlags = sizeof(MqttFlags) / sizeof(MqttFlags[0]);
  +
  +static const char * fmtMqttFlags(uint32_t flags)
  +{
  +    static char buf[BUFSIZ];
  +    char * te = buf;
  +    (void) fmtBits(flags, MqttFlags, nMqttFlags, te);
  +    return buf;
  +}
  +#define      _MQTTFLAGS(_flags)      fmtMqttFlags(_flags)
  +
  +#define _ENTRY(_v)      { MQTTASYNC_##_v, #_v, }
  +static KEY rpmmqtt_errs[] = {
  +#ifdef       WITH_MQTT
  +    _ENTRY(SUCCESS),
  +    _ENTRY(FAILURE),
  +    _ENTRY(PERSISTENCE_ERROR),
  +    _ENTRY(DISCONNECTED),
  +    _ENTRY(MAX_MESSAGES_INFLIGHT),
  +    _ENTRY(BAD_UTF8_STRING),
  +    _ENTRY(NULL_PARAMETER),
  +    _ENTRY(TOPICNAME_TRUNCATED),
  +    _ENTRY(BAD_STRUCTURE),
  +    _ENTRY(BAD_QOS),
  +    _ENTRY(NO_MORE_MSGIDS),
  +    _ENTRY(OPERATION_INCOMPLETE),
  +    _ENTRY(MAX_BUFFERED_MESSAGES),
  +#else
  +    { 0, NULL },
  +#endif
  +};
  +#undef       _ENTRY
  +static size_t rpmmqtt_nerrs = sizeof(rpmmqtt_errs) / sizeof(rpmmqtt_errs[0]);
  +
  +static const char * rpmmqttStrerror(int v)
  +{
  +    KEY * tbl = rpmmqtt_errs;
  +    size_t ntbl = rpmmqtt_nerrs;
  +    const char * n = NULL;
  +    static char buf[64];
  +
  +    for (size_t i = 0; i < ntbl; i++) {
  +        if (v != tbl[i].v)
  +            continue;
  +        n = tbl[i].n;
  +        break;
  +    }
  +    if (n == NULL) {
  +        (void) snprintf(buf, sizeof(buf), "0x%x", v);
  +        n = buf;
  +    }
  +    return n;
  +}
  +
  +static int Xcheck(rpmmqtt mqtt, const char * msg, int rc,
  +             int printit, const char * func, const char * fn, unsigned ln)
  +{
  +
  +    if (rc != 0) {   /* MQTTCLIENT_SUCCESS */
  +     int _lvl = RPMLOG_WARNING;
  +     rpmlog(_lvl, "%s:%s:%u: MQTTAsync_%s: %s(%d)\n",
  +             func, fn, ln, msg, rpmmqttStrerror(rc), rc);
  +    }
  +    return rc;
  +}
  +#define check(_o, _m, _rc)  \
  +    Xcheck(_o, _m, _rc, _rpmmqtt_debug, __FUNCTION__, __FILE__, __LINE__)
  +
   static void dumpMQTT(const char * msg, rpmmqtt mqtt)
   {
       FILE * fp = stderr;
  @@ -76,9 +211,9 @@
                msg, mqtt);
       if (mqtt) {
   
  -#define      PRINT(_fmt, _foo) fprintf(fp, "%19s: %"#_fmt"\n", #_foo, 
mqtt->_foo)
        if (mqtt->flags != 0x40000003)
  -         PRINT(x, flags);
  +         fprintf(fp, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags));
  +#define      PRINT(_fmt, _foo) fprintf(fp, "%19s: %"#_fmt"\n", #_foo, 
mqtt->_foo)
        if (mqtt->msg_input != 0)
            PRINT(d, msg_input);
        if (mqtt->msg_output != 1)
  @@ -86,6 +221,11 @@
        if (mqtt->av)
            argvPrint("mqtt->av", (ARGV_t)mqtt->av, fp);
   
  +     if (mqtt->_progname)
  +         PRINT(s, _progname);
  +     if (mqtt->_progmode)
  +         PRINT(s, _progmode);
  +
        if (mqtt->_address)
            PRINT(s, _address);
        if (mqtt->msgs)
  @@ -148,8 +288,10 @@
        if (mqtt->timeout != 10000)
            PRINT(u, timeout);
   
  -     if (mqtt->_topics)
  -         argvPrint("mqtt->_topics", (ARGV_t)mqtt->_topics, fp);
  +     if (mqtt->topics)
  +         argvPrint("mqtt->topics", (ARGV_t)mqtt->topics, fp);
  +     if (mqtt->subtopics)
  +         argvPrint("mqtt->subtopics", (ARGV_t)mqtt->subtopics, fp);
        if (mqtt->_filter_out)
            argvPrint("mqtt->_filter_out", (ARGV_t)mqtt->_filter_out, fp);
        if (mqtt->topic == NULL || strcmp(mqtt->topic, "rpm/mqtt"))
  @@ -208,69 +350,6 @@
   }
   
   /*==============================================================*/
  -typedef struct key_s {
  -    int         v;
  -    const char *n;
  -} KEY;
  -
  -#define _ENTRY(_v)      { MQTTASYNC_##_v, #_v, }
  -static KEY rpmmqtt_errs[] = {
  -#ifdef       WITH_MQTT
  -    _ENTRY(SUCCESS),
  -    _ENTRY(FAILURE),
  -    _ENTRY(PERSISTENCE_ERROR),
  -    _ENTRY(DISCONNECTED),
  -    _ENTRY(MAX_MESSAGES_INFLIGHT),
  -    _ENTRY(BAD_UTF8_STRING),
  -    _ENTRY(NULL_PARAMETER),
  -    _ENTRY(TOPICNAME_TRUNCATED),
  -    _ENTRY(BAD_STRUCTURE),
  -    _ENTRY(BAD_QOS),
  -    _ENTRY(NO_MORE_MSGIDS),
  -    _ENTRY(OPERATION_INCOMPLETE),
  -    _ENTRY(MAX_BUFFERED_MESSAGES),
  -#else
  -    { 0, NULL },
  -#endif
  -};
  -static size_t rpmmqtt_nerrs = sizeof(rpmmqtt_errs) / sizeof(rpmmqtt_errs[0]);
  -#undef       _ENTRY
  -
  -static const char * rpmmqttStrerror(int v)
  -{
  -    KEY * tbl = rpmmqtt_errs;
  -    size_t ntbl = rpmmqtt_nerrs;
  -    const char * n = NULL;
  -    static char buf[64];
  -
  -    for (size_t i = 0; i < ntbl; i++) {
  -        if (v != tbl[i].v)
  -            continue;
  -        n = tbl[i].n;
  -        break;
  -    }
  -    if (n == NULL) {
  -        (void) snprintf(buf, sizeof(buf), "0x%x", v);
  -        n = buf;
  -    }
  -    return n;
  -}
  -
  -static int Xcheck(rpmmqtt mqtt, const char * msg, int rc,
  -             int printit, const char * func, const char * fn, unsigned ln)
  -{
  -
  -    if (rc != 0) {   /* MQTTCLIENT_SUCCESS */
  -     int _lvl = RPMLOG_WARNING;
  -     rpmlog(_lvl, "%s:%s:%u: MQTTAsync_%s: %s(%d)\n",
  -             func, fn, ln, msg, rpmmqttStrerror(rc), rc);
  -    }
  -    return rc;
  -}
  -#define check(_o, _m, _rc)  \
  -    Xcheck(_o, _m, _rc, _rpmmqtt_debug, __FUNCTION__, __FILE__, __LINE__)
  -
  -/*==============================================================*/
   #ifdef       WITH_MQTT
   static int onMessageArrived(void * _mqtt, char * topic, int topicLen,
                MQTTAsync_message *  message)
  @@ -283,7 +362,7 @@
       const char * s = message->payload;
       size_t ns = message->payloadlen;
       if (_rpmmqtt_debug < 0)
  -     rpmlog(RPMLOG_DEBUG, "MQTT rcvd topic(%s) \"%.*s\"\n", topic, ns, s);
  +     rpmlog(RPMLOG_DEBUG, "MQTT rcvd topic(%s) \"%.*s\"\n", topic, (int)ns, 
s);
   
       if (mqtt->iob) {
        mqtt->iob = rpmiobAppend(mqtt->iob, topic, 0);
  @@ -400,7 +479,7 @@
        const char *s = response->message;
        int token = response->token;
        int code = response->code;
  -     rpmlog(RPMLOG_WARNING, "MQTT  subscribe failed: code(%d) msg %s\n",
  +     rpmlog(RPMLOG_WARNING, "MQTT  subscribe(%d) failed: code(%d) msg %s\n",
                        token, code, s);
       } else
        rpmlog(RPMLOG_WARNING, "MQTT  subscribe failed\n");
  @@ -453,7 +532,7 @@
        size_t ns = response->alt.pub.message.payloadlen;
        int token = response->token;
        rpmlog(RPMLOG_DEBUG, "MQTT sent(%d) topic(%s) \"%.*s\"\n",
  -                     token, mqtt->topic, ns, s);
  +                     token, mqtt->topic, (int)ns, s);
       }
       mqtt->finished = 1;
   }
  @@ -783,9 +862,6 @@
       switch (otype) {
       case 'C':
        u = mqtt->u;
  -#ifdef       DYING
  -assert(u);
  -#endif
        memcpy(C, &_C, sizeof(*C));
        C->keepAliveInterval = mqtt->keepalive;
        C->cleansession = (MF_ISSET(CLEAN) ? 1 : 0);
  @@ -822,8 +898,6 @@
        C->automaticReconnect = 1;
        C->minRetryInterval = 1;        /* secs */
        C->maxRetryInterval = 60;       /* secs */
  -mqtt->ut = 0;
  -mqtt->u = NULL;
        ptr = (void *) C;
        break;
       case 'D':
  @@ -919,11 +993,17 @@
       return rc;
   }
   
  -int rpmmqttSendMessage(rpmmqtt mqtt, const char * s, size_t ns)
  +int rpmmqttSendMessage(rpmmqtt mqtt, const char * topic,
  +             const char * s, size_t ns)
   {
       int rc = -1;
   
  -    if (ns == 0) ns = strlen(s);
  +    if (topic == NULL)
  +     topic = mqtt->topic;
  +    if (s == NULL)
  +     s = "";
  +    if (ns == 0)
  +     ns = strlen(s);
   
   #ifdef       WITH_MQTT
       MQTTAsync_message *M = AOBJ(mqtt, 'M');
  @@ -936,7 +1016,7 @@
   
       mqtt->finished = 0;
       rc = check(mqtt, "sendMessage",
  -             MQTTAsync_sendMessage(mqtt->I, mqtt->topic, M, R));
  +             MQTTAsync_sendMessage(mqtt->I, topic, M, R));
       while (!mqtt->finished)
        usleep(100);
   #endif       /* WITH_MQTT */
  @@ -954,18 +1034,20 @@
        goto exit;
   #ifdef       WITH_MQTT
       int _lvl = RPMLOG_DEBUG;
  -    int *subqos = xcalloc(mqtt->ac, sizeof(*subqos));
  +    int *subqos = xcalloc(ac, sizeof(*subqos));
       for (int i = 0; i < ac; i++) {
        char * t = av[i];
  +#ifdef       NOTYET
        char * te = strchr(t, '?');
  -     subqos[i] = mqtt->qos;  /* XXX */
        if (te) {
            *te++ = '\0';
            if ((te = strchr(te, '='))) {
                if (!strncmp(t, "qos", (te - t)))
                    subqos[i] = strtoul(te+1, NULL, 0);
            }
  -     }
  +     } else
  +#endif
  +         subqos[i] = mqtt->qos;      /* XXX */
        rpmlog(_lvl, "%19s: %s qos(%u)\n", "subtopic", t, subqos[i]);
       }
   
  @@ -985,11 +1067,17 @@
   }
   
   /*==============================================================*/
  -int rpmmqttPub(rpmmqtt mqtt, const char *s, size_t ns)
  +int rpmmqttPub(rpmmqtt mqtt, const char * topic, const char *s, size_t ns)
   {
       int ret = -1;    /* assume failure */
   
  -    if (ns == 0) ns = strlen(s);
  +    if (topic == NULL)
  +     topic = mqtt->topic;
  +    if (s == NULL)
  +     s = "";
  +    if (ns == 0)
  +     ns = strlen(s);
  +
   
   #ifdef       WITH_MQTT
       if (rpmmqttConnect(mqtt))
  @@ -999,7 +1087,7 @@
        char * t = rpmExpand(_mqtt_prefix, " ", s, NULL);
        size_t nt = strlen(t);
   
  -     if (!rpmmqttSendMessage(mqtt, t, nt))
  +     if (!rpmmqttSendMessage(mqtt, topic, t, nt))
            ret = nt;
   
        t = _free(t);
  @@ -1123,11 +1211,9 @@
        N_("MQTT <QOS> level."), N_("<QOS>") },
      { "retain", 'r', POPT_BIT_SET,            &mqtt->flags, MQTT_FLAGS_RETAIN,
        N_("Retain the message on the host."), NULL },
  -   { NULL, 's', POPT_BIT_SET,                &mqtt->flags, MQTT_FLAGS_STDIN,
  -     N_("Send stdin lines as a single message."), NULL },
      { NULL, 'S', POPT_BIT_SET,                &mqtt->flags, MQTT_FLAGS_DNSSRV,
        N_("Use SRV record to find remote host."), NULL },
  -   { "topic", 't', POPT_ARG_ARGV,            &mqtt->_topics, 0,
  +   { "topic", 't', POPT_ARG_ARGV,            &mqtt->topics, 0,
        N_("MQTT pub/sub <TOPIC>."), N_("<TOPIC>") },
      { "user", 'u', POPT_ARG_STRING,   &mqtt->user, 0,
        N_("Remote <USER>."), N_("<USER>") },
  @@ -1183,6 +1269,20 @@
       con = poptGetContext(av[0], ac, (const char **)av,
                rpmmqttOptionsTable, _popt_context_flags);
   
  +mqtt->_progname = _free(mqtt->_progname);
  +mqtt->_progmode = _free(mqtt->_progmode);
  +    {        const char *arg0 = poptGetInvocationName(con);
  +     const char * _progname;
  +     const char * _progmode;
  +     if ((_progname = strrchr(arg0, '/')) != NULL) _progname++;
  +     else _progname = arg0;
  +     if (!strncmp(_progname, "lt-", sizeof("lt-")-1))
  +          _progname += sizeof("lt-")-1;
  +     _progmode = (strstr(_progname, "sub") ? "sub" : "pub");
  +     mqtt->_progname = xstrdup(_progname);
  +     mqtt->_progmode = xstrdup(_progmode);
  +    }
  +
       while ((xx = poptGetNextOpt(con)) > 0)
       switch (xx) {
       default:
  @@ -1214,7 +1314,6 @@
        /* XXX MQTT_FLAGS_EOL */
        /* XXX MQTT_FLAGS_NOSTALE */
        /* XXX MQTT_FLAGS_LINES */
  -     /* XXX MQTT_FLAGS_STDIN */
        /* XXX MQTT_FLAGS_EMPTY */
        /* XXX MQTT_FLAGS_DNSSRV */
        /* XXX MQTT_FLAGS_INSECURE */
  @@ -1242,8 +1341,6 @@
       mqtt->clientid = rpmExpand(_mqtt_clientid, NULL);
   #endif
   
  -     /* XXX load mqtt->_topics from _mqtt_subtopics */
  -
       return rc;
   }
   
  @@ -1255,10 +1352,17 @@
       const char *te;
       rpmRC rc = RPMRC_OK;
   
  -    (void) argvSplit(&av, query, ",");
  +    /* https://en.wikipedia.org/wiki/Query_string */
  +    /* XXX ampersand? semi-colon? comma? whitespace? newlines? */
  +    (void) argvSplit(&av, query, "&;,");
  +#ifdef       DYING
  +argvPrint(__FUNCTION__, av, NULL);
  +#endif
       ac = argvCount(av);
       for (int i = 0; i < ac; i++) {
        t = av[i];
  +     if (!(t && *t))
  +         continue;
        if ((te = strchr(t, '=')) == NULL)
            te += strlen(t);
        if (!strncmp(t, "clean", (te - t))) {
  @@ -1268,7 +1372,6 @@
        /* XXX MQTT_FLAGS_EOL */
        /* XXX MQTT_FLAGS_NOSTALE */
        /* XXX MQTT_FLAGS_LINES */
  -     /* XXX MQTT_FLAGS_STDIN */
        /* XXX MQTT_FLAGS_EMPTY */
        /* XXX MQTT_FLAGS_DNSSRV */
        /* XXX MQTT_FLAGS_INSECURE */
  @@ -1280,14 +1383,14 @@
        if (!strncmp(t, "qos", (te - t))) {
            mqtt->qos = (te[0] == '='  && xisdigit(te[1]))
                ? strtoul(te+1, NULL, 0)
  -             : 2;            /* XXX default qos=2? */
  +             : 2;    /* XXX default qos=2? */
            mqtt->qos %= 3;
            continue;
        }
        if (!strncmp(t, "timeout", (te - t))) {
            mqtt->timeout = (te[0] == '='  && xisdigit(te[1]))
                ? strtoul(te+1, NULL, 0)
  -             : 10000;        /* XXX default timeout=10000? */
  +             : 10000;/* XXX default timeout=10000? */
            continue;
        }
        if (!strncmp(t, "max_inflight", (te - t))) {
  @@ -1306,9 +1409,42 @@
        if (!strncmp(t, "trace", (te - t))) {
            mqtt->trace = (te[0] == '='  && xisdigit(te[1]))
                ? strtoul(te+1, NULL, 0)
  -             : 4;            /* XXX default trace=4? */
  +             : 4;    /* XXX default trace=4? */
            continue;
        }
  +     /* XXX error msg? */
  +     rc = RPMRC_FAIL;
  +    }
  +    av = argvFree(av);
  +
  +    return rc;
  +}
  +
  +static rpmRC rpmmqttInitURIFragment(rpmmqtt mqtt, const char *fragment)
  +{
  +    ARGV_t av = NULL;
  +    int ac;
  +    const char *t;
  +    const char *te;
  +    rpmRC rc = RPMRC_OK;
  +
  +    /* https://en.wikipedia.org/wiki/Fragment_identifier */
  +    /* XXX GOOG AJAX search (deprecated) */
  +    if (*fragment == '!')
  +     fragment++;
  +    /* XXX ampersand? semi-colon? comma? whitespace? newlines? */
  +    (void) argvSplit(&av, fragment, "&;,");
  +#ifdef       DYING
  +argvPrint(__FUNCTION__, av, NULL);
  +#endif
  +    ac = argvCount(av);
  +    for (int i = 0; i < ac; i++) {
  +     t = av[i];
  +     if (!(t && *t))
  +         continue;
  +     if ((te = strchr(t, '=')) == NULL)
  +         te += strlen(t);
  +     (void) argvAdd(&mqtt->subtopics, t);
       }
       av = argvFree(av);
   
  @@ -1323,6 +1459,9 @@
       /* -- Set unspecified MQTT options from the URI parameters. */
       mqtt->ut = urlSplit(url, &u);
       mqtt->u = u;
  +#ifdef       DYING
  +dumpU(__FUNCTION__, u);
  +#endif
   
       if (u->scheme == NULL
        || !strcmp(u->scheme, "mqtt")
  @@ -1364,9 +1503,11 @@
        }
       }
   
  -    if (u->query) {
  -     rc = rpmmqttInitURIQuery(mqtt, u->query);
  -    }
  +    if (u->query && rpmmqttInitURIQuery(mqtt, u->query))
  +     rc = RPMRC_FAIL;
  +
  +    if (u->fragment && rpmmqttInitURIFragment(mqtt, u->fragment))
  +     rc = RPMRC_FAIL;
   
       return rc;
   }
  @@ -1374,6 +1515,7 @@
   static rpmRC rpmmqttInit(rpmmqtt mqtt, int ac, const char ** av,
                mqttFlags flags)
   {
  +    const char * uri = NULL;
       rpmRC rc;
   
   SPEW((stderr, "--> %s(%p,%p[%d],0x%x)\n", __FUNCTION__, mqtt, av, ac, 
flags));
  @@ -1396,9 +1538,15 @@
       mqtt->_clientid = rpmExpand(mqtt->idprefix, "-%%{pid}", NULL);
   
       /* -- Initialize values frpm default URI. */
  -static const char _mqtt_default_uri[] =
  -     
"mqtt://localhost:1883/?qos=0,timeout=10000,max_inflight=20,keepalive=60";
  -    rc = rpmmqttInitURI(mqtt, _mqtt_default_uri);
  +    static const char _mqtt_uri[] =
  +     "mqtt://localhost:1883/rpm/mqtt"
  +             "?qos=0;"
  +             "timeout=10000;"
  +             "max_inflight=20;";
  +             "keepalive=60;";
  +    uri = rpmExpand("%{?_mqtt_uri}%{!?_mqtt_uri:", _mqtt_uri, "}", NULL);
  +
  +    rc = rpmmqttInitURI(mqtt, uri);
       if (rc)
           goto exit;
   
  @@ -1413,11 +1561,8 @@
           goto exit;
   
       /* -- Set message input/output modes from flags/options. */
  -    if (MF_ISSET(STDIN))
  -     (void) argvAdd(&mqtt->ifn, "-");
  -
       if (mqtt->msgs)
  -     mqtt->msg_input = MQTT_INPUT_OPTION;
  +     mqtt->msg_input = MQTT_INPUT_OPTIONS;
       else if (mqtt->ifn)
        mqtt->msg_input = MF_ISSET(LINES)
                ? MQTT_INPUT_FILES_LINES : MQTT_INPUT_FILES;
  @@ -1433,8 +1578,8 @@
       }
   
       /* -- XXX Ensure topic/clientid are set. */
  -    if (mqtt->clientid == NULL) {
   assert(mqtt->_clientid);
  +    if (mqtt->clientid == NULL) {
        mqtt->clientid = rpmExpand(mqtt->_clientid, NULL);
       }
       if (mqtt->topic == NULL) {
  @@ -1442,26 +1587,118 @@
        mqtt->topic = rpmExpand(mqtt->idprefix, "/mqtt", NULL);
       }
        
  -#ifdef       DYING
  -    /* -- XXX Add default argv (if not specified) */
  -    if (mqtt->ac == 0) {
  -static const char *_av[] = {
  -    "mqtt://luser:jasnl@localhost:1883/rpm/mqtt",
  -    "rpm/#",
  -    NULL,
  -};
  -     (void) argvAppend((ARGV_t *)&mqtt->av, _av);
  -     mqtt->ac = argvCount((ARGV_t)mqtt->av);
  -    }
  -#endif
  -
       rc = RPMRC_OK;
   
   exit:
  +    uri = _free(uri);
   SPEW((stderr, "<-- %s(%p,%p[%d],0x%x) rc %d\n", __FUNCTION__, mqtt, av, ac, 
flags, rc));
       return rc;
   }
   
  +/* XXX add subtopics arg? */
  +static rpmRC rpmmqttInitSubscribe(rpmmqtt mqtt)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +    int xx;
  +
  +    if (mqtt->subtopics) {
  +     int nsubs = argvCount((ARGV_t)mqtt->subtopics);
  +#ifdef       NOTYET  /* XXX MQTT segfault if done here. */
  +argvPrint("mqtt->subtopics", (ARGV_t)mqtt->subtopics, NULL);
  +     xx = rpmmqttSubscribeMany(mqtt, nsubs, (char **)mqtt->subtopics);
  +#else
  +     for (int i = 0; i < nsubs; i++) {
  +         xx = rpmmqttSub(mqtt, mqtt->subtopics[i], 0);
  +     }
  +#endif
  +    }
  +    rc = RPMRC_OK;
  +    return rc;
  +}
  +
  +/* XXX add topics arg? */
  +static rpmRC rpmmqttInitPublish(rpmmqtt mqtt)
  +{
  +    int ntopics = argvCount((ARGV_t)mqtt->topics);
  +    int nmsgs = argvCount((ARGV_t)mqtt->msgs);
  +    int nifn = argvCount(mqtt->ifn);
  +    const char * ifn = NULL;
  +    rpmiob iob = NULL;
  +    const char * topic = NULL;
  +    const char * s = NULL;
  +    int ns = 0;
  +    ARGV_t lav = NULL;
  +    int lac = 0;
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +    int xx;
  +
  +    switch (mqtt->msg_input) {
  +    case MQTT_INPUT_UNKNOWN:
  +    default:
  +     break;
  +    case MQTT_INPUT_OPTIONS:
  +     for (int i = 0; i < nmsgs; i++) {
  +         s = mqtt->msgs[i];
  +         ns = strlen(s);
  +         if (ntopics <= 0)
  +             xx = rpmmqttPub(mqtt, topic, s, ns);
  +         else
  +         for (int j = 0; j < ntopics; j++) {
  +             topic = mqtt->topics[j];
  +             xx = rpmmqttPub(mqtt, topic, s, ns);
  +         }
  +     }
  +     break;
  +    case MQTT_INPUT_FILES:
  +     for (int i = 0; i < nifn; i++) {
  +         ifn = mqtt->ifn[i];
  +         if (rpmiobSlurp(ifn, &iob) == 0) {
  +             s = rpmiobStr(iob);
  +             ns = rpmiobLen(iob);
  +             /* XXX trim */
  +             if (ns == 0)    /* XXX skip empty files? */
  +                 continue;
  +             if (ntopics <= 0)
  +                 xx = rpmmqttPub(mqtt, topic, s, ns);
  +             else
  +             for (int j = 0; j < ntopics; j++) {
  +                 topic = mqtt->topics[j];
  +                 xx = rpmmqttPub(mqtt, topic, s, ns);
  +             }
  +         }
  +         iob = rpmiobFree(iob);
  +     }
  +     break;
  +    case MQTT_INPUT_FILES_LINES:
  +     for (int i = 0; i < nifn; i++) {
  +         ifn = mqtt->ifn[i];
  +         if (rpmiobSlurp(ifn, &iob) == 0) {
  +             xx = argvSplit(&lav, rpmiobStr(iob), "\n\r");
  +             lac = argvCount(lav);
  +             for (int j = 0; j < lac; j++) {
  +                 s = lav[j];
  +                 ns = strlen(s);
  +                 /* XXX trim */
  +                 if (ns == 0)        /* XXX skip empty lines? */
  +                     continue;
  +                 if (ntopics <= 0)
  +                     xx = rpmmqttPub(mqtt, topic, s, ns);
  +                 else
  +                 for (int k = 0; j < ntopics; j++) {
  +                     topic = mqtt->topics[k];
  +                     xx = rpmmqttPub(mqtt, topic, s, ns);
  +                 }
  +             }
  +             lav = argvFree(lav);
  +         }
  +         iob = rpmiobFree(iob);
  +     }
  +     break;
  +    }
  +    rc = RPMRC_OK;
  +    return rc;
  +}
  +
   /*==============================================================*/
   static void rpmmqttFini(void * _mqtt)
   {
  @@ -1474,13 +1711,17 @@
   #endif       /* WITH_MQTT */
   
   /* ========== */
  +    mqtt->_progname = _free(mqtt->_progname);
  +    mqtt->_progmode = _free(mqtt->_progmode);
  +
       mqtt->_address = _free(mqtt->_address);
       mqtt->host = _free(mqtt->host);
       mqtt->_clientid = _free(mqtt->_clientid);
       mqtt->idprefix = _free(mqtt->idprefix);
       mqtt->msgs = argvFree((ARGV_t)mqtt->msgs);
       mqtt->password = _free(mqtt->password);
  -    mqtt->_topics = argvFree((ARGV_t)mqtt->_topics);
  +    mqtt->topics = argvFree((ARGV_t)mqtt->topics);
  +    mqtt->subtopics = argvFree((ARGV_t)mqtt->subtopics);
       mqtt->_filter_out = argvFree((ARGV_t)mqtt->_filter_out);
       mqtt->user = _free(mqtt->user);
       mqtt->protocol_version = _free(mqtt->protocol_version); /* XXX malloc? */
  @@ -1526,7 +1767,7 @@
   
   rpmmqtt rpmmqttNew(char ** av, uint32_t flags)
   {
  -    static const char * _av[] = { (char *) "rpmmqtt", NULL };
  +    static const char * _av[] = { (char *) "mqtt", NULL };
       rpmmqtt mqtt = (flags & 0x80000000)
        ? rpmmqttI() : rpmmqttGetPool(_rpmmqttPool);
   
  @@ -1535,6 +1776,9 @@
       /* XXX quick-n-dirty recursion avoidance. */
       if (av == NULL) av = (char **) _av;
   
  +#ifdef       DYING
  +argvPrint(__FUNCTION__, (ARGV_t)av, NULL);
  +#endif
       int ac = argvCount((ARGV_t)av);
       flags &= ~0x80000000;
       rpmRC rc = rpmmqttInit(mqtt, ac, (const char **)av, flags);
  @@ -1545,6 +1789,7 @@
        int _lvl = RPMLOG_DEBUG;
        int xx;
   
  +mqtt->trace = 4;
        if (mqtt->trace && rpmIsDebug()) {
            xx = check(mqtt, "setTraceCallback",
                (MQTTAsync_setTraceCallback(onTrace), 0));
  @@ -1579,7 +1824,9 @@
        rpmlog(_lvl, "%19s: type(%u) %s\n",
                "persist", mqtt->persist_type,
                (mqtt->persist_type ? persist_path : ""));
  -     rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace);
  +     rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags));
  +     if (mqtt->trace)
  +         rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace);
   
   
   mqtt->persist_path = _free(mqtt->persist_path);
  @@ -1631,83 +1878,39 @@
   mqtt->u = NULL;
   dumpMQTT(__FUNCTION__, mqtt);
   
  -      if (mqtt->I == NULL) {
  -     xx = check(mqtt, "createWithOptions",
  +     if (mqtt->I == NULL) {
  +         xx = check(mqtt, "createWithOptions",
                MQTTAsync_createWithOptions(&mqtt->I,
                        mqtt->uri, mqtt->clientid,
                        mqtt->persist_type, mqtt->persist_ctx, AOBJ(mqtt, 
'O')));
   
  -     xx = check(mqtt, "setCallbacks",
  +         xx = check(mqtt, "setCallbacks",
                MQTTAsync_setCallbacks(mqtt->I, mqtt,
                        onConnectionLost,
                        onMessageArrived,
                        onDeliveryComplete));
  +     }
   
  -     /* Subscribe to channels (if any). */
  -     /* XXX rework using mqtt->_topics instead */
  -     if (mqtt->ac > 1) {
  -#ifdef       NOTYET  /* XXX segfault here. */
  -         xx = rpmmqttSubscribeMany(mqtt, mqtt->ac-1, mqtt->av+1);
  +     /* Subscribe to initial topics (if any). */
  +#ifdef       DYING
  +argvPrint("mqtt->subtopics", (ARGV_t)mqtt->subtopics, NULL);
  +     if (mqtt->subtopics) {
  +         int nsubs = argvCount((ARGV_t)mqtt->subtopics);
  +#ifdef       NOTYET  /* XXX MQTT segfault if done here. */
  +         xx = rpmmqttSubscribeMany(mqtt, nsubs, (char **)mqtt->subtopics);
   #else
  -         for (int i = 1; i < mqtt->ac; i++)
  -             xx = rpmmqttSub(mqtt, mqtt->av[i], 0);
  +         for (int i = 0; i < nsubs; i++) {
  +             xx = rpmmqttSub(mqtt, mqtt->subtopics[i], 0);
  +         }
   #endif
        }
  +#else
  +     xx = rpmmqttInitSubscribe(mqtt);
  +#endif
  +
  +     /* Publish any initial messages (if any). */
  +     xx = rpmmqttInitPublish(mqtt);
   
  -     /* Publish any initial messages. */
  -     switch (mqtt->msg_input) {
  -     case MQTT_INPUT_UNKNOWN:
  -     default:
  -         break;
  -     case MQTT_INPUT_OPTION:
  -         if (mqtt->msgs) {
  -             int nmsgs = argvCount((ARGV_t)mqtt->msgs);
  -             for (int i = 0; i < nmsgs; i++) {
  -                 xx = rpmmqttPub(mqtt, mqtt->msgs[i], 0);
  -             }
  -         }
  -         break;
  -     case MQTT_INPUT_STDIN:
  -     case MQTT_INPUT_FILES:
  -         if (mqtt->ifn) {
  -             int nifn = argvCount(mqtt->ifn);
  -             for (int i = 0; i < nifn; i++) {
  -                 const char * ifn = mqtt->ifn[i];
  -                 rpmiob iob = NULL;
  -                 if (rpmiobSlurp(ifn, &iob) == 0) {
  -                     xx = rpmmqttPub(mqtt,
  -                             rpmiobStr(iob), rpmiobLen(iob));
  -                 }
  -                 iob = rpmiobFree(iob);
  -             }
  -         }
  -         break;
  -     case MQTT_INPUT_STDIN_LINES:
  -     case MQTT_INPUT_FILES_LINES:
  -         if (mqtt->ifn) {
  -             int nifn = argvCount(mqtt->ifn);
  -             for (int i = 0; i < nifn; i++) {
  -                 const char * ifn = mqtt->ifn[i];
  -                 rpmiob iob = NULL;
  -                 if (rpmiobSlurp(ifn, &iob) == 0) {
  -                     ARGV_t lav = NULL;
  -                     int lac;
  -                     xx = argvSplit(&lav, rpmiobStr(iob), "\n\r");
  -                     lac = argvCount(lav);
  -                     for (int j = 0; j < lac; j++) {
  -                         const char * s = lav[j];
  -                         size_t ns = strlen(s);
  -                         if (ns > 0) /* XXX skip empty lines? */
  -                             xx = rpmmqttPub(mqtt, s, ns);
  -                     }
  -                     lav = argvFree(lav);
  -                 }
  -                 iob = rpmiobFree(iob);
  -             }
  -         }
  -         break;
  -     }
  -      }
       }
   #endif       /* WITH_MQTT */
   
  @@ -1727,7 +1930,7 @@
   #if defined(WITH_MQTT)
       if (rpmmqttConnect(mqtt))
        goto exit;
  -    if (msg != NULL && !rpmmqttSendMessage(mqtt, msg, strlen(msg))) {
  +    if (msg != NULL && !rpmmqttSendMessage(mqtt, NULL, msg, strlen(msg))) {
        rc = RPMRC_OK;
        if (resultp) {
            *resultp = (mqtt->iob ? rpmiobStr(mqtt->iob) : "");
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmqtt.h
  ============================================================================
  $ cvs diff -u -r1.1.2.12 -r1.1.2.13 rpmmqtt.h
  --- rpm/rpmio/rpmmqtt.h       4 Jul 2016 07:45:23 -0000       1.1.2.12
  +++ rpm/rpmio/rpmmqtt.h       5 Jul 2016 14:49:11 -0000       1.1.2.13
  @@ -19,10 +19,9 @@
       MQTT_FLAGS_NOSTALE               = _MFB( 2),     /*!< (sub) -R */
   
       MQTT_FLAGS_LINES         = _MFB( 3),     /*!< -l */
  -    MQTT_FLAGS_STDIN         = _MFB( 4),     /*!< -s */
  -    MQTT_FLAGS_EMPTY         = _MFB( 5),     /*!< -n */
  -    MQTT_FLAGS_DNSSRV                = _MFB( 6),     /*!< -S */
  -    MQTT_FLAGS_INSECURE              = _MFB( 7),     /*!< --insecure */
  +    MQTT_FLAGS_EMPTY         = _MFB( 4),     /*!< -n */
  +    MQTT_FLAGS_DNSSRV                = _MFB( 5),     /*!< -S */
  +    MQTT_FLAGS_INSECURE              = _MFB( 6),     /*!< --insecure */
   
       MQTT_FLAGS_RETAIN                = _MFB( 8),     /*!< -r */
       MQTT_FLAGS_WILL_RETAIN   = _MFB( 9),     /*!< --will-retain */
  @@ -35,11 +34,9 @@
   
   typedef enum mqttInput_e {
       MQTT_INPUT_UNKNOWN               = 0,
  -    MQTT_INPUT_OPTION                = 1,            /*!< -m str */
  +    MQTT_INPUT_OPTIONS               = 1,            /*!< -m str */
       MQTT_INPUT_FILES_LINES   = 2,            /*!< -l -f fn */
       MQTT_INPUT_FILES         = 3,            /*!< -f fn */
  -    MQTT_INPUT_STDIN_LINES   = 4,            /*!< -l -f '-' */
  -    MQTT_INPUT_STDIN         = 5,            /*!< -s */
   } mqttInput;
   
   typedef enum mqttOutput_e {
  @@ -59,28 +56,32 @@
       mqttInput msg_input;
       mqttOutput msg_output;
   /* ========== */
  +    const char *_progname;
  +    const char *_progmode;
  +
       const char *_address;                    /*!< -A */
  -    const char *host;                                /*!< -h */
  -    const char *idprefix;                    /*!< -I */
  +    const char * host;                               /*!< -h */
  +    const char * idprefix;                   /*!< -I */
       int keepalive;                           /*!< -k */
       const char **msgs;                               /*!< -m */
       int max_inflight;                                /*!< -M */
       int port;                                        /*!< -p */
  -    const char *password;                    /*!< -P */
  -    const char **_topics;                    /*!< -t */
  +    const char * password;                   /*!< -P */
  +    const char **topics;                     /*!< -t */
  +    const char **subtopics;                  /*!< -t */
       const char **_filter_out;                        /*!< -T */
  -    const char *user;                                /*!< -u */
  -    const char *protocol_version;            /*!< -V */
  +    const char * user;                               /*!< -u */
  +    const char * protocol_version;           /*!< -V */
   
       const char * will_message;                       /*!< --will-payload */
       int will_qos;                            /*!< --will-qos */
       const char * will_topic;                 /*!< --will-topic */
   
  -    const char *cafile;                              /*!< --cafile */
  +    const char * cafile;                             /*!< --cafile */
       const char *_capath;                     /*!< --capath */
  -    const char *cert;                                /*!< --cert */
  -    const char *privkey;                     /*!< --key */
  -    const char *ciphers;                     /*!< --ciphers */
  +    const char * cert;                               /*!< --cert */
  +    const char * privkey;                    /*!< --key */
  +    const char * ciphers;                    /*!< --ciphers */
   
       const char *_tls_version;                        /*!< --tls-version */
       const char *_psk_key;                    /*!< --psk */
  @@ -118,12 +119,12 @@
       int MQTTVersion;
       int sessionPresent;
   
  -    const char *cachedn;
  +    const char * cachedn;
   
       FD_t ifd;
       const char **ifn;                                /*!< -f */
       FD_t ofd;
  -    const char *ofn;                         /*!< -o */
  +    const char * ofn;                                /*!< -o */
       rpmiob iob;
   
       MQTTAsync_connectOptions C;
  @@ -180,9 +181,10 @@
   
   int rpmmqttDisconnect(rpmmqtt mqtt);
   
  -int rpmmqttSendMessage(rpmmqtt mqtt, const char *s, size_t ns);
  +int rpmmqttSendMessage(rpmmqtt mqtt, const char * topic,
  +             const char *s, size_t ns);
   
  -int rpmmqttPub(rpmmqtt mqtt, const char *s, size_t ns);
  +int rpmmqttPub(rpmmqtt mqtt, const char * topic, const char *s, size_t ns);
   
   int rpmmqttSub(rpmmqtt mqtt, const char *s, size_t ns);
   
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/tmqtt.c
  ============================================================================
  $ cvs diff -u -r1.1.2.9 -r1.1.2.10 tmqtt.c
  --- rpm/rpmio/tmqtt.c 4 Jul 2016 07:45:23 -0000       1.1.2.9
  +++ rpm/rpmio/tmqtt.c 5 Jul 2016 14:49:11 -0000       1.1.2.10
  @@ -33,13 +33,13 @@
       (void) rpmmqttSub(mqtt, "$SYS/broker/version?qos=0", 0);
   #endif
   
  -    nw = rpmmqttPub(mqtt, "bzzt ...", 0);
  -    nw = rpmmqttPub(mqtt, "bzzT ...", 0);
  -    nw = rpmmqttPub(mqtt, "bzZT ...", 0);
  -    nw = rpmmqttPub(mqtt, "bZZT ...", 0);
  -    nw = rpmmqttPub(mqtt, "BZZT ...", 0);
  +    nw = rpmmqttPub(mqtt, NULL, "bzzt ...", 0);
  +    nw = rpmmqttPub(mqtt, NULL, "bzzT ...", 0);
  +    nw = rpmmqttPub(mqtt, NULL, "bzZT ...", 0);
  +    nw = rpmmqttPub(mqtt, NULL, "bZZT ...", 0);
  +    nw = rpmmqttPub(mqtt, NULL, "BZZT ...", 0);
       (void) rpmmqttDisconnect(mqtt);
  -    nw = rpmmqttPub(mqtt, "SWAT !!!", 0);
  +    nw = rpmmqttPub(mqtt, NULL, "SWAT !!!", 0);
       (void) rpmmqttDisconnect(mqtt);
       (void) rpmmqttConnect(mqtt);
   
  @@ -74,10 +74,18 @@
   
       /* Initialize the _mqtt_ macro context */
   
  +static char _mqtt_uri[] =
  +    "_mqtt_uri mqtt://luser:jasnl@localhost:1883/rpm/mqtt"
  +     "?qos=1;"
  +     "timeout=10000;"
  +     "trace=4;"
  +     "#rpm/#";
  +(void) rpmDefineMacro(NULL, _mqtt_uri, 0);
   (void) rpmDefineMacro(NULL, "_mqtt_persist 2", 0);
   (void) rpmDefineMacro(NULL, "_mqtt_cachedir /var/cache/mqtt", 0);
   
  -(void) rpmDefineMacro(NULL, "_mqtt_prefix %{now} %{nil}", 0);
  +(void) rpmDefineMacro(NULL, "nil %{!?nil}", 0);
  +(void) rpmDefineMacro(NULL, "_mqtt_prefix %{now}", 0);
   
   #ifdef       DYING
   (void) rpmDefineMacro(NULL, "_mqtt_trace 4", 0);
  @@ .
______________________________________________________________________
RPM Package Manager                                    http://rpm5.org
CVS Sources Repository                                rpm-cvs@rpm5.org

Reply via email to