Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 0da444e9a -> a8703b5c7
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/samples/MQTTAsync_publish.c ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/samples/MQTTAsync_publish.c b/thirdparty/paho.mqtt.c/src/samples/MQTTAsync_publish.c new file mode 100644 index 0000000..a524940 --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/samples/MQTTAsync_publish.c @@ -0,0 +1,154 @@ +/******************************************************************************* + * Copyright (c) 2012, 2013 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial contribution + *******************************************************************************/ + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include "MQTTAsync.h" + +#if !defined(WIN32) +#include <unistd.h> +#else +#include <windows.h> +#endif + +#include <OsWrapper.h> + +#define ADDRESS "tcp://m2m.eclipse.org:1883" +#define CLIENTID "ExampleClientPub" +#define TOPIC "MQTT Examples" +#define PAYLOAD "Hello World!" +#define QOS 1 +#define TIMEOUT 10000L + +volatile MQTTAsync_token deliveredtoken; + +int finished = 0; + +void connlost(void *context, char *cause) +{ + MQTTAsync client = (MQTTAsync)context; + MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; + int rc; + + printf("\nConnection lost\n"); + printf(" cause: %s\n", cause); + + printf("Reconnecting\n"); + conn_opts.keepAliveInterval = 20; + conn_opts.cleansession = 1; + if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) + { + printf("Failed to start connect, return code %d\n", rc); + finished = 1; + } +} + + +void onDisconnect(void* context, MQTTAsync_successData* response) +{ + printf("Successful disconnection\n"); + finished = 1; +} + + +void onSend(void* context, MQTTAsync_successData* response) +{ + MQTTAsync client = (MQTTAsync)context; + MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer; + int rc; + + printf("Message with token value %d delivery confirmed\n", response->token); + + opts.onSuccess = onDisconnect; + opts.context = client; + + if ((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS) + { + printf("Failed to start sendMessage, return code %d\n", rc); + exit(EXIT_FAILURE); + } +} + + +void onConnectFailure(void* context, MQTTAsync_failureData* response) +{ + printf("Connect failed, rc %d\n", response ? response->code : 0); + finished = 1; +} + + +void onConnect(void* context, MQTTAsync_successData* response) +{ + MQTTAsync client = (MQTTAsync)context; + MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; + MQTTAsync_message pubmsg = MQTTAsync_message_initializer; + int rc; + + printf("Successful connection\n"); + + opts.onSuccess = onSend; + opts.context = client; + + pubmsg.payload = PAYLOAD; + pubmsg.payloadlen = strlen(PAYLOAD); + pubmsg.qos = QOS; + pubmsg.retained = 0; + deliveredtoken = 0; + + if ((rc = MQTTAsync_sendMessage(client, TOPIC, &pubmsg, &opts)) != MQTTASYNC_SUCCESS) + { + printf("Failed to start sendMessage, return code %d\n", rc); + exit(EXIT_FAILURE); + } +} + + +int main(int argc, char* argv[]) +{ + MQTTAsync client; + MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; + int rc; + + MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL); + + MQTTAsync_setCallbacks(client, NULL, connlost, NULL, NULL); + + conn_opts.keepAliveInterval = 20; + conn_opts.cleansession = 1; + conn_opts.onSuccess = onConnect; + conn_opts.onFailure = onConnectFailure; + conn_opts.context = client; + if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) + { + printf("Failed to start connect, return code %d\n", rc); + exit(EXIT_FAILURE); + } + + printf("Waiting for publication of %s\n" + "on topic %s for client with ClientID: %s\n", + PAYLOAD, TOPIC, CLIENTID); + while (!finished) + #if defined(WIN32) + Sleep(100); + #else + usleep(10000L); + #endif + + MQTTAsync_destroy(&client); + return rc; +} + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/samples/MQTTAsync_subscribe.c ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/samples/MQTTAsync_subscribe.c b/thirdparty/paho.mqtt.c/src/samples/MQTTAsync_subscribe.c new file mode 100644 index 0000000..d0a01f8 --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/samples/MQTTAsync_subscribe.c @@ -0,0 +1,190 @@ +/******************************************************************************* + * Copyright (c) 2012, 2017 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial contribution + *******************************************************************************/ + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include "MQTTAsync.h" + +#if !defined(WIN32) +#include <unistd.h> +#else +#include <windows.h> +#endif + +#include <OsWrapper.h> + +#define ADDRESS "tcp://localhost:1883" +#define CLIENTID "ExampleClientSub" +#define TOPIC "MQTT Examples" +#define PAYLOAD "Hello World!" +#define QOS 1 +#define TIMEOUT 10000L + +volatile MQTTAsync_token deliveredtoken; + +int disc_finished = 0; +int subscribed = 0; +int finished = 0; + +void connlost(void *context, char *cause) +{ + MQTTAsync client = (MQTTAsync)context; + MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; + int rc; + + printf("\nConnection lost\n"); + if (cause) + printf(" cause: %s\n", cause); + + printf("Reconnecting\n"); + conn_opts.keepAliveInterval = 20; + conn_opts.cleansession = 1; + if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) + { + printf("Failed to start connect, return code %d\n", rc); + finished = 1; + } +} + + +int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message) +{ + int i; + char* payloadptr; + + printf("Message arrived\n"); + printf(" topic: %s\n", topicName); + printf(" message: "); + + payloadptr = message->payload; + for(i=0; i<message->payloadlen; i++) + { + putchar(*payloadptr++); + } + putchar('\n'); + MQTTAsync_freeMessage(&message); + MQTTAsync_free(topicName); + return 1; +} + + +void onDisconnect(void* context, MQTTAsync_successData* response) +{ + printf("Successful disconnection\n"); + disc_finished = 1; +} + + +void onSubscribe(void* context, MQTTAsync_successData* response) +{ + printf("Subscribe succeeded\n"); + subscribed = 1; +} + +void onSubscribeFailure(void* context, MQTTAsync_failureData* response) +{ + printf("Subscribe failed, rc %d\n", response ? response->code : 0); + finished = 1; +} + + +void onConnectFailure(void* context, MQTTAsync_failureData* response) +{ + printf("Connect failed, rc %d\n", response ? response->code : 0); + finished = 1; +} + + +void onConnect(void* context, MQTTAsync_successData* response) +{ + MQTTAsync client = (MQTTAsync)context; + MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; + int rc; + + printf("Successful connection\n"); + + printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n" + "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS); + opts.onSuccess = onSubscribe; + opts.onFailure = onSubscribeFailure; + opts.context = client; + + deliveredtoken = 0; + + if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS) + { + printf("Failed to start subscribe, return code %d\n", rc); + exit(EXIT_FAILURE); + } +} + + +int main(int argc, char* argv[]) +{ + MQTTAsync client; + MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; + MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; + int rc; + int ch; + + MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL); + + MQTTAsync_setCallbacks(client, client, connlost, msgarrvd, NULL); + + conn_opts.keepAliveInterval = 20; + conn_opts.cleansession = 1; + conn_opts.onSuccess = onConnect; + conn_opts.onFailure = onConnectFailure; + conn_opts.context = client; + if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) + { + printf("Failed to start connect, return code %d\n", rc); + exit(EXIT_FAILURE); + } + + while (!subscribed) + #if defined(WIN32) + Sleep(100); + #else + usleep(10000L); + #endif + + if (finished) + goto exit; + + do + { + ch = getchar(); + } while (ch!='Q' && ch != 'q'); + + disc_opts.onSuccess = onDisconnect; + if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS) + { + printf("Failed to start disconnect, return code %d\n", rc); + exit(EXIT_FAILURE); + } + while (!disc_finished) + #if defined(WIN32) + Sleep(100); + #else + usleep(10000L); + #endif + +exit: + MQTTAsync_destroy(&client); + return rc; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/samples/MQTTClient_publish.c ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/samples/MQTTClient_publish.c b/thirdparty/paho.mqtt.c/src/samples/MQTTClient_publish.c new file mode 100644 index 0000000..fc71417 --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/samples/MQTTClient_publish.c @@ -0,0 +1,60 @@ +/******************************************************************************* + * Copyright (c) 2012, 2017 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial contribution + *******************************************************************************/ + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include "MQTTClient.h" + +#define ADDRESS "tcp://localhost:1883" +#define CLIENTID "ExampleClientPub" +#define TOPIC "MQTT Examples" +#define PAYLOAD "Hello World!" +#define QOS 1 +#define TIMEOUT 10000L + +int main(int argc, char* argv[]) +{ + MQTTClient client; + MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; + MQTTClient_message pubmsg = MQTTClient_message_initializer; + MQTTClient_deliveryToken token; + int rc; + + MQTTClient_create(&client, ADDRESS, CLIENTID, + MQTTCLIENT_PERSISTENCE_NONE, NULL); + conn_opts.keepAliveInterval = 20; + conn_opts.cleansession = 1; + + if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) + { + printf("Failed to connect, return code %d\n", rc); + exit(EXIT_FAILURE); + } + pubmsg.payload = PAYLOAD; + pubmsg.payloadlen = strlen(PAYLOAD); + pubmsg.qos = QOS; + pubmsg.retained = 0; + MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token); + printf("Waiting for up to %d seconds for publication of %s\n" + "on topic %s for client with ClientID: %s\n", + (int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID); + rc = MQTTClient_waitForCompletion(client, token, TIMEOUT); + printf("Message with delivery token %d delivered\n", token); + MQTTClient_disconnect(client, 10000); + MQTTClient_destroy(&client); + return rc; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/samples/MQTTClient_publish_async.c ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/samples/MQTTClient_publish_async.c b/thirdparty/paho.mqtt.c/src/samples/MQTTClient_publish_async.c new file mode 100644 index 0000000..7784349 --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/samples/MQTTClient_publish_async.c @@ -0,0 +1,96 @@ +/******************************************************************************* + * Copyright (c) 2012, 2017 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial contribution + *******************************************************************************/ + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include "MQTTClient.h" + +#define ADDRESS "tcp://localhost:1883" +#define CLIENTID "ExampleClientPub" +#define TOPIC "MQTT Examples" +#define PAYLOAD "Hello World!" +#define QOS 1 +#define TIMEOUT 10000L + +volatile MQTTClient_deliveryToken deliveredtoken; + +void delivered(void *context, MQTTClient_deliveryToken dt) +{ + printf("Message with token value %d delivery confirmed\n", dt); + deliveredtoken = dt; +} + +int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) +{ + int i; + char* payloadptr; + + printf("Message arrived\n"); + printf(" topic: %s\n", topicName); + printf(" message: "); + + payloadptr = message->payload; + for(i=0; i<message->payloadlen; i++) + { + putchar(*payloadptr++); + } + putchar('\n'); + MQTTClient_freeMessage(&message); + MQTTClient_free(topicName); + return 1; +} + +void connlost(void *context, char *cause) +{ + printf("\nConnection lost\n"); + printf(" cause: %s\n", cause); +} + +int main(int argc, char* argv[]) +{ + MQTTClient client; + MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; + MQTTClient_message pubmsg = MQTTClient_message_initializer; + MQTTClient_deliveryToken token; + int rc; + + MQTTClient_create(&client, ADDRESS, CLIENTID, + MQTTCLIENT_PERSISTENCE_NONE, NULL); + conn_opts.keepAliveInterval = 20; + conn_opts.cleansession = 1; + + MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered); + + if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) + { + printf("Failed to connect, return code %d\n", rc); + exit(EXIT_FAILURE); + } + pubmsg.payload = PAYLOAD; + pubmsg.payloadlen = (int)strlen(PAYLOAD); + pubmsg.qos = QOS; + pubmsg.retained = 0; + deliveredtoken = 0; + MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token); + printf("Waiting for publication of %s\n" + "on topic %s for client with ClientID: %s\n", + PAYLOAD, TOPIC, CLIENTID); + while(deliveredtoken != token); + MQTTClient_disconnect(client, 10000); + MQTTClient_destroy(&client); + return rc; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/samples/MQTTClient_subscribe.c ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/samples/MQTTClient_subscribe.c b/thirdparty/paho.mqtt.c/src/samples/MQTTClient_subscribe.c new file mode 100644 index 0000000..c675ecc --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/samples/MQTTClient_subscribe.c @@ -0,0 +1,95 @@ +/******************************************************************************* + * Copyright (c) 2012, 2017 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial contribution + *******************************************************************************/ + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include "MQTTClient.h" + +#define ADDRESS "tcp://localhost:1883" +#define CLIENTID "ExampleClientSub" +#define TOPIC "MQTT Examples" +#define PAYLOAD "Hello World!" +#define QOS 1 +#define TIMEOUT 10000L + +volatile MQTTClient_deliveryToken deliveredtoken; + +void delivered(void *context, MQTTClient_deliveryToken dt) +{ + printf("Message with token value %d delivery confirmed\n", dt); + deliveredtoken = dt; +} + +int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) +{ + int i; + char* payloadptr; + + printf("Message arrived\n"); + printf(" topic: %s\n", topicName); + printf(" message: "); + + payloadptr = message->payload; + for(i=0; i<message->payloadlen; i++) + { + putchar(*payloadptr++); + } + putchar('\n'); + MQTTClient_freeMessage(&message); + MQTTClient_free(topicName); + return 1; +} + +void connlost(void *context, char *cause) +{ + printf("\nConnection lost\n"); + printf(" cause: %s\n", cause); +} + +int main(int argc, char* argv[]) +{ + MQTTClient client; + MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; + int rc; + int ch; + + MQTTClient_create(&client, ADDRESS, CLIENTID, + MQTTCLIENT_PERSISTENCE_NONE, NULL); + conn_opts.keepAliveInterval = 20; + conn_opts.cleansession = 1; + + MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered); + + if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) + { + printf("Failed to connect, return code %d\n", rc); + exit(EXIT_FAILURE); + } + printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n" + "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS); + MQTTClient_subscribe(client, TOPIC, QOS); + + do + { + ch = getchar(); + } while(ch!='Q' && ch != 'q'); + + MQTTClient_unsubscribe(client, TOPIC); + MQTTClient_disconnect(client, 10000); + MQTTClient_destroy(&client); + return rc; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/samples/paho_c_pub.c ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/samples/paho_c_pub.c b/thirdparty/paho.mqtt.c/src/samples/paho_c_pub.c new file mode 100644 index 0000000..1a4040d --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/samples/paho_c_pub.c @@ -0,0 +1,379 @@ +/******************************************************************************* + * Copyright (c) 2012, 2016 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial contribution + * Guilherme Maciel Ferreira - add keep alive option + *******************************************************************************/ + + /* + stdin publisher + + compulsory parameters: + + --topic topic to publish on + + defaulted parameters: + + --host localhost + --port 1883 + --qos 0 + --delimiters \n + --clientid stdin-publisher-async + --maxdatalen 100 + --keepalive 10 + + --userid none + --password none + +*/ + +#include "MQTTAsync.h" + +#include <stdio.h> +#include <signal.h> +#include <string.h> +#include <stdlib.h> + +#if defined(WIN32) +#include <windows.h> +#define sleep Sleep +#else +#include <unistd.h> +#include <sys/time.h> +#include <unistd.h> +#endif + +#include <OsWrapper.h> + +volatile int toStop = 0; + + +struct +{ + char* clientid; + char* delimiter; + int maxdatalen; + int qos; + int retained; + char* username; + char* password; + char* host; + char* port; + int verbose; + int keepalive; +} opts = +{ + "stdin-publisher-async", "\n", 100, 0, 0, NULL, NULL, "localhost", "1883", 0, 10 +}; + + +void usage(void) +{ + printf("MQTT stdin publisher\n"); + printf("Usage: stdinpub topicname <options>, where options are:\n"); + printf(" --host <hostname> (default is %s)\n", opts.host); + printf(" --port <port> (default is %s)\n", opts.port); + printf(" --qos <qos> (default is %d)\n", opts.qos); + printf(" --retained (default is %s)\n", opts.retained ? "on" : "off"); + printf(" --delimiter <delim> (default is \\n)\n"); + printf(" --clientid <clientid> (default is %s)\n", opts.clientid); + printf(" --maxdatalen <bytes> (default is %d)\n", opts.maxdatalen); + printf(" --username none\n"); + printf(" --password none\n"); + printf(" --keepalive <seconds> (default is 10 seconds)\n"); + exit(EXIT_FAILURE); +} + + + +void cfinish(int sig) +{ + signal(SIGINT, NULL); + toStop = 1; +} + +void getopts(int argc, char** argv); + +int messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* m) +{ + /* not expecting any messages */ + return 1; +} + + +static int disconnected = 0; + +void onDisconnect(void* context, MQTTAsync_successData* response) +{ + disconnected = 1; +} + + +static int connected = 0; +void myconnect(MQTTAsync* client); + +void onConnectFailure(void* context, MQTTAsync_failureData* response) +{ + printf("Connect failed, rc %d\n", response ? response->code : -1); + connected = -1; + + MQTTAsync client = (MQTTAsync)context; + myconnect(client); +} + + +void onConnect(void* context, MQTTAsync_successData* response) +{ + printf("Connected\n"); + connected = 1; +} + +void myconnect(MQTTAsync* client) +{ + MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; + MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer; + int rc = 0; + + printf("Connecting\n"); + conn_opts.keepAliveInterval = opts.keepalive; + conn_opts.cleansession = 1; + conn_opts.username = opts.username; + conn_opts.password = opts.password; + conn_opts.onSuccess = onConnect; + conn_opts.onFailure = onConnectFailure; + conn_opts.context = client; + ssl_opts.enableServerCertAuth = 0; + conn_opts.ssl = &ssl_opts; + conn_opts.automaticReconnect = 1; + connected = 0; + if ((rc = MQTTAsync_connect(*client, &conn_opts)) != MQTTASYNC_SUCCESS) + { + printf("Failed to start connect, return code %d\n", rc); + exit(EXIT_FAILURE); + } +} + + +static int published = 0; + +void onPublishFailure(void* context, MQTTAsync_failureData* response) +{ + printf("Publish failed, rc %d\n", response ? -1 : response->code); + published = -1; +} + + +void onPublish(void* context, MQTTAsync_successData* response) +{ + published = 1; +} + + +void connectionLost(void* context, char* cause) +{ + MQTTAsync client = (MQTTAsync)context; + MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; + MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer; + int rc = 0; + + printf("Connecting\n"); + conn_opts.keepAliveInterval = 10; + conn_opts.cleansession = 1; + conn_opts.username = opts.username; + conn_opts.password = opts.password; + conn_opts.onSuccess = onConnect; + conn_opts.onFailure = onConnectFailure; + conn_opts.context = client; + ssl_opts.enableServerCertAuth = 0; + conn_opts.ssl = &ssl_opts; + connected = 0; + if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) + { + printf("Failed to start connect, return code %d\n", rc); + exit(EXIT_FAILURE); + } +} + + +int main(int argc, char** argv) +{ + MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; + MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer; + MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer; + MQTTAsync client; + char* topic = NULL; + char* buffer = NULL; + int rc = 0; + char url[100]; + + if (argc < 2) + usage(); + + getopts(argc, argv); + + sprintf(url, "%s:%s", opts.host, opts.port); + if (opts.verbose) + printf("URL is %s\n", url); + + topic = argv[1]; + printf("Using topic %s\n", topic); + + create_opts.sendWhileDisconnected = 1; + rc = MQTTAsync_createWithOptions(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts); + + signal(SIGINT, cfinish); + signal(SIGTERM, cfinish); + + rc = MQTTAsync_setCallbacks(client, client, connectionLost, messageArrived, NULL); + + myconnect(&client); + + buffer = malloc(opts.maxdatalen); + + while (!toStop) + { + int data_len = 0; + int delim_len = 0; + + delim_len = (int)strlen(opts.delimiter); + do + { + buffer[data_len++] = getchar(); + if (data_len > delim_len) + { + /* printf("comparing %s %s\n", opts.delimiter, &buffer[data_len - delim_len]); */ + if (strncmp(opts.delimiter, &buffer[data_len - delim_len], delim_len) == 0) + break; + } + } while (data_len < opts.maxdatalen); + + if (opts.verbose) + printf("Publishing data of length %d\n", data_len); + pub_opts.onSuccess = onPublish; + pub_opts.onFailure = onPublishFailure; + do + { + rc = MQTTAsync_send(client, topic, data_len, buffer, opts.qos, opts.retained, &pub_opts); + } + while (rc != MQTTASYNC_SUCCESS); + } + + printf("Stopping\n"); + + free(buffer); + + disc_opts.onSuccess = onDisconnect; + if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS) + { + printf("Failed to start disconnect, return code %d\n", rc); + exit(EXIT_FAILURE); + } + + while (!disconnected) + #if defined(WIN32) + Sleep(100); + #else + usleep(10000L); + #endif + + MQTTAsync_destroy(&client); + + return EXIT_SUCCESS; +} + +void getopts(int argc, char** argv) +{ + int count = 2; + + while (count < argc) + { + if (strcmp(argv[count], "--retained") == 0) + opts.retained = 1; + if (strcmp(argv[count], "--verbose") == 0) + opts.verbose = 1; + else if (strcmp(argv[count], "--qos") == 0) + { + if (++count < argc) + { + if (strcmp(argv[count], "0") == 0) + opts.qos = 0; + else if (strcmp(argv[count], "1") == 0) + opts.qos = 1; + else if (strcmp(argv[count], "2") == 0) + opts.qos = 2; + else + usage(); + } + else + usage(); + } + else if (strcmp(argv[count], "--host") == 0) + { + if (++count < argc) + opts.host = argv[count]; + else + usage(); + } + else if (strcmp(argv[count], "--port") == 0) + { + if (++count < argc) + opts.port = argv[count]; + else + usage(); + } + else if (strcmp(argv[count], "--clientid") == 0) + { + if (++count < argc) + opts.clientid = argv[count]; + else + usage(); + } + else if (strcmp(argv[count], "--username") == 0) + { + if (++count < argc) + opts.username = argv[count]; + else + usage(); + } + else if (strcmp(argv[count], "--password") == 0) + { + if (++count < argc) + opts.password = argv[count]; + else + usage(); + } + else if (strcmp(argv[count], "--maxdatalen") == 0) + { + if (++count < argc) + opts.maxdatalen = atoi(argv[count]); + else + usage(); + } + else if (strcmp(argv[count], "--delimiter") == 0) + { + if (++count < argc) + opts.delimiter = argv[count]; + else + usage(); + } + else if (strcmp(argv[count], "--keepalive") == 0) + { + if (++count < argc) + opts.keepalive = atoi(argv[count]); + else + usage(); + } + count++; + } + +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/samples/paho_c_sub.c ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/samples/paho_c_sub.c b/thirdparty/paho.mqtt.c/src/samples/paho_c_sub.c new file mode 100644 index 0000000..bbb5edd --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/samples/paho_c_sub.c @@ -0,0 +1,359 @@ +/******************************************************************************* + * Copyright (c) 2012, 2013 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial contribution + * Ian Craggs - fix for bug 413429 - connectionLost not called + * Guilherme Maciel Ferreira - add keep alive option + *******************************************************************************/ + +/* + + stdout subscriber for the asynchronous client + + compulsory parameters: + + --topic topic to subscribe to + + defaulted parameters: + + --host localhost + --port 1883 + --qos 2 + --delimiter \n + --clientid stdout-subscriber-async + --showtopics off + --keepalive 10 + + --userid none + --password none + +*/ + +#include "MQTTAsync.h" +#include "MQTTClientPersistence.h" + +#include <stdio.h> +#include <signal.h> +#include <string.h> +#include <stdlib.h> + + +#if defined(WIN32) +#include <windows.h> +#define sleep Sleep +#else +#include <sys/time.h> +#include <unistd.h> +#endif + +#include <OsWrapper.h> + +volatile int finished = 0; +char* topic = NULL; +int subscribed = 0; +int disconnected = 0; + + +void cfinish(int sig) +{ + signal(SIGINT, NULL); + finished = 1; +} + + +struct +{ + char* clientid; + int nodelimiter; + char delimiter; + int qos; + char* username; + char* password; + char* host; + char* port; + int showtopics; + int keepalive; +} opts = +{ + "stdout-subscriber-async", 1, '\n', 2, NULL, NULL, "localhost", "1883", 0, 10 +}; + + +void usage(void) +{ + printf("MQTT stdout subscriber\n"); + printf("Usage: stdoutsub topicname <options>, where options are:\n"); + printf(" --host <hostname> (default is %s)\n", opts.host); + printf(" --port <port> (default is %s)\n", opts.port); + printf(" --qos <qos> (default is %d)\n", opts.qos); + printf(" --delimiter <delim> (default is no delimiter)\n"); + printf(" --clientid <clientid> (default is %s)\n", opts.clientid); + printf(" --username none\n"); + printf(" --password none\n"); + printf(" --showtopics <on or off> (default is on if the topic has a wildcard, else off)\n"); + printf(" --keepalive <seconds> (default is 10 seconds)\n"); + exit(EXIT_FAILURE); +} + + +void getopts(int argc, char** argv) +{ + int count = 2; + + while (count < argc) + { + if (strcmp(argv[count], "--qos") == 0) + { + if (++count < argc) + { + if (strcmp(argv[count], "0") == 0) + opts.qos = 0; + else if (strcmp(argv[count], "1") == 0) + opts.qos = 1; + else if (strcmp(argv[count], "2") == 0) + opts.qos = 2; + else + usage(); + } + else + usage(); + } + else if (strcmp(argv[count], "--host") == 0) + { + if (++count < argc) + opts.host = argv[count]; + else + usage(); + } + else if (strcmp(argv[count], "--port") == 0) + { + if (++count < argc) + opts.port = argv[count]; + else + usage(); + } + else if (strcmp(argv[count], "--clientid") == 0) + { + if (++count < argc) + opts.clientid = argv[count]; + else + usage(); + } + else if (strcmp(argv[count], "--username") == 0) + { + if (++count < argc) + opts.username = argv[count]; + else + usage(); + } + else if (strcmp(argv[count], "--password") == 0) + { + if (++count < argc) + opts.password = argv[count]; + else + usage(); + } + else if (strcmp(argv[count], "--delimiter") == 0) + { + if (++count < argc) + { + if (strcmp("newline", argv[count]) == 0) + opts.delimiter = '\n'; + else + opts.delimiter = argv[count][0]; + opts.nodelimiter = 0; + } + else + usage(); + } + else if (strcmp(argv[count], "--showtopics") == 0) + { + if (++count < argc) + { + if (strcmp(argv[count], "on") == 0) + opts.showtopics = 1; + else if (strcmp(argv[count], "off") == 0) + opts.showtopics = 0; + else + usage(); + } + else + usage(); + } + else if (strcmp(argv[count], "--keepalive") == 0) + { + if (++count < argc) + opts.keepalive = atoi(argv[count]); + else + usage(); + } + count++; + } + +} + + +int messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message) +{ + if (opts.showtopics) + printf("%s\t", topicName); + if (opts.nodelimiter) + printf("%.*s", message->payloadlen, (char*)message->payload); + else + printf("%.*s%c", message->payloadlen, (char*)message->payload, opts.delimiter); + fflush(stdout); + MQTTAsync_freeMessage(&message); + MQTTAsync_free(topicName); + return 1; +} + + +void onDisconnect(void* context, MQTTAsync_successData* response) +{ + disconnected = 1; +} + + +void onSubscribe(void* context, MQTTAsync_successData* response) +{ + subscribed = 1; +} + + +void onSubscribeFailure(void* context, MQTTAsync_failureData* response) +{ + printf("Subscribe failed, rc %d\n", response->code); + finished = 1; +} + + +void onConnectFailure(void* context, MQTTAsync_failureData* response) +{ + printf("Connect failed, rc %d\n", response ? response->code : -99); + finished = 1; +} + + +void onConnect(void* context, MQTTAsync_successData* response) +{ + MQTTAsync client = (MQTTAsync)context; + MQTTAsync_responseOptions ropts = MQTTAsync_responseOptions_initializer; + int rc; + + if (opts.showtopics) + printf("Subscribing to topic %s with client %s at QoS %d\n", topic, opts.clientid, opts.qos); + + ropts.onSuccess = onSubscribe; + ropts.onFailure = onSubscribeFailure; + ropts.context = client; + if ((rc = MQTTAsync_subscribe(client, topic, opts.qos, &ropts)) != MQTTASYNC_SUCCESS) + { + printf("Failed to start subscribe, return code %d\n", rc); + finished = 1; + } +} + + +MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; + + +void connectionLost(void *context, char *cause) +{ + MQTTAsync client = (MQTTAsync)context; + int rc; + + printf("connectionLost called\n"); + if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) + { + printf("Failed to start reconnect, return code %d\n", rc); + finished = 1; + } +} + + +int main(int argc, char** argv) +{ + MQTTAsync client; + MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; + int rc = 0; + char url[100]; + + if (argc < 2) + usage(); + + topic = argv[1]; + + if (strchr(topic, '#') || strchr(topic, '+')) + opts.showtopics = 1; + if (opts.showtopics) + printf("topic is %s\n", topic); + + getopts(argc, argv); + sprintf(url, "%s:%s", opts.host, opts.port); + + rc = MQTTAsync_create(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL); + + MQTTAsync_setCallbacks(client, client, connectionLost, messageArrived, NULL); + + signal(SIGINT, cfinish); + signal(SIGTERM, cfinish); + + conn_opts.keepAliveInterval = opts.keepalive; + conn_opts.cleansession = 1; + conn_opts.username = opts.username; + conn_opts.password = opts.password; + conn_opts.onSuccess = onConnect; + conn_opts.onFailure = onConnectFailure; + conn_opts.context = client; + if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) + { + printf("Failed to start connect, return code %d\n", rc); + exit(EXIT_FAILURE); + } + + while (!subscribed) + #if defined(WIN32) + Sleep(100); + #else + usleep(10000L); + #endif + + if (finished) + goto exit; + + while (!finished) + #if defined(WIN32) + Sleep(100); + #else + usleep(10000L); + #endif + + disc_opts.onSuccess = onDisconnect; + if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS) + { + printf("Failed to start disconnect, return code %d\n", rc); + exit(EXIT_FAILURE); + } + + while (!disconnected) + #if defined(WIN32) + Sleep(100); + #else + usleep(10000L); + #endif + +exit: + MQTTAsync_destroy(&client); + + return EXIT_SUCCESS; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/samples/paho_cs_pub.c ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/samples/paho_cs_pub.c b/thirdparty/paho.mqtt.c/src/samples/paho_cs_pub.c new file mode 100644 index 0000000..04ce688 --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/samples/paho_cs_pub.c @@ -0,0 +1,276 @@ +/******************************************************************************* + * Copyright (c) 2012, 2013 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial contribution + *******************************************************************************/ + + /* + stdin publisher + + compulsory parameters: + + --topic topic to publish on + + defaulted parameters: + + --host localhost + --port 1883 + --qos 0 + --delimiters \n + --clientid stdin_publisher + --maxdatalen 100 + + --userid none + --password none + +*/ + +#include "MQTTClient.h" +#include "MQTTClientPersistence.h" + +#include <stdio.h> +#include <signal.h> +#include <string.h> +#include <stdlib.h> + +#if defined(WIN32) +#define sleep Sleep +#else +#include <sys/time.h> +#endif + + +volatile int toStop = 0; + + +void usage(void) +{ + printf("MQTT stdin publisher\n"); + printf("Usage: stdinpub topicname <options>, where options are:\n"); + printf(" --host <hostname> (default is localhost)\n"); + printf(" --port <port> (default is 1883)\n"); + printf(" --qos <qos> (default is 0)\n"); + printf(" --retained (default is off)\n"); + printf(" --delimiter <delim> (default is \\n)"); + printf(" --clientid <clientid> (default is hostname+timestamp)"); + printf(" --maxdatalen 100\n"); + printf(" --username none\n"); + printf(" --password none\n"); + exit(EXIT_FAILURE); +} + + +void myconnect(MQTTClient* client, MQTTClient_connectOptions* opts) +{ + printf("Connecting\n"); + if (MQTTClient_connect(*client, opts) != 0) + { + printf("Failed to connect\n"); + exit(EXIT_FAILURE); + } + printf("Connected\n"); +} + + +void cfinish(int sig) +{ + signal(SIGINT, NULL); + toStop = 1; +} + + +struct +{ + char* clientid; + char* delimiter; + int maxdatalen; + int qos; + int retained; + char* username; + char* password; + char* host; + char* port; + int verbose; +} opts = +{ + "publisher", "\n", 100, 0, 0, NULL, NULL, "localhost", "1883", 0 +}; + +void getopts(int argc, char** argv); + +int messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* m) +{ + /* not expecting any messages */ + return 1; +} + +int main(int argc, char** argv) +{ + MQTTClient client; + MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; + MQTTClient_SSLOptions ssl_opts = MQTTClient_SSLOptions_initializer; + char* topic = NULL; + char* buffer = NULL; + int rc = 0; + char url[100]; + + if (argc < 2) + usage(); + + getopts(argc, argv); + + sprintf(url, "%s:%s", opts.host, opts.port); + if (opts.verbose) + printf("URL is %s\n", url); + + topic = argv[1]; + printf("Using topic %s\n", topic); + + rc = MQTTClient_create(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL); + + signal(SIGINT, cfinish); + signal(SIGTERM, cfinish); + + rc = MQTTClient_setCallbacks(client, NULL, NULL, messageArrived, NULL); + + conn_opts.keepAliveInterval = 10; + conn_opts.reliable = 0; + conn_opts.cleansession = 1; + conn_opts.username = opts.username; + conn_opts.password = opts.password; + ssl_opts.enableServerCertAuth = 0; + conn_opts.ssl = &ssl_opts; + + myconnect(&client, &conn_opts); + + buffer = malloc(opts.maxdatalen); + + while (!toStop) + { + int data_len = 0; + int delim_len = 0; + + delim_len = (int)strlen(opts.delimiter); + do + { + buffer[data_len++] = getchar(); + if (data_len > delim_len) + { + /* printf("comparing %s %s\n", opts.delimiter, &buffer[data_len - delim_len]); */ + if (strncmp(opts.delimiter, &buffer[data_len - delim_len], delim_len) == 0) + break; + } + } while (data_len < opts.maxdatalen); + + if (opts.verbose) + printf("Publishing data of length %d\n", data_len); + rc = MQTTClient_publish(client, topic, data_len, buffer, opts.qos, opts.retained, NULL); + if (rc != 0) + { + myconnect(&client, &conn_opts); + rc = MQTTClient_publish(client, topic, data_len, buffer, opts.qos, opts.retained, NULL); + } + if (opts.qos > 0) + MQTTClient_yield(); + } + + printf("Stopping\n"); + + free(buffer); + + MQTTClient_disconnect(client, 0); + + MQTTClient_destroy(&client); + + return EXIT_SUCCESS; +} + +void getopts(int argc, char** argv) +{ + int count = 2; + + while (count < argc) + { + if (strcmp(argv[count], "--retained") == 0) + opts.retained = 1; + if (strcmp(argv[count], "--verbose") == 0) + opts.verbose = 1; + else if (strcmp(argv[count], "--qos") == 0) + { + if (++count < argc) + { + if (strcmp(argv[count], "0") == 0) + opts.qos = 0; + else if (strcmp(argv[count], "1") == 0) + opts.qos = 1; + else if (strcmp(argv[count], "2") == 0) + opts.qos = 2; + else + usage(); + } + else + usage(); + } + else if (strcmp(argv[count], "--host") == 0) + { + if (++count < argc) + opts.host = argv[count]; + else + usage(); + } + else if (strcmp(argv[count], "--port") == 0) + { + if (++count < argc) + opts.port = argv[count]; + else + usage(); + } + else if (strcmp(argv[count], "--clientid") == 0) + { + if (++count < argc) + opts.clientid = argv[count]; + else + usage(); + } + else if (strcmp(argv[count], "--username") == 0) + { + if (++count < argc) + opts.username = argv[count]; + else + usage(); + } + else if (strcmp(argv[count], "--password") == 0) + { + if (++count < argc) + opts.password = argv[count]; + else + usage(); + } + else if (strcmp(argv[count], "--maxdatalen") == 0) + { + if (++count < argc) + opts.maxdatalen = atoi(argv[count]); + else + usage(); + } + else if (strcmp(argv[count], "--delimiter") == 0) + { + if (++count < argc) + opts.delimiter = argv[count]; + else + usage(); + } + count++; + } + +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/samples/paho_cs_sub.c ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/samples/paho_cs_sub.c b/thirdparty/paho.mqtt.c/src/samples/paho_cs_sub.c new file mode 100644 index 0000000..52fe9bc --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/samples/paho_cs_sub.c @@ -0,0 +1,270 @@ +/******************************************************************************* + * Copyright (c) 2012, 2013 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial contribution + * Ian Craggs - change delimiter option from char to string + * Guilherme Maciel Ferreira - add keep alive option + *******************************************************************************/ + +/* + + stdout subscriber + + compulsory parameters: + + --topic topic to subscribe to + + defaulted parameters: + + --host localhost + --port 1883 + --qos 2 + --delimiter \n + --clientid stdout-subscriber + --showtopics off + --keepalive 10 + + --userid none + --password none + +*/ +#include "MQTTClient.h" +#include "MQTTClientPersistence.h" + +#include <stdio.h> +#include <signal.h> +#include <string.h> +#include <stdlib.h> + + +#if defined(WIN32) +#define sleep Sleep +#else +#include <sys/time.h> +#endif + + +volatile int toStop = 0; + + +struct opts_struct +{ + char* clientid; + int nodelimiter; + char* delimiter; + int qos; + char* username; + char* password; + char* host; + char* port; + int showtopics; + int keepalive; +} opts = +{ + "stdout-subscriber", 0, "\n", 2, NULL, NULL, "localhost", "1883", 0, 10 +}; + + +void usage(void) +{ + printf("MQTT stdout subscriber\n"); + printf("Usage: stdoutsub topicname <options>, where options are:\n"); + printf(" --host <hostname> (default is %s)\n", opts.host); + printf(" --port <port> (default is %s)\n", opts.port); + printf(" --qos <qos> (default is %d)\n", opts.qos); + printf(" --delimiter <delim> (default is \\n)\n"); + printf(" --clientid <clientid> (default is %s)\n", opts.clientid); + printf(" --username none\n"); + printf(" --password none\n"); + printf(" --showtopics <on or off> (default is on if the topic has a wildcard, else off)\n"); + printf(" --keepalive <seconds> (default is %d seconds)\n", opts.keepalive); + exit(EXIT_FAILURE); +} + + +void myconnect(MQTTClient* client, MQTTClient_connectOptions* opts) +{ + int rc = 0; + if ((rc = MQTTClient_connect(*client, opts)) != 0) + { + printf("Failed to connect, return code %d\n", rc); + exit(EXIT_FAILURE); + } +} + + +void cfinish(int sig) +{ + signal(SIGINT, NULL); + toStop = 1; +} + +void getopts(int argc, char** argv); + +int main(int argc, char** argv) +{ + MQTTClient client; + MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; + char* topic = NULL; + int rc = 0; + char url[100]; + + if (argc < 2) + usage(); + + topic = argv[1]; + + if (strchr(topic, '#') || strchr(topic, '+')) + opts.showtopics = 1; + if (opts.showtopics) + printf("topic is %s\n", topic); + + getopts(argc, argv); + sprintf(url, "%s:%s", opts.host, opts.port); + + rc = MQTTClient_create(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL); + + signal(SIGINT, cfinish); + signal(SIGTERM, cfinish); + + conn_opts.keepAliveInterval = opts.keepalive; + conn_opts.reliable = 0; + conn_opts.cleansession = 1; + conn_opts.username = opts.username; + conn_opts.password = opts.password; + + myconnect(&client, &conn_opts); + + rc = MQTTClient_subscribe(client, topic, opts.qos); + + while (!toStop) + { + char* topicName = NULL; + int topicLen; + MQTTClient_message* message = NULL; + + rc = MQTTClient_receive(client, &topicName, &topicLen, &message, 1000); + if (message) + { + if (opts.showtopics) + printf("%s\t", topicName); + if (opts.nodelimiter) + printf("%.*s", message->payloadlen, (char*)message->payload); + else + printf("%.*s%s", message->payloadlen, (char*)message->payload, opts.delimiter); + fflush(stdout); + MQTTClient_freeMessage(&message); + MQTTClient_free(topicName); + } + if (rc != 0) + myconnect(&client, &conn_opts); + } + + printf("Stopping\n"); + + MQTTClient_disconnect(client, 0); + + MQTTClient_destroy(&client); + + return EXIT_SUCCESS; +} + +void getopts(int argc, char** argv) +{ + int count = 2; + + while (count < argc) + { + if (strcmp(argv[count], "--qos") == 0) + { + if (++count < argc) + { + if (strcmp(argv[count], "0") == 0) + opts.qos = 0; + else if (strcmp(argv[count], "1") == 0) + opts.qos = 1; + else if (strcmp(argv[count], "2") == 0) + opts.qos = 2; + else + usage(); + } + else + usage(); + } + else if (strcmp(argv[count], "--host") == 0) + { + if (++count < argc) + opts.host = argv[count]; + else + usage(); + } + else if (strcmp(argv[count], "--port") == 0) + { + if (++count < argc) + opts.port = argv[count]; + else + usage(); + } + else if (strcmp(argv[count], "--clientid") == 0) + { + if (++count < argc) + opts.clientid = argv[count]; + else + usage(); + } + else if (strcmp(argv[count], "--username") == 0) + { + if (++count < argc) + opts.username = argv[count]; + else + usage(); + } + else if (strcmp(argv[count], "--password") == 0) + { + if (++count < argc) + opts.password = argv[count]; + else + usage(); + } + else if (strcmp(argv[count], "--delimiter") == 0) + { + if (++count < argc) + opts.delimiter = argv[count]; + else + opts.nodelimiter = 1; + } + else if (strcmp(argv[count], "--showtopics") == 0) + { + if (++count < argc) + { + if (strcmp(argv[count], "on") == 0) + opts.showtopics = 1; + else if (strcmp(argv[count], "off") == 0) + opts.showtopics = 0; + else + usage(); + } + else + usage(); + } + else if (strcmp(argv[count], "--keepalive") == 0) + { + if (++count < argc) + opts.keepalive = atoi(argv[count]); + else + usage(); + } + count++; + } + +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/utf-8.c ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/utf-8.c b/thirdparty/paho.mqtt.c/src/utf-8.c new file mode 100644 index 0000000..1530701 --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/utf-8.c @@ -0,0 +1,230 @@ +/******************************************************************************* + * Copyright (c) 2009, 2013 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ + + +/** + * @file + * \brief Functions for checking that strings contain UTF-8 characters only + * + * See page 104 of the Unicode Standard 5.0 for the list of well formed + * UTF-8 byte sequences. + * + */ +#include "utf-8.h" + +#include <stdlib.h> +#include <string.h> + +#include "StackTrace.h" + +/** + * Macro to determine the number of elements in a single-dimension array + */ +#if !defined(ARRAY_SIZE) +#define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0])) +#endif + + +/** + * Structure to hold the valid ranges of UTF-8 characters, for each byte up to 4 + */ +struct +{ + int len; /**< number of elements in the following array (1 to 4) */ + struct + { + char lower; /**< lower limit of valid range */ + char upper; /**< upper limit of valid range */ + } bytes[4]; /**< up to 4 bytes can be used per character */ +} +valid_ranges[] = +{ + {1, { {00, 0x7F} } }, + {2, { {0xC2, 0xDF}, {0x80, 0xBF} } }, + {3, { {0xE0, 0xE0}, {0xA0, 0xBF}, {0x80, 0xBF} } }, + {3, { {0xE1, 0xEC}, {0x80, 0xBF}, {0x80, 0xBF} } }, + {3, { {0xED, 0xED}, {0x80, 0x9F}, {0x80, 0xBF} } }, + {3, { {0xEE, 0xEF}, {0x80, 0xBF}, {0x80, 0xBF} } }, + {4, { {0xF0, 0xF0}, {0x90, 0xBF}, {0x80, 0xBF}, {0x80, 0xBF} } }, + {4, { {0xF1, 0xF3}, {0x80, 0xBF}, {0x80, 0xBF}, {0x80, 0xBF} } }, + {4, { {0xF4, 0xF4}, {0x80, 0x8F}, {0x80, 0xBF}, {0x80, 0xBF} } }, +}; + + +static const char* UTF8_char_validate(int len, const char* data); + + +/** + * Validate a single UTF-8 character + * @param len the length of the string in "data" + * @param data the bytes to check for a valid UTF-8 char + * @return pointer to the start of the next UTF-8 character in "data" + */ +static const char* UTF8_char_validate(int len, const char* data) +{ + int good = 0; + int charlen = 2; + int i, j; + const char *rc = NULL; + + FUNC_ENTRY; + /* first work out how many bytes this char is encoded in */ + if ((data[0] & 128) == 0) + charlen = 1; + else if ((data[0] & 0xF0) == 0xF0) + charlen = 4; + else if ((data[0] & 0xE0) == 0xE0) + charlen = 3; + + if (charlen > len) + goto exit; /* not enough characters in the string we were given */ + + for (i = 0; i < ARRAY_SIZE(valid_ranges); ++i) + { /* just has to match one of these rows */ + if (valid_ranges[i].len == charlen) + { + good = 1; + for (j = 0; j < charlen; ++j) + { + if (data[j] < valid_ranges[i].bytes[j].lower || + data[j] > valid_ranges[i].bytes[j].upper) + { + good = 0; /* failed the check */ + break; + } + } + if (good) + break; + } + } + + if (good) + rc = data + charlen; + exit: + FUNC_EXIT; + return rc; +} + + +/** + * Validate a length-delimited string has only UTF-8 characters + * @param len the length of the string in "data" + * @param data the bytes to check for valid UTF-8 characters + * @return 1 (true) if the string has only UTF-8 characters, 0 (false) otherwise + */ +int UTF8_validate(int len, const char* data) +{ + const char* curdata = NULL; + int rc = 0; + + FUNC_ENTRY; + if (len == 0) + { + rc = 1; + goto exit; + } + curdata = UTF8_char_validate(len, data); + while (curdata && (curdata < data + len)) + curdata = UTF8_char_validate(len, curdata); + + rc = curdata != NULL; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Validate a null-terminated string has only UTF-8 characters + * @param string the string to check for valid UTF-8 characters + * @return 1 (true) if the string has only UTF-8 characters, 0 (false) otherwise + */ +int UTF8_validateString(const char* string) +{ + int rc = 0; + + FUNC_ENTRY; + rc = UTF8_validate((int)strlen(string), string); + FUNC_EXIT_RC(rc); + return rc; +} + + + +#if defined(UNIT_TESTS) +#include <stdio.h> + +typedef struct +{ + int len; + char data[20]; +} tests; + +tests valid_strings[] = +{ + {3, "hjk" }, + {7, {0x41, 0xE2, 0x89, 0xA2, 0xCE, 0x91, 0x2E} }, + {3, {'f', 0xC9, 0xB1 } }, + {9, {0xED, 0x95, 0x9C, 0xEA, 0xB5, 0xAD, 0xEC, 0x96, 0xB4} }, + {9, {0xE6, 0x97, 0xA5, 0xE6, 0x9C, 0xAC, 0xE8, 0xAA, 0x9E} }, + {4, {0x2F, 0x2E, 0x2E, 0x2F} }, + {7, {0xEF, 0xBB, 0xBF, 0xF0, 0xA3, 0x8E, 0xB4} }, +}; + +tests invalid_strings[] = +{ + {2, {0xC0, 0x80} }, + {5, {0x2F, 0xC0, 0xAE, 0x2E, 0x2F} }, + {6, {0xED, 0xA1, 0x8C, 0xED, 0xBE, 0xB4} }, + {1, {0xF4} }, +}; + +int main (int argc, char *argv[]) +{ + int i, failed = 0; + + for (i = 0; i < ARRAY_SIZE(valid_strings); ++i) + { + if (!UTF8_validate(valid_strings[i].len, valid_strings[i].data)) + { + printf("valid test %d failed\n", i); + failed = 1; + } + else + printf("valid test %d passed\n", i); + } + + for (i = 0; i < ARRAY_SIZE(invalid_strings); ++i) + { + if (UTF8_validate(invalid_strings[i].len, invalid_strings[i].data)) + { + printf("invalid test %d failed\n", i); + failed = 1; + } + else + printf("invalid test %d passed\n", i); + } + + if (failed) + printf("Failed\n"); + else + printf("Passed\n"); + + return 0; +} /* End of main function*/ + +#endif + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/src/utf-8.h ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/src/utf-8.h b/thirdparty/paho.mqtt.c/src/utf-8.h new file mode 100644 index 0000000..8bce4b3 --- /dev/null +++ b/thirdparty/paho.mqtt.c/src/utf-8.h @@ -0,0 +1,23 @@ +/******************************************************************************* + * Copyright (c) 2009, 2013 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ + +#if !defined(UTF8_H) +#define UTF8_H + +int UTF8_validate(int len, const char *data); +int UTF8_validateString(const char* string); + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/travis-build.sh ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/travis-build.sh b/thirdparty/paho.mqtt.c/travis-build.sh new file mode 100755 index 0000000..5356f8b --- /dev/null +++ b/thirdparty/paho.mqtt.c/travis-build.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +set -e + +rm -rf build.paho +mkdir build.paho +cd build.paho +echo "travis build dir $TRAVIS_BUILD_DIR pwd $PWD" +cmake -DPAHO_WITH_SSL=TRUE -DPAHO_BUILD_DOCUMENTATION=FALSE -DPAHO_BUILD_SAMPLES=TRUE .. +make +python ../test/mqttsas2.py & +ctest -VV --timeout 600 +kill %1 +killall mosquitto + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/travis-env-vars ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/travis-env-vars b/thirdparty/paho.mqtt.c/travis-env-vars new file mode 100644 index 0000000..2551ccb --- /dev/null +++ b/thirdparty/paho.mqtt.c/travis-env-vars @@ -0,0 +1,2 @@ +export TRAVIS_OS_NAME=linux +export TRAVIS_BUILD_DIR=$PWD http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/travis-install.sh ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/travis-install.sh b/thirdparty/paho.mqtt.c/travis-install.sh new file mode 100755 index 0000000..0405da6 --- /dev/null +++ b/thirdparty/paho.mqtt.c/travis-install.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +if [ "$TRAVIS_OS_NAME" == "linux" ]; then + pwd + sudo service mosquitto stop + # Stop any mosquitto instance which may be still running from previous runs + killall mosquitto + mosquitto -h + mosquitto -c test/tls-testing/mosquitto.conf & +fi + +if [ "$TRAVIS_OS_NAME" == "osx" ]; then + pwd + brew update + brew install openssl mosquitto + brew services stop mosquitto + /usr/local/sbin/mosquitto -c test/tls-testing/mosquitto.conf & +fi http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8703b5c/thirdparty/paho.mqtt.c/travis-macos-vars ---------------------------------------------------------------------- diff --git a/thirdparty/paho.mqtt.c/travis-macos-vars b/thirdparty/paho.mqtt.c/travis-macos-vars new file mode 100644 index 0000000..bbdbccb --- /dev/null +++ b/thirdparty/paho.mqtt.c/travis-macos-vars @@ -0,0 +1,2 @@ +export TRAVIS_OS_NAME=osx +export TRAVIS_BUILD_DIR=$PWD
