RPM Package Manager, CVS Repository
  http://rpm5.org/cvs/
  ____________________________________________________________________________

  Server: rpm5.org                         Name:   Jeff Johnson
  Root:   /v/rpm/cvs                       Email:  j...@rpm5.org
  Module: rpm                              Date:   14-Jul-2016 20:52:27
  Branch: rpm-5_4                          Handle: 2016071418522700

  Modified files:           (Branch: rpm-5_4)
    rpm/rpmio               macro.c rpmmqtt.c rpmmqtt.h

  Log:
    - mqtt: lily gilding.

  Summary:
    Revision    Changes     Path
    2.249.2.42  +17 -1      rpm/rpmio/macro.c
    1.1.2.25    +316 -315   rpm/rpmio/rpmmqtt.c
    1.1.2.22    +4  -0      rpm/rpmio/rpmmqtt.h
  ____________________________________________________________________________

  patch -p0 <<'@@ .'
  Index: rpm/rpmio/macro.c
  ============================================================================
  $ cvs diff -u -r2.249.2.41 -r2.249.2.42 macro.c
  --- rpm/rpmio/macro.c 14 Jul 2016 11:09:49 -0000      2.249.2.41
  +++ rpm/rpmio/macro.c 14 Jul 2016 18:52:27 -0000      2.249.2.42
  @@ -14,6 +14,7 @@
   #define      iseol(_c)       ((char)(_c) == '\n' || (char)(_c) == '\r')
   
   #define      STREQ(_t, _f, _fn)      ((_fn) == (sizeof(_t)-1) && 
!strncmp((_t), (_f), (_fn)))
  +#define      MEMEM(_t, _f, _fn)      memem((_f), (_fn), (_t), sizeof(_t)-1)
   
   #ifdef DEBUG_MACROS
   #undef       WITH_LUA        /* XXX fixme */
  @@ -2301,7 +2302,22 @@
   #if defined(WITH_PAHO) || defined(WITH_MOSQUITTO) || defined(WITH_RABBITMQ) 
