RPM Package Manager, CVS Repository http://rpm5.org/cvs/ ____________________________________________________________________________
Server: rpm5.org Name: Jeff Johnson Root: /v/rpm/cvs Email: j...@rpm5.org Module: rpm Date: 27-Jun-2016 20:27:10 Branch: rpm-5_4 Handle: 2016062718271000 Modified files: (Branch: rpm-5_4) rpm CHANGES configure.ac rpm/rpmio rpmmqtt.c rpmmqtt.h Log: - mqtt: stub-in a paho-mqtt client. Summary: Revision Changes Path 1.3501.2.507+2 -0 rpm/CHANGES 2.472.2.151 +8 -0 rpm/configure.ac 1.1.2.2 +166 -2 rpm/rpmio/rpmmqtt.c 1.1.2.2 +11 -1 rpm/rpmio/rpmmqtt.h ____________________________________________________________________________ patch -p0 <<'@@ .' Index: rpm/CHANGES ============================================================================ $ cvs diff -u -r1.3501.2.506 -r1.3501.2.507 CHANGES --- rpm/CHANGES 27 Jun 2016 03:10:03 -0000 1.3501.2.506 +++ rpm/CHANGES 27 Jun 2016 18:27:10 -0000 1.3501.2.507 @@ -1,4 +1,6 @@ 5.4.17 -> 5.4.18: + - jbj: mqtt: stub-in a paho-mqtt client. + - jbj: build: add/use RPMIOPOOL_ macros where possible. - jbj: lib: add/use RPMIOPOOL_ macros where possible. - jbj: rpmdb: add/use RPMIOPOOL_ macros where possible. - jbj: rpmdb: rename dbiIndex -> rpmdbi type, dbiFoo() -> rpmdbiFoo(). @@ . patch -p0 <<'@@ .' Index: rpm/configure.ac ============================================================================ $ cvs diff -u -r2.472.2.150 -r2.472.2.151 configure.ac --- rpm/configure.ac 24 Jun 2016 19:08:22 -0000 2.472.2.150 +++ rpm/configure.ac 27 Jun 2016 18:27:10 -0000 2.472.2.151 @@ -2244,6 +2244,14 @@ fi ], [ AC_MSG_WARN([No Neon library found, using unsupported configuration]) ]) +# MQTT +RPM_CHECK_LIB( + [MQTT], [mqtt], + [paho-mqtt3c], [MQTTClient_create], [MQTTClient.h], + [no,external:none], [], + [ AC_DEFINE(WITH_MQTT, 1, [Define if building with MQTT]) + ], []) + # Libgit2 # XXX internal needs *.a/*.la files to merge into -lrpmmisc RPM_CHECK_LIB( @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmqtt.c ============================================================================ $ cvs diff -u -r1.1.2.1 -r1.1.2.2 rpmmqtt.c --- rpm/rpmio/rpmmqtt.c 27 Jun 2016 03:05:58 -0000 1.1.2.1 +++ rpm/rpmio/rpmmqtt.c 27 Jun 2016 18:27:10 -0000 1.1.2.2 @@ -3,6 +3,9 @@ */ #include "system.h" +#include <stdio.h> +#include <stdlib.h> +#include <string.h> #include <rpmio.h> /* for *Pool methods */ #include <argv.h> @@ -12,20 +15,91 @@ #include "debug.h" +int _rpmmqtt_debug = 0; + +static int Xcheck(rpmmqtt mqtt, const char * msg, int rc, + int printit, const char * func, const char * fn, unsigned ln) +{ + + if (printit || rc) { + fprintf(stderr, "error: %s:%s:%u: MQTTClient_%s(%d)\n", + func, fn, ln, msg, rc); + } + return rc; +} +#define check(_o, _m, _rc) \ + Xcheck(_o, _m, _rc, _rpmmqtt_debug, __FUNCTION__, __FILE__, __LINE__) + static void rpmmqttFini(void * _mqtt) { rpmmqtt mqtt = (rpmmqtt) _mqtt; +#ifdef WITH_MQTT + { MQTTClient C = (MQTTClient) mqtt->C; + int xx; + mqtt->msecs = 10000; + xx = check(mqtt, "disconnect", + MQTTClient_disconnect(C, mqtt->msecs)); + MQTTClient_destroy(&C); + } +#endif + mqtt->C = NULL; + if (mqtt->av) (void) argvFree((ARGV_t)mqtt->av); mqtt->av = NULL; mqtt->flags = 0; } -int _rpmmqtt_debug = 0; - RPMIOPOOL_MODULE(mqtt) +static int rpmmqttMessageArrived(void * _mqtt, char * topic, int topicLen, + void * _message) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + FILE * fp = stderr; + int rc = 1; + + (void)mqtt; + + fprintf(fp, "Message arrived\n"); + fprintf(fp, " topic: %s\n", topic); + +#ifdef WITH_MQTT + MQTTClient_message *message = _message; + const char * s = message->payload; + + fprintf(fp, " message: "); + for (int i = 0; i < message->payloadlen; i++) + fprintf(fp, "%c", s[i]); + fprintf(fp, "\n"); + + MQTTClient_freeMessage(&message); + MQTTClient_free(topic); +#endif + + return rc; +} + +static void rpmmqttDeliveryComplete(void * _mqtt, int token) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + FILE * fp = stderr; + fprintf(fp, "Message with token value %d delivery confirmed\n", token); + mqtt->token = token; +} + +static void rpmmqttConnlost(void * _mqtt, char *cause) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + FILE * fp = stderr; + + (void)mqtt; + + fprintf(fp, "\nConnection lost\n"); + fprintf(fp, " cause: %s\n", cause); +} + rpmmqtt rpmmqttNew(char ** av, uint32_t flags) { rpmmqtt mqtt = rpmmqttGetPool(_rpmmqttPool); @@ -38,5 +112,95 @@ (void) argvAdd((ARGV_t *)&mqtt->av, av[i]); } +#ifdef WITH_MQTT + { MQTTClient C = NULL; + static char * _uri = "tcp://localhost:1883"; + static char * _clientid = "ExampleClientPub"; + static int _persist_type = MQTTCLIENT_PERSISTENCE_NONE; + static void * _persist_ctx = NULL; + int xx; + + xx = check(mqtt, "create", + MQTTClient_create(&C, _uri, _clientid, + _persist_type, _persist_ctx)); + +#ifdef ASYNC + xx = check(mqtt, "setCallbacks", + MQTTClient_setCallbacks(C, mqtt, + rpmmqttConnlost, + rpmmqttMessageArrived, + rpmmqttDeliveryComplete)); +#endif + + MQTTClient_connectOptions conn_opts = + MQTTClient_connectOptions_initializer; + conn_opts.keepAliveInterval = 20; + conn_opts.cleansession = 1; + xx = check(mqtt, "connect", + MQTTClient_connect(C, &conn_opts)); + + mqtt->C = C; + } +#endif + return rpmmqttLink(mqtt); } + +int rpmmqttPublish(rpmmqtt mqtt, const char *topic, + const char *s, size_t ns) +{ + int rc = -1; /* assume failure */ + + if (ns == 0) ns = strlen(s); + + mqtt->topic = (topic ? topic : "MQTT Examples"); + mqtt->qos = 1; + mqtt->msecs = 10000; + +#ifdef WITH_MQTT + { MQTTClient C = (MQTTClient) mqtt->C; + MQTTClient_message pubmsg = MQTTClient_message_initializer; + pubmsg.payload = (char *) s; + pubmsg.payloadlen = ns; + pubmsg.qos = mqtt->qos; + pubmsg.retained = 0; + + rc = check(mqtt, "publishMessage", + MQTTClient_publishMessage(C, mqtt->topic, &pubmsg, + &mqtt->token)); + + rc = check(mqtt, "waitForCompletion", + MQTTClient_waitForCompletion(C, mqtt->token, mqtt->msecs)); + } +#endif + + return rc; +} +int rpmmqttSubscribe(rpmmqtt mqtt, const char *topic, + const char *s, size_t ns) +{ + int rc = -1; /* assume failure */ + + mqtt->topic = (topic ? topic : "MQTT Examples"); + mqtt->qos = 1; + mqtt->msecs = 10000; + +#ifdef WITH_MQTT + { MQTTClient C = (MQTTClient) mqtt->C; + MQTTClient_message pubmsg = MQTTClient_message_initializer; + pubmsg.payload = (char *) s; + pubmsg.payloadlen = ns; + pubmsg.qos = mqtt->qos; + pubmsg.retained = 0; + + rc = check(mqtt, "publishMessage", + MQTTClient_publishMessage(C, mqtt->topic, &pubmsg, + &mqtt->token)); + + rc = check(mqtt, "waitForCompletion", + MQTTClient_waitForCompletion(C, mqtt->token, mqtt->msecs)); + } +#endif + + return rc; +} @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmqtt.h ============================================================================ $ cvs diff -u -r1.1.2.1 -r1.1.2.2 rpmmqtt.h --- rpm/rpmio/rpmmqtt.h 27 Jun 2016 03:05:58 -0000 1.1.2.1 +++ rpm/rpmio/rpmmqtt.h 27 Jun 2016 18:27:10 -0000 1.1.2.2 @@ -10,12 +10,18 @@ typedef struct rpmmqtt_s * rpmmqtt; #if defined(_RPMMQTT_INTERNAL) +#include <MQTTClient.h> struct rpmmqtt_s { struct rpmioItem_s _item; /*!< usage mutex and pool identifier. */ char ** av; uint32_t flags; - int sv[2]; + void * C; /* MQTTClient */ + + const char * topic; + int qos; + int token; + int msecs; }; #endif /* _RPMMQTT_INTERNAL */ @@ -59,6 +65,10 @@ */ rpmmqtt rpmmqttNew(char ** av, uint32_t flags); +int rpmmqttPublish(rpmmqtt mqtt, const char *topic, const char *s, size_t ns); + +int rpmmqttSubscribe(rpmmqtt mqtt, const char *topic, const char *s, size_t ns); + #ifdef __cplusplus } #endif @@ . ______________________________________________________________________ RPM Package Manager http://rpm5.org CVS Sources Repository rpm-cvs@rpm5.org