RPM Package Manager, CVS Repository
  http://rpm5.org/cvs/
  ____________________________________________________________________________

  Server: rpm5.org                         Name:   Jeff Johnson
  Root:   /v/rpm/cvs                       Email:  j...@rpm5.org
  Module: rpm                              Date:   28-Jun-2016 00:00:14
  Branch: rpm-5_4                          Handle: 2016062722001400

  Added files:              (Branch: rpm-5_4)
    rpm/rpmio               tmqtt.c
  Modified files:           (Branch: rpm-5_4)
    rpm/rpmio               Makefile.am librpmio.vers rpmmqtt.c rpmmqtt.h

  Log:
    - mqtt: WIP.

  Summary:
    Revision    Changes     Path
    1.293.2.75  +5  -2      rpm/rpmio/Makefile.am
    2.199.2.58  +6  -0      rpm/rpmio/librpmio.vers
    1.1.2.3     +189 -107   rpm/rpmio/rpmmqtt.c
    1.1.2.3     +14 -2      rpm/rpmio/rpmmqtt.h
    1.1.2.1     +68 -0      rpm/rpmio/tmqtt.c
  ____________________________________________________________________________

  patch -p0 <<'@@ .'
  Index: rpm/rpmio/Makefile.am
  ============================================================================
  $ cvs diff -u -r1.293.2.74 -r1.293.2.75 Makefile.am
  --- rpm/rpmio/Makefile.am     25 Jun 2016 22:36:53 -0000      1.293.2.74
  +++ rpm/rpmio/Makefile.am     27 Jun 2016 22:00:14 -0000      1.293.2.75
  @@ -15,12 +15,12 @@
        rpmcpio.c rpmcpio.h rpmgenbasedir.c rpmgenpkglist.c rpmgensrclist.c \
        rpmjsio.msg rpmtar.c rpmtar.h \
        tdir.c tfts.c tget.c tgfs.c tgit.c tglob.c thkp.c thtml.c tinv.c tkey.c 
