RPM Package Manager, CVS Repository
  http://rpm5.org/cvs/
  ____________________________________________________________________________

  Server: rpm5.org                         Name:   Jeff Johnson
  Root:   /v/rpm/cvs                       Email:  j...@rpm5.org
  Module: rpm                              Date:   27-Jun-2016 20:27:10
  Branch: rpm-5_4                          Handle: 2016062718271000

  Modified files:           (Branch: rpm-5_4)
    rpm                     CHANGES configure.ac
    rpm/rpmio               rpmmqtt.c rpmmqtt.h

  Log:
    - mqtt: stub-in a paho-mqtt client.

  Summary:
    Revision    Changes     Path
    1.3501.2.507+2  -0      rpm/CHANGES
    2.472.2.151 +8  -0      rpm/configure.ac
    1.1.2.2     +166 -2     rpm/rpmio/rpmmqtt.c
    1.1.2.2     +11 -1      rpm/rpmio/rpmmqtt.h
  ____________________________________________________________________________

  patch -p0 <<'@@ .'
  Index: rpm/CHANGES
  ============================================================================
  $ cvs diff -u -r1.3501.2.506 -r1.3501.2.507 CHANGES
  --- rpm/CHANGES       27 Jun 2016 03:10:03 -0000      1.3501.2.506
  +++ rpm/CHANGES       27 Jun 2016 18:27:10 -0000      1.3501.2.507
  @@ -1,4 +1,6 @@
   5.4.17 -> 5.4.18:
  +    - jbj: mqtt: stub-in a paho-mqtt client.
  +    - jbj: build: add/use RPMIOPOOL_ macros where possible.
       - jbj: lib: add/use RPMIOPOOL_ macros where possible.
       - jbj: rpmdb: add/use RPMIOPOOL_ macros where possible.
       - jbj: rpmdb: rename dbiIndex -> rpmdbi type, dbiFoo() -> rpmdbiFoo().
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/configure.ac
  ============================================================================
  $ cvs diff -u -r2.472.2.150 -r2.472.2.151 configure.ac
  --- rpm/configure.ac  24 Jun 2016 19:08:22 -0000      2.472.2.150
  +++ rpm/configure.ac  27 Jun 2016 18:27:10 -0000      2.472.2.151
  @@ -2244,6 +2244,14 @@
         fi
       ], [ AC_MSG_WARN([No Neon library found, using unsupported 
configuration]) ])
   
  +# MQTT
  +RPM_CHECK_LIB(
  +    [MQTT], [mqtt],
  +    [paho-mqtt3c], [MQTTClient_create], [MQTTClient.h],
  +    [no,external:none], [],
  +    [ AC_DEFINE(WITH_MQTT, 1, [Define if building with MQTT])
  +    ], [])
  +
   # Libgit2
   # XXX internal needs *.a/*.la files to merge into -lrpmmisc
   RPM_CHECK_LIB(
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmqtt.c
  ============================================================================
  $ cvs diff -u -r1.1.2.1 -r1.1.2.2 rpmmqtt.c
  --- rpm/rpmio/rpmmqtt.c       27 Jun 2016 03:05:58 -0000      1.1.2.1
  +++ rpm/rpmio/rpmmqtt.c       27 Jun 2016 18:27:10 -0000      1.1.2.2
  @@ -3,6 +3,9 @@
    */
   
   #include "system.h"
  +#include <stdio.h>
  +#include <stdlib.h>
  +#include <string.h>
   
   #include <rpmio.h>   /* for *Pool methods */
   #include <argv.h>
  @@ -12,20 +15,91 @@
   
   #include "debug.h"
   
  +int _rpmmqtt_debug = 0;
  +
  +static int Xcheck(rpmmqtt mqtt, const char * msg, int rc,
  +             int printit, const char * func, const char * fn, unsigned ln)
  +{
  +
  +    if (printit || rc) {
  +     fprintf(stderr, "error: %s:%s:%u: MQTTClient_%s(%d)\n",
  +             func, fn, ln, msg, rc);
  +    }
  +    return rc;
  +}
  +#define check(_o, _m, _rc)  \
  +    Xcheck(_o, _m, _rc, _rpmmqtt_debug, __FUNCTION__, __FILE__, __LINE__)
  +
   static void rpmmqttFini(void * _mqtt)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
   
  +#ifdef       WITH_MQTT
  +    {        MQTTClient C = (MQTTClient) mqtt->C;
  +     int xx;
  +     mqtt->msecs = 10000;
  +     xx = check(mqtt, "disconnect",
  +             MQTTClient_disconnect(C, mqtt->msecs));
  +             MQTTClient_destroy(&C);
  +    }
  +#endif
  +    mqtt->C = NULL;
  +
       if (mqtt->av)
        (void) argvFree((ARGV_t)mqtt->av);
       mqtt->av = NULL;
       mqtt->flags = 0;
   }
   
  -int _rpmmqtt_debug = 0;
  -
   RPMIOPOOL_MODULE(mqtt)
   
  +static int rpmmqttMessageArrived(void * _mqtt, char * topic, int topicLen,
  +             void * _message)
  +{
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    FILE * fp = stderr;
  +    int rc = 1;
  +
  +    (void)mqtt;
  +
  +    fprintf(fp, "Message arrived\n");
  +    fprintf(fp, "     topic: %s\n", topic);
  +
  +#ifdef       WITH_MQTT
  +    MQTTClient_message *message = _message;
  +    const char * s = message->payload;
  +
  +    fprintf(fp, "   message: ");
  +    for (int i = 0; i < message->payloadlen; i++)
  +        fprintf(fp, "%c", s[i]);
  +    fprintf(fp, "\n");
  +
  +    MQTTClient_freeMessage(&message);
  +    MQTTClient_free(topic);
  +#endif
  +
  +    return rc;
  +}
  +
  +static void rpmmqttDeliveryComplete(void * _mqtt, int token)
  +{
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    FILE * fp = stderr;
  +    fprintf(fp, "Message with token value %d delivery confirmed\n", token);
  +    mqtt->token = token;
  +}
  +
  +static void rpmmqttConnlost(void * _mqtt, char *cause)
  +{
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    FILE * fp = stderr;
  +
  +    (void)mqtt;
  +
  +    fprintf(fp, "\nConnection lost\n");
  +    fprintf(fp, "     cause: %s\n", cause);
  +}
  +
   rpmmqtt rpmmqttNew(char ** av, uint32_t flags)
   {
       rpmmqtt mqtt = rpmmqttGetPool(_rpmmqttPool);
  @@ -38,5 +112,95 @@
            (void) argvAdd((ARGV_t *)&mqtt->av, av[i]);
       }
   
  +#ifdef       WITH_MQTT
  +    {        MQTTClient C = NULL;
  +     static char * _uri = "tcp://localhost:1883";
  +     static char * _clientid = "ExampleClientPub";
  +     static int _persist_type = MQTTCLIENT_PERSISTENCE_NONE;
  +     static void * _persist_ctx = NULL;
  +     int xx;
  +    
  +     xx = check(mqtt, "create",
  +             MQTTClient_create(&C, _uri, _clientid,
  +             _persist_type, _persist_ctx));
  +
  +#ifdef       ASYNC
  +     xx = check(mqtt, "setCallbacks",
  +             MQTTClient_setCallbacks(C, mqtt,
  +                     rpmmqttConnlost,
  +                     rpmmqttMessageArrived,
  +                     rpmmqttDeliveryComplete));
  +#endif
  +
  +     MQTTClient_connectOptions conn_opts =
  +             MQTTClient_connectOptions_initializer;
  +     conn_opts.keepAliveInterval = 20;
  +     conn_opts.cleansession = 1;
  +     xx = check(mqtt, "connect",
  +             MQTTClient_connect(C, &conn_opts));
  +
  +     mqtt->C = C;
  +    }
  +#endif
  +
       return rpmmqttLink(mqtt);
   }
  +
  +int rpmmqttPublish(rpmmqtt mqtt, const char *topic,
  +             const char *s, size_t ns)
  +{
  +    int rc = -1;     /* assume failure */
  +
  +    if (ns == 0) ns = strlen(s);
  +
  +    mqtt->topic = (topic ? topic : "MQTT Examples");
  +    mqtt->qos = 1;
  +    mqtt->msecs = 10000;
  +
  +#ifdef       WITH_MQTT
  +    {        MQTTClient C = (MQTTClient) mqtt->C;
  +     MQTTClient_message pubmsg = MQTTClient_message_initializer;
  +     pubmsg.payload = (char *) s;
  +     pubmsg.payloadlen = ns;
  +     pubmsg.qos = mqtt->qos;
  +     pubmsg.retained = 0;
  +
  +     rc = check(mqtt, "publishMessage",
  +             MQTTClient_publishMessage(C, mqtt->topic, &pubmsg,
  +                     &mqtt->token));
  +
  +     rc = check(mqtt, "waitForCompletion",
  +             MQTTClient_waitForCompletion(C, mqtt->token, mqtt->msecs));
  +    }
  +#endif
  +
  +    return rc;
  +}
  +int rpmmqttSubscribe(rpmmqtt mqtt, const char *topic,
  +             const char *s, size_t ns)
  +{
  +    int rc = -1;     /* assume failure */
  +
  +    mqtt->topic = (topic ? topic : "MQTT Examples");
  +    mqtt->qos = 1;
  +    mqtt->msecs = 10000;
  +
  +#ifdef       WITH_MQTT
  +    {        MQTTClient C = (MQTTClient) mqtt->C;
  +     MQTTClient_message pubmsg = MQTTClient_message_initializer;
  +     pubmsg.payload = (char *) s;
  +     pubmsg.payloadlen = ns;
  +     pubmsg.qos = mqtt->qos;
  +     pubmsg.retained = 0;
  +
  +     rc = check(mqtt, "publishMessage",
  +             MQTTClient_publishMessage(C, mqtt->topic, &pubmsg,
  +                     &mqtt->token));
  +
  +     rc = check(mqtt, "waitForCompletion",
  +             MQTTClient_waitForCompletion(C, mqtt->token, mqtt->msecs));
  +    }
  +#endif
  +
  +    return rc;
  +}
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmqtt.h
  ============================================================================
  $ cvs diff -u -r1.1.2.1 -r1.1.2.2 rpmmqtt.h
  --- rpm/rpmio/rpmmqtt.h       27 Jun 2016 03:05:58 -0000      1.1.2.1
  +++ rpm/rpmio/rpmmqtt.h       27 Jun 2016 18:27:10 -0000      1.1.2.2
  @@ -10,12 +10,18 @@
   typedef struct rpmmqtt_s * rpmmqtt;
   
   #if defined(_RPMMQTT_INTERNAL)
  +#include <MQTTClient.h>
   struct rpmmqtt_s {
       struct rpmioItem_s _item;        /*!< usage mutex and pool identifier. */
       char ** av;
       uint32_t flags;
   
  -    int sv[2];
  +    void * C;                        /* MQTTClient */
  +
  +    const char * topic;
  +    int qos;
  +    int token;
  +    int msecs;
   
   };
   #endif       /* _RPMMQTT_INTERNAL */
  @@ -59,6 +65,10 @@
    */
   rpmmqtt rpmmqttNew(char ** av, uint32_t flags);
   
  +int rpmmqttPublish(rpmmqtt mqtt, const char *topic, const char *s, size_t 
ns);
  +
  +int rpmmqttSubscribe(rpmmqtt mqtt, const char *topic, const char *s, size_t 
ns);
  +
   #ifdef __cplusplus
   }
   #endif
  @@ .
______________________________________________________________________
RPM Package Manager                                    http://rpm5.org
CVS Sources Repository                                rpm-cvs@rpm5.org

Reply via email to