RPM Package Manager, CVS Repository http://rpm5.org/cvs/ ____________________________________________________________________________
Server: rpm5.org Name: Jeff Johnson Root: /v/rpm/cvs Email: j...@rpm5.org Module: rpm Date: 28-Jun-2016 00:00:14 Branch: rpm-5_4 Handle: 2016062722001400 Added files: (Branch: rpm-5_4) rpm/rpmio tmqtt.c Modified files: (Branch: rpm-5_4) rpm/rpmio Makefile.am librpmio.vers rpmmqtt.c rpmmqtt.h Log: - mqtt: WIP. Summary: Revision Changes Path 1.293.2.75 +5 -2 rpm/rpmio/Makefile.am 2.199.2.58 +6 -0 rpm/rpmio/librpmio.vers 1.1.2.3 +189 -107 rpm/rpmio/rpmmqtt.c 1.1.2.3 +14 -2 rpm/rpmio/rpmmqtt.h 1.1.2.1 +68 -0 rpm/rpmio/tmqtt.c ____________________________________________________________________________ patch -p0 <<'@@ .' Index: rpm/rpmio/Makefile.am ============================================================================ $ cvs diff -u -r1.293.2.74 -r1.293.2.75 Makefile.am --- rpm/rpmio/Makefile.am 25 Jun 2016 22:36:53 -0000 1.293.2.74 +++ rpm/rpmio/Makefile.am 27 Jun 2016 22:00:14 -0000 1.293.2.75 @@ -15,12 +15,12 @@ rpmcpio.c rpmcpio.h rpmgenbasedir.c rpmgenpkglist.c rpmgensrclist.c \ rpmjsio.msg rpmtar.c rpmtar.h \ tdir.c tfts.c tget.c tgfs.c tgit.c tglob.c thkp.c thtml.c tinv.c tkey.c \ - tmire.c todbc.c tput.c tpython.c trpmio.c tsexp.c tsvn.c tsw.c \ + tmire.c tmqtt.c todbc.c tput.c tpython.c trpmio.c tsexp.c tsvn.c tsw.c \ lookup3.c duktape.c tjsmn.c tjson.c yajl.c testit.sh \ microjson.c mongoc-counters.defs EXTRA_PROGRAMS = rpmcpio rpmdpkg rpmtar rpmz -EXTRA_PROGRAMS += bdes thtml tinv tkey tmacro tpw +EXTRA_PROGRAMS += bdes thtml tinv tkey tmacro tpw turg noinst_PROGRAMS = tjsmn duk tmqtt EXTRA_PROGRAMS += bsdiff bspatch pcrsed rpmborg rpmcurl \ @@ -659,6 +659,9 @@ tmqtt_SOURCES = tmqtt.c tmqtt_LDADD = $(RPMIO_LDADD_COMMON) +turg_SOURCES = turg.c +turg_LDADD = $(RPMIO_LDADD_COMMON) + github_SOURCES = tjson.c github_CFLAGS = -Wall -Werror -std=gnu99 -O2 -DJSMN_GITHUB github_LDADD = -lcurl @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/librpmio.vers ============================================================================ $ cvs diff -u -r2.199.2.57 -r2.199.2.58 librpmio.vers --- rpm/rpmio/librpmio.vers 27 Jun 2016 03:05:57 -0000 2.199.2.57 +++ rpm/rpmio/librpmio.vers 27 Jun 2016 22:00:14 -0000 2.199.2.58 @@ -593,6 +593,8 @@ rpmltcImplVecs; rpmLUA_PATH; rpmLUA_CPATH; + _rpmluaPool; + _rpmluavPool; rpmluaFiles; rpmluaCheckScript; rpmluaDelVar; @@ -633,7 +635,11 @@ rpmmgFile; rpmmgBuffer; _rpmmqtt_debug; + rpmmqttConnect; + rpmmqttDisconnect; rpmmqttNew; + rpmmqttWrite; + rpmmqttRead; _rpmmrb_debug; _rpmmrbI; _rpmmrbPool; @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmqtt.c ============================================================================ $ cvs diff -u -r1.1.2.2 -r1.1.2.3 rpmmqtt.c --- rpm/rpmio/rpmmqtt.c 27 Jun 2016 18:27:10 -0000 1.1.2.2 +++ rpm/rpmio/rpmmqtt.c 27 Jun 2016 22:00:14 -0000 1.1.2.3 @@ -8,6 +8,7 @@ #include <string.h> #include <rpmio.h> /* for *Pool methods */ +#include <rpmlog.h> #include <argv.h> #define _RPMMQTT_INTERNAL @@ -15,14 +16,18 @@ #include "debug.h" -int _rpmmqtt_debug = 0; +int _rpmmqtt_debug = -1; +static char _test_mqtt[] = "test/mqtt"; + +/*==============================================================*/ static int Xcheck(rpmmqtt mqtt, const char * msg, int rc, int printit, const char * func, const char * fn, unsigned ln) { - if (printit || rc) { - fprintf(stderr, "error: %s:%s:%u: MQTTClient_%s(%d)\n", + if (rc != 0) { /* MQTTCLIENT_SUCCESS */ + int _lvl = RPMLOG_WARNING; + rpmlog(_lvl, "%s:%s:%u: MQTTClient_%s(%d)\n", func, fn, ln, msg, rc); } return rc; @@ -30,50 +35,21 @@ #define check(_o, _m, _rc) \ Xcheck(_o, _m, _rc, _rpmmqtt_debug, __FUNCTION__, __FILE__, __LINE__) -static void rpmmqttFini(void * _mqtt) -{ - rpmmqtt mqtt = (rpmmqtt) _mqtt; - -#ifdef WITH_MQTT - { MQTTClient C = (MQTTClient) mqtt->C; - int xx; - mqtt->msecs = 10000; - xx = check(mqtt, "disconnect", - MQTTClient_disconnect(C, mqtt->msecs)); - MQTTClient_destroy(&C); - } -#endif - mqtt->C = NULL; - - if (mqtt->av) - (void) argvFree((ARGV_t)mqtt->av); - mqtt->av = NULL; - mqtt->flags = 0; -} - -RPMIOPOOL_MODULE(mqtt) - +/*==============================================================*/ +struct MQTTClient_message; static int rpmmqttMessageArrived(void * _mqtt, char * topic, int topicLen, - void * _message) + MQTTClient_message * message) { rpmmqtt mqtt = (rpmmqtt) _mqtt; - FILE * fp = stderr; int rc = 1; (void)mqtt; - fprintf(fp, "Message arrived\n"); - fprintf(fp, " topic: %s\n", topic); - #ifdef WITH_MQTT - MQTTClient_message *message = _message; const char * s = message->payload; - - fprintf(fp, " message: "); - for (int i = 0; i < message->payloadlen; i++) - fprintf(fp, "%c", s[i]); - fprintf(fp, "\n"); - + size_t ns = message->payloadlen; + if (_rpmmqtt_debug < 0) + rpmlog(RPMLOG_DEBUG, "+++ MQTT rcvd topic(%s) \"%*s\"\n", topic, ns, s); MQTTClient_freeMessage(&message); MQTTClient_free(topic); #endif @@ -84,123 +60,229 @@ static void rpmmqttDeliveryComplete(void * _mqtt, int token) { rpmmqtt mqtt = (rpmmqtt) _mqtt; - FILE * fp = stderr; - fprintf(fp, "Message with token value %d delivery confirmed\n", token); + if (_rpmmqtt_debug < 0) + rpmlog(RPMLOG_DEBUG, "--- MQTT done(%d)\n", token); mqtt->token = token; + mqtt->delivered = 1; } static void rpmmqttConnlost(void * _mqtt, char *cause) { rpmmqtt mqtt = (rpmmqtt) _mqtt; - FILE * fp = stderr; - - (void)mqtt; - fprintf(fp, "\nConnection lost\n"); - fprintf(fp, " cause: %s\n", cause); + if (mqtt->debug || _rpmmqtt_debug < 0) { + rpmlog(RPMLOG_DEBUG, + "+++ MQTT disconnect(mqtt://%s) version(%d) present(%d)\n", + mqtt->serverURI, mqtt->MQTTVersion, mqtt->sessionPresent); + if (cause) + rpmlog(RPMLOG_DEBUG, "\tcause: %s\n", cause); + mqtt->serverURI = _free(mqtt->serverURI); + mqtt->connected = 0; + } } -rpmmqtt rpmmqttNew(char ** av, uint32_t flags) +/*==============================================================*/ +int rpmmqttConnect(rpmmqtt mqtt) { - rpmmqtt mqtt = rpmmqttGetPool(_rpmmqttPool); - - mqtt->flags = flags; - mqtt->av = NULL; - if (av) { - int ac = argvCount((ARGV_t)av); - for (int i = 0; i < ac; i++) - (void) argvAdd((ARGV_t *)&mqtt->av, av[i]); - } - + int rc = -1; #ifdef WITH_MQTT - { MQTTClient C = NULL; - static char * _uri = "tcp://localhost:1883"; - static char * _clientid = "ExampleClientPub"; - static int _persist_type = MQTTCLIENT_PERSISTENCE_NONE; - static void * _persist_ctx = NULL; - int xx; - - xx = check(mqtt, "create", - MQTTClient_create(&C, _uri, _clientid, - _persist_type, _persist_ctx)); - -#ifdef ASYNC - xx = check(mqtt, "setCallbacks", - MQTTClient_setCallbacks(C, mqtt, - rpmmqttConnlost, - rpmmqttMessageArrived, - rpmmqttDeliveryComplete)); + if (MQTTClient_isConnected(mqtt->C)) { + mqtt->connected = 1; + rc = 0; + } else { + MQTTClient_connectOptions Copts = MQTTClient_connectOptions_initializer; +#ifdef REF + memcpy(Copts.struct_id, "MQTC", 4); + Copts.struct_version = 4; /* 0-4 determines what follows */ #endif - - MQTTClient_connectOptions conn_opts = - MQTTClient_connectOptions_initializer; - conn_opts.keepAliveInterval = 20; - conn_opts.cleansession = 1; - xx = check(mqtt, "connect", - MQTTClient_connect(C, &conn_opts)); - - mqtt->C = C; + Copts.keepAliveInterval = 20; /* 60 */ + Copts.cleansession = 0; /* 1 discards session state */ + Copts.reliable = 0; /* 1 forces sync */ +#ifdef REF + Copts.will = NULL; /* last will */ + Copts.username = NULL; + Copts.password = NULL; + Copts.connectTimeout = 30; + Copts.retryInterval = 20; + Copts.ssl = NULL; + Copts.serverURIcount = 0; + Copts.serverURIs = NULL; + COpts.MQTTVersion = 0; +#endif + rc = check(mqtt, "connect", + MQTTClient_connect(mqtt->C, &Copts)); + if (rc == 0) { + mqtt->serverURI = xstrdup(Copts.returned.serverURI); + mqtt->MQTTVersion = Copts.returned.MQTTVersion; + mqtt->sessionPresent = Copts.returned.sessionPresent; + if (mqtt->debug || _rpmmqtt_debug < 0) + rpmlog(RPMLOG_DEBUG, + "+++ MQTT connect(mqtt://%s) version(%d) present(%d)\n", + mqtt->serverURI, mqtt->MQTTVersion, mqtt->sessionPresent); + mqtt->connected = 1; + } } #endif + return rc; +} - return rpmmqttLink(mqtt); +int rpmmqttDisconnect(rpmmqtt mqtt) +{ + int rc = -1; +#ifdef WITH_MQTT + if (MQTTClient_isConnected(mqtt->C)) { + rc = check(mqtt, "disconnect", + MQTTClient_disconnect(mqtt->C, mqtt->msecs)); + if (mqtt->debug || _rpmmqtt_debug < 0) + rpmlog(RPMLOG_DEBUG, + "+++ MQTT disconnect(mqtt://%s) version(%d) present(%d)\n", + mqtt->serverURI, mqtt->MQTTVersion, mqtt->sessionPresent); + mqtt->serverURI = _free(mqtt->serverURI); + mqtt->connected = 0; + } +#endif + return rc; } -int rpmmqttPublish(rpmmqtt mqtt, const char *topic, - const char *s, size_t ns) +/*==============================================================*/ +ssize_t rpmmqttWrite(rpmmqtt mqtt, const char *s, size_t ns) { - int rc = -1; /* assume failure */ + ssize_t ret = -1; /* assume failure */ if (ns == 0) ns = strlen(s); - mqtt->topic = (topic ? topic : "MQTT Examples"); - mqtt->qos = 1; - mqtt->msecs = 10000; - #ifdef WITH_MQTT - { MQTTClient C = (MQTTClient) mqtt->C; + if (rpmmqttConnect(mqtt) == 0) { MQTTClient_message pubmsg = MQTTClient_message_initializer; + int rc; pubmsg.payload = (char *) s; pubmsg.payloadlen = ns; pubmsg.qos = mqtt->qos; pubmsg.retained = 0; + mqtt->delivered = 0; rc = check(mqtt, "publishMessage", - MQTTClient_publishMessage(C, mqtt->topic, &pubmsg, + MQTTClient_publishMessage(mqtt->C, mqtt->topic, &pubmsg, &mqtt->token)); - - rc = check(mqtt, "waitForCompletion", - MQTTClient_waitForCompletion(C, mqtt->token, mqtt->msecs)); + if (_rpmmqtt_debug < 0) + rpmlog(RPMLOG_DEBUG, "+++ MQTT sent(%d) topic(%s) \"%*s\"\n", + mqtt->token, mqtt->topic, ns, s); + sleep(1); + + if (!mqtt->delivered) + rc = check(mqtt, "waitForCompletion", + MQTTClient_waitForCompletion(mqtt->C, mqtt->token, mqtt->msecs)); + if (rc == 0) + ret = ns; } #endif - return rc; + return ret; } -int rpmmqttSubscribe(rpmmqtt mqtt, const char *topic, - const char *s, size_t ns) -{ - int rc = -1; /* assume failure */ - mqtt->topic = (topic ? topic : "MQTT Examples"); - mqtt->qos = 1; - mqtt->msecs = 10000; +ssize_t rpmmqttRead(rpmmqtt mqtt, const char *s, size_t ns) +{ + ssize_t ret = -1; /* assume failure */ #ifdef WITH_MQTT - { MQTTClient C = (MQTTClient) mqtt->C; + if (rpmmqttConnect(mqtt) == 0) { +#ifdef NOTYET MQTTClient_message pubmsg = MQTTClient_message_initializer; + int rc; + pubmsg.payload = (char *) s; pubmsg.payloadlen = ns; pubmsg.qos = mqtt->qos; pubmsg.retained = 0; rc = check(mqtt, "publishMessage", - MQTTClient_publishMessage(C, mqtt->topic, &pubmsg, + MQTTClient_publishMessage(mqtt->C, mqtt->topic, &pubmsg, &mqtt->token)); rc = check(mqtt, "waitForCompletion", - MQTTClient_waitForCompletion(C, mqtt->token, mqtt->msecs)); + MQTTClient_waitForCompletion(mqtt->C, mqtt->token, mqtt->msecs)); + if (rc == 0) + ret = ns; +#endif } #endif - return rc; + return ret; +} + +/*==============================================================*/ +static void rpmmqttFini(void * _mqtt) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + +#ifdef WITH_MQTT + { MQTTClient C = (MQTTClient) mqtt->C; + int xx; + xx = rpmmqttDisconnect(mqtt); + xx = check(mqtt, "destroy", + (MQTTClient_destroy(&C), 0)); + } +#endif + mqtt->C = NULL; + mqtt->serverURI = _free(mqtt->serverURI); + + if (mqtt->av) + (void) argvFree((ARGV_t)mqtt->av); + mqtt->av = NULL; + mqtt->flags = 0; +} + +RPMIOPOOL_MODULE(mqtt) + +rpmmqtt rpmmqttNew(char ** av, uint32_t flags) +{ + rpmmqtt mqtt = rpmmqttGetPool(_rpmmqttPool); + + mqtt->flags = flags; + mqtt->av = NULL; + if (av) { + int ac = argvCount((ARGV_t)av); + for (int i = 0; i < ac; i++) + (void) argvAdd((ARGV_t *)&mqtt->av, av[i]); + } + + mqtt->topic = _test_mqtt; + mqtt->qos = 1; + mqtt->msecs = 10000; + +#ifdef WITH_MQTT + { static int oneshot; + static char * _uri = "tcp://localhost:1883"; + static char * _clientid = "ExampleClientPub"; + static int _persist_type = MQTTCLIENT_PERSISTENCE_NONE; + static void * _persist_ctx = NULL; + int xx; + + if (!oneshot) { + MQTTClient_nameValue *I = MQTTClient_getVersionInfo(); + int _lvl = RPMLOG_DEBUG; + rpmlog(_lvl, "==================== MQTT\n"); + while (I->name) { + rpmlog(_lvl, "%19s: %s\n", I->name, I->value); + I++; + } + oneshot++; + } + + xx = check(mqtt, "create", + MQTTClient_create(&mqtt->C, _uri, _clientid, + _persist_type, _persist_ctx)); + + xx = check(mqtt, "setCallbacks", + MQTTClient_setCallbacks(mqtt->C, mqtt, + rpmmqttConnlost, + rpmmqttMessageArrived, + rpmmqttDeliveryComplete)); + + xx = rpmmqttConnect(mqtt); + + } +#endif + + return rpmmqttLink(mqtt); } @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmqtt.h ============================================================================ $ cvs diff -u -r1.1.2.2 -r1.1.2.3 rpmmqtt.h --- rpm/rpmio/rpmmqtt.h 27 Jun 2016 18:27:10 -0000 1.1.2.2 +++ rpm/rpmio/rpmmqtt.h 27 Jun 2016 22:00:14 -0000 1.1.2.3 @@ -23,6 +23,14 @@ int token; int msecs; + int debug; + int connected; + int delivered; + + char * serverURI; + int MQTTVersion; + int sessionPresent; + }; #endif /* _RPMMQTT_INTERNAL */ @@ -65,9 +73,13 @@ */ rpmmqtt rpmmqttNew(char ** av, uint32_t flags); -int rpmmqttPublish(rpmmqtt mqtt, const char *topic, const char *s, size_t ns); +int rpmmqttConnect(rpmmqtt mqtt); + +int rpmmqttDisconnect(rpmmqtt mqtt); + +ssize_t rpmmqttWrite(rpmmqtt mqtt, const char *s, size_t ns); -int rpmmqttSubscribe(rpmmqtt mqtt, const char *topic, const char *s, size_t ns); +ssize_t rpmmqttRead(rpmmqtt mqtt, const char *s, size_t ns); #ifdef __cplusplus } @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/tmqtt.c ============================================================================ $ cvs diff -u -r0 -r1.1.2.1 tmqtt.c --- /dev/null 2016-06-28 00:00:14.000000000 +0200 +++ tmqtt.c 2016-06-28 00:00:14.953122940 +0200 @@ -0,0 +1,68 @@ + +#include "microjson.c" + +#include "system.h" +#include <stdarg.h> +#include <stdbool.h> +#include <stddef.h> +#include <getopt.h> +#include <math.h> +#include <termio.h> + +#include <poptIO.h> +#include <rpmdefs.h> + +#define _RPMMQTT_INTERNAL +#include "rpmmqtt.h" + +#include "debug.h" + +/*==============================================================*/ + +static int _DoMQTT(rpmmqtt mqtt) +{ + ssize_t nw; + int rc = 0; + + nw = rpmmqttWrite(mqtt, "bzzt ...", 0); + nw = rpmmqttWrite(mqtt, "bzzT ...", 0); + nw = rpmmqttWrite(mqtt, "bzZT ...", 0); + nw = rpmmqttWrite(mqtt, "bZZT ...", 0); + nw = rpmmqttWrite(mqtt, "BZZT ...", 0); + (void) rpmmqttDisconnect(mqtt); + nw = rpmmqttWrite(mqtt, "SWAT !!!", 0); + (void) rpmmqttDisconnect(mqtt); + (void) rpmmqttConnect(mqtt); + + return rc; +} + +/*==============================================================*/ + +static struct poptOption optionsTable[] = { + + { NULL, '\0', POPT_ARG_INCLUDE_TABLE, rpmioAllPoptTable, 0, + N_("Common options for all rpmio executables:"), + NULL }, + + POPT_AUTOHELP + POPT_TABLEEND +}; + +int +main(int argc, char *argv[]) +{ + poptContext optCon = rpmioInit(argc, argv, optionsTable); + ARGV_t av = poptGetArgs(optCon); +#ifdef UNUSED + int ac = argvCount(av); +#endif + rpmmqtt mqtt = rpmmqttNew((char **)av, 0); + int rc = -1; + + rc = _DoMQTT(mqtt); + + mqtt = rpmmqttFree(mqtt); + optCon = rpmioFini(optCon); + return rc; +} @@ . ______________________________________________________________________ RPM Package Manager http://rpm5.org CVS Sources Repository rpm-cvs@rpm5.org