RPM Package Manager, CVS Repository
  http://rpm5.org/cvs/
  ____________________________________________________________________________

  Server: rpm5.org                         Name:   Jeff Johnson
  Root:   /v/rpm/cvs                       Email:  j...@rpm5.org
  Module: rpm                              Date:   14-Jul-2016 14:59:21
  Branch: rpm-5_4                          Handle: 2016071412592100

  Modified files:           (Branch: rpm-5_4)
    rpm/rpmio               rpmmqtt.c rpmmqtt.h tmqtt.c

  Log:
    - mqtt: add per-module default uri.
    - mqtt: fix: select module from argv[0].

  Summary:
    Revision    Changes     Path
    1.1.2.24    +194 -539   rpm/rpmio/rpmmqtt.c
    1.1.2.21    +1  -0      rpm/rpmio/rpmmqtt.h
    1.1.2.16    +18 -67     rpm/rpmio/tmqtt.c
  ____________________________________________________________________________

  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmqtt.c
  ============================================================================
  $ cvs diff -u -r1.1.2.23 -r1.1.2.24 rpmmqtt.c
  --- rpm/rpmio/rpmmqtt.c       14 Jul 2016 11:32:57 -0000      1.1.2.23
  +++ rpm/rpmio/rpmmqtt.c       14 Jul 2016 12:59:21 -0000      1.1.2.24
  @@ -4,15 +4,6 @@
   
   #include "system.h"
   
  -#undef       WITH_MOSQUITTO
  -#undef       WITH_RABBITMQ
  -#undef       WITH_PROTON
  -#undef       WITH_ZEROMQ
  -
  -#if defined(WITH_PAHO) || defined(WITH_MOSQUITTO) || defined(WITH_RABBITMQ) 