\
  -     tmire.c todbc.c tput.c tpython.c trpmio.c tsexp.c tsvn.c tsw.c \
  +     tmire.c tmqtt.c todbc.c tput.c tpython.c trpmio.c tsexp.c tsvn.c tsw.c \
        lookup3.c duktape.c tjsmn.c tjson.c yajl.c testit.sh \
        microjson.c mongoc-counters.defs
   
   EXTRA_PROGRAMS = rpmcpio rpmdpkg rpmtar rpmz
  -EXTRA_PROGRAMS += bdes thtml tinv tkey tmacro tpw
  +EXTRA_PROGRAMS += bdes thtml tinv tkey tmacro tpw turg
   noinst_PROGRAMS = tjsmn duk tmqtt
   
   EXTRA_PROGRAMS += bsdiff bspatch pcrsed rpmborg rpmcurl \
  @@ -659,6 +659,9 @@
   tmqtt_SOURCES = tmqtt.c
   tmqtt_LDADD = $(RPMIO_LDADD_COMMON)
   
  +turg_SOURCES = turg.c
  +turg_LDADD = $(RPMIO_LDADD_COMMON)
  +
   github_SOURCES = tjson.c
   github_CFLAGS = -Wall -Werror -std=gnu99 -O2 -DJSMN_GITHUB
   github_LDADD = -lcurl
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/librpmio.vers
  ============================================================================
  $ cvs diff -u -r2.199.2.57 -r2.199.2.58 librpmio.vers
  --- rpm/rpmio/librpmio.vers   27 Jun 2016 03:05:57 -0000      2.199.2.57
  +++ rpm/rpmio/librpmio.vers   27 Jun 2016 22:00:14 -0000      2.199.2.58
  @@ -593,6 +593,8 @@
       rpmltcImplVecs;
       rpmLUA_PATH;
       rpmLUA_CPATH;
  +    _rpmluaPool;
  +    _rpmluavPool;
       rpmluaFiles;
       rpmluaCheckScript;
       rpmluaDelVar;
  @@ -633,7 +635,11 @@
       rpmmgFile;
       rpmmgBuffer;
       _rpmmqtt_debug;
  +    rpmmqttConnect;
  +    rpmmqttDisconnect;
       rpmmqttNew;
  +    rpmmqttWrite;
  +    rpmmqttRead;
       _rpmmrb_debug;
       _rpmmrbI;
       _rpmmrbPool;
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmqtt.c
  ============================================================================
  $ cvs diff -u -r1.1.2.2 -r1.1.2.3 rpmmqtt.c
  --- rpm/rpmio/rpmmqtt.c       27 Jun 2016 18:27:10 -0000      1.1.2.2
  +++ rpm/rpmio/rpmmqtt.c       27 Jun 2016 22:00:14 -0000      1.1.2.3
  @@ -8,6 +8,7 @@
   #include <string.h>
   
   #include <rpmio.h>   /* for *Pool methods */
  +#include <rpmlog.h>
   #include <argv.h>
   
   #define      _RPMMQTT_INTERNAL
  @@ -15,14 +16,18 @@
   
   #include "debug.h"
   
  -int _rpmmqtt_debug = 0;
  +int _rpmmqtt_debug = -1;
   
  +static char _test_mqtt[] = "test/mqtt";
  +
  +/*==============================================================*/
   static int Xcheck(rpmmqtt mqtt, const char * msg, int rc,
                int printit, const char * func, const char * fn, unsigned ln)
   {
   
  -    if (printit || rc) {
  -     fprintf(stderr, "error: %s:%s:%u: MQTTClient_%s(%d)\n",
  +    if (rc != 0) {   /* MQTTCLIENT_SUCCESS */
  +     int _lvl = RPMLOG_WARNING;
  +     rpmlog(_lvl, "%s:%s:%u: MQTTClient_%s(%d)\n",
                func, fn, ln, msg, rc);
       }
       return rc;
  @@ -30,50 +35,21 @@
   #define check(_o, _m, _rc)  \
       Xcheck(_o, _m, _rc, _rpmmqtt_debug, __FUNCTION__, __FILE__, __LINE__)
   
  -static void rpmmqttFini(void * _mqtt)
  -{
  -    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  -
  -#ifdef       WITH_MQTT
  -    {        MQTTClient C = (MQTTClient) mqtt->C;
  -     int xx;
  -     mqtt->msecs = 10000;
  -     xx = check(mqtt, "disconnect",
  -             MQTTClient_disconnect(C, mqtt->msecs));
  -             MQTTClient_destroy(&C);
  -    }
  -#endif
  -    mqtt->C = NULL;
  -
  -    if (mqtt->av)
  -     (void) argvFree((ARGV_t)mqtt->av);
  -    mqtt->av = NULL;
  -    mqtt->flags = 0;
  -}
  -
  -RPMIOPOOL_MODULE(mqtt)
  -
  +/*==============================================================*/
  +struct MQTTClient_message;
   static int rpmmqttMessageArrived(void * _mqtt, char * topic, int topicLen,
  -             void * _message)
  +             MQTTClient_message *  message)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
  -    FILE * fp = stderr;
       int rc = 1;
   
       (void)mqtt;
   
  -    fprintf(fp, "Message arrived\n");
  -    fprintf(fp, "     topic: %s\n", topic);
  -
   #ifdef       WITH_MQTT
  -    MQTTClient_message *message = _message;
       const char * s = message->payload;
  -
  -    fprintf(fp, "   message: ");
  -    for (int i = 0; i < message->payloadlen; i++)
  -        fprintf(fp, "%c", s[i]);
  -    fprintf(fp, "\n");
  -
  +    size_t ns = message->payloadlen;
  +    if (_rpmmqtt_debug < 0)
  +     rpmlog(RPMLOG_DEBUG, "+++ MQTT rcvd topic(%s) \"%*s\"\n", topic, ns, s);
       MQTTClient_freeMessage(&message);
       MQTTClient_free(topic);
   #endif
  @@ -84,123 +60,229 @@
   static void rpmmqttDeliveryComplete(void * _mqtt, int token)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
  -    FILE * fp = stderr;
  -    fprintf(fp, "Message with token value %d delivery confirmed\n", token);
  +    if (_rpmmqtt_debug < 0)
  +     rpmlog(RPMLOG_DEBUG, "--- MQTT done(%d)\n", token);
       mqtt->token = token;
  +    mqtt->delivered = 1;
   }
   
   static void rpmmqttConnlost(void * _mqtt, char *cause)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
  -    FILE * fp = stderr;
  -
  -    (void)mqtt;
   
  -    fprintf(fp, "\nConnection lost\n");
  -    fprintf(fp, "     cause: %s\n", cause);
  +    if (mqtt->debug || _rpmmqtt_debug < 0) {
  +     rpmlog(RPMLOG_DEBUG,
  +             "+++ MQTT disconnect(mqtt://%s) version(%d) present(%d)\n",
  +             mqtt->serverURI, mqtt->MQTTVersion, mqtt->sessionPresent);
  +     if (cause)
  +         rpmlog(RPMLOG_DEBUG, "\tcause: %s\n", cause);
  +     mqtt->serverURI = _free(mqtt->serverURI);
  +     mqtt->connected = 0;
  +    }
   }
   
  -rpmmqtt rpmmqttNew(char ** av, uint32_t flags)
  +/*==============================================================*/
  +int rpmmqttConnect(rpmmqtt mqtt)
   {
  -    rpmmqtt mqtt = rpmmqttGetPool(_rpmmqttPool);
  -
  -    mqtt->flags = flags;
  -    mqtt->av = NULL;
  -    if (av) {
  -     int ac = argvCount((ARGV_t)av);
  -     for (int i = 0; i < ac; i++)
  -         (void) argvAdd((ARGV_t *)&mqtt->av, av[i]);
  -    }
  -
  +    int rc = -1;
   #ifdef       WITH_MQTT
  -    {        MQTTClient C = NULL;
  -     static char * _uri = "tcp://localhost:1883";
  -     static char * _clientid = "ExampleClientPub";
  -     static int _persist_type = MQTTCLIENT_PERSISTENCE_NONE;
  -     static void * _persist_ctx = NULL;
  -     int xx;
  -    
  -     xx = check(mqtt, "create",
  -             MQTTClient_create(&C, _uri, _clientid,
  -             _persist_type, _persist_ctx));
  -
  -#ifdef       ASYNC
  -     xx = check(mqtt, "setCallbacks",
  -             MQTTClient_setCallbacks(C, mqtt,
  -                     rpmmqttConnlost,
  -                     rpmmqttMessageArrived,
  -                     rpmmqttDeliveryComplete));
  +    if (MQTTClient_isConnected(mqtt->C)) {
  +     mqtt->connected = 1;
  +     rc = 0;
  +    } else {
  +     MQTTClient_connectOptions Copts = MQTTClient_connectOptions_initializer;
  +#ifdef       REF
  +     memcpy(Copts.struct_id, "MQTC", 4);
  +     Copts.struct_version = 4;       /* 0-4 determines what follows */
   #endif
  -
  -     MQTTClient_connectOptions conn_opts =
  -             MQTTClient_connectOptions_initializer;
  -     conn_opts.keepAliveInterval = 20;
  -     conn_opts.cleansession = 1;
  -     xx = check(mqtt, "connect",
  -             MQTTClient_connect(C, &conn_opts));
  -
  -     mqtt->C = C;
  +     Copts.keepAliveInterval = 20;   /* 60 */
  +     Copts.cleansession = 0;         /* 1 discards session state */
  +     Copts.reliable = 0;             /* 1 forces sync */
  +#ifdef       REF
  +     Copts.will = NULL;              /* last will */
  +     Copts.username = NULL;
  +     Copts.password = NULL;
  +     Copts.connectTimeout = 30;
  +     Copts.retryInterval = 20;
  +     Copts.ssl = NULL;
  +     Copts.serverURIcount = 0;
  +     Copts.serverURIs = NULL;
  +     COpts.MQTTVersion = 0;
  +#endif
  +     rc = check(mqtt, "connect",
  +             MQTTClient_connect(mqtt->C, &Copts));
  +     if (rc == 0) {
  +         mqtt->serverURI = xstrdup(Copts.returned.serverURI);
  +         mqtt->MQTTVersion = Copts.returned.MQTTVersion;
  +         mqtt->sessionPresent = Copts.returned.sessionPresent;
  +         if (mqtt->debug || _rpmmqtt_debug < 0)
  +             rpmlog(RPMLOG_DEBUG,
  +                     "+++ MQTT    connect(mqtt://%s) version(%d) 
present(%d)\n",
  +                     mqtt->serverURI, mqtt->MQTTVersion, 
mqtt->sessionPresent);
  +         mqtt->connected = 1;
  +     }
       }
   #endif
  +    return rc;
  +}
   
  -    return rpmmqttLink(mqtt);
  +int rpmmqttDisconnect(rpmmqtt mqtt)
  +{
  +    int rc = -1;
  +#ifdef       WITH_MQTT
  +    if (MQTTClient_isConnected(mqtt->C)) {
  +     rc = check(mqtt, "disconnect",
  +             MQTTClient_disconnect(mqtt->C, mqtt->msecs));
  +     if (mqtt->debug || _rpmmqtt_debug < 0)
  +         rpmlog(RPMLOG_DEBUG,
  +             "+++ MQTT disconnect(mqtt://%s) version(%d) present(%d)\n",
  +             mqtt->serverURI, mqtt->MQTTVersion, mqtt->sessionPresent);
  +     mqtt->serverURI = _free(mqtt->serverURI);
  +     mqtt->connected = 0;
  +    }
  +#endif
  +    return rc;
   }
   
  -int rpmmqttPublish(rpmmqtt mqtt, const char *topic,
  -             const char *s, size_t ns)
  +/*==============================================================*/
  +ssize_t rpmmqttWrite(rpmmqtt mqtt, const char *s, size_t ns)
   {
  -    int rc = -1;     /* assume failure */
  +    ssize_t ret = -1;        /* assume failure */
   
       if (ns == 0) ns = strlen(s);
   
  -    mqtt->topic = (topic ? topic : "MQTT Examples");
  -    mqtt->qos = 1;
  -    mqtt->msecs = 10000;
  -
   #ifdef       WITH_MQTT
  -    {        MQTTClient C = (MQTTClient) mqtt->C;
  +    if (rpmmqttConnect(mqtt) == 0) {
        MQTTClient_message pubmsg = MQTTClient_message_initializer;
  +     int rc;
        pubmsg.payload = (char *) s;
        pubmsg.payloadlen = ns;
        pubmsg.qos = mqtt->qos;
        pubmsg.retained = 0;
   
  +     mqtt->delivered = 0;
        rc = check(mqtt, "publishMessage",
  -             MQTTClient_publishMessage(C, mqtt->topic, &pubmsg,
  +             MQTTClient_publishMessage(mqtt->C, mqtt->topic, &pubmsg,
                        &mqtt->token));
  -
  -     rc = check(mqtt, "waitForCompletion",
  -             MQTTClient_waitForCompletion(C, mqtt->token, mqtt->msecs));
  +     if (_rpmmqtt_debug < 0)
  +         rpmlog(RPMLOG_DEBUG, "+++ MQTT sent(%d) topic(%s) \"%*s\"\n",
  +                     mqtt->token, mqtt->topic, ns, s);
  +     sleep(1);
  +
  +     if (!mqtt->delivered)
  +         rc = check(mqtt, "waitForCompletion",
  +             MQTTClient_waitForCompletion(mqtt->C, mqtt->token, 
mqtt->msecs));
  +     if (rc == 0)
  +         ret = ns;
       }
   #endif
   
  -    return rc;
  +    return ret;
   }
  -int rpmmqttSubscribe(rpmmqtt mqtt, const char *topic,
  -             const char *s, size_t ns)
  -{
  -    int rc = -1;     /* assume failure */
   
  -    mqtt->topic = (topic ? topic : "MQTT Examples");
  -    mqtt->qos = 1;
  -    mqtt->msecs = 10000;
  +ssize_t rpmmqttRead(rpmmqtt mqtt, const char *s, size_t ns)
  +{
  +    ssize_t ret = -1;        /* assume failure */
   
   #ifdef       WITH_MQTT
  -    {        MQTTClient C = (MQTTClient) mqtt->C;
  +    if (rpmmqttConnect(mqtt) == 0) {
  +#ifdef       NOTYET
        MQTTClient_message pubmsg = MQTTClient_message_initializer;
  +     int rc;
  +
        pubmsg.payload = (char *) s;
        pubmsg.payloadlen = ns;
        pubmsg.qos = mqtt->qos;
        pubmsg.retained = 0;
   
        rc = check(mqtt, "publishMessage",
  -             MQTTClient_publishMessage(C, mqtt->topic, &pubmsg,
  +             MQTTClient_publishMessage(mqtt->C, mqtt->topic, &pubmsg,
                        &mqtt->token));
   
        rc = check(mqtt, "waitForCompletion",
  -             MQTTClient_waitForCompletion(C, mqtt->token, mqtt->msecs));
  +             MQTTClient_waitForCompletion(mqtt->C, mqtt->token, 
mqtt->msecs));
  +     if (rc == 0)
  +         ret = ns;
  +#endif
       }
   #endif
   
  -    return rc;
  +    return ret;
  +}
  +
  +/*==============================================================*/
  +static void rpmmqttFini(void * _mqtt)
  +{
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +
  +#ifdef       WITH_MQTT
  +    {        MQTTClient C = (MQTTClient) mqtt->C;
  +     int xx;
  +     xx = rpmmqttDisconnect(mqtt);
  +     xx = check(mqtt, "destroy",
  +             (MQTTClient_destroy(&C), 0));
  +    }
  +#endif
  +    mqtt->C = NULL;
  +    mqtt->serverURI = _free(mqtt->serverURI);
  +
  +    if (mqtt->av)
  +     (void) argvFree((ARGV_t)mqtt->av);
  +    mqtt->av = NULL;
  +    mqtt->flags = 0;
  +}
  +
  +RPMIOPOOL_MODULE(mqtt)
  +
  +rpmmqtt rpmmqttNew(char ** av, uint32_t flags)
  +{
  +    rpmmqtt mqtt = rpmmqttGetPool(_rpmmqttPool);
  +
  +    mqtt->flags = flags;
  +    mqtt->av = NULL;
  +    if (av) {
  +     int ac = argvCount((ARGV_t)av);
  +     for (int i = 0; i < ac; i++)
  +         (void) argvAdd((ARGV_t *)&mqtt->av, av[i]);
  +    }
  +
  +    mqtt->topic = _test_mqtt;
  +    mqtt->qos = 1;
  +    mqtt->msecs = 10000;
  +
  +#ifdef       WITH_MQTT
  +    {        static int oneshot;
  +     static char * _uri = "tcp://localhost:1883";
  +     static char * _clientid = "ExampleClientPub";
  +     static int _persist_type = MQTTCLIENT_PERSISTENCE_NONE;
  +     static void * _persist_ctx = NULL;
  +     int xx;
  +
  +     if (!oneshot) {
  +         MQTTClient_nameValue *I = MQTTClient_getVersionInfo();
  +         int _lvl = RPMLOG_DEBUG;
  +         rpmlog(_lvl, "==================== MQTT\n");
  +         while (I->name) {
  +             rpmlog(_lvl, "%19s: %s\n", I->name, I->value);
  +             I++;
  +         }
  +         oneshot++;
  +     }
  +    
  +     xx = check(mqtt, "create",
  +             MQTTClient_create(&mqtt->C, _uri, _clientid,
  +             _persist_type, _persist_ctx));
  +
  +     xx = check(mqtt, "setCallbacks",
  +             MQTTClient_setCallbacks(mqtt->C, mqtt,
  +                     rpmmqttConnlost,
  +                     rpmmqttMessageArrived,
  +                     rpmmqttDeliveryComplete));
  +
  +     xx = rpmmqttConnect(mqtt);
  +
  +    }
  +#endif
  +
  +    return rpmmqttLink(mqtt);
   }
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmqtt.h
  ============================================================================
  $ cvs diff -u -r1.1.2.2 -r1.1.2.3 rpmmqtt.h
  --- rpm/rpmio/rpmmqtt.h       27 Jun 2016 18:27:10 -0000      1.1.2.2
  +++ rpm/rpmio/rpmmqtt.h       27 Jun 2016 22:00:14 -0000      1.1.2.3
  @@ -23,6 +23,14 @@
       int token;
       int msecs;
   
  +    int debug;
  +    int connected;
  +    int delivered;
  +
  +    char * serverURI;
  +    int MQTTVersion;
  +    int sessionPresent;
  +
   };
   #endif       /* _RPMMQTT_INTERNAL */
   
  @@ -65,9 +73,13 @@
    */
   rpmmqtt rpmmqttNew(char ** av, uint32_t flags);
   
  -int rpmmqttPublish(rpmmqtt mqtt, const char *topic, const char *s, size_t 
ns);
  +int rpmmqttConnect(rpmmqtt mqtt);
  +
  +int rpmmqttDisconnect(rpmmqtt mqtt);
  +
  +ssize_t rpmmqttWrite(rpmmqtt mqtt, const char *s, size_t ns);
   
  -int rpmmqttSubscribe(rpmmqtt mqtt, const char *topic, const char *s, size_t 
ns);
  +ssize_t rpmmqttRead(rpmmqtt mqtt, const char *s, size_t ns);
   
   #ifdef __cplusplus
   }
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/tmqtt.c
  ============================================================================
  $ cvs diff -u -r0 -r1.1.2.1 tmqtt.c
  --- /dev/null 2016-06-28 00:00:14.000000000 +0200
  +++ tmqtt.c   2016-06-28 00:00:14.953122940 +0200
  @@ -0,0 +1,68 @@
  +
  +#include "microjson.c"
  +
  +#include "system.h"
  +#include <stdarg.h>
  +#include <stdbool.h>
  +#include <stddef.h>
  +#include <getopt.h>
  +#include <math.h>
  +#include <termio.h>
  +
  +#include <poptIO.h>
  +#include <rpmdefs.h>
  +
  +#define      _RPMMQTT_INTERNAL
  +#include "rpmmqtt.h"
  +
  +#include "debug.h"
  +
  +/*==============================================================*/
  +
  +static int _DoMQTT(rpmmqtt mqtt)
  +{
  +    ssize_t nw;
  +    int rc = 0;
  +
  +    nw = rpmmqttWrite(mqtt, "bzzt ...", 0);
  +    nw = rpmmqttWrite(mqtt, "bzzT ...", 0);
  +    nw = rpmmqttWrite(mqtt, "bzZT ...", 0);
  +    nw = rpmmqttWrite(mqtt, "bZZT ...", 0);
  +    nw = rpmmqttWrite(mqtt, "BZZT ...", 0);
  +    (void) rpmmqttDisconnect(mqtt);
  +    nw = rpmmqttWrite(mqtt, "SWAT !!!", 0);
  +    (void) rpmmqttDisconnect(mqtt);
  +    (void) rpmmqttConnect(mqtt);
  +
  +    return rc;
  +}
  +
  +/*==============================================================*/
  +
  +static struct poptOption optionsTable[] = {
  +
  + { NULL, '\0', POPT_ARG_INCLUDE_TABLE, rpmioAllPoptTable, 0,
  +     N_("Common options for all rpmio executables:"),
  +     NULL },
  +
  +  POPT_AUTOHELP
  +  POPT_TABLEEND
  +};
  +
  +int
  +main(int argc, char *argv[])
  +{
  +    poptContext optCon = rpmioInit(argc, argv, optionsTable);
  +    ARGV_t av = poptGetArgs(optCon);
  +#ifdef       UNUSED
  +    int ac = argvCount(av);
  +#endif
  +    rpmmqtt mqtt = rpmmqttNew((char **)av, 0);
  +    int rc = -1;
  +
  +    rc = _DoMQTT(mqtt);
  +
  +    mqtt = rpmmqttFree(mqtt);
  +    optCon = rpmioFini(optCon);
  +    return rc;
  +}
  @@ .
______________________________________________________________________
RPM Package Manager                                    http://rpm5.org
CVS Sources Repository                                rpm-cvs@rpm5.org

Reply via email to