|| defined(WITH_PROTON) || defined(WITH_ZEROMQ)
        if (STREQ("mqtt", f, fn)
         || STREQ("pub", f, fn)
  -      || STREQ("sub", f, fn))
  +      || STREQ("sub", f, fn)
  +      || STREQ("paho", f, fn)
  +      || STREQ("mosquitto", f, fn)
  +      || STREQ("amqp", f, fn)
  +      || STREQ("qpid", f, fn)
  +      || STREQ("zmq", f, fn)
  +      || STREQ("paho_pub", f, fn)
  +      || STREQ("amqp_pub", f, fn)
  +      || STREQ("mosquitto_pub", f, fn)
  +      || STREQ("qpid_pub", f, fn)
  +      || STREQ("zmq_pub", f, fn)
  +      || STREQ("paho_sub", f, fn)
  +      || STREQ("amqp_sub", f, fn)
  +      || STREQ("mosquitto_sub", f, fn)
  +      || STREQ("qpid_sub", f, fn)
  +      || STREQ("zmq_sub", f, fn))
        {
            RPMIOPOOL_INTERP_EXPAND(mqtt)
            continue;
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmqtt.c
  ============================================================================
  $ cvs diff -u -r1.1.2.24 -r1.1.2.25 rpmmqtt.c
  --- rpm/rpmio/rpmmqtt.c       14 Jul 2016 12:59:21 -0000      1.1.2.24
  +++ rpm/rpmio/rpmmqtt.c       14 Jul 2016 18:52:27 -0000      1.1.2.25
  @@ -150,6 +150,14 @@
       _ENTRY(RETAIN),
       _ENTRY(WILL_RETAIN),
       _ENTRY(BUFFER),
  +    _ENTRY(DURABLE),
  +    _ENTRY(EXCLUSIVE),
  +    _ENTRY(NOACK),
  +    _ENTRY(IFEMPTY),
  +    _ENTRY(IFUNUSED),
  +    _ENTRY(PASSIVE),
  +    _ENTRY(AUTODELETE),
  +    _ENTRY(NOLOCAL),
   #undef       _ENTRY
   };
   static size_t nMqttFlags = sizeof(MqttFlags) / sizeof(MqttFlags[0]);
  @@ -1172,36 +1180,34 @@
       rpmRC rc = RPMRC_FAIL;   /* assume failure */
       int xx;
   
  -#ifdef       DYING
  -mqtt->trace = 4;     /* XXX */
  -#endif
  -    if (mqtt->trace && rpmIsDebug()) {
  -     xx = mqchk(mqtt, "setTraceCallback",
  +    if (mqtt->I == NULL) {
  +     if (mqtt->trace && rpmIsDebug()) {
  +         xx = mqchk(mqtt, "setTraceCallback",
                (MQTTAsync_setTraceCallback(onTrace), 0));
  -     xx = mqchk(mqtt, "setTraceLevel",
  +         xx = mqchk(mqtt, "setTraceLevel",
                (MQTTAsync_setTraceLevel((enum 
MQTTASYNC_TRACE_LEVELS)mqtt->trace), 0));
  -    }
  +     }
   
  -    rpmlog(_lvl, "==================== %s\n", mqtt->vec->name);
  +     rpmlog(_lvl, "==================== %s\n", mqtt->vec->name);
   
  -    if (!oneshot) {
  -     if (mqtt->trace == 0) {
  -         MQTTAsync_nameValue *NV = MQTTAsync_getVersionInfo();
  -         while (NV->name) {
  -             rpmlog(_lvl, "%19s: %s\n", NV->name, NV->value);
  -             NV++;
  +     if (!oneshot) {
  +         if (mqtt->trace == 0) {
  +             MQTTAsync_nameValue *NV = MQTTAsync_getVersionInfo();
  +             while (NV->name) {
  +                 rpmlog(_lvl, "%19s: %s\n", NV->name, NV->value);
  +                 NV++;
  +             }
            }
  +         oneshot++;
        }
  -     oneshot++;
  -    }
   
  -    {
  -     char portstr[32] = "";
  -     (void) snprintf(portstr, sizeof(portstr), "%d",
  -             (mqtt->port ? mqtt->port : 1883));
  -     (void) rpmmqttExpand(mqtt, &mqtt->uri,
  -             "tcp://", mqtt->host, ":", portstr, NULL);
  -    }
  +     {
  +         char portstr[32] = "";
  +         (void) snprintf(portstr, sizeof(portstr), "%d",
  +                     (mqtt->port ? mqtt->port : 1883));
  +         (void) rpmmqttExpand(mqtt, &mqtt->uri,
  +                     "tcp://", mqtt->host, ":", portstr, NULL);
  +     }
   
       /* XXX set through uri?query */
   static const char _mqtt_persist[] =
  @@ -1211,45 +1217,50 @@
        "%{?_mqtt_cachedir}%{!?_mqtt_cachedir:/var/cache/mqtt}";
   char *persist_path = rpmGetPath(_mqtt_cachedir, NULL);
   
  -    rpmlog(_lvl, "%19s: %s\n", "uri", mqtt->uri);
  -    rpmlog(_lvl, "%19s: %s\n", "host", mqtt->host);
  -    rpmlog(_lvl, "%19s: %d\n", "port", mqtt->port);
  -    rpmlog(_lvl, "%19s: %s\n", "user", mqtt->user);
  -    rpmlog(_lvl, "%19s: %s\n", "pass", mqtt->password);
  +     if (strcmp(mqtt->uri, "tcp://localhost:1883"))
  +         rpmlog(_lvl, "%19s: %s\n", "uri", mqtt->uri);
  +     if (strcmp(mqtt->host, "localhost"))
  +         rpmlog(_lvl, "%19s: %s\n", "host", mqtt->host);
  +     if (mqtt->port != 1883)
  +         rpmlog(_lvl, "%19s: %d\n", "port", mqtt->port);
  +     if (mqtt->user)
  +         rpmlog(_lvl, "%19s: %s\n", "user", mqtt->user);
  +     if (mqtt->password)
  +         rpmlog(_lvl, "%19s: %s\n", "pass", mqtt->password);
   
  -    rpmlog(_lvl, "%19s: %s\n", "clientid", mqtt->clientid);
  -    rpmlog(_lvl, "%19s: %s qos(%d) timeout(%u msecs)\n",
  +     rpmlog(_lvl, "%19s: %s\n", "clientid", mqtt->clientid);
  +     rpmlog(_lvl, "%19s: %s qos(%d) timeout(%u msecs)\n",
                "topic", mqtt->topic, mqtt->qos, mqtt->timeout);
  -    rpmlog(_lvl, "%19s: type(%u) %s\n",
  +     rpmlog(_lvl, "%19s: type(%u) %s\n",
                "persist", mqtt->persist_type,
                (mqtt->persist_type ? persist_path : ""));
  -    rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags));
  -    if (mqtt->trace)
  -     rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace);
  +     rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags));
  +     if (mqtt->trace)
  +         rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace);
   
   mqtt->persist_path = _free(mqtt->persist_path);
   mqtt->persist_ctx = _free(mqtt->persist_ctx);
  -    switch (mqtt->persist_type) {
  -    default:
  -    case MQTTCLIENT_PERSISTENCE_NONE:
  -     mqtt->persist_ctx = NULL;
  -     break;
  -    case MQTTCLIENT_PERSISTENCE_DEFAULT:
  -      {
  -     mqtt->persist_path = xstrdup(persist_path);
  -     /* XXX rpmmqttFini double free */
  -     mqtt->persist_ctx = (void *)xstrdup(mqtt->persist_path);
  -      } break;
  -    case MQTTCLIENT_PERSISTENCE_USER:
  -      {
  -     mqtt->persist_path = xstrdup(persist_path);
  -     MQTTClient_persistence * ctx =
  +     switch (mqtt->persist_type) {
  +     default:
  +     case MQTTCLIENT_PERSISTENCE_NONE:
  +         mqtt->persist_ctx = NULL;
  +         break;
  +     case MQTTCLIENT_PERSISTENCE_DEFAULT:
  +       {
  +         mqtt->persist_path = xstrdup(persist_path);
  +         /* XXX rpmmqttFini double free */
  +         mqtt->persist_ctx = (void *)xstrdup(mqtt->persist_path);
  +       } break;
  +     case MQTTCLIENT_PERSISTENCE_USER:
  +       {
  +         mqtt->persist_path = xstrdup(persist_path);
  +         MQTTClient_persistence * ctx =
                (MQTTClient_persistence *) xmalloc(sizeof(*ctx));
  -     *ctx = _mqtt_persistence;       /* structure assignment */
  -     ctx->context = mqtt;
  -     mqtt->persist_ctx = ctx;
  -      } break;
  -    }
  +         *ctx = _mqtt_persistence;   /* structure assignment */
  +         ctx->context = mqtt;
  +         mqtt->persist_ctx = ctx;
  +       } break;
  +     }
   persist_path = _free(persist_path);
   
   #ifdef       DYING
  @@ -1257,7 +1268,6 @@
   dumpMQTT(__FUNCTION__, mqtt);
   #endif
   
  -    if (mqtt->I == NULL) {
        xx = mqchk(mqtt, "createWithOptions",
                MQTTAsync_createWithOptions(&mqtt->I,
                        mqtt->uri, mqtt->clientid,
  @@ -1716,38 +1726,36 @@
       int _lvl = RPMLOG_DEBUG;
       int xx;
   
  -#ifdef       DYING
  -mqtt->trace = 4;     /* XXX */
  -#endif
  +    if (mqtt->I == NULL) {
   
  -    rpmlog(_lvl, "==================== %s\n", mqtt->vec->name);
  +     rpmlog(_lvl, "==================== %s\n", mqtt->vec->name);
   
  -    if (!oneshot) {
  -     int major = 0;
  -     int minor = 0;
  -     int revision = 0;
  -     int version = mosquitto_lib_version(&major, &minor, &revision);
  -     rpmlog(_lvl, "%19s: %d.%d.%d (%d)\n", "version",
  -             major, minor, revision, version);
  -     oneshot++;
  -    }
  +     if (!oneshot) {
  +         int major = 0;
  +         int minor = 0;
  +         int revision = 0;
  +         int version = 0;
   
  -    rpmlog(_lvl, "%19s: %s\n", "clientid", mqtt->clientid);
  -    rpmlog(_lvl, "%19s: %s qos(%d) timeout(%u msecs)\n",
  +         xx = mqchk(mqtt, "lib_init",
  +             mosquitto_lib_init());
  +         version = mosquitto_lib_version(&major, &minor, &revision);
  +         rpmlog(_lvl, "%19s: %d.%d.%d (%d)\n", "version",
  +                     major, minor, revision, version);
  +         oneshot++;
  +     }
  +
  +     rpmlog(_lvl, "%19s: %s\n", "clientid", mqtt->clientid);
  +     rpmlog(_lvl, "%19s: %s qos(%d) timeout(%u msecs)\n",
                "topic", mqtt->topic, mqtt->qos, mqtt->timeout);
  -    rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags));
  -    if (mqtt->trace)
  -     rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace);
  +     rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags));
  +     if (mqtt->trace)
  +         rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace);
   
   #ifdef       DYING
   mqtt->u = NULL;
   dumpMQTT(__FUNCTION__, mqtt);
   #endif
   
  -    xx = mqchk(mqtt, "lib_init",
  -             mosquitto_lib_init());
  -
  -    if (mqtt->I == NULL) {
        xx = mqchk(mqtt, "new",
                (mqtt->I = mosquitto_new(mqtt->clientid,
                        (MF_ISSET(CLEAN) ? true : false),
  @@ -1759,14 +1767,13 @@
        mosquitto_message_callback_set(mqtt->I, mosqOnMessage);
        mosquitto_subscribe_callback_set(mqtt->I, mosqOnSubscribe);
        mosquitto_unsubscribe_callback_set(mqtt->I, mosqOnUnsubscribe);
  -    }
   
   #ifdef       NOTYET
  -    if (mqtt->trace && rpmIsDebug())
  +     if (mqtt->trace && rpmIsDebug())
   #else
  -    if (mqtt->trace || mqtt->debug)
  +     if (mqtt->trace || mqtt->debug)
   #endif
  -     mosquitto_log_callback_set(mqtt->I, mosqOnLog);
  +         mosquitto_log_callback_set(mqtt->I, mosqOnLog);
   
   /*
    * Example 1:
  @@ -1777,15 +1784,17 @@
    *      delay=3, delay_max=30, exponential_backoff=True
    *      Delays would be: 3, 6, 12, 24, 30, 30, ...
    */
  -    xx = mqchk(mqtt, "reconnect_delay_set",
  +     xx = mqchk(mqtt, "reconnect_delay_set",
                mosquitto_reconnect_delay_set(mqtt->I, 2, 10, false));
   
  -    xx = mqchk(mqtt, "user_data_set",
  +     xx = mqchk(mqtt, "user_data_set",
                (mosquitto_user_data_set(mqtt->I, mqtt), 0));
   
  -    xx = mqchk(mqtt, "loop_start",
  +     xx = mqchk(mqtt, "loop_start",
                mosquitto_loop_start(mqtt->I));
   
  +    }
  +
       rc = RPMRC_OK;
   
       return rc;
  @@ -2188,7 +2197,6 @@
   #define mqchkrpc(_o, _m, _rc)  \
       Xmqchkrpc(_o, _m, _rc, _rpmmqtt_debug, __FUNCTION__, __FILE__, __LINE__)
   
  -#ifdef       REFERENCE
   static void dump_row(long count, int numinrow, int *chs)
   {
       int i;
  @@ -2277,7 +2285,6 @@
        printf("%08lX:\n", count);
       }
   }
  -#endif       /* REFERENCE */
   
   static
   rpmRC amqpDestroy(rpmmqtt mqtt)
  @@ -2297,68 +2304,74 @@
       static int oneshot;
       int _lvl = RPMLOG_DEBUG;
       rpmRC rc = RPMRC_FAIL;   /* assume failure */
  -    int xx;
  -
  -#ifdef       DYING
  -mqtt->trace = 4;     /* XXX */
  -#endif
  -
  -    rpmlog(_lvl, "==================== %s\n", mqtt->vec->name);
   
  -    if (!oneshot) {
  -     int major = AMQP_VERSION_MAJOR;
  -     int minor = AMQP_VERSION_MINOR;
  -     int patch = AMQP_VERSION_PATCH;
  -     int version = AMQP_VERSION;
  -     rpmlog(_lvl, "%19s: %d.%d.%d (%d)\n", "version",
  -             major, minor, patch, version);
  -     oneshot++;
  -    }
  +    if (mqtt->I == NULL) {
  +     rpmlog(_lvl, "==================== %s\n", mqtt->vec->name);
   
  -    rpmlog(_lvl, "%19s: %s\n", "vhost", mqtt->vhost);
  -    rpmlog(_lvl, "%19s: %s\n", "queue", mqtt->queue);
  -    rpmlog(_lvl, "%19s: %s\n", "exchange", mqtt->exchange);
  -    rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags));
  -    if (mqtt->trace)
  -     rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace);
  +     if (!oneshot) {
  +         int major = AMQP_VERSION_MAJOR;
  +         int minor = AMQP_VERSION_MINOR;
  +         int patch = AMQP_VERSION_PATCH;
  +         int version = AMQP_VERSION;
  +         rpmlog(_lvl, "%19s: %d.%d.%d (0x%x)\n", "version",
  +                     major, minor, patch, version);
  +         oneshot++;
  +     }
  +
  +     rpmlog(_lvl, "%19s: %s\n", "vhost", mqtt->vhost);
  +     rpmlog(_lvl, "%19s: %s\n", "queue", mqtt->queue);
  +     rpmlog(_lvl, "%19s: %s\n", "exchange", mqtt->exchange);
  +     rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags));
  +     if (mqtt->trace)
  +         rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace);
   
   #ifdef       DYING
   mqtt->u = NULL;
   dumpMQTT(__FUNCTION__, mqtt);
   #endif
   
  -    if (mqtt->I == NULL) {
        amqp_socket_t *socket = NULL;
        int port;
   
  -     xx = mqchk(mqtt, "new_connection",
  +     rc = mqchk(mqtt, "new_connection",
                ((mqtt->I = amqp_new_connection()) == NULL));
   
        if (mqtt->cacertfile || mqtt->_capath) {
            port = mqtt->vec->sport;    /* XXX elsewhere */
  -         xx = mqchk(mqtt, "ssl_socket_new",
  +         rc = mqchk(mqtt, "ssl_socket_new",
                    ((socket = amqp_ssl_socket_new(mqtt->I)) == NULL));
  -         if (socket == NULL)
  +         if (rc || socket == NULL)
                goto exit;
  +#ifdef       NOTYET
  +  amqp_ssl_socket_set_verify_peer(socket, 1);
  +  amqp_ssl_socket_set_verify_hostname(socket, 1);
  +#endif
            if (mqtt->cacertfile)
  -             xx = mqchk(mqtt, "ssl_socket_set_cacert",
  +             rc = mqchk(mqtt, "ssl_socket_set_cacert",
                        amqp_ssl_socket_set_cacert(mqtt->I, mqtt->cacertfile));
            if (mqtt->keyfile)
  -             xx = mqchk(mqtt, "ssl_socket_set_key",
  +             rc = mqchk(mqtt, "ssl_socket_set_key",
                        amqp_ssl_socket_set_key(mqtt->I,
                                mqtt->certfile, mqtt->keyfile));
        } else {
            port = mqtt->vec->port;     /* XXX elsewhere */
  -         xx = mqchk(mqtt, "tcp_socket_new",
  +         rc = mqchk(mqtt, "tcp_socket_new",
                ((socket = amqp_tcp_socket_new(mqtt->I)) == NULL));
  -         if (socket == NULL)
  +         if (rc || socket == NULL)
                goto exit;
        }
   
  -if (_rpmmqtt_debug)
  -fprintf(stderr, "*** socket_open(%p,\"%s:%d\")\n", socket, mqtt->host, port);
  -     xx = mqchk(mqtt, "socket_open",
  +#ifdef       NOTYET
  +     {   strict timeval tv = { mqtt->connect_timeout, 0 };
  +         rc = mqchk(mqtt, "socket_open_noblock",
  +             amqp_socket_open(socket, mqtt->host, port, &tv));
  +
  +     }
  +#endif
  +     rc = mqchk(mqtt, "socket_open",
                amqp_socket_open(socket, mqtt->host, port));
  +     if (rc)
  +         goto exit;
   
       }
   
  @@ -2374,12 +2387,14 @@
       rpmRC rc = RPMRC_FAIL;   /* assume failure */
   
       if (rpmmqttIsConnected(mqtt) == RPMRC_OK) {
  +     amqp_channel_t _channel = 1;
  +     int _code = AMQP_REPLY_SUCCESS;
        rc = mqchkrpc(mqtt, "channel_close",
  -             amqp_channel_close(mqtt->I, 1, AMQP_REPLY_SUCCESS));
  +             amqp_channel_close(mqtt->I, _channel, _code));
        if (rc)
            goto exit;
        rc = mqchkrpc(mqtt, "connection_close",
  -             amqp_connection_close(mqtt->I, AMQP_REPLY_SUCCESS));
  +             amqp_connection_close(mqtt->I, _code));
        if (rc)
            goto exit;
   mqtt->connected = 0;
  @@ -2395,26 +2410,35 @@
       rpmRC rc = RPMRC_FAIL;   /* assume failure */
   
       if (rpmmqttIsConnected(mqtt) == RPMRC_NOTFOUND) {
  -     amqp_channel_open_ok_t * open_ok = NULL;
  -     const char * username = "guest";        /* XXX elsewhere */
  -     const char * password = "guest";        /* XXX elsewhere */
  +     int _channel_max = AMQP_DEFAULT_MAX_CHANNELS;
  +     int _frame_max = AMQP_DEFAULT_FRAME_SIZE;
  +     const amqp_table_t *_properties = NULL;
  +     /* XXX AMQP_SASL_METHOD_EXTERNAL uses stdargs */
  +     amqp_sasl_method_enum _sasl_method =
  +                     (mqtt->_sasl_mechanism == NULL
  +                     || !strcmp(mqtt->_sasl_mechanism, "plain")
  +                             ? AMQP_SASL_METHOD_PLAIN
  +                             : AMQP_SASL_METHOD_PLAIN); /* XXX EXTERNAL */
   
  -if (_rpmmqtt_debug)
  -fprintf(stderr, "*** login(%p, \"%s\", %d,%d,%d, \"%s:%s\")\n", mqtt->I, 
mqtt->vhost, 0, 131072, mqtt->keepalive, username, password);
  -     rc = mqchkrpc(mqtt, "login",
  -             amqp_login(mqtt->I,
  +     rc = mqchkrpc(mqtt, "login_with_properties",
  +             amqp_login_with_properties(mqtt->I,
                        (mqtt->vhost ? mqtt->vhost : "/"),
  -                     0, 131072, mqtt->keepalive,
  -                     AMQP_SASL_METHOD_PLAIN,
  -                     username, password));
  +                     _channel_max,
  +                     _frame_max,
  +                     mqtt->keepalive,
  +                     _properties,
  +                     _sasl_method,
  +                     mqtt->user,
  +                     mqtt->password));
        if (rc)
            goto exit;
   
  +     amqp_channel_open_ok_t * open_ok = NULL;
  +     amqp_channel_t _channel = 1;
        rc = mqchk(mqtt, "channel_open",
  -             ((open_ok = amqp_channel_open(mqtt->I, 1)),
  +             ((open_ok = amqp_channel_open(mqtt->I, _channel)),
                0));
  -
  -         rc = mqchkrpc(mqtt, "get_rpc_reply",
  +     rc = mqchkrpc(mqtt, "get_rpc_reply",
                amqp_get_rpc_reply(mqtt->I));
   
        if (rc)
  @@ -2448,18 +2472,31 @@
        ns = strlen(s);
   
       if (!rpmmqttConnect(mqtt)) {
  +     amqp_channel_t _channel = 1;
  +     amqp_boolean_t _mandatory = false;
  +     amqp_boolean_t _immediate = false;
  +     struct amqp_basic_properties_t_ const *_properties = NULL;
  +
        amqp_bytes_t message_bytes;
        message_bytes.len = ns;
        message_bytes.bytes = (char *) s;
   
  +#ifdef       NOTYET
  +     amqp_basic_properties_t props;
  +     props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | 
AMQP_BASIC_DELIVERY_MODE_FLAG;
  +     props.content_type = amqp_cstring_bytes("text/plain");
  +     props.delivery_mode = 2; /* persistent delivery mode */
  +     _properties = &props;
  +#endif
  +
        rc = mqchk(mqtt, "basic_publish",
                amqp_basic_publish(mqtt->I,
  -                             1,
  +                             _channel,
                                amqp_cstring_bytes(mqtt->exchange),
                                amqp_cstring_bytes(mqtt->queue),
  -                             0,
  -                             0,
  -                             NULL,
  +                             _mandatory,
  +                             _immediate,
  +                             _properties,
                                message_bytes));
   
        rpmlog(RPMLOG_DEBUG, "%s: send() exchange(%s) queue(%s) \"%.*s\"\n",
  @@ -2482,41 +2519,110 @@
       if (!rpmmqttConnect(mqtt)) {
        int _lvl = RPMLOG_DEBUG;
   
  +fprintf(stderr, "XXX %s: FIXME\n", __FUNCTION__);
        rpmlog(_lvl, "%19s: %s qos(%d)\n", "subscribe", topic, qos);
   
  -     amqp_bytes_t queuename;
  +     amqp_bytes_t _queuename;
  +
  +     amqp_channel_t _channel = 1;
  +     amqp_bytes_t _queue = amqp_empty_bytes;
  +     amqp_boolean_t _passive = (MF_ISSET(PASSIVE) ? true : false);
  +     amqp_boolean_t _durable = (MF_ISSET(DURABLE) ? true : false);
  +     amqp_boolean_t _exclusive = (MF_ISSET(EXCLUSIVE) ? true : false);
  +#ifdef       NOTYET
  +     amqp_boolean_t _auto_delete = (MF_ISSET(AUTODELETE) ? true : false);
  +#else
  +     amqp_boolean_t _auto_delete = true;
  +#endif
  +     amqp_table_t _arguments = amqp_empty_table;
        {   amqp_queue_declare_ok_t *r =
  -             amqp_queue_declare(mqtt->I, 1, amqp_empty_bytes,
  -                     0, 0, 0, 1, amqp_empty_table);
  +             amqp_queue_declare(mqtt->I,
  +                     _channel,
  +                     _queue,
  +                     _passive,
  +                     _durable,
  +                     _exclusive,
  +                     _auto_delete,
  +                     _arguments);
            rc = mqchkrpc(mqtt, "queue_declare",
  -             amqp_get_rpc_reply(mqtt->I));
  +                     amqp_get_rpc_reply(mqtt->I));
            if (rc)
                goto exit;
  -         queuename = amqp_bytes_malloc_dup(r->queue);
  -assert(queuename.bytes);
  +         _queuename = amqp_bytes_malloc_dup(r->queue);
  +assert(_queuename.bytes);
        }
   
  -     const char * _bindingkey = "test queue";        /* XXX W2DO? */
  +     const char * _routing_key = "test queue";       /* XXX W2DO? */
   
        amqp_queue_bind(mqtt->I,
  -                     1,
  -                     queuename,
  +                     _channel,
  +                     _queuename,
                        amqp_cstring_bytes(mqtt->exchange),
  -                     amqp_cstring_bytes(_bindingkey),
  -                     amqp_empty_table);
  +                     amqp_cstring_bytes(_routing_key),
  +                     _arguments);
        rc = mqchkrpc(mqtt, "queue_bind",
  -             amqp_get_rpc_reply(mqtt->I));
  +                     amqp_get_rpc_reply(mqtt->I));
        if (rc)
            goto exit;
   
  -     amqp_basic_consume(mqtt->I, 1, queuename, amqp_empty_bytes,
  -             0, 1, 0, amqp_empty_table);
  +     amqp_bytes_t _consumer_tag = amqp_empty_bytes;
  +     amqp_boolean_t _no_local = (MF_ISSET(NOLOCAL) ? true : false);
  +#ifdef       NOTYET
  +     amqp_boolean_t _no_ack = (MF_ISSET(NOACK) ? true : false);
  +#else
  +     amqp_boolean_t _no_ack = true;
  +#endif
  +     amqp_basic_consume(mqtt->I,
  +                     _channel,
  +                     _queuename,
  +                     _consumer_tag,
  +                     _no_local,
  +                     _no_ack,
  +                     _exclusive,
  +                     _arguments);
        rc = mqchkrpc(mqtt, "basic_consume",
  -             amqp_get_rpc_reply(mqtt->I));
  +                     amqp_get_rpc_reply(mqtt->I));
        if (rc)
            goto exit;
   
  -     /* XXX todo++ */
  +#ifdef       NOTYET  /* XXX make asynchronous */
  +     for (;;) {
  +         amqp_rpc_reply_t res;
  +         amqp_envelope_t envelope;
  +         struct timeval *_tvp = NULL;
  +         int _flags = 0;
  +
  +         rc = mqchk(mqtt, "maybe_release_buffers",
  +                     (amqp_maybe_release_buffers(mqtt->I),0));
  +
  +         rc = mqchkrpc(mqtt, "consume_message",
  +                     (res = amqp_consume_message(mqtt->I,
  +                                     &envelope, _tvp, _flags)));
  +         if (rc)
  +             break;
  +
  +         fprintf(stderr, "Delivery %u, exchange %.*s routingkey %.*s\n",
  +                     (unsigned)envelope.delivery_tag,
  +                     (int)envelope.exchange.len,
  +                     (char *)envelope.exchange.bytes,
  +                     (int)envelope.routing_key.len,
  +                     (char *)envelope.routing_key.bytes);
  +
  +         if (envelope.message.properties._flags & 
AMQP_BASIC_CONTENT_TYPE_FLAG) {
  +             fprintf(stderr, "Content-type: %.*s\n",
  +                     (int) envelope.message.properties.content_type.len,
  +                     (char *)envelope.message.properties.content_type.bytes);
  +         }
  +         fprintf(stderr, "----\n");
  +
  +         amqp_dump(envelope.message.body.bytes, envelope.message.body.len);
  +
  +         amqp_destroy_envelope(&envelope);
  +
  +     }
  +     if (rc)
  +         goto exit;
  +#endif
   
        rc = RPMRC_OK;
       }
  @@ -2530,7 +2636,6 @@
   {
       rpmRC rc = RPMRC_FAIL;   /* assume failure */
   
  -SPEW((stderr, "-->    %s\n", __FUNCTION__));
       if (!rpmmqttConnect(mqtt)) {
        int _lvl = RPMLOG_DEBUG;
   
  @@ -2582,7 +2687,7 @@
   static
   struct mqttVec_s amqpVec = {
       .name            = "amqp",
  -    .uri             = "amqp://localhost:5672/"      /* XXX vhost? */
  +    .uri             = "amqp://guest:guest@localhost:5672/"  /* XXX vhost? */
                                "?vhost=/"              /* XXX vhost? */
                                "&exchange=amq.direct"
                                "&queue=test queue"
  @@ -2635,45 +2740,38 @@
       static int oneshot;
       int _lvl = RPMLOG_DEBUG;
       rpmRC rc = RPMRC_FAIL;   /* assume failure */
  -    int xx;
   
  -#ifdef       DYING
  -mqtt->trace = 4;     /* XXX */
  -#endif
  -
  -    rpmlog(_lvl, "==================== %s\n", mqtt->vec->name);
  +    if (mqtt->I == NULL) {
  +     rpmlog(_lvl, "==================== %s\n", mqtt->vec->name);
   
  -    if (!oneshot) {
  +     if (!oneshot) {
   #ifdef       NOTYET
  -     int major = AMQP_VERSION_MAJOR;
  -     int minor = AMQP_VERSION_MINOR;
  -     int patch = AMQP_VERSION_PATCH;
  -     int version = AMQP_VERSION;
  -     rpmlog(_lvl, "%19s: %d.%d.%d (%d)\n", "version",
  +         int major = AMQP_VERSION_MAJOR;
  +         int minor = AMQP_VERSION_MINOR;
  +         int patch = AMQP_VERSION_PATCH;
  +         int version = AMQP_VERSION;
  +         rpmlog(_lvl, "%19s: %d.%d.%d (%d)\n", "version",
                major, minor, patch, version);
   #endif
  -     oneshot++;
  -    }
  +         oneshot++;
  +     }
   
  -    rpmlog(_lvl, "%19s: %s\n", "vhost", mqtt->vhost);
  -    rpmlog(_lvl, "%19s: %s\n", "queue", mqtt->queue);
  -    rpmlog(_lvl, "%19s: %s\n", "exchange", mqtt->exchange);
  -    rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags));
  -    if (mqtt->trace)
  -     rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace);
  +     rpmlog(_lvl, "%19s: %s\n", "vhost", mqtt->vhost);
  +     rpmlog(_lvl, "%19s: %s\n", "queue", mqtt->queue);
  +     rpmlog(_lvl, "%19s: %s\n", "exchange", mqtt->exchange);
  +     rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags));
  +     if (mqtt->trace)
  +         rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace);
   
   #ifdef       DYING
   mqtt->u = NULL;
  -#endif
   dumpMQTT(__FUNCTION__, mqtt);
  +#endif
   
  -    if (mqtt->I == NULL) {
  -
  -     xx = mqchk(mqtt, "messenger",
  +         rc = mqchk(mqtt, "messenger",
                ((mqtt->I = pn_messenger(NULL)) == NULL));
  -     xx = mqchk(mqtt, "messenger_start",
  +         rc = mqchk(mqtt, "messenger_start",
                pn_messenger_start(mqtt->I));
  -
       }
   
       rc = RPMRC_OK;
  @@ -2686,21 +2784,11 @@
   {
       rpmRC rc = RPMRC_FAIL;   /* assume failure */
   
  -#ifdef       NOTYET
       if (rpmmqttIsConnected(mqtt) == RPMRC_OK) {
  -     rc = mqchkrpc(mqtt, "channel_close",
  -             qpid_channel_close(mqtt->I, 1, AMQP_REPLY_SUCCESS));
  -     if (rc)
  -         goto exit;
  -     rc = mqchkrpc(mqtt, "connection_close",
  -             qpid_connection_close(mqtt->I, AMQP_REPLY_SUCCESS));
  -     if (rc)
  -         goto exit;
  +fprintf(stderr, "XXX %s: FIXME\n", __FUNCTION__);
   mqtt->connected = 0;
       }
  -#else
       rc = RPMRC_OK;
  -#endif
   
       return rc;
   }
  @@ -2710,38 +2798,11 @@
   {
       rpmRC rc = RPMRC_FAIL;   /* assume failure */
   
  -#ifdef       DYING
       if (rpmmqttIsConnected(mqtt) == RPMRC_NOTFOUND) {
  -     qpid_channel_open_ok_t * open_ok = NULL;
  -     const char * username = "guest";        /* XXX elsewhere */
  -     const char * password = "guest";        /* XXX elsewhere */
  -
  -if (_rpmmqtt_debug)
  -fprintf(stderr, "*** login(%p, \"%s\", %d,%d,%d, \"%s:%s\")\n", mqtt->I, 
mqtt->vhost, 0, 131072, mqtt->keepalive, username, password);
  -     rc = mqchkrpc(mqtt, "login",
  -             qpid_login(mqtt->I,
  -                     (mqtt->vhost ? mqtt->vhost : "/"),
  -                     0, 131072, mqtt->keepalive,
  -                     AMQP_SASL_METHOD_PLAIN,
  -                     username, password));
  -     if (rc)
  -         goto exit;
  -
  -     rc = mqchk(mqtt, "channel_open",
  -             ((open_ok = qpid_channel_open(mqtt->I, 1)),
  -             0));
  -
  -         rc = mqchkrpc(mqtt, "get_rpc_reply",
  -             qpid_get_rpc_reply(mqtt->I));
  -
  -     if (rc)
  -         goto exit;
  +fprintf(stderr, "XXX %s: FIXME\n", __FUNCTION__);
   mqtt->connected = 1;
  -    } else
  -     rc = RPMRC_OK;
  -#else
  +    }
       rc = RPMRC_OK;
  -#endif
   
       return rc;
   }
  @@ -2749,12 +2810,8 @@
   static
   rpmRC qpidIsConnected(rpmmqtt mqtt)
   {
  -#ifdef       DYING
       rpmRC rc = (mqtt->connected ? RPMRC_OK : RPMRC_NOTFOUND);
       return rc;
  -#else
  -    return RPMRC_OK;
  -#endif
   }
   
   static
  @@ -2762,7 +2819,6 @@
                const char * s, size_t ns)
   {
       rpmRC rc = RPMRC_FAIL;   /* assume failure */
  -    int xx;
   
       if (topic == NULL)
        topic = mqtt->topic;
  @@ -2771,54 +2827,45 @@
       if (ns == 0)
        ns = strlen(s);
   
  -#ifdef       DYING
       if (!rpmmqttConnect(mqtt)) {
  -     qpid_bytes_t message_bytes;
  -     message_bytes.len = ns;
  -     message_bytes.bytes = (char *) s;
  -
  -     rc = mqchk(mqtt, "basic_publish",
  -             qpid_basic_publish(mqtt->I,
  -                             1,
  -                             qpid_cstring_bytes(mqtt->exchange),
  -                             qpid_cstring_bytes(mqtt->queue),
  -                             0,
  -                             0,
  -                             NULL,
  -                             message_bytes));
  -
  -     rpmlog(RPMLOG_DEBUG, "%s: send() exchange(%s) queue(%s) \"%.*s\"\n",
  -                     mqtt->vec->name, mqtt->exchange, mqtt->queue, (int)ns, 
s);
  -
  -     if (rc)
  -         goto exit;
  -     rc = RPMRC_OK;
  -    }
  -#else
  -     if (mqtt->M == NULL)
  -         xx = mqchk(mqtt, "message",
  +     if (mqtt->M == NULL) {
  +         rc = mqchk(mqtt, "message",
                ((mqtt->M = pn_message()) == NULL));
  +         if (rc)
  +             goto exit;
  +     }
  +
   
  -     const char * _address = "amqp://localhost/test";
  -     xx = mqchk(mqtt, "message_set_address",
  +fprintf(stderr, "*** %s: host %s topic %s\n", __FUNCTION__, mqtt->host, 
mqtt->topic);
  +     const char * _address = "amqp://localhost/test";        /* XXX */
  +     rc = mqchk(mqtt, "message_set_address",
                pn_message_set_address(mqtt->M, _address));
  +
        {   pn_data_t * body = pn_message_body(mqtt->M);
  -         xx = mqchk(mqtt, "data_put_string",
  +         rc = mqchk(mqtt, "data_put_string",
                        pn_data_put_string(body, pn_bytes(ns, s)));
  -         xx = mqchk(mqtt, "messenger_put",
  +         rc = mqchk(mqtt, "messenger_put",
                        pn_messenger_put(mqtt->I, mqtt->M));
   #ifdef       NOTYET
            check(messenger);
   #endif
  -         xx = mqchk(mqtt, "messenger_send",
  +         rc = mqchk(mqtt, "messenger_send",
                        pn_messenger_send(mqtt->I, -1));
  +
   #ifdef       NOTYET
            check(messenger);
   #endif
        }
  +
  +     rpmlog(RPMLOG_DEBUG, "%s: send() exchange(%s) queue(%s) \"%.*s\"\n",
  +                     mqtt->vec->name, mqtt->exchange, mqtt->queue, (int)ns, 
s);
  +
  +     if (rc)
  +         goto exit;
        rc = RPMRC_OK;
  -#endif
  +    }
   
  +exit:
       return rc;
   }
   
  @@ -2827,52 +2874,18 @@
   {
       rpmRC rc = RPMRC_FAIL;   /* assume failure */
   
  -#ifdef       DYING
       if (!rpmmqttConnect(mqtt)) {
        int _lvl = RPMLOG_DEBUG;
   
  +fprintf(stderr, "XXX %s: FIXME\n", __FUNCTION__);
        rpmlog(_lvl, "%19s: %s qos(%d)\n", "subscribe", topic, qos);
   
  -     qpid_bytes_t queuename;
  -     {   qpid_queue_declare_ok_t *r =
  -             qpid_queue_declare(mqtt->I, 1, qpid_empty_bytes,
  -                     0, 0, 0, 1, qpid_empty_table);
  -         rc = mqchkrpc(mqtt, "queue_declare",
  -             qpid_get_rpc_reply(mqtt->I));
  -         if (rc)
  -             goto exit;
  -         queuename = qpid_bytes_malloc_dup(r->queue);
  -assert(queuename.bytes);
  -     }
  -
  -     const char * _bindingkey = "test queue";        /* XXX W2DO? */
  -
  -     qpid_queue_bind(mqtt->I,
  -                     1,
  -                     queuename,
  -                     qpid_cstring_bytes(mqtt->exchange),
  -                     qpid_cstring_bytes(_bindingkey),
  -                     qpid_empty_table);
  -     rc = mqchkrpc(mqtt, "queue_bind",
  -             qpid_get_rpc_reply(mqtt->I));
        if (rc)
            goto exit;
  -
  -     qpid_basic_consume(mqtt->I, 1, queuename, qpid_empty_bytes,
  -             0, 1, 0, qpid_empty_table);
  -     rc = mqchkrpc(mqtt, "basic_consume",
  -             qpid_get_rpc_reply(mqtt->I));
  -     if (rc)
  -         goto exit;
  -
  -     /* XXX todo++ */
  -
        rc = RPMRC_OK;
       }
  -#else
  -    rc = RPMRC_OK;
  -#endif
   
  +exit:
       return rc;
   }
   
  @@ -2881,34 +2894,18 @@
   {
       rpmRC rc = RPMRC_FAIL;   /* assume failure */
   
  -#ifdef       DYING
  -SPEW((stderr, "-->    %s\n", __FUNCTION__));
       if (!rpmmqttConnect(mqtt)) {
        int _lvl = RPMLOG_DEBUG;
   
  +fprintf(stderr, "XXX %s: FIXME\n", __FUNCTION__);
        rpmlog(_lvl, "%19s: %s\n", "unsubscribe", topic);
   
  -#ifdef       NOTYET
  -     MQTTAsync_responseOptions *R =
  -             (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R');
  -     R->onSuccess = qpidOnUnsubscribeMany;
  -     R->onFailure = qpidOnUnsubscribeManyFailure;
  -
  -     mqtt->finished = 0;
  -     rc = mqchk(mqtt, "unsubscribe",
  -             MQTTAsync_unsubscribe(mqtt->I, topic, R));
  -     while (!mqtt->finished)
  -         usleep(100);
  -#endif
        if (rc)
            goto exit;
        rc = RPMRC_OK;
       }
   
   exit:
  -#else
  -    rc = RPMRC_OK;
  -#endif
       return rc;
   }
   
  @@ -3985,12 +3982,16 @@
   rpmmqtt rpmmqttNew(char ** av, uint32_t flags)
   {
       static const char * _av[] = { (char *) "mqtt", NULL };
  -    rpmmqtt mqtt = (flags & 0x80000000)
  -     ? rpmmqttI() : rpmmqttGetPool(_rpmmqttPool);
  +    static char ** _avp;
  +    rpmmqtt mqtt = NULL;
   
  -SPEW((stderr, "--> %s(%p,0x%x)\n", __FUNCTION__, av, flags));
  +    /* XXX quick-n-dirty means to instantiate _rpmmqttI with parent argv. */
  +    if ((flags & 0x80000000) && _rpmmqttI == NULL)
  +     _avp = av;
  +    mqtt = (flags & 0x80000000)
  +     ? rpmmqttI() : rpmmqttGetPool(_rpmmqttPool);
   
  -    /* XXX quick-n-dirty recursion avoidance. */
  +    if (av == NULL) av = _avp;
       if (av == NULL) av = (char **) _av;
   
   #ifdef       DYING
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmqtt.h
  ============================================================================
  $ cvs diff -u -r1.1.2.21 -r1.1.2.22 rpmmqtt.h
  --- rpm/rpmio/rpmmqtt.h       14 Jul 2016 12:59:21 -0000      1.1.2.21
  +++ rpm/rpmio/rpmmqtt.h       14 Jul 2016 18:52:27 -0000      1.1.2.22
  @@ -35,6 +35,10 @@
       MQTT_FLAGS_IFEMPTY               = _MFB(19),     /*!< -e,--if-empty */
       MQTT_FLAGS_IFUNUSED              = _MFB(20),     /*!< -u,--if-unused */
   
  +    MQTT_FLAGS_PASSIVE               = _MFB(21),     /*!< XXX W2DO? */
  +    MQTT_FLAGS_AUTODELETE    = _MFB(22),     /*!< XXX W2DO? */
  +    MQTT_FLAGS_NOLOCAL               = _MFB(23),     /*!< XXX W2DO? */
  +
   } mqttFlags;
   #define      MQTT_FLAGS_DEFAULT      
((mqttFlags)(MQTT_FLAGS_CLEAN|MQTT_FLAGS_EOL))
   #undef       _MFB
  @@ .
______________________________________________________________________
RPM Package Manager                                    http://rpm5.org
CVS Sources Repository                                rpm-cvs@rpm5.org

Reply via email to