RPM Package Manager, CVS Repository http://rpm5.org/cvs/ ____________________________________________________________________________
Server: rpm5.org Name: Jeff Johnson Root: /v/rpm/cvs Email: j...@rpm5.org Module: rpm Date: 14-Jul-2016 14:59:21 Branch: rpm-5_4 Handle: 2016071412592100 Modified files: (Branch: rpm-5_4) rpm/rpmio rpmmqtt.c rpmmqtt.h tmqtt.c Log: - mqtt: add per-module default uri. - mqtt: fix: select module from argv[0]. Summary: Revision Changes Path 1.1.2.24 +194 -539 rpm/rpmio/rpmmqtt.c 1.1.2.21 +1 -0 rpm/rpmio/rpmmqtt.h 1.1.2.16 +18 -67 rpm/rpmio/tmqtt.c ____________________________________________________________________________ patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmqtt.c ============================================================================ $ cvs diff -u -r1.1.2.23 -r1.1.2.24 rpmmqtt.c --- rpm/rpmio/rpmmqtt.c 14 Jul 2016 11:32:57 -0000 1.1.2.23 +++ rpm/rpmio/rpmmqtt.c 14 Jul 2016 12:59:21 -0000 1.1.2.24 @@ -4,15 +4,6 @@ #include "system.h" -#undef WITH_MOSQUITTO -#undef WITH_RABBITMQ -#undef WITH_PROTON -#undef WITH_ZEROMQ - -#if defined(WITH_PAHO) || defined(WITH_MOSQUITTO) || defined(WITH_RABBITMQ) || defined(WITH_ZEROMQ) -#define WITH_MQTT -#endif - #include <stdio.h> #include <stdlib.h> #include <string.h> @@ -427,6 +418,64 @@ } /*==============================================================*/ +static +rpmRC _rpmmqttSubscribeMany(rpmmqtt mqtt, int ac, char ** av) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + + if (ac <= 0) + goto exit; + if (!rpmmqttConnect(mqtt)) { + int *subqos = (int *) xcalloc(ac, sizeof(*subqos)); + for (int i = 0; i < ac; i++) { + char * t = av[i]; +#ifdef NOTYET /* XXX qos as subtopic ?qos=N? */ + char * te = strchr(t, '?'); + if (te) { + *te++ = '\0'; + if ((te = strchr(te, '='))) { + if (!strncmp(t, "qos", (te - t))) + subqos[i] = strtoul(te+1, NULL, 0); + } + } else +#endif + subqos[i] = mqtt->qos; /* XXX */ + rc = rpmmqttSubscribe(mqtt, t, subqos[i]); + if (rc) + break; + } + + if (subqos) + free(subqos); + if (rc) + goto exit; + rc = RPMRC_OK; + } + +exit: + return rc; +} + +static +rpmRC _rpmmqttUnsubscribeMany(rpmmqtt mqtt, int ac, char ** av) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + + if (ac <= 0) + goto exit; + if (!rpmmqttConnect(mqtt)) { + for (int i = 0; i < ac; i++) { + rc = rpmmqttUnsubscribe(mqtt, av[i]); + if (rc) + goto exit; + } + } + +exit: + return rc; +} + +/*==============================================================*/ #ifdef WITH_PAHO static int pahoOnMessageArrived(void * _mqtt, char * topic, int topicLen, MQTTAsync_message * message) @@ -1154,7 +1203,7 @@ "tcp://", mqtt->host, ":", portstr, NULL); } - /* XXX improve integration */ + /* XXX set through uri?query */ static const char _mqtt_persist[] = "%{?_mqtt_persist}%{!?_mqtt_persist:2}"; mqtt->persist_type = (rpmmqttExpandNumeric(mqtt, _mqtt_persist) % 3); @@ -1476,6 +1525,12 @@ static struct mqttVec_s pahoVec = { .name = "paho", + .uri = "mqtt://localhost:1883/" + "rpm/mqtt" /* XXX topic? */ + "?qos=1" /* XXX */ + "&timeout=10000" + "&max_inflight=20" + "&keepalive=60", .port = 1883, .sport = 8883, .prefix = "MQTTAsync_", @@ -1941,44 +1996,6 @@ } static -rpmRC mosqSubscribeMany(rpmmqtt mqtt, int ac, char ** av) -{ - rpmRC rc = RPMRC_FAIL; /* assume failure */ - - if (ac <= 0) - goto exit; - if (!rpmmqttConnect(mqtt)) { - int *subqos = (int *) xcalloc(ac, sizeof(*subqos)); - for (int i = 0; i < ac; i++) { - char * t = av[i]; -#ifdef NOTYET /* XXX qos as subtopic ?qos=N? */ - char * te = strchr(t, '?'); - if (te) { - *te++ = '\0'; - if ((te = strchr(te, '='))) { - if (!strncmp(t, "qos", (te - t))) - subqos[i] = strtoul(te+1, NULL, 0); - } - } else -#endif - subqos[i] = mqtt->qos; /* XXX */ - rc = rpmmqttSubscribe(mqtt, t, subqos[i]); - if (rc) - break; - } - - if (subqos) - free(subqos); - if (rc) - goto exit; - rc = RPMRC_OK; - } - -exit: - return rc; -} - -static rpmRC mosqSubscribe(rpmmqtt mqtt, const char * topic, int qos) { rpmRC rc = RPMRC_FAIL; /* assume failure */ @@ -2035,25 +2052,6 @@ return rc; } -static -rpmRC mosqUnsubscribeMany(rpmmqtt mqtt, int ac, char ** av) -{ - rpmRC rc = RPMRC_FAIL; /* assume failure */ - - if (ac <= 0) - goto exit; - if (!rpmmqttConnect(mqtt)) { - for (int i = 0; i < ac; i++) { - rc = rpmmqttUnsubscribe(mqtt, av[i]); - if (rc) - goto exit; - } - } - -exit: - return rc; -} - static KEY mosqErrs[] = { #define _ENTRY(_v) { MOSQ_ERR_##_v, #_v, } _ENTRY(CONN_PENDING), @@ -2081,6 +2079,12 @@ static struct mqttVec_s mosqVec = { .name = "mosquitto", + .uri = "mqtt://localhost:1883/" + "rpm/mqtt" /* XXX topic? */ + "?qos=1" /* XXX */ + "&timeout=10000" + "&max_inflight=20" + "&keepalive=60", .port = 1883, .sport = 8883, .prefix = "mosquitto_", @@ -2094,8 +2098,8 @@ .isconnected = mosqIsConnected, .unsubscribe = mosqUnsubscribe, .subscribe = mosqSubscribe, - .unsubscribeMany = mosqUnsubscribeMany, - .subscribeMany = mosqSubscribeMany, + .unsubscribeMany = _rpmmqttUnsubscribeMany, + .subscribeMany = _rpmmqttSubscribeMany, .sendMessage = mosqSendMessage, }; #endif /* WITH_MOSQUITTO */ @@ -2471,44 +2475,6 @@ } static -rpmRC amqpSubscribeMany(rpmmqtt mqtt, int ac, char ** av) -{ - rpmRC rc = RPMRC_FAIL; /* assume failure */ - - if (ac <= 0) - goto exit; - if (!rpmmqttConnect(mqtt)) { - int *subqos = (int *) xcalloc(ac, sizeof(*subqos)); - for (int i = 0; i < ac; i++) { - char * t = av[i]; -#ifdef NOTYET - char * te = strchr(t, '?'); - if (te) { - *te++ = '\0'; - if ((te = strchr(te, '='))) { - if (!strncmp(t, "qos", (te - t))) - subqos[i] = strtoul(te+1, NULL, 0); - } - } else -#endif - subqos[i] = mqtt->qos; /* XXX */ - rc = rpmmqttSubscribe(mqtt, t, subqos[i]); - if (rc) - break; - } - - if (subqos) - free(subqos); - if (rc) - goto exit; - rc = RPMRC_OK; - } - -exit: - return rc; -} - -static rpmRC amqpSubscribe(rpmmqtt mqtt, const char * topic, int qos) { rpmRC rc = RPMRC_FAIL; /* assume failure */ @@ -2568,57 +2534,9 @@ if (!rpmmqttConnect(mqtt)) { int _lvl = RPMLOG_DEBUG; +fprintf(stderr, "XXX %s: FIXME\n", __FUNCTION__); rpmlog(_lvl, "%19s: %s\n", "unsubscribe", topic); -#ifdef NOTYET - MQTTAsync_responseOptions *R = - (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R'); - R->onSuccess = amqpOnUnsubscribeMany; - R->onFailure = amqpOnUnsubscribeManyFailure; - - mqtt->finished = 0; - rc = mqchk(mqtt, "unsubscribe", - MQTTAsync_unsubscribe(mqtt->I, topic, R)); - while (!mqtt->finished) - usleep(100); -#endif - if (rc) - goto exit; - rc = RPMRC_OK; - } - -exit: - return rc; -} - -static -rpmRC amqpUnsubscribeMany(rpmmqtt mqtt, int ac, char ** av) -{ - rpmRC rc = RPMRC_FAIL; /* assume failure */ - -SPEW((stderr, "--> %s\n", __FUNCTION__)); - if (ac <= 0) - goto exit; - if (!rpmmqttConnect(mqtt)) { - int _lvl = RPMLOG_DEBUG; - for (int i = 0; i < ac; i++) { - char * t = av[i]; - rpmlog(_lvl, "%19s: %s\n", "unsubscribe", t); - } - -#ifdef NOTYET - MQTTAsync_responseOptions *R = - (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R'); - R->onSuccess = amqpOnUnsubscribeMany; - R->onFailure = amqpOnUnsubscribeManyFailure; - - mqtt->finished = 0; - rc = mqchk(mqtt, "unsubscribeMany", - MQTTAsync_unsubscribeMany(mqtt->I, ac, av, R)); - while (!mqtt->finished) - usleep(100); -#endif - if (rc) goto exit; rc = RPMRC_OK; @@ -2664,6 +2582,16 @@ static struct mqttVec_s amqpVec = { .name = "amqp", + .uri = "amqp://localhost:5672/" /* XXX vhost? */ + "?vhost=/" /* XXX vhost? */ + "&exchange=amq.direct" + "&queue=test queue" + "&verify=1" + "&fail_if_no_peer_cert=1" + "&sasl_mechanism=plain" + "&keepalive=60" + "&connection_timeout=60" + "&channel_max=60", .port = 5672, .sport = 5671, .prefix = "amqp_", @@ -2677,8 +2605,8 @@ .isconnected = amqpIsConnected, /* XXX */ .unsubscribe = amqpUnsubscribe, .subscribe = amqpSubscribe, - .unsubscribeMany = amqpUnsubscribeMany, - .subscribeMany = amqpSubscribeMany, + .unsubscribeMany = _rpmmqttUnsubscribeMany, + .subscribeMany = _rpmmqttSubscribeMany, .sendMessage = amqpSendMessage, }; #endif /* WITH_RABBITMQ */ @@ -2895,48 +2823,6 @@ } static -rpmRC qpidSubscribeMany(rpmmqtt mqtt, int ac, char ** av) -{ - rpmRC rc = RPMRC_FAIL; /* assume failure */ - - if (ac <= 0) - goto exit; -#ifdef DYING - if (!rpmmqttConnect(mqtt)) { - int *subqos = (int *) xcalloc(ac, sizeof(*subqos)); - for (int i = 0; i < ac; i++) { - char * t = av[i]; -#ifdef NOTYET - char * te = strchr(t, '?'); - if (te) { - *te++ = '\0'; - if ((te = strchr(te, '='))) { - if (!strncmp(t, "qos", (te - t))) - subqos[i] = strtoul(te+1, NULL, 0); - } - } else -#endif - subqos[i] = mqtt->qos; /* XXX */ - rc = rpmmqttSubscribe(mqtt, t, subqos[i]); - if (rc) - break; - } - - if (subqos) - free(subqos); - if (rc) - goto exit; - rc = RPMRC_OK; - } -#else - rc = RPMRC_OK; -#endif - -exit: - return rc; -} - -static rpmRC qpidSubscribe(rpmmqtt mqtt, const char * topic, int qos) { rpmRC rc = RPMRC_FAIL; /* assume failure */ @@ -3026,67 +2912,36 @@ return rc; } -static -rpmRC qpidUnsubscribeMany(rpmmqtt mqtt, int ac, char ** av) -{ - rpmRC rc = RPMRC_FAIL; /* assume failure */ - -#ifdef DYING -SPEW((stderr, "--> %s\n", __FUNCTION__)); - if (ac <= 0) - goto exit; - if (!rpmmqttConnect(mqtt)) { - int _lvl = RPMLOG_DEBUG; - for (int i = 0; i < ac; i++) { - char * t = av[i]; - rpmlog(_lvl, "%19s: %s\n", "unsubscribe", t); - } - -#ifdef NOTYET - MQTTAsync_responseOptions *R = - (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R'); - R->onSuccess = qpidOnUnsubscribeMany; - R->onFailure = qpidOnUnsubscribeManyFailure; - - mqtt->finished = 0; - rc = mqchk(mqtt, "unsubscribeMany", - MQTTAsync_unsubscribeMany(mqtt->I, ac, av, R)); - while (!mqtt->finished) - usleep(100); -#endif - - if (rc) - goto exit; - rc = RPMRC_OK; - } - -exit: -#else - rc = RPMRC_OK; -#endif - return rc; -} - -static KEY qpidErrs[] = { -#define _ENTRY(_v) { PN_##_v, #_v, } - _ENTRY(OK), - _ENTRY(EOS), - _ENTRY(ERR), - _ENTRY(OVERFLOW), - _ENTRY(UNDERFLOW), - _ENTRY(STATE_ERR), - _ENTRY(ARG_ERR), - _ENTRY(TIMEOUT), - _ENTRY(INTR), - _ENTRY(INPROGRESS), - _ENTRY(OUT_OF_MEMORY), -#undef _ENTRY - { 0, NULL }, -}; +static KEY qpidErrs[] = { +#define _ENTRY(_v) { PN_##_v, #_v, } + _ENTRY(OK), + _ENTRY(EOS), + _ENTRY(ERR), + _ENTRY(OVERFLOW), + _ENTRY(UNDERFLOW), + _ENTRY(STATE_ERR), + _ENTRY(ARG_ERR), + _ENTRY(TIMEOUT), + _ENTRY(INTR), + _ENTRY(INPROGRESS), + _ENTRY(OUT_OF_MEMORY), +#undef _ENTRY + { 0, NULL }, +}; static struct mqttVec_s qpidVec = { .name = "qpid", + .uri = "amqp://localhost:5672/" /* XXX vhost? */ + "?vhost=/" /* XXX vhost? */ + "&exchange=amq.direct" /* XXX */ + "&queue=test queue" /* XXX */ + "&verify=1" /* XXX */ + "&fail_if_no_peer_cert=1"/* XXX */ + "&sasl_mechanism=plain" /* XXX */ + "&keepalive=60" + "&connection_timeout=60" + "&channel_max=60", .port = 5672, .sport = 5671, .prefix = "pn_", @@ -3100,8 +2955,8 @@ .isconnected = qpidIsConnected, /* XXX */ .unsubscribe = qpidUnsubscribe, .subscribe = qpidSubscribe, - .unsubscribeMany = qpidUnsubscribeMany, - .subscribeMany = qpidSubscribeMany, + .unsubscribeMany = _rpmmqttUnsubscribeMany, + .subscribeMany = _rpmmqttSubscribeMany, .sendMessage = qpidSendMessage, }; #endif /* WITH_PROTON */ @@ -3111,12 +2966,8 @@ rpmRC zmqDestroy(rpmmqtt mqtt) { rpmRC rc = RPMRC_FAIL; /* assume failure */ -#ifdef NOTYET - rc = mqchk(mqtt, "destroy", - (MQTTAsync_destroy(&mqtt->I), 0)); - mqtt->state = _free(mqtt->state); +fprintf(stderr, "XXX %s: FIXME\n", __FUNCTION__); mqtt->I = NULL; -#endif return rc; } @@ -3124,42 +2975,17 @@ rpmRC zmqCreate(rpmmqtt mqtt) { rpmRC rc = RPMRC_FAIL; /* assume failure */ -#ifdef NOTYET static int oneshot; int _lvl = RPMLOG_DEBUG; - int xx; - -#ifdef DYING -mqtt->trace = 4; /* XXX */ -#endif - if (mqtt->trace && rpmIsDebug()) { - xx = mqchk(mqtt, "setTraceCallback", - (MQTTAsync_setTraceCallback(onTrace), 0)); - xx = mqchk(mqtt, "setTraceLevel", - (MQTTAsync_setTraceLevel((enum MQTTASYNC_TRACE_LEVELS)mqtt->trace), 0)); - } rpmlog(_lvl, "==================== %s\n", mqtt->vec->name); if (!oneshot) { - if (mqtt->trace == 0) { - MQTTAsync_nameValue *NV = MQTTAsync_getVersionInfo(); - while (NV->name) { - rpmlog(_lvl, "%19s: %s\n", NV->name, NV->value); - NV++; - } - } +fprintf(stderr, "XXX %s: FIXME: version oneshot\n", __FUNCTION__); oneshot++; } - /* XXX improve integration */ -static const char _mqtt_persist[] = - "%{?_mqtt_persist}%{!?_mqtt_persist:2}"; - mqtt->persist_type = (rpmmqttExpandNumeric(mqtt, _mqtt_persist) % 3); -static const char _mqtt_cachedir[] = - "%{?_mqtt_cachedir}%{!?_mqtt_cachedir:/var/cache/mqtt}"; -char *persist_path = rpmGetPath(_mqtt_cachedir, NULL); - +#ifdef NOTYET rpmlog(_lvl, "%19s: %s\n", "clientid", mqtt->clientid); rpmlog(_lvl, "%19s: %s qos(%d) timeout(%u msecs)\n", "topic", mqtt->topic, mqtt->qos, mqtt->timeout); @@ -3169,51 +2995,18 @@ rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags)); if (mqtt->trace) rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace); - -mqtt->persist_path = _free(mqtt->persist_path); -mqtt->persist_ctx = _free(mqtt->persist_ctx); - switch (mqtt->persist_type) { - default: - case MQTTCLIENT_PERSISTENCE_NONE: - mqtt->persist_ctx = NULL; - break; - case MQTTCLIENT_PERSISTENCE_DEFAULT: - { - mqtt->persist_path = xstrdup(persist_path); - /* XXX rpmmqttFini double free */ - mqtt->persist_ctx = (void *)xstrdup(mqtt->persist_path); - } break; - case MQTTCLIENT_PERSISTENCE_USER: - { - mqtt->persist_path = xstrdup(persist_path); - MQTTClient_persistence * ctx = - (MQTTClient_persistence *) xmalloc(sizeof(*ctx)); - *ctx = _mqtt_persistence; /* structure assignment */ - ctx->context = mqtt; - mqtt->persist_ctx = ctx; - } break; - } -persist_path = _free(persist_path); +#else +fprintf(stderr, "XXX %s: FIXME: values\n", __FUNCTION__); +#endif mqtt->u = NULL; dumpMQTT(__FUNCTION__, mqtt); if (mqtt->I == NULL) { - xx = mqchk(mqtt, "createWithOptions", - MQTTAsync_createWithOptions(&mqtt->I, - mqtt->uri, mqtt->clientid, - mqtt->persist_type, mqtt->persist_ctx, - (MQTTAsync_createOptions *)AOBJ(mqtt, 'O'))); - - xx = mqchk(mqtt, "setCallbacks", - MQTTAsync_setCallbacks(mqtt->I, mqtt, - zmqOnConnectionLost, - zmqOnMessageArrived, - zmqOnDeliveryComplete)); +fprintf(stderr, "XXX %s: FIXME: handle\n", __FUNCTION__); } rc = RPMRC_OK; -#endif /* NOTYET */ return rc; } @@ -3223,16 +3016,10 @@ { rpmRC rc = RPMRC_FAIL; /* assume failure */ if (rpmmqttIsConnected(mqtt) == RPMRC_OK) { -#ifdef NOTYET - mqtt->finished = 0; - rc = mqchk(mqtt, "disconnect", - MQTTAsync_disconnect(mqtt->I, - (const MQTTAsync_disconnectOptions*)AOBJ(mqtt, 'D'))); - while (!mqtt->finished) - usleep(100); -#endif +fprintf(stderr, "XXX %s: FIXME: connected = 0\n", __FUNCTION__); if (rc) goto exit; +mqtt->connected = 0; } rc = RPMRC_OK; @@ -3246,16 +3033,10 @@ rpmRC rc = RPMRC_FAIL; /* assume failure */ if (rpmmqttIsConnected(mqtt) == RPMRC_NOTFOUND) { -#ifdef NOTYET - mqtt->finished = 0; - rc = mqchk(mqtt, "connect", - MQTTAsync_connect(mqtt->I, - (const MQTTAsync_connectOptions*)AOBJ(mqtt, 'C'))); - while (!mqtt->finished) - usleep(1000); -#endif +fprintf(stderr, "XXX %s: FIXME: connected = 1\n", __FUNCTION__); if (rc) goto exit; +mqtt->connected = 1; } rc = RPMRC_OK; @@ -3266,13 +3047,7 @@ static rpmRC zmqIsConnected(rpmmqtt mqtt) { - rpmRC rc = RPMRC_NOTFOUND; - -#ifdef NOTYET - if (mqchk(mqtt, "isconnected", - MQTTAsync_isConnected(mqtt->I))) - rc = RPMRC_OK; -#endif + rpmRC rc = (mqtt->connected ? RPMRC_OK : RPMRC_NOTFOUND); return rc; } @@ -3290,72 +3065,7 @@ ns = strlen(s); if (!rpmmqttConnect(mqtt)) { -#ifdef NOTYET - MQTTAsync_message *M = - (MQTTAsync_message *) AOBJ(mqtt, 'M'); - M->payloadlen = ns; - M->payload = (char *) s; - - MQTTAsync_responseOptions *R = - (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R'); - R->onSuccess = zmqOnSend; - R->onFailure = zmqOnSendFailure; - - mqtt->finished = 0; - rc = mqchk(mqtt, "sendMessage", - MQTTAsync_sendMessage(mqtt->I, topic, M, R)); - while (!mqtt->finished) - usleep(100); -#endif - if (rc) - goto exit; - rc = RPMRC_OK; - } - -exit: - return rc; -} - -static -rpmRC zmqSubscribeMany(rpmmqtt mqtt, int ac, char ** av) -{ - rpmRC rc = RPMRC_FAIL; /* assume failure */ - - if (ac <= 0) - goto exit; - if (!rpmmqttConnect(mqtt)) { - int _lvl = RPMLOG_DEBUG; - int *subqos = (int *) xcalloc(ac, sizeof(*subqos)); - for (int i = 0; i < ac; i++) { - char * t = av[i]; -#ifdef NOTYET - char * te = strchr(t, '?'); - if (te) { - *te++ = '\0'; - if ((te = strchr(te, '='))) { - if (!strncmp(t, "qos", (te - t))) - subqos[i] = strtoul(te+1, NULL, 0); - } - } else -#endif - subqos[i] = mqtt->qos; /* XXX */ - rpmlog(_lvl, "%19s: %s qos(%u)\n", "subscribe", t, subqos[i]); - } - -#ifdef NOTYET - MQTTAsync_responseOptions *R = - (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R'); - R->onSuccess = zmqOnSubscribeMany; - R->onFailure = zmqOnSubscribeManyFailure; - - mqtt->finished = 0; - rc = mqchk(mqtt, "subscribeMany", - MQTTAsync_subscribeMany(mqtt->I, ac, av, subqos, R)); - while (!mqtt->finished) - usleep(100); -#endif - if (subqos) - free(subqos); +fprintf(stderr, "XXX %s: FIXME\n", __FUNCTION__); if (rc) goto exit; rc = RPMRC_OK; @@ -3373,20 +3083,9 @@ if (!rpmmqttConnect(mqtt)) { int _lvl = RPMLOG_DEBUG; +fprintf(stderr, "XXX %s: FIXME\n", __FUNCTION__); rpmlog(_lvl, "%19s: %s qos(%d)\n", "subscribe", topic, qos); -#ifdef NOTYET - MQTTAsync_responseOptions *R = - (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R'); - R->onSuccess = zmqOnSubscribe; - R->onFailure = zmqOnSubscribeFailure; - - mqtt->finished = 0; - rc = mqchk(mqtt, "subscribe", - MQTTAsync_subscribe(mqtt->I, topic, qos, R)); - while (!mqtt->finished) - usleep(100); -#endif if (rc) goto exit; rc = RPMRC_OK; @@ -3404,56 +3103,9 @@ if (!rpmmqttConnect(mqtt)) { int _lvl = RPMLOG_DEBUG; +fprintf(stderr, "XXX %s: FIXME\n", __FUNCTION__); rpmlog(_lvl, "%19s: %s\n", "unsubscribe", topic); -#ifdef NOTYET - MQTTAsync_responseOptions *R = - (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R'); - R->onSuccess = zmqOnUnsubscribeMany; - R->onFailure = zmqOnUnsubscribeManyFailure; - - mqtt->finished = 0; - rc = mqchk(mqtt, "unsubscribe", - MQTTAsync_unsubscribe(mqtt->I, topic, R)); - while (!mqtt->finished) - usleep(100); -#endif - if (rc) - goto exit; - rc = RPMRC_OK; - } - -exit: - return rc; -} - -static -rpmRC zmqUnsubscribeMany(rpmmqtt mqtt, int ac, char ** av) -{ - rpmRC rc = RPMRC_FAIL; /* assume failure */ - - if (ac <= 0) - goto exit; - if (!rpmmqttConnect(mqtt)) { - int _lvl = RPMLOG_DEBUG; - for (int i = 0; i < ac; i++) { - char * t = av[i]; - rpmlog(_lvl, "%19s: %s\n", "unsubscribe", t); - } - -#ifdef NOTYET - MQTTAsync_responseOptions *R = - (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R'); - R->onSuccess = zmqOnUnsubscribeMany; - R->onFailure = zmqOnUnsubscribeManyFailure; - - mqtt->finished = 0; - rc = mqchk(mqtt, "unsubscribeMany", - MQTTAsync_unsubscribeMany(mqtt->I, ac, av, R)); - while (!mqtt->finished) - usleep(100); -#endif - if (rc) goto exit; rc = RPMRC_OK; @@ -3470,10 +3122,11 @@ static struct mqttVec_s zmqVec = { .name = "zmq", - .port = 0xfffe, /* XXX W2DO? */ - .sport = 0xfffe, /* XXX W2DO? */ + .uri = "zmq://localhost/", /* XXX W2DO? */ + .port = 0xfffe, /* XXX W2DO? */ + .sport = 0xfffe, /* XXX W2DO? */ .prefix = "zmq_", - .errs = zmqErrs, /* XXX W2DO? */ + .errs = zmqErrs, /* XXX W2DO? */ .nerrs = sizeof(zmqErrs) / sizeof(zmqErrs[0]), .destroy = zmqDestroy, .create = zmqCreate, @@ -3482,8 +3135,8 @@ .isconnected = zmqIsConnected, /* XXX */ .unsubscribe = zmqUnsubscribe, .subscribe = zmqSubscribe, - .unsubscribeMany = zmqUnsubscribeMany, - .subscribeMany = zmqSubscribeMany, + .unsubscribeMany = _rpmmqttUnsubscribeMany, + .subscribeMany = _rpmmqttSubscribeMany, .sendMessage = zmqSendMessage, }; #endif /* WITH_ZEROMQ */ @@ -3741,52 +3394,6 @@ con = poptGetContext(av[0], ac, (const char **)av, rpmmqttOptionsTable, _popt_context_flags); -mqtt->_progname = _free(mqtt->_progname); -mqtt->_progmode = _free(mqtt->_progmode); - { const char *arg0 = poptGetInvocationName(con); - const char * _progname; - const char * _progmode; - - if ((_progname = strrchr(arg0, '/')) != NULL) _progname++; - else _progname = arg0; - if (!strncmp(_progname, "lt-", sizeof("lt-")-1)) - _progname += sizeof("lt-")-1; - - /* XXX initialized based on %__mqtt_module */ - /* XXX initialized based on av[1] */ - /* XXX initialized based on URI scheme */ -#ifdef WITH_PAHO - if (mqtt->vec == NULL - && (strstr(_progname, "paho") || strstr(_progname, "mqtt"))) - mqtt->vec = &pahoVec; -#endif -#ifdef WITH_MOSQUITTO - if (mqtt->vec == NULL - && (strstr(_progname, "mosquitto") || strstr(_progname, "mosq") || strstr(_progname, "mqtt"))) - mqtt->vec = &mosqVec; -#endif -#ifdef WITH_RABBITMQ - if (mqtt->vec == NULL - && (strstr(_progname, "rabbitmq") || strstr(_progname, "amqp"))) - mqtt->vec = &amqpVec; -#endif -#ifdef WITH_PROTON - if (mqtt->vec == NULL - && (strstr(_progname, "proton") || strstr(_progname, "qpid") strstr(_progname, "amqp"))) - mqtt->vec = &qpidVec; -#endif -#ifdef WITH_ZEROMQ - if (mqtt->vec == NULL - && (strstr(_progname, "zeromq") || strstr(_progname, "zmq"))) - mqtt->vec = &zmqVec; -#endif - /* XXX mqtt->vec default? */ - - _progmode = (strstr(_progname, "sub") ? "sub" : "pub"); - mqtt->_progname = xstrdup(_progname); - mqtt->_progmode = xstrdup(_progmode); - } - while ((xx = poptGetNextOpt(con)) > 0) switch (xx) { default: @@ -4060,9 +3667,59 @@ static mqttFlags _flags = MQTT_FLAGS_DEFAULT; /* CLEAN|EOL */ mqtt->flags = (flags ? flags : _flags); + /* -- Set module vector and program name/mode from av[0] */ +assert(av && av[0]); +mqtt->_progname = _free(mqtt->_progname); +mqtt->_progmode = _free(mqtt->_progmode); + { const char *arg0 = av[0]; + const char * _progname; + const char * _progmode; + + if ((_progname = strrchr(arg0, '/')) != NULL) _progname++; + else _progname = arg0; + if (!strncmp(_progname, "lt-", sizeof("lt-")-1)) + _progname += sizeof("lt-")-1; + + /* XXX initialized based on %__mqtt_module */ + /* XXX initialized based on av[1] */ + /* XXX initialized based on URI scheme */ +#ifdef WITH_PAHO + if (mqtt->vec == NULL + && (strstr(_progname, "paho") || strstr(_progname, "mqtt"))) + mqtt->vec = &pahoVec; +#endif +#ifdef WITH_MOSQUITTO + if (mqtt->vec == NULL + && (strstr(_progname, "mosquitto") || strstr(_progname, "mosq") || strstr(_progname, "mqtt"))) + mqtt->vec = &mosqVec; +#endif +#ifdef WITH_RABBITMQ + if (mqtt->vec == NULL + && (strstr(_progname, "rabbitmq") || strstr(_progname, "amqp"))) + mqtt->vec = &amqpVec; +#endif +#ifdef WITH_PROTON + if (mqtt->vec == NULL + && (strstr(_progname, "proton") || strstr(_progname, "qpid") || strstr(_progname, "amqp"))) + mqtt->vec = &qpidVec; +#endif +#ifdef WITH_ZEROMQ + if (mqtt->vec == NULL + && (strstr(_progname, "zeromq") || strstr(_progname, "zmq"))) + mqtt->vec = &zmqVec; +#endif + /* XXX mqtt->vec default? */ + + _progmode = (strstr(_progname, "sub") ? "sub" : "pub"); + mqtt->_progname = xstrdup(_progname); + mqtt->_progmode = xstrdup(_progmode); + } +assert(mqtt->vec); + /* -- Initialize oddball values. */ if (_rpmmqtt_debug < 0) fprintf(stderr, "======= Initialize oddball values.\n"); + /* XXX FIXME: needed for --help POPT_ARGFLAG_DEFAULT spewage only */ (void) rpmmqttExpand(mqtt, &mqtt->protocol_version, "auto", NULL); @@ -4077,20 +3734,18 @@ mqtt->idprefix, "-%%{pid}", NULL); /* -- Initialize values from default URI. */ - static const char _mqtt_uri[] = - "mqtt://localhost:1883/rpm/mqtt" - "?qos=0" - "&timeout=10000" - "&max_inflight=20"; - "&keepalive=60"; +assert(mqtt && mqtt->vec); uri = rpmmqttExpand(mqtt, NULL, - "%{?_mqtt_uri}%{!?_mqtt_uri:", _mqtt_uri, "}", NULL); - + "%{?_mqtt_uri}%{!?_mqtt_uri:", + (mqtt->vec->uri ? mqtt->vec->uri : ""), + "}", NULL); + if (uri && *uri) { if (_rpmmqtt_debug < 0) fprintf(stderr, "======= Initialize values from default uri\n\t%s\n", uri); - rc = rpmmqttInitURI(mqtt, uri); - if (rc) - goto exit; + rc = rpmmqttInitURI(mqtt, uri); + if (rc) + goto exit; + } /* -- Override with values from macros (if any). */ if (_rpmmqtt_debug < 0) @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmqtt.h ============================================================================ $ cvs diff -u -r1.1.2.20 -r1.1.2.21 rpmmqtt.h --- rpm/rpmio/rpmmqtt.h 14 Jul 2016 11:09:49 -0000 1.1.2.20 +++ rpm/rpmio/rpmmqtt.h 14 Jul 2016 12:59:21 -0000 1.1.2.21 @@ -160,6 +160,7 @@ struct mqttVec_s { const char * name; + const char * uri; uint16_t port; uint16_t sport; const char * prefix; @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/tmqtt.c ============================================================================ $ cvs diff -u -r1.1.2.15 -r1.1.2.16 tmqtt.c --- rpm/rpmio/tmqtt.c 14 Jul 2016 11:09:49 -0000 1.1.2.15 +++ rpm/rpmio/tmqtt.c 14 Jul 2016 12:59:21 -0000 1.1.2.16 @@ -12,10 +12,6 @@ #include <poptIO.h> #include <rpmdefs.h> -#ifdef WITH_MQTT -#include <MQTTAsync.h> -#endif - #define _RPMMQTT_INTERNAL #include "rpmmqtt.h" @@ -53,73 +49,28 @@ int main(int argc, char *argv[]) { -#ifdef UNUSED -static char _mqtt_argv[] = "\ - mqtt://luser:jasnl@localhost:1883/rpm/mqtt?qos=1,timeout=10000,trace=4 \n\ - rpm/#?qos=1 \n\ -"; -#ifdef REF - $SYS/broker/version?qos=0 \n\ - $SYS/broker/timestamp?qos=0 \n\ - $SYS/broker/uptime?qos=0 \n\ - $SYS/broker/clients/#?qos=0 \n\ - $SYS/broker/messages/#?qos=0 \n\ - $SYS/broker/subscriptions/#?qos=0 \n\ - $SYS/broker/heap/#?qos=0 \n\ - $SYS/broker/publish/#?qos=0 \n\ - $SYS/broker/bytes/#?qos=0 \n -#endif - ARGV_t av = poptGetArgs(optCon); - int ac = argvCount(av); - static char *_av[] = { _mqtt_argv, NULL, }; -#endif rpmmqtt mqtt = NULL; int rc = -1; - /* Initialize the _mqtt_ macro context */ + if (argc >= 2) { + if (!strcmp(argv[1], "paho")) + argv[0] = "paho_pub"; + if (!strcmp(argv[1], "mosquitto")) + argv[0] = "mosquitto_pub"; + if (!strcmp(argv[1], "amqp")) + argv[0] = "amqp_pub"; + if (!strcmp(argv[1], "qpid")) + argv[0] = "qpid_pub"; + if (!strcmp(argv[1], "zmq")) + argv[0] = "zmq_pub"; + } + +/* XXX global macros to initialize correctly. */ +(void) rpmmqttDefineMacro(NULL, "_mqtt_persist 2", 0); +(void) rpmmqttDefineMacro(NULL, "_mqtt_cachedir /var/cache/mqtt", 0); -#undef WITH_RABBITMQ -#ifdef WITH_RABBITMQ -static char _mqtt_uri[] = - "_mqtt_uri amqp://guest:guest@localhost:5672/" - "?vhost=/" - "&exchange=amq.direct" - "&queue=test queue" - "&verify=1" - "&fail_if_no_peer_cert=1" - "&sasl_mechanism=plain" - "&sasl_mechanism=amqplain" - "&keepalive=60" - "&connection_timeout=60" - "&channel_max=60" - ""; -#else -static char _mqtt_uri[] = - "_mqtt_uri mqtt://luser:jasnl@localhost:1883/rpm/mqtt" - "?qos=1;" - "timeout=10000;" - "trace=4;" - "#rpm/#"; -#endif -(void) rpmmqttDefineMacro(mqtt, _mqtt_uri, 0); -(void) rpmmqttDefineMacro(mqtt, "_mqtt_persist 2", 0); -(void) rpmmqttDefineMacro(mqtt, "_mqtt_cachedir /var/cache/mqtt", 0); - -(void) rpmmqttDefineMacro(mqtt, "nil %{!?nil}", 0); -(void) rpmmqttDefineMacro(mqtt, "_mqtt_prefix %{now}", 0); - -#ifdef DYING -(void) rpmmqttDefineMacro(mqtt, "_mqtt_trace 4", 0); -(void) rpmmqttDefineMacro(mqtt, "_mqtt_user luser", 0); -(void) rpmmqttDefineMacro(mqtt, "_mqtt_pass jasnl", 0); -(void) rpmmqttDefineMacro(mqtt, "_mqtt_host localhost", 0); -(void) rpmmqttDefineMacro(mqtt, "_mqtt_port 1883", 0); -(void) rpmmqttDefineMacro(mqtt, "_mqtt_clientid rpm-%{pid}", 0); -(void) rpmmqttDefineMacro(mqtt, "_mqtt_topic rpm/#", 0); -(void) rpmmqttDefineMacro(mqtt, "_mqtt_qos 1", 0); -(void) rpmmqttDefineMacro(mqtt, "_mqtt_timeout 10000", 0); -(void) rpmmqttDefineMacro(mqtt, "_mqtt_prefix %{now} rpm pid %{pid} on cpu%{cpu} %{user}:%{group} ", 0); -#endif +(void) rpmmqttDefineMacro(NULL, "nil %{!?nil}", 0); +(void) rpmmqttDefineMacro(NULL, "_mqtt_prefix %{now}", 0); mqtt = rpmmqttNew(argv, 0); rc = _DoMQTT(mqtt); @@ . ______________________________________________________________________ RPM Package Manager http://rpm5.org CVS Sources Repository rpm-cvs@rpm5.org