|| defined(WITH_ZEROMQ)
  -#define      WITH_MQTT
  -#endif
  -
   #include <stdio.h>
   #include <stdlib.h>
   #include <string.h>
  @@ -427,6 +418,64 @@
   }
   
   /*==============================================================*/
  +static
  +rpmRC _rpmmqttSubscribeMany(rpmmqtt mqtt, int ac, char ** av)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +
  +    if (ac <= 0)
  +     goto exit;
  +    if (!rpmmqttConnect(mqtt)) {
  +     int *subqos = (int *) xcalloc(ac, sizeof(*subqos));
  +     for (int i = 0; i < ac; i++) {
  +         char * t = av[i];
  +#ifdef       NOTYET          /* XXX qos as subtopic ?qos=N? */
  +         char * te = strchr(t, '?');
  +         if (te) {
  +             *te++ = '\0';
  +             if ((te = strchr(te, '='))) {
  +                 if (!strncmp(t, "qos", (te - t)))
  +                     subqos[i] = strtoul(te+1, NULL, 0);
  +             }
  +         } else
  +#endif
  +             subqos[i] = mqtt->qos;  /* XXX */
  +         rc = rpmmqttSubscribe(mqtt, t, subqos[i]);
  +         if (rc)
  +             break;
  +     }
  +
  +     if (subqos)
  +         free(subqos);
  +     if (rc)
  +         goto exit;
  +     rc = RPMRC_OK;
  +    }
  +
  +exit:
  +    return rc;
  +}
  +
  +static
  +rpmRC _rpmmqttUnsubscribeMany(rpmmqtt mqtt, int ac, char ** av)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +
  +    if (ac <= 0)
  +     goto exit;
  +    if (!rpmmqttConnect(mqtt)) {
  +     for (int i = 0; i < ac; i++) {
  +         rc = rpmmqttUnsubscribe(mqtt, av[i]);
  +         if (rc)
  +             goto exit;
  +     }
  +    }
  +
  +exit:
  +    return rc;
  +}
  +
  +/*==============================================================*/
   #ifdef       WITH_PAHO
   static int pahoOnMessageArrived(void * _mqtt, char * topic, int topicLen,
                MQTTAsync_message *  message)
  @@ -1154,7 +1203,7 @@
                "tcp://", mqtt->host, ":", portstr, NULL);
       }
   
  -    /* XXX improve integration */
  +    /* XXX set through uri?query */
   static const char _mqtt_persist[] =
        "%{?_mqtt_persist}%{!?_mqtt_persist:2}";
       mqtt->persist_type = (rpmmqttExpandNumeric(mqtt, _mqtt_persist) % 3);
  @@ -1476,6 +1525,12 @@
   static
   struct mqttVec_s pahoVec = {
       .name            = "paho",
  +    .uri             = "mqtt://localhost:1883/"
  +                             "rpm/mqtt"      /* XXX topic? */
  +                             "?qos=1"        /* XXX */
  +                             "&timeout=10000"
  +                             "&max_inflight=20"
  +                             "&keepalive=60",
       .port            = 1883,
       .sport           = 8883,
       .prefix          = "MQTTAsync_",
  @@ -1941,44 +1996,6 @@
   }
   
   static
  -rpmRC mosqSubscribeMany(rpmmqtt mqtt, int ac, char ** av)
  -{
  -    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  -
  -    if (ac <= 0)
  -     goto exit;
  -    if (!rpmmqttConnect(mqtt)) {
  -     int *subqos = (int *) xcalloc(ac, sizeof(*subqos));
  -     for (int i = 0; i < ac; i++) {
  -         char * t = av[i];
  -#ifdef       NOTYET          /* XXX qos as subtopic ?qos=N? */
  -         char * te = strchr(t, '?');
  -         if (te) {
  -             *te++ = '\0';
  -             if ((te = strchr(te, '='))) {
  -                 if (!strncmp(t, "qos", (te - t)))
  -                     subqos[i] = strtoul(te+1, NULL, 0);
  -             }
  -         } else
  -#endif
  -             subqos[i] = mqtt->qos;  /* XXX */
  -         rc = rpmmqttSubscribe(mqtt, t, subqos[i]);
  -         if (rc)
  -             break;
  -     }
  -
  -     if (subqos)
  -         free(subqos);
  -     if (rc)
  -         goto exit;
  -     rc = RPMRC_OK;
  -    }
  -
  -exit:
  -    return rc;
  -}
  -
  -static
   rpmRC mosqSubscribe(rpmmqtt mqtt, const char * topic, int qos)
   {
       rpmRC rc = RPMRC_FAIL;   /* assume failure */
  @@ -2035,25 +2052,6 @@
       return rc;
   }
   
  -static
  -rpmRC mosqUnsubscribeMany(rpmmqtt mqtt, int ac, char ** av)
  -{
  -    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  -
  -    if (ac <= 0)
  -     goto exit;
  -    if (!rpmmqttConnect(mqtt)) {
  -     for (int i = 0; i < ac; i++) {
  -         rc = rpmmqttUnsubscribe(mqtt, av[i]);
  -         if (rc)
  -             goto exit;
  -     }
  -    }
  -
  -exit:
  -    return rc;
  -}
  -
   static KEY mosqErrs[] = {
   #define _ENTRY(_v)      { MOSQ_ERR_##_v, #_v, }
       _ENTRY(CONN_PENDING),
  @@ -2081,6 +2079,12 @@
   static
   struct mqttVec_s mosqVec = {
       .name            = "mosquitto",
  +    .uri             = "mqtt://localhost:1883/"
  +                             "rpm/mqtt"      /* XXX topic? */
  +                             "?qos=1"        /* XXX */
  +                             "&timeout=10000"
  +                             "&max_inflight=20"
  +                             "&keepalive=60",
       .port            = 1883,
       .sport           = 8883,
       .prefix          = "mosquitto_",
  @@ -2094,8 +2098,8 @@
       .isconnected     = mosqIsConnected,
       .unsubscribe     = mosqUnsubscribe,
       .subscribe               = mosqSubscribe,
  -    .unsubscribeMany = mosqUnsubscribeMany,
  -    .subscribeMany   = mosqSubscribeMany,
  +    .unsubscribeMany = _rpmmqttUnsubscribeMany,
  +    .subscribeMany   = _rpmmqttSubscribeMany,
       .sendMessage     = mosqSendMessage,
   };
   #endif       /* WITH_MOSQUITTO */
  @@ -2471,44 +2475,6 @@
   }
   
   static
  -rpmRC amqpSubscribeMany(rpmmqtt mqtt, int ac, char ** av)
  -{
  -    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  -
  -    if (ac <= 0)
  -     goto exit;
  -    if (!rpmmqttConnect(mqtt)) {
  -     int *subqos = (int *) xcalloc(ac, sizeof(*subqos));
  -     for (int i = 0; i < ac; i++) {
  -         char * t = av[i];
  -#ifdef       NOTYET
  -         char * te = strchr(t, '?');
  -         if (te) {
  -             *te++ = '\0';
  -             if ((te = strchr(te, '='))) {
  -                 if (!strncmp(t, "qos", (te - t)))
  -                     subqos[i] = strtoul(te+1, NULL, 0);
  -             }
  -         } else
  -#endif
  -             subqos[i] = mqtt->qos;  /* XXX */
  -         rc = rpmmqttSubscribe(mqtt, t, subqos[i]);
  -         if (rc)
  -             break;
  -     }
  -
  -     if (subqos)
  -         free(subqos);
  -     if (rc)
  -         goto exit;
  -     rc = RPMRC_OK;
  -    }
  -
  -exit:
  -    return rc;
  -}
  -
  -static
   rpmRC amqpSubscribe(rpmmqtt mqtt, const char * topic, int qos)
   {
       rpmRC rc = RPMRC_FAIL;   /* assume failure */
  @@ -2568,57 +2534,9 @@
       if (!rpmmqttConnect(mqtt)) {
        int _lvl = RPMLOG_DEBUG;
   
  +fprintf(stderr, "XXX %s: FIXME\n", __FUNCTION__);
        rpmlog(_lvl, "%19s: %s\n", "unsubscribe", topic);
   
  -#ifdef       NOTYET
  -     MQTTAsync_responseOptions *R =
  -             (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R');
  -     R->onSuccess = amqpOnUnsubscribeMany;
  -     R->onFailure = amqpOnUnsubscribeManyFailure;
  -
  -     mqtt->finished = 0;
  -     rc = mqchk(mqtt, "unsubscribe",
  -             MQTTAsync_unsubscribe(mqtt->I, topic, R));
  -     while (!mqtt->finished)
  -         usleep(100);
  -#endif
  -     if (rc)
  -         goto exit;
  -     rc = RPMRC_OK;
  -    }
  -
  -exit:
  -    return rc;
  -}
  -
  -static
  -rpmRC amqpUnsubscribeMany(rpmmqtt mqtt, int ac, char ** av)
  -{
  -    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  -
  -SPEW((stderr, "-->    %s\n", __FUNCTION__));
  -    if (ac <= 0)
  -     goto exit;
  -    if (!rpmmqttConnect(mqtt)) {
  -     int _lvl = RPMLOG_DEBUG;
  -     for (int i = 0; i < ac; i++) {
  -         char * t = av[i];
  -         rpmlog(_lvl, "%19s: %s\n", "unsubscribe", t);
  -     }
  -
  -#ifdef       NOTYET
  -     MQTTAsync_responseOptions *R =
  -             (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R');
  -     R->onSuccess = amqpOnUnsubscribeMany;
  -     R->onFailure = amqpOnUnsubscribeManyFailure;
  -
  -     mqtt->finished = 0;
  -     rc = mqchk(mqtt, "unsubscribeMany",
  -             MQTTAsync_unsubscribeMany(mqtt->I, ac, av, R));
  -     while (!mqtt->finished)
  -         usleep(100);
  -#endif
  -
        if (rc)
            goto exit;
        rc = RPMRC_OK;
  @@ -2664,6 +2582,16 @@
   static
   struct mqttVec_s amqpVec = {
       .name            = "amqp",
  +    .uri             = "amqp://localhost:5672/"      /* XXX vhost? */
  +                             "?vhost=/"              /* XXX vhost? */
  +                             "&exchange=amq.direct"
  +                             "&queue=test queue"
  +                             "&verify=1"
  +                             "&fail_if_no_peer_cert=1"
  +                             "&sasl_mechanism=plain"
  +                             "&keepalive=60"
  +                             "&connection_timeout=60"
  +                             "&channel_max=60",
       .port            = 5672,
       .sport           = 5671,
       .prefix          = "amqp_",
  @@ -2677,8 +2605,8 @@
       .isconnected     = amqpIsConnected,      /* XXX */
       .unsubscribe     = amqpUnsubscribe,
       .subscribe               = amqpSubscribe,
  -    .unsubscribeMany = amqpUnsubscribeMany,
  -    .subscribeMany   = amqpSubscribeMany,
  +    .unsubscribeMany = _rpmmqttUnsubscribeMany,
  +    .subscribeMany   = _rpmmqttSubscribeMany,
       .sendMessage     = amqpSendMessage,
   };
   #endif       /* WITH_RABBITMQ */
  @@ -2895,48 +2823,6 @@
   }
   
   static
  -rpmRC qpidSubscribeMany(rpmmqtt mqtt, int ac, char ** av)
  -{
  -    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  -
  -    if (ac <= 0)
  -     goto exit;
  -#ifdef       DYING
  -    if (!rpmmqttConnect(mqtt)) {
  -     int *subqos = (int *) xcalloc(ac, sizeof(*subqos));
  -     for (int i = 0; i < ac; i++) {
  -         char * t = av[i];
  -#ifdef       NOTYET
  -         char * te = strchr(t, '?');
  -         if (te) {
  -             *te++ = '\0';
  -             if ((te = strchr(te, '='))) {
  -                 if (!strncmp(t, "qos", (te - t)))
  -                     subqos[i] = strtoul(te+1, NULL, 0);
  -             }
  -         } else
  -#endif
  -             subqos[i] = mqtt->qos;  /* XXX */
  -         rc = rpmmqttSubscribe(mqtt, t, subqos[i]);
  -         if (rc)
  -             break;
  -     }
  -
  -     if (subqos)
  -         free(subqos);
  -     if (rc)
  -         goto exit;
  -     rc = RPMRC_OK;
  -    }
  -#else
  -    rc = RPMRC_OK;
  -#endif
  -
  -exit:
  -    return rc;
  -}
  -
  -static
   rpmRC qpidSubscribe(rpmmqtt mqtt, const char * topic, int qos)
   {
       rpmRC rc = RPMRC_FAIL;   /* assume failure */
  @@ -3026,67 +2912,36 @@
       return rc;
   }
   
  -static
  -rpmRC qpidUnsubscribeMany(rpmmqtt mqtt, int ac, char ** av)
  -{
  -    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  -
  -#ifdef       DYING
  -SPEW((stderr, "-->    %s\n", __FUNCTION__));
  -    if (ac <= 0)
  -     goto exit;
  -    if (!rpmmqttConnect(mqtt)) {
  -     int _lvl = RPMLOG_DEBUG;
  -     for (int i = 0; i < ac; i++) {
  -         char * t = av[i];
  -         rpmlog(_lvl, "%19s: %s\n", "unsubscribe", t);
  -     }
  -
  -#ifdef       NOTYET
  -     MQTTAsync_responseOptions *R =
  -             (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R');
  -     R->onSuccess = qpidOnUnsubscribeMany;
  -     R->onFailure = qpidOnUnsubscribeManyFailure;
  -
  -     mqtt->finished = 0;
  -     rc = mqchk(mqtt, "unsubscribeMany",
  -             MQTTAsync_unsubscribeMany(mqtt->I, ac, av, R));
  -     while (!mqtt->finished)
  -         usleep(100);
  -#endif
  -
  -     if (rc)
  -         goto exit;
  -     rc = RPMRC_OK;
  -    }
  -
  -exit:
  -#else
  -    rc = RPMRC_OK;
  -#endif
  -    return rc;
  -}
  -
  -static KEY qpidErrs[] = {
  -#define _ENTRY(_v)      { PN_##_v, #_v, }
  -    _ENTRY(OK),
  -    _ENTRY(EOS),
  -    _ENTRY(ERR),
  -    _ENTRY(OVERFLOW),
  -    _ENTRY(UNDERFLOW),
  -    _ENTRY(STATE_ERR),
  -    _ENTRY(ARG_ERR),
  -    _ENTRY(TIMEOUT),
  -    _ENTRY(INTR),
  -    _ENTRY(INPROGRESS),
  -    _ENTRY(OUT_OF_MEMORY),
  -#undef       _ENTRY
  -    { 0, NULL },
  -};
  +static KEY qpidErrs[] = {
  +#define _ENTRY(_v)      { PN_##_v, #_v, }
  +    _ENTRY(OK),
  +    _ENTRY(EOS),
  +    _ENTRY(ERR),
  +    _ENTRY(OVERFLOW),
  +    _ENTRY(UNDERFLOW),
  +    _ENTRY(STATE_ERR),
  +    _ENTRY(ARG_ERR),
  +    _ENTRY(TIMEOUT),
  +    _ENTRY(INTR),
  +    _ENTRY(INPROGRESS),
  +    _ENTRY(OUT_OF_MEMORY),
  +#undef       _ENTRY
  +    { 0, NULL },
  +};
   
   static
   struct mqttVec_s qpidVec = {
       .name            = "qpid",
  +    .uri             = "amqp://localhost:5672/"      /* XXX vhost? */
  +                             "?vhost=/"              /* XXX vhost? */
  +                             "&exchange=amq.direct"  /* XXX */
  +                             "&queue=test queue"     /* XXX */
  +                             "&verify=1"             /* XXX */
  +                             "&fail_if_no_peer_cert=1"/* XXX */
  +                             "&sasl_mechanism=plain" /* XXX */
  +                             "&keepalive=60"
  +                             "&connection_timeout=60"
  +                             "&channel_max=60",
       .port            = 5672,
       .sport           = 5671,
       .prefix          = "pn_",
  @@ -3100,8 +2955,8 @@
       .isconnected     = qpidIsConnected,      /* XXX */
       .unsubscribe     = qpidUnsubscribe,
       .subscribe               = qpidSubscribe,
  -    .unsubscribeMany = qpidUnsubscribeMany,
  -    .subscribeMany   = qpidSubscribeMany,
  +    .unsubscribeMany = _rpmmqttUnsubscribeMany,
  +    .subscribeMany   = _rpmmqttSubscribeMany,
       .sendMessage     = qpidSendMessage,
   };
   #endif       /* WITH_PROTON */
  @@ -3111,12 +2966,8 @@
   rpmRC zmqDestroy(rpmmqtt mqtt)
   {
       rpmRC rc = RPMRC_FAIL;   /* assume failure */
  -#ifdef       NOTYET
  -    rc = mqchk(mqtt, "destroy",
  -             (MQTTAsync_destroy(&mqtt->I), 0));
  -    mqtt->state = _free(mqtt->state);
  +fprintf(stderr, "XXX %s: FIXME\n", __FUNCTION__);
       mqtt->I = NULL;
  -#endif
       return rc;
   }
   
  @@ -3124,42 +2975,17 @@
   rpmRC zmqCreate(rpmmqtt mqtt)
   {
       rpmRC rc = RPMRC_FAIL;   /* assume failure */
  -#ifdef       NOTYET
       static int oneshot;
       int _lvl = RPMLOG_DEBUG;
  -    int xx;
  -
  -#ifdef       DYING
  -mqtt->trace = 4;     /* XXX */
  -#endif
  -    if (mqtt->trace && rpmIsDebug()) {
  -     xx = mqchk(mqtt, "setTraceCallback",
  -             (MQTTAsync_setTraceCallback(onTrace), 0));
  -     xx = mqchk(mqtt, "setTraceLevel",
  -             (MQTTAsync_setTraceLevel((enum 
MQTTASYNC_TRACE_LEVELS)mqtt->trace), 0));
  -    }
   
       rpmlog(_lvl, "==================== %s\n", mqtt->vec->name);
   
       if (!oneshot) {
  -     if (mqtt->trace == 0) {
  -         MQTTAsync_nameValue *NV = MQTTAsync_getVersionInfo();
  -         while (NV->name) {
  -             rpmlog(_lvl, "%19s: %s\n", NV->name, NV->value);
  -             NV++;
  -         }
  -     }
  +fprintf(stderr, "XXX %s: FIXME: version oneshot\n", __FUNCTION__);
        oneshot++;
       }
   
  -    /* XXX improve integration */
  -static const char _mqtt_persist[] =
  -     "%{?_mqtt_persist}%{!?_mqtt_persist:2}";
  -    mqtt->persist_type = (rpmmqttExpandNumeric(mqtt, _mqtt_persist) % 3);
  -static const char _mqtt_cachedir[] =
  -     "%{?_mqtt_cachedir}%{!?_mqtt_cachedir:/var/cache/mqtt}";
  -char *persist_path = rpmGetPath(_mqtt_cachedir, NULL);
  -
  +#ifdef       NOTYET
       rpmlog(_lvl, "%19s: %s\n", "clientid", mqtt->clientid);
       rpmlog(_lvl, "%19s: %s qos(%d) timeout(%u msecs)\n",
                "topic", mqtt->topic, mqtt->qos, mqtt->timeout);
  @@ -3169,51 +2995,18 @@
       rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags));
       if (mqtt->trace)
        rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace);
  -
  -mqtt->persist_path = _free(mqtt->persist_path);
  -mqtt->persist_ctx = _free(mqtt->persist_ctx);
  -    switch (mqtt->persist_type) {
  -    default:
  -    case MQTTCLIENT_PERSISTENCE_NONE:
  -     mqtt->persist_ctx = NULL;
  -     break;
  -    case MQTTCLIENT_PERSISTENCE_DEFAULT:
  -      {
  -     mqtt->persist_path = xstrdup(persist_path);
  -     /* XXX rpmmqttFini double free */
  -     mqtt->persist_ctx = (void *)xstrdup(mqtt->persist_path);
  -      } break;
  -    case MQTTCLIENT_PERSISTENCE_USER:
  -      {
  -     mqtt->persist_path = xstrdup(persist_path);
  -     MQTTClient_persistence * ctx =
  -             (MQTTClient_persistence *) xmalloc(sizeof(*ctx));
  -     *ctx = _mqtt_persistence;       /* structure assignment */
  -     ctx->context = mqtt;
  -     mqtt->persist_ctx = ctx;
  -      } break;
  -    }
  -persist_path = _free(persist_path);
  +#else
  +fprintf(stderr, "XXX %s: FIXME: values\n", __FUNCTION__);
  +#endif
   
   mqtt->u = NULL;
   dumpMQTT(__FUNCTION__, mqtt);
   
       if (mqtt->I == NULL) {
  -     xx = mqchk(mqtt, "createWithOptions",
  -             MQTTAsync_createWithOptions(&mqtt->I,
  -                     mqtt->uri, mqtt->clientid,
  -                     mqtt->persist_type, mqtt->persist_ctx,
  -                     (MQTTAsync_createOptions *)AOBJ(mqtt, 'O')));
  -
  -     xx = mqchk(mqtt, "setCallbacks",
  -             MQTTAsync_setCallbacks(mqtt->I, mqtt,
  -                     zmqOnConnectionLost,
  -                     zmqOnMessageArrived,
  -                     zmqOnDeliveryComplete));
  +fprintf(stderr, "XXX %s: FIXME: handle\n", __FUNCTION__);
       }
   
       rc = RPMRC_OK;
  -#endif       /* NOTYET */
   
       return rc;
   }
  @@ -3223,16 +3016,10 @@
   {
       rpmRC rc = RPMRC_FAIL;   /* assume failure */
       if (rpmmqttIsConnected(mqtt) == RPMRC_OK) {
  -#ifdef       NOTYET
  -     mqtt->finished = 0;
  -     rc = mqchk(mqtt, "disconnect",
  -             MQTTAsync_disconnect(mqtt->I,
  -                     (const MQTTAsync_disconnectOptions*)AOBJ(mqtt, 'D')));
  -     while (!mqtt->finished)
  -         usleep(100);
  -#endif
  +fprintf(stderr, "XXX %s: FIXME: connected = 0\n", __FUNCTION__);
        if (rc)
            goto exit;
  +mqtt->connected = 0;
       }
       rc = RPMRC_OK;
   
  @@ -3246,16 +3033,10 @@
       rpmRC rc = RPMRC_FAIL;   /* assume failure */
   
       if (rpmmqttIsConnected(mqtt) == RPMRC_NOTFOUND) {
  -#ifdef       NOTYET
  -     mqtt->finished = 0;
  -     rc = mqchk(mqtt, "connect",
  -             MQTTAsync_connect(mqtt->I,
  -                     (const MQTTAsync_connectOptions*)AOBJ(mqtt, 'C')));
  -     while (!mqtt->finished)
  -         usleep(1000);
  -#endif
  +fprintf(stderr, "XXX %s: FIXME: connected = 1\n", __FUNCTION__);
        if (rc)
            goto exit;
  +mqtt->connected = 1;
       }
       rc = RPMRC_OK;
   
  @@ -3266,13 +3047,7 @@
   static
   rpmRC zmqIsConnected(rpmmqtt mqtt)
   {
  -    rpmRC rc = RPMRC_NOTFOUND;
  -
  -#ifdef       NOTYET
  -    if (mqchk(mqtt, "isconnected",
  -             MQTTAsync_isConnected(mqtt->I)))
  -     rc = RPMRC_OK;
  -#endif
  +    rpmRC rc = (mqtt->connected ? RPMRC_OK : RPMRC_NOTFOUND);
       return rc;
   }
   
  @@ -3290,72 +3065,7 @@
        ns = strlen(s);
   
       if (!rpmmqttConnect(mqtt)) {
  -#ifdef       NOTYET
  -     MQTTAsync_message *M =
  -         (MQTTAsync_message *) AOBJ(mqtt, 'M');
  -     M->payloadlen = ns;
  -     M->payload = (char *) s;
  -
  -     MQTTAsync_responseOptions *R =
  -         (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R');
  -     R->onSuccess = zmqOnSend;
  -     R->onFailure = zmqOnSendFailure;
  -
  -     mqtt->finished = 0;
  -     rc = mqchk(mqtt, "sendMessage",
  -             MQTTAsync_sendMessage(mqtt->I, topic, M, R));
  -     while (!mqtt->finished)
  -         usleep(100);
  -#endif
  -     if (rc)
  -         goto exit;
  -     rc = RPMRC_OK;
  -    }
  -
  -exit:
  -    return rc;
  -}
  -
  -static
  -rpmRC zmqSubscribeMany(rpmmqtt mqtt, int ac, char ** av)
  -{
  -    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  -
  -    if (ac <= 0)
  -     goto exit;
  -    if (!rpmmqttConnect(mqtt)) {
  -     int _lvl = RPMLOG_DEBUG;
  -     int *subqos = (int *) xcalloc(ac, sizeof(*subqos));
  -     for (int i = 0; i < ac; i++) {
  -         char * t = av[i];
  -#ifdef       NOTYET
  -         char * te = strchr(t, '?');
  -         if (te) {
  -             *te++ = '\0';
  -             if ((te = strchr(te, '='))) {
  -                 if (!strncmp(t, "qos", (te - t)))
  -                     subqos[i] = strtoul(te+1, NULL, 0);
  -             }
  -         } else
  -#endif
  -             subqos[i] = mqtt->qos;  /* XXX */
  -         rpmlog(_lvl, "%19s: %s qos(%u)\n", "subscribe", t, subqos[i]);
  -     }
  -
  -#ifdef       NOTYET
  -     MQTTAsync_responseOptions *R =
  -             (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R');
  -     R->onSuccess = zmqOnSubscribeMany;
  -     R->onFailure = zmqOnSubscribeManyFailure;
  -
  -     mqtt->finished = 0;
  -     rc = mqchk(mqtt, "subscribeMany",
  -             MQTTAsync_subscribeMany(mqtt->I, ac, av, subqos, R));
  -     while (!mqtt->finished)
  -         usleep(100);
  -#endif
  -     if (subqos)
  -         free(subqos);
  +fprintf(stderr, "XXX %s: FIXME\n", __FUNCTION__);
        if (rc)
            goto exit;
        rc = RPMRC_OK;
  @@ -3373,20 +3083,9 @@
       if (!rpmmqttConnect(mqtt)) {
        int _lvl = RPMLOG_DEBUG;
   
  +fprintf(stderr, "XXX %s: FIXME\n", __FUNCTION__);
        rpmlog(_lvl, "%19s: %s qos(%d)\n", "subscribe", topic, qos);
   
  -#ifdef       NOTYET
  -     MQTTAsync_responseOptions *R =
  -             (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R');
  -     R->onSuccess = zmqOnSubscribe;
  -     R->onFailure = zmqOnSubscribeFailure;
  -
  -     mqtt->finished = 0;
  -     rc = mqchk(mqtt, "subscribe",
  -             MQTTAsync_subscribe(mqtt->I, topic, qos, R));
  -     while (!mqtt->finished)
  -         usleep(100);
  -#endif
        if (rc)
            goto exit;
        rc = RPMRC_OK;
  @@ -3404,56 +3103,9 @@
       if (!rpmmqttConnect(mqtt)) {
        int _lvl = RPMLOG_DEBUG;
   
  +fprintf(stderr, "XXX %s: FIXME\n", __FUNCTION__);
        rpmlog(_lvl, "%19s: %s\n", "unsubscribe", topic);
   
  -#ifdef       NOTYET
  -     MQTTAsync_responseOptions *R =
  -             (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R');
  -     R->onSuccess = zmqOnUnsubscribeMany;
  -     R->onFailure = zmqOnUnsubscribeManyFailure;
  -
  -     mqtt->finished = 0;
  -     rc = mqchk(mqtt, "unsubscribe",
  -             MQTTAsync_unsubscribe(mqtt->I, topic, R));
  -     while (!mqtt->finished)
  -         usleep(100);
  -#endif
  -     if (rc)
  -         goto exit;
  -     rc = RPMRC_OK;
  -    }
  -
  -exit:
  -    return rc;
  -}
  -
  -static
  -rpmRC zmqUnsubscribeMany(rpmmqtt mqtt, int ac, char ** av)
  -{
  -    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  -
  -    if (ac <= 0)
  -     goto exit;
  -    if (!rpmmqttConnect(mqtt)) {
  -     int _lvl = RPMLOG_DEBUG;
  -     for (int i = 0; i < ac; i++) {
  -         char * t = av[i];
  -         rpmlog(_lvl, "%19s: %s\n", "unsubscribe", t);
  -     }
  -
  -#ifdef       NOTYET
  -     MQTTAsync_responseOptions *R =
  -             (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R');
  -     R->onSuccess = zmqOnUnsubscribeMany;
  -     R->onFailure = zmqOnUnsubscribeManyFailure;
  -
  -     mqtt->finished = 0;
  -     rc = mqchk(mqtt, "unsubscribeMany",
  -             MQTTAsync_unsubscribeMany(mqtt->I, ac, av, R));
  -     while (!mqtt->finished)
  -         usleep(100);
  -#endif
  -
        if (rc)
            goto exit;
        rc = RPMRC_OK;
  @@ -3470,10 +3122,11 @@
   static
   struct mqttVec_s zmqVec = {
       .name            = "zmq",
  -    .port            = 0xfffe,       /* XXX W2DO? */
  -    .sport           = 0xfffe,       /* XXX W2DO? */
  +    .uri             = "zmq://localhost/",   /* XXX W2DO? */
  +    .port            = 0xfffe,               /* XXX W2DO? */
  +    .sport           = 0xfffe,               /* XXX W2DO? */
       .prefix          = "zmq_",
  -    .errs            = zmqErrs,      /* XXX W2DO? */
  +    .errs            = zmqErrs,              /* XXX W2DO? */
       .nerrs           = sizeof(zmqErrs) / sizeof(zmqErrs[0]),
       .destroy         = zmqDestroy,
       .create          = zmqCreate,
  @@ -3482,8 +3135,8 @@
       .isconnected     = zmqIsConnected,       /* XXX */
       .unsubscribe     = zmqUnsubscribe,
       .subscribe               = zmqSubscribe,
  -    .unsubscribeMany = zmqUnsubscribeMany,
  -    .subscribeMany   = zmqSubscribeMany,
  +    .unsubscribeMany = _rpmmqttUnsubscribeMany,
  +    .subscribeMany   = _rpmmqttSubscribeMany,
       .sendMessage     = zmqSendMessage,
   };
   #endif       /* WITH_ZEROMQ */
  @@ -3741,52 +3394,6 @@
       con = poptGetContext(av[0], ac, (const char **)av,
                rpmmqttOptionsTable, _popt_context_flags);
   
  -mqtt->_progname = _free(mqtt->_progname);
  -mqtt->_progmode = _free(mqtt->_progmode);
  -    {        const char *arg0 = poptGetInvocationName(con);
  -     const char * _progname;
  -     const char * _progmode;
  -
  -     if ((_progname = strrchr(arg0, '/')) != NULL) _progname++;
  -     else _progname = arg0;
  -     if (!strncmp(_progname, "lt-", sizeof("lt-")-1))
  -          _progname += sizeof("lt-")-1;
  -
  -     /* XXX initialized based on %__mqtt_module */
  -     /* XXX initialized based on av[1] */
  -     /* XXX initialized based on URI scheme */
  -#ifdef       WITH_PAHO
  -     if (mqtt->vec == NULL
  -      && (strstr(_progname, "paho") || strstr(_progname, "mqtt")))
  -         mqtt->vec = &pahoVec;
  -#endif
  -#ifdef       WITH_MOSQUITTO
  -     if (mqtt->vec == NULL
  -      && (strstr(_progname, "mosquitto") || strstr(_progname, "mosq") || 
strstr(_progname, "mqtt")))
  -         mqtt->vec = &mosqVec;
  -#endif
  -#ifdef       WITH_RABBITMQ
  -     if (mqtt->vec == NULL
  -      && (strstr(_progname, "rabbitmq") || strstr(_progname, "amqp")))
  -         mqtt->vec = &amqpVec;
  -#endif
  -#ifdef       WITH_PROTON
  -     if (mqtt->vec == NULL
  -      && (strstr(_progname, "proton") || strstr(_progname, "qpid") 
strstr(_progname, "amqp")))
  -         mqtt->vec = &qpidVec;
  -#endif
  -#ifdef       WITH_ZEROMQ
  -     if (mqtt->vec == NULL
  -      && (strstr(_progname, "zeromq") || strstr(_progname, "zmq")))
  -         mqtt->vec = &zmqVec;
  -#endif
  -     /* XXX mqtt->vec default? */
  -
  -     _progmode = (strstr(_progname, "sub") ? "sub" : "pub");
  -     mqtt->_progname = xstrdup(_progname);
  -     mqtt->_progmode = xstrdup(_progmode);
  -    }
  -
       while ((xx = poptGetNextOpt(con)) > 0)
       switch (xx) {
       default:
  @@ -4060,9 +3667,59 @@
       static mqttFlags _flags = MQTT_FLAGS_DEFAULT;    /* CLEAN|EOL */
       mqtt->flags = (flags ? flags : _flags);
   
  +    /* -- Set module vector and program name/mode from av[0] */
  +assert(av && av[0]);
  +mqtt->_progname = _free(mqtt->_progname);
  +mqtt->_progmode = _free(mqtt->_progmode);
  +    {        const char *arg0 = av[0];
  +     const char * _progname;
  +     const char * _progmode;
  +
  +     if ((_progname = strrchr(arg0, '/')) != NULL) _progname++;
  +     else _progname = arg0;
  +     if (!strncmp(_progname, "lt-", sizeof("lt-")-1))
  +          _progname += sizeof("lt-")-1;
  +
  +     /* XXX initialized based on %__mqtt_module */
  +     /* XXX initialized based on av[1] */
  +     /* XXX initialized based on URI scheme */
  +#ifdef       WITH_PAHO
  +     if (mqtt->vec == NULL
  +      && (strstr(_progname, "paho") || strstr(_progname, "mqtt")))
  +         mqtt->vec = &pahoVec;
  +#endif
  +#ifdef       WITH_MOSQUITTO
  +     if (mqtt->vec == NULL
  +      && (strstr(_progname, "mosquitto") || strstr(_progname, "mosq") || 
strstr(_progname, "mqtt")))
  +         mqtt->vec = &mosqVec;
  +#endif
  +#ifdef       WITH_RABBITMQ
  +     if (mqtt->vec == NULL
  +      && (strstr(_progname, "rabbitmq") || strstr(_progname, "amqp")))
  +         mqtt->vec = &amqpVec;
  +#endif
  +#ifdef       WITH_PROTON
  +     if (mqtt->vec == NULL
  +      && (strstr(_progname, "proton") || strstr(_progname, "qpid") || 
strstr(_progname, "amqp")))
  +         mqtt->vec = &qpidVec;
  +#endif
  +#ifdef       WITH_ZEROMQ
  +     if (mqtt->vec == NULL
  +      && (strstr(_progname, "zeromq") || strstr(_progname, "zmq")))
  +         mqtt->vec = &zmqVec;
  +#endif
  +     /* XXX mqtt->vec default? */
  +
  +     _progmode = (strstr(_progname, "sub") ? "sub" : "pub");
  +     mqtt->_progname = xstrdup(_progname);
  +     mqtt->_progmode = xstrdup(_progmode);
  +    }
  +assert(mqtt->vec);
  +
       /* -- Initialize oddball values. */
   if (_rpmmqtt_debug < 0)
   fprintf(stderr, "======= Initialize oddball values.\n");
  +
        /* XXX FIXME: needed for --help POPT_ARGFLAG_DEFAULT spewage only */
       (void) rpmmqttExpand(mqtt, &mqtt->protocol_version,
                "auto", NULL);
  @@ -4077,20 +3734,18 @@
                mqtt->idprefix, "-%%{pid}", NULL);
   
       /* -- Initialize values from default URI. */
  -    static const char _mqtt_uri[] =
  -     "mqtt://localhost:1883/rpm/mqtt"
  -             "?qos=0"
  -             "&timeout=10000"
  -             "&max_inflight=20";
  -             "&keepalive=60";
  +assert(mqtt && mqtt->vec);
       uri = rpmmqttExpand(mqtt, NULL,
  -             "%{?_mqtt_uri}%{!?_mqtt_uri:", _mqtt_uri, "}", NULL);
  -
  +             "%{?_mqtt_uri}%{!?_mqtt_uri:",
  +                     (mqtt->vec->uri ? mqtt->vec->uri : ""),
  +                     "}", NULL);
  +    if (uri && *uri) {
   if (_rpmmqtt_debug < 0)
   fprintf(stderr, "======= Initialize values from default uri\n\t%s\n", uri);
  -    rc = rpmmqttInitURI(mqtt, uri);
  -    if (rc)
  -        goto exit;
  +     rc = rpmmqttInitURI(mqtt, uri);
  +     if (rc)
  +         goto exit;
  +    }
   
       /* -- Override with values from macros (if any). */
   if (_rpmmqtt_debug < 0)
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmqtt.h
  ============================================================================
  $ cvs diff -u -r1.1.2.20 -r1.1.2.21 rpmmqtt.h
  --- rpm/rpmio/rpmmqtt.h       14 Jul 2016 11:09:49 -0000      1.1.2.20
  +++ rpm/rpmio/rpmmqtt.h       14 Jul 2016 12:59:21 -0000      1.1.2.21
  @@ -160,6 +160,7 @@
   
   struct mqttVec_s {
       const char * name;
  +    const char * uri;
       uint16_t port;
       uint16_t sport;
       const char * prefix;
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/tmqtt.c
  ============================================================================
  $ cvs diff -u -r1.1.2.15 -r1.1.2.16 tmqtt.c
  --- rpm/rpmio/tmqtt.c 14 Jul 2016 11:09:49 -0000      1.1.2.15
  +++ rpm/rpmio/tmqtt.c 14 Jul 2016 12:59:21 -0000      1.1.2.16
  @@ -12,10 +12,6 @@
   #include <poptIO.h>
   #include <rpmdefs.h>
    
  -#ifdef  WITH_MQTT
  -#include <MQTTAsync.h>
  -#endif
  -
   #define      _RPMMQTT_INTERNAL
   #include "rpmmqtt.h"
   
  @@ -53,73 +49,28 @@
   int
   main(int argc, char *argv[])
   {
  -#ifdef       UNUSED
  -static char _mqtt_argv[] = "\
  -  mqtt://luser:jasnl@localhost:1883/rpm/mqtt?qos=1,timeout=10000,trace=4 \n\
  -  rpm/#?qos=1 \n\
  -";
  -#ifdef       REF
  -  $SYS/broker/version?qos=0 \n\
  -  $SYS/broker/timestamp?qos=0 \n\
  -  $SYS/broker/uptime?qos=0 \n\
  -  $SYS/broker/clients/#?qos=0 \n\
  -  $SYS/broker/messages/#?qos=0 \n\
  -  $SYS/broker/subscriptions/#?qos=0 \n\
  -  $SYS/broker/heap/#?qos=0 \n\
  -  $SYS/broker/publish/#?qos=0 \n\
  -  $SYS/broker/bytes/#?qos=0 \n
  -#endif
  -    ARGV_t av = poptGetArgs(optCon);
  -    int ac = argvCount(av);
  -    static char *_av[] = { _mqtt_argv, NULL, };
  -#endif
       rpmmqtt mqtt = NULL;
       int rc = -1;
   
  -    /* Initialize the _mqtt_ macro context */
  +    if (argc >= 2) {
  +     if (!strcmp(argv[1], "paho"))
  +         argv[0] = "paho_pub";
  +     if (!strcmp(argv[1], "mosquitto"))
  +         argv[0] = "mosquitto_pub";
  +     if (!strcmp(argv[1], "amqp"))
  +         argv[0] = "amqp_pub";
  +     if (!strcmp(argv[1], "qpid"))
  +         argv[0] = "qpid_pub";
  +     if (!strcmp(argv[1], "zmq"))
  +         argv[0] = "zmq_pub";
  +    }
  +
  +/* XXX global macros to initialize correctly. */
  +(void) rpmmqttDefineMacro(NULL, "_mqtt_persist 2", 0);
  +(void) rpmmqttDefineMacro(NULL, "_mqtt_cachedir /var/cache/mqtt", 0);
   
  -#undef       WITH_RABBITMQ
  -#ifdef       WITH_RABBITMQ
  -static char _mqtt_uri[] =
  -    "_mqtt_uri amqp://guest:guest@localhost:5672/"
  -     "?vhost=/"
  -     "&exchange=amq.direct"
  -     "&queue=test queue"
  -     "&verify=1"
  -     "&fail_if_no_peer_cert=1"
  -     "&sasl_mechanism=plain"
  -     "&sasl_mechanism=amqplain"
  -     "&keepalive=60"
  -     "&connection_timeout=60"
  -     "&channel_max=60"
  -     "";
  -#else
  -static char _mqtt_uri[] =
  -    "_mqtt_uri mqtt://luser:jasnl@localhost:1883/rpm/mqtt"
  -     "?qos=1;"
  -     "timeout=10000;"
  -     "trace=4;"
  -     "#rpm/#";
  -#endif
  -(void) rpmmqttDefineMacro(mqtt, _mqtt_uri, 0);
  -(void) rpmmqttDefineMacro(mqtt, "_mqtt_persist 2", 0);
  -(void) rpmmqttDefineMacro(mqtt, "_mqtt_cachedir /var/cache/mqtt", 0);
  -
  -(void) rpmmqttDefineMacro(mqtt, "nil %{!?nil}", 0);
  -(void) rpmmqttDefineMacro(mqtt, "_mqtt_prefix %{now}", 0);
  -
  -#ifdef       DYING
  -(void) rpmmqttDefineMacro(mqtt, "_mqtt_trace 4", 0);
  -(void) rpmmqttDefineMacro(mqtt, "_mqtt_user luser", 0);
  -(void) rpmmqttDefineMacro(mqtt, "_mqtt_pass jasnl", 0);
  -(void) rpmmqttDefineMacro(mqtt, "_mqtt_host localhost", 0);
  -(void) rpmmqttDefineMacro(mqtt, "_mqtt_port 1883", 0);
  -(void) rpmmqttDefineMacro(mqtt, "_mqtt_clientid rpm-%{pid}", 0);
  -(void) rpmmqttDefineMacro(mqtt, "_mqtt_topic rpm/#", 0);
  -(void) rpmmqttDefineMacro(mqtt, "_mqtt_qos 1", 0);
  -(void) rpmmqttDefineMacro(mqtt, "_mqtt_timeout 10000", 0);
  -(void) rpmmqttDefineMacro(mqtt, "_mqtt_prefix %{now} rpm pid %{pid} on 
cpu%{cpu} %{user}:%{group} ", 0);
  -#endif
  +(void) rpmmqttDefineMacro(NULL, "nil %{!?nil}", 0);
  +(void) rpmmqttDefineMacro(NULL, "_mqtt_prefix %{now}", 0);
   
       mqtt = rpmmqttNew(argv, 0);
       rc = _DoMQTT(mqtt);
  @@ .
______________________________________________________________________
RPM Package Manager                                    http://rpm5.org
CVS Sources Repository                                rpm-cvs@rpm5.org

Reply via